diff --git a/packages/web/src/Response.zig b/packages/web/src/Response.zig index 9d62336..427975d 100644 --- a/packages/web/src/Response.zig +++ b/packages/web/src/Response.zig @@ -10,11 +10,11 @@ pub const State = union(enum) { errored: anyerror, }; -connection: Connection, +connection: *Connection, writer: std.Io.Writer, state: State, -pub fn init(connection: Connection, write_buffer: []u8) Response { +pub fn init(connection: *Connection, write_buffer: []u8) Response { return .{ .connection = connection, .writer = .fixed(write_buffer), @@ -32,14 +32,14 @@ pub const ResponseOptions = struct { response_body: []const u8, }; -pub fn sendResponseEmpty(self: *Response, options: ResponseEmptyOptions) !void { +pub fn sendEmpty(self: *Response, options: ResponseEmptyOptions) !void { try self.writer.print("{s}", .{options.status_text}); try self.writer.print("\r\n", .{}); self.finalize(); } -pub fn sendResponseClose(self: *Response, options: ResponseEmptyOptions) !void { +pub fn sendClose(self: *Response, options: ResponseEmptyOptions) !void { try self.writer.print("{s}", .{options.status_text}); try self.writer.print("Connection: close\r\n", .{}); try self.writer.print("\r\n", .{}); diff --git a/packages/web/src/Worker.zig b/packages/web/src/Worker.zig new file mode 100644 index 0000000..6392e21 --- /dev/null +++ b/packages/web/src/Worker.zig @@ -0,0 +1,177 @@ +const std = @import("std"); +const Worker = @This(); + +const Connection = @import("Connection.zig"); +const FileDescriptor = @import("FileDescriptor.zig").FileDescriptor; +const http = @import("http.zig"); +const RequestHandler = @import("RequestHandler.zig"); +const RequestRouter = @import("RequestRouter.zig"); +const Response = @import("Response.zig"); +const Server = @import("Server.zig"); + +read_buffer_ptr: [*]u8, +read_buffer_size: usize, +read_head: usize, +read_tail: usize, + +write_buffer: []const u8, + +pub fn worker( + self: *Worker, + server: *Server, + running: *const std.atomic.Value(bool), +) void { + server.mutex.lock(); + defer server.mutex.unlock(); + + while (running.load(.acquire)) { + if (server.connection_queue.pop()) |node| { + const connection: *Connection = @fieldParentPtr("node", node); + + server.mutex.unlock(); + defer { + server.mutex.lock(); + server.connection_pool.append(self.connection.node); + server.cond_connection_freed.signal(); + } + + self.handleConnection(server, server.request_router, connection, running) catch |err| { + std.log.err("Error while handling connection: {}", .{err}); + }; + } else { + server.cond_connection_queued.wait(&server.mutex); + } + } +} + +fn handleConnection( + self: *Worker, + request_router: RequestRouter, + connection: *Connection, + running: *const std.atomic.Value(bool), +) !void { + defer connection.deinit(); + + while (running.load(.acquire)) { + const res = self.handleRequest(request_router, running) catch |err| { + std.log.err("Error while handling request: {}", .{err}); + }; + + if (!res) break; + } +} + +fn handleRequest( + self: *Worker, + request_router: RequestRouter, + connection: *Connection, +) !bool { + var response: Response = .init(connection, self.write_buffer); + var parser: http.Parser = .init(request_router, &response); + + var leftover_bytes = self.read_tail - self.read_head; + const max_read_tail = self.read_head + self.read_buffer_size; + + while (true) { + var bytes_read: usize = undefined; + var chunk: []const u8 = undefined; + + if (leftover_bytes > 0) { + bytes_read = leftover_bytes; + chunk = self.read_buffer_ptr[self.read_head..self.read_tail]; + leftover_bytes = 0; + } else { + const read_tail = self.read_tail; + bytes_read = try connection.fd.read(self.read_buffer_ptr[read_tail..max_read_tail]); + chunk = self.read_buffer_ptr[read_tail .. read_tail + bytes_read]; + self.read_tail += bytes_read; + } + + const res = parser.consume(chunk) catch |err| { + switch (err) { + error.MethodNotSupported => { + try response.sendClose(.{ .status_text = http.status.method_not_allowed }); + }, + error.HttpVersionNotSupported => { + try response.sendClose(.{ .status_text = http.status.http_version_not_supported }); + }, + error.MissingLineFeed => { + try response.sendClose(.{ .status_text = http.status.bad_request }); + }, + error.InvalidContentLength => { + try response.sendClose(.{ .status_text = http.status.bad_request }); + }, + error.RouterError => { + const cause = parser.last_router_error; + const cause_name = @errorName(cause); + + // TODO Could really use separate header and content write + // buffers and this code shows why. We can't start writing + // until we know the exact byte length of the response body, + // because the value of "Content-Length" header depends on + // it. I guess we could do preemptive padding and in-place + // patch it later, but two buffers would be ideal anyway. + // We could then use writev syscall and it still will be + // exactly one syscall per response (unless we want to + // sendfile, but then it will be two syscalls either way). + + try response.writer.print("{s}", .{http.status.internal_server_error}); + try response.writer.print("Content-Type: {s}\r\n", .{"text/plain; charset=utf-8"}); + try response.writer.print("Content-Length: {d}\r\n", .{cause_name.len}); + try response.writer.print("\r\n", .{}); + try response.writer.print("{s}", .{cause_name}); + }, + error.HandlerError => { + const cause = parser.last_handler_error; + const cause_name = @errorName(cause); + + try response.writer.print("{s}", .{http.status.internal_server_error}); + try response.writer.print("Content-Type: {s}\r\n", .{"text/plain; charset=utf-8"}); + try response.writer.print("Content-Length: {d}\r\n", .{cause_name.len}); + try response.writer.print("\r\n", .{}); + try response.writer.print("{s}", .{cause_name}); + }, + } + return false; + }; + + if (self.read_tail - self.read_head >= self.read_buffer_size and !res.done) { + if (parser.state == .body) { + try response.sendClose(.{ .status_text = http.status.content_too_large }); + } else { + try response.sendClose(.{ .status_text = http.status.request_header_fields_too_large }); + } + return false; + } + + if (res.done) { + leftover_bytes = bytes_read - res.consumed; + self.read_head = (self.read_tail - leftover_bytes) & ~(self.read_buffer_size - 1); + self.read_tail = self.read_head + leftover_bytes; + return true; + } + } +} + +fn read(self: *Worker, fd: FileDescriptor) ![]const u8 { + const read_tail = self.read_tail; + const max_read_tail = self.read_head + self.read_buffer_size; + + std.debug.assert(max_read_tail > read_tail); + + const bytes_read = try fd.read(self.read_buffer_ptr[self.read_head .. self.read_head + self.read_buffer_size]); + const bytes = self.read_buffer_ptr[self.read_head .. self.read_head + bytes_read]; + + std.debug.assert(std.math.isPowerOfTwo(self.read_buffer_size)); + self.read_head = (self.read_head + bytes_read) & ~(self.read_buffer_size - 1); + + return bytes; +} + +fn consumeReadHead(self: *Worker, consumed: usize) void { + const bytes_read = self.read_tail - self.read_head; + std.debug.assert(consumed <= bytes_read); + + self.read_head = (self.read_head + consumed) & ~(self.read_buffer_size - 1); + self.read_tail = self.read_head + (bytes_read - consumed); +} diff --git a/packages/web/src/http.zig b/packages/web/src/http.zig new file mode 100644 index 0000000..db520fb --- /dev/null +++ b/packages/web/src/http.zig @@ -0,0 +1,6 @@ +const std = @import("std"); + +pub const Header = @import("http/Header.zig"); +pub const Method = @import("http/Method.zig").Method; +pub const Parser = @import("http/Parser.zig"); +pub const status = @import("http/status.zig"); diff --git a/packages/web/src/main.zig b/packages/web/src/main.zig index 449c702..7fad64e 100644 --- a/packages/web/src/main.zig +++ b/packages/web/src/main.zig @@ -124,9 +124,9 @@ const Handler = struct { const self: *Handler = @ptrCast(@alignCast(ctx)); const time_ns = self.timer.read(); - const time_us = time_ns / std.time.ns_per_us; + const time_us_ceil = (time_ns + std.time.ns_per_us - 1) / std.time.ns_per_us; - std.log.info("{s} {s} [{d}]", .{ @tagName(self.route.method), self.route.pathname, time_us }); + std.log.info("{s} {s} (<={} [µs])", .{ @tagName(self.route.method), self.route.pathname, time_us_ceil }); self.allocator.destroy(self); }