const std = @import("std"); const Worker = @This(); const Connection = @import("Connection.zig"); const FileDescriptor = @import("FileDescriptor.zig").FileDescriptor; const http = @import("http.zig"); const Request = @import("Request.zig"); const RequestHandler = @import("RequestHandler.zig"); const Response = @import("Response.zig"); const Server = @import("Server.zig"); const log = std.log.scoped(.Worker); /// Integer unique for this worker. Has no functional meaning. Can be used for /// debugging and profiling. worker_id: usize, read_buffer_ptr: [*]u8, read_buffer_size: usize, read_head: usize, read_tail: usize, header_write_buffer: []u8, body_write_buffer: []u8, header_hash_map: Request.HeaderHashMap, header_value_buffer: []Request.HeaderValue, pub const Options = struct { worker_id: usize, max_header_fields: u32, read_buffer_ptr: [*]u8, read_buffer_size: usize, header_write_buffer: []u8, body_write_buffer: []u8, }; pub fn init(allocator: std.mem.Allocator, options: Options) !Worker { var header_hash_map: Request.HeaderHashMap = .empty; try header_hash_map.ensureTotalCapacity(allocator, options.max_header_fields); errdefer header_hash_map.deinit(allocator); const header_value_buffer = try allocator.alloc(Request.HeaderValue, options.max_header_fields); errdefer allocator.free(header_value_buffer); return .{ .worker_id = options.worker_id, .read_buffer_ptr = options.read_buffer_ptr, .read_buffer_size = options.read_buffer_size, .read_head = 0, .read_tail = 0, .header_write_buffer = options.header_write_buffer, .body_write_buffer = options.body_write_buffer, .header_hash_map = header_hash_map, .header_value_buffer = header_value_buffer, }; } pub fn deinit(self: *Worker, allocator: std.mem.Allocator) void { log.debug("[#{d}] Deinitializing Worker.", .{self.worker_id}); allocator.free(self.header_value_buffer); self.header_hash_map.deinit(allocator); self.* = undefined; } pub fn worker( self: *Worker, server: *Server, running: *const std.atomic.Value(bool), ) void { log.debug("[#{d}] Acquiring mutex.", .{self.worker_id}); server.mutex.lock(); log.debug("[#{d}] Acquired mutex.", .{self.worker_id}); defer { log.debug("[#{d}] Unlocking mutex.", .{self.worker_id}); server.mutex.unlock(); } while (running.load(.acquire)) { if (server.connection_queue.pop()) |node| { const connection: *Connection = @fieldParentPtr("node", node); log.debug("[#{d}] Popped connection to {f} from the connection queue.", .{ self.worker_id, connection.address }); log.debug("[#{d}] Unlocking mutex.", .{self.worker_id}); server.mutex.unlock(); defer { log.debug("[#{d}] Acquiring mutex.", .{self.worker_id}); server.mutex.lock(); log.debug("[#{d}] Acquired mutex.", .{self.worker_id}); log.debug("[#{d}] Returning connection to connection pool.", .{self.worker_id}); server.connection_pool.append(&connection.node); log.debug("[#{d}] Signaling connection freed condition variable.", .{self.worker_id}); server.cond_connection_freed.signal(); } log.debug("[#{d}] Handling connection to {f}.", .{ self.worker_id, connection.address }); self.handleConnection(server.request_handler, connection, running) catch |err| { log.err("[#{d}] Error while handling connection: {}", .{ self.worker_id, err }); }; } else { log.debug("[#{d}] Waiting on connection queued condition variable.", .{self.worker_id}); server.cond_connection_queued.wait(&server.mutex); log.debug("[#{d}] Woken up on connection queued condition variable.", .{self.worker_id}); } } else { log.debug("[#{d}] Loaded `false` from running, the worker loop exited.", .{self.worker_id}); } } fn handleConnection( self: *Worker, request_handler: RequestHandler, connection: *Connection, running: *const std.atomic.Value(bool), ) !void { defer connection.deinit(); while (running.load(.acquire)) { const res = self.handleRequest(request_handler, connection) catch |err| { log.err("[#{d}] Error while handling request: {}", .{ self.worker_id, err }); return err; }; if (!res) { log.debug("[#{d}] Request handler indicated to stop handling the connection to {f}.", .{ self.worker_id, connection.address }); break; } } else { log.debug("[#{d}] Loaded `false` from running, the connection handler loop exited.", .{self.worker_id}); } } fn handleRequest( self: *Worker, request_handler: RequestHandler, connection: *Connection, ) !bool { self.header_hash_map.clearRetainingCapacity(); var request: Request = .{ .method = undefined, .pathname = undefined, .headers = &self.header_hash_map, .body = undefined, }; var response: Response = .init(connection, self.header_write_buffer, self.body_write_buffer); var parser: http.Parser = .init(); var next_header_index: usize = 0; var ignore: bool = false; var client_closed: bool = false; var leftover_bytes = self.read_tail - self.read_head; const max_read_tail = self.read_head + self.read_buffer_size; while (true) { var bytes_read: usize = undefined; var chunk: []const u8 = undefined; if (leftover_bytes > 0) { bytes_read = leftover_bytes; chunk = self.read_buffer_ptr[self.read_tail - leftover_bytes .. self.read_tail]; leftover_bytes = 0; } else { const read_tail = self.read_tail; bytes_read = connection.read(self.read_buffer_ptr[read_tail..max_read_tail]) catch |err| switch (err) { error.Timeout => { log.debug("[#{d}] Connection to {f} timed out.", .{ self.worker_id, connection.address }); return false; }, else => return err, }; if (bytes_read == 0) { log.debug("[#{d}] Read zero bytes from connection to {f}.", .{ self.worker_id, connection.address }); return false; } chunk = self.read_buffer_ptr[read_tail .. read_tail + bytes_read]; self.read_tail += bytes_read; } const res = parser.consume(chunk) catch |err| { switch (err) { error.MethodNotSupported => try closeWith(&response, http.status.method_not_allowed), error.HttpVersionNotSupported => try closeWith(&response, http.status.http_version_not_supported), error.SyntaxError => try closeWith(&response, http.status.bad_request), } return false; }; const done = if (res.result) |result| std.meta.activeTag(result) == .body else false; if (self.read_tail - self.read_head >= self.read_buffer_size and !done) { if (parser.state == .body) { try closeWith(&response, http.status.content_too_large); } else { try closeWith(&response, http.status.request_header_fields_too_large); } return false; } if (res.result) |result| { switch (result) { .method => |method| request.method = method, .pathname => |pathname| request.pathname = pathname, .header => |header| blk: { if (header.isNamedKnown(.Connection) and std.mem.eql(u8, header.value, "close")) { client_closed = true; } if (ignore) { break :blk; } if (next_header_index >= self.header_value_buffer.len or self.header_hash_map.available == 0) { // TODO Here, we could ignore, but make sure this does // not clash with the other "request too long" checks // (i.e. be careful not to double respond). _ = &ignore; try closeWith(&response, http.status.request_header_fields_too_large); return false; } else { const entry = self.header_hash_map.getOrPutAssumeCapacity(header.name); const header_value = &self.header_value_buffer[next_header_index]; header_value.* = .{ .node = .{}, .value = header.value }; next_header_index += 1; if (!entry.found_existing) { entry.value_ptr.* = .{ .len = 0, .list = .{}, }; } entry.value_ptr.list.prepend(&header_value.node); entry.value_ptr.len += 1; } }, .end_of_headers => {}, .body => |body| { request.body = body; if (!ignore) { request_handler.handle(&request, &response) catch |err| { if (response.state == .init) { response.header_writer.end = 0; response.body_writer.end = 0; const error_name = @errorName(err); try response.body_writer.print("Internal Server Error\n{s}\n", .{error_name}); try response.header_writer.writeAll(http.status.internal_server_error); try response.header_writer.writeAll("Content-Type: text/plain; charset=utf-8\r\n"); try response.header_writer.print("Content-Length: {d}\r\n", .{response.body_writer.end}); try response.header_writer.writeAll("\r\n"); response.sendHeadersAndBody(); } }; } if (response.state == .init) { const no_headers = response.header_writer.end > 0; const no_body = response.body_writer.end > 0; if (no_headers) { if (no_body) { try response.header_writer.writeAll(http.status.no_content); try response.header_writer.writeAll("\r\n"); response.sendHeadersOnly(); } else { try response.header_writer.writeAll(http.status.ok); try response.header_writer.writeAll("Content-Type: application/octet-stream"); try response.header_writer.print("Content-Length: {d}\r\n", .{response.body_writer.end}); try response.header_writer.writeAll("\r\n"); response.sendHeadersAndBody(); } } else { if (no_body) { response.sendHeadersOnly(); } else { response.sendHeadersAndBody(); } } } leftover_bytes = bytes_read - res.consumed; self.read_head = (self.read_tail - leftover_bytes) & ~(self.read_buffer_size - 1); self.read_tail = self.read_head + leftover_bytes; return !client_closed; }, } } leftover_bytes = bytes_read - res.consumed; } } fn closeWith(response: *Response, status_line: []const u8) !void { // This function is meant to be called before a request handler gets to do // anything. std.debug.assert(response.header_writer.end == 0); std.debug.assert(response.body_writer.end == 0); std.debug.assert(response.state == .init); try response.header_writer.writeAll(status_line); try response.header_writer.writeAll("Connection: close\r\n"); try response.header_writer.writeAll("\r\n"); }