diff --git a/packages/web/src/FileDescriptor.zig b/packages/web/src/FileDescriptor.zig index 70bc788..22b964c 100644 --- a/packages/web/src/FileDescriptor.zig +++ b/packages/web/src/FileDescriptor.zig @@ -33,13 +33,16 @@ pub const FileDescriptor = enum(i32) { } pub fn accept(self: FileDescriptor, noalias addr: ?*linux.sockaddr, noalias len: ?*linux.socklen_t) !FileDescriptor { - const rc = linux.accept(@intFromEnum(self), addr, len); - return switch (errno(rc)) { - .SUCCESS => @enumFromInt(@as(i32, @intCast(rc))), - .CONNABORTED => return error.ConnectionAborted, - .PERM => return error.BlockedByFirewall, - else => return error.SystemError, - }; + while (true) { + const rc = linux.accept(@intFromEnum(self), addr, len); + switch (errno(rc)) { + .SUCCESS => return @enumFromInt(@as(i32, @intCast(rc))), + .INPROGRESS, .AGAIN => continue, + .CONNABORTED => return error.ConnectionAborted, + .PERM => return error.BlockedByFirewall, + else => return error.SystemError, + } + } } pub fn bind(self: FileDescriptor, addr: *const linux.sockaddr, len: linux.socklen_t) !void { @@ -90,6 +93,7 @@ pub const FileDescriptor = enum(i32) { switch (errno(rc)) { .SUCCESS => return rc, .INTR => continue, + .AGAIN => return error.Timeout, else => return error.SystemError, } } @@ -111,6 +115,7 @@ pub const FileDescriptor = enum(i32) { switch (errno(rc)) { .SUCCESS => return rc, .INTR => continue, + .AGAIN, .WOULDBLOCK => error.Timeout, else => return error.SystemError, } } diff --git a/packages/web/src/Server.zig b/packages/web/src/Server.zig index 708f16c..a0027d2 100644 --- a/packages/web/src/Server.zig +++ b/packages/web/src/Server.zig @@ -9,6 +9,7 @@ const Request = @import("Request.zig"); const RequestHandler = @import("RequestHandler.zig"); const Worker = @import("Worker.zig"); +const log = std.log.scoped(.Server); const linux = std.os.linux; const errno = linux.E.init; @@ -18,6 +19,7 @@ ssl_ctx: ?*openssl.SslContext, workers: []Worker, threads: []std.Thread, request_handler: RequestHandler, +read_timeout_us: u64, connection_queue: std.DoublyLinkedList, // NOTE Connection pool has no need for being doubly-linked, but the queue has @@ -65,6 +67,10 @@ pub const Options = struct { /// bodies generated with the body writer and not to bodies sent with /// `sendfile`. body_write_buffer_huge_pages: u32 = 1, + /// How much time should a worker wait on an idle connection before closing + /// it. Specifically, how much time can a `read` syscall block for, before + /// the connection is forcefully closed. + read_timeout_us: u64 = 1 * std.time.us_per_s, }; pub fn init(allocator: std.mem.Allocator, options: Options) !Server { @@ -220,6 +226,7 @@ pub fn init(allocator: std.mem.Allocator, options: Options) !Server { .workers = workers, .threads = threads, .request_handler = options.request_handler, + .read_timeout_us = options.read_timeout_us, .connection_queue = .{}, .connection_pool = connection_pool, @@ -232,6 +239,7 @@ pub fn init(allocator: std.mem.Allocator, options: Options) !Server { } pub fn deinit(self: *Server, allocator: std.mem.Allocator) void { + log.debug("Deinitializing Server.", .{}); const worker_count = self.workers.len; const single_read_buffers_size = self.workers[0].read_buffer_size; @@ -273,14 +281,18 @@ pub fn listen(self: *Server, running: *const std.atomic.Value(bool)) !void { var spawned: usize = 0; defer { + log.debug("Storing `false` into worker_running.", .{}); worker_running.store(false, .release); + log.debug("Broadcasting connection queued condition variable.", .{}); self.cond_connection_queued.broadcast(); - for (self.threads[0..spawned]) |*thread| { + for (self.threads[0..spawned], 0..) |*thread, i| { + log.debug("Joining the thread of worker #{d}.", .{i}); thread.join(); } } for (self.workers, 0..) |*worker, i| { + log.debug("Spawning thread for worker #{d}.", .{i}); self.threads[i] = try std.Thread.spawn(.{}, Worker.worker, .{ worker, self, &worker_running }); spawned += 1; } @@ -289,34 +301,53 @@ pub fn listen(self: *Server, running: *const std.atomic.Value(bool)) !void { var address: std.net.Address = undefined; var address_size: u32 = @sizeOf(std.net.Address); + log.debug("Accepting connection.", .{}); const fd = self.fd.accept(&address.any, &address_size) catch |e| { - std.log.err("Error while accepting connection: {}", .{e}); + log.err("Error while accepting connection: {}", .{e}); continue; }; + log.debug("Accepted connection from {f}", .{address}); + + const timeout: linux.timeval = .{ + .sec = @intCast(self.read_timeout_us / std.time.us_per_s), + .usec = @intCast(self.read_timeout_us % std.time.us_per_s), + }; + try fd.setsockopt(linux.SOL.SOCKET, linux.SO.RCVTIMEO, std.mem.asBytes(&timeout)); const ssl: ?*openssl.Ssl = self.maybeInitSsl(fd) catch |e| { - std.log.err("Error while estabilishing SSL connection: {}", .{e}); + log.err("Error while estabilishing SSL connection: {}", .{e}); fd.close(); continue; }; { + log.debug("Acquiring mutex.", .{}); self.mutex.lock(); - defer self.mutex.unlock(); + log.debug("Acquired mutex.", .{}); + defer { + log.debug("Unlocking mutex.", .{}); + self.mutex.unlock(); + } while (true) { if (self.connection_pool.pop()) |node| { const connection: *Connection = @fieldParentPtr("node", node); connection.reinit(address, fd, ssl); + log.debug("Adding connection to {f} to the connection queue.", .{connection.address}); self.connection_queue.prepend(node); break; } + log.debug("Waiting on connection freed condition variable.", .{}); self.cond_connection_freed.wait(&self.mutex); + log.debug("Woken up on connection freed condition variable.", .{}); } } + log.debug("Signaling connection queued condition variable.", .{}); self.cond_connection_queued.signal(); + } else { + log.debug("Loaded `false` from running, the accept loop exited.", .{}); } } @@ -324,7 +355,9 @@ fn maybeInitSsl(self: *const Server, fd: FileDescriptor) !?*openssl.Ssl { if (self.ssl_ctx) |ssl_ctx| { const ssl = try openssl.Ssl.new(ssl_ctx); try ssl.setFd(fd); + log.debug("Accepting SSL layer.", .{}); try ssl.accept(); + log.debug("Accepted SSL layer.", .{}); return ssl; } else { return null; diff --git a/packages/web/src/Worker.zig b/packages/web/src/Worker.zig index 94fb845..68ec75c 100644 --- a/packages/web/src/Worker.zig +++ b/packages/web/src/Worker.zig @@ -9,6 +9,8 @@ const RequestHandler = @import("RequestHandler.zig"); const Response = @import("Response.zig"); const Server = @import("Server.zig"); +const log = std.log.scoped(.Worker); + /// Integer unique for this worker. Has no functional meaning. Can be used for /// debugging and profiling. worker_id: usize, @@ -61,6 +63,8 @@ pub fn init(allocator: std.mem.Allocator, options: Options) !Worker { } pub fn deinit(self: *Worker, allocator: std.mem.Allocator) void { + log.debug("[#{d}] Deinitializing Worker.", .{self.worker_id}); + allocator.free(self.header_value_buffer); self.header_hash_map.deinit(allocator); @@ -72,26 +76,43 @@ pub fn worker( server: *Server, running: *const std.atomic.Value(bool), ) void { + log.debug("[#{d}] Acquiring mutex.", .{self.worker_id}); server.mutex.lock(); - defer server.mutex.unlock(); + log.debug("[#{d}] Acquired mutex.", .{self.worker_id}); + defer { + log.debug("[#{d}] Unlocking mutex.", .{self.worker_id}); + server.mutex.unlock(); + } while (running.load(.acquire)) { if (server.connection_queue.pop()) |node| { const connection: *Connection = @fieldParentPtr("node", node); + log.debug("[#{d}] Popped connection to {f} from the connection queue.", .{ self.worker_id, connection.address }); + log.debug("[#{d}] Unlocking mutex.", .{self.worker_id}); server.mutex.unlock(); defer { + log.debug("[#{d}] Acquiring mutex.", .{self.worker_id}); server.mutex.lock(); + log.debug("[#{d}] Acquired mutex.", .{self.worker_id}); + + log.debug("[#{d}] Returning connection to connection pool.", .{self.worker_id}); server.connection_pool.append(&connection.node); + log.debug("[#{d}] Signaling connection freed condition variable.", .{self.worker_id}); server.cond_connection_freed.signal(); } + log.debug("[#{d}] Handling connection to {f}.", .{ self.worker_id, connection.address }); self.handleConnection(server.request_handler, connection, running) catch |err| { - std.log.err("Error while handling connection: {}", .{err}); + log.err("[#{d}] Error while handling connection: {}", .{ self.worker_id, err }); }; } else { + log.debug("[#{d}] Waiting on connection queued condition variable.", .{self.worker_id}); server.cond_connection_queued.wait(&server.mutex); + log.debug("[#{d}] Woken up on connection queued condition variable.", .{self.worker_id}); } + } else { + log.debug("[#{d}] Loaded `false` from running, the worker loop exited.", .{self.worker_id}); } } @@ -105,11 +126,16 @@ fn handleConnection( while (running.load(.acquire)) { const res = self.handleRequest(request_handler, connection) catch |err| { - std.log.err("Error while handling request: {}", .{err}); + log.err("[#{d}] Error while handling request: {}", .{ self.worker_id, err }); return err; }; - if (!res) break; + if (!res) { + log.debug("[#{d}] Request handler indicated to stop handling the connection to {f}.", .{ self.worker_id, connection.address }); + break; + } + } else { + log.debug("[#{d}] Loaded `false` from running, the connection handler loop exited.", .{self.worker_id}); } } @@ -131,6 +157,7 @@ fn handleRequest( var next_header_index: usize = 0; var ignore: bool = false; + var client_closed: bool = false; var leftover_bytes = self.read_tail - self.read_head; const max_read_tail = self.read_head + self.read_buffer_size; @@ -145,7 +172,17 @@ fn handleRequest( leftover_bytes = 0; } else { const read_tail = self.read_tail; - bytes_read = try connection.read(self.read_buffer_ptr[read_tail..max_read_tail]); + bytes_read = connection.read(self.read_buffer_ptr[read_tail..max_read_tail]) catch |err| switch (err) { + error.Timeout => { + log.debug("[#{d}] Connection to {f} timed out.", .{ self.worker_id, connection.address }); + return false; + }, + else => return err, + }; + if (bytes_read == 0) { + log.debug("[#{d}] Read zero bytes from connection to {f}.", .{ self.worker_id, connection.address }); + return false; + } chunk = self.read_buffer_ptr[read_tail .. read_tail + bytes_read]; self.read_tail += bytes_read; } @@ -175,6 +212,10 @@ fn handleRequest( .method => |method| request.method = method, .pathname => |pathname| request.pathname = pathname, .header => |header| blk: { + if (header.isNamedKnown(.Connection) and std.mem.eql(u8, header.value, "close")) { + client_closed = true; + } + if (ignore) { break :blk; } @@ -258,7 +299,7 @@ fn handleRequest( leftover_bytes = bytes_read - res.consumed; self.read_head = (self.read_tail - leftover_bytes) & ~(self.read_buffer_size - 1); self.read_tail = self.read_head + leftover_bytes; - return true; + return !client_closed; }, } } diff --git a/packages/web/src/main.zig b/packages/web/src/main.zig index 123df8e..41f29a0 100644 --- a/packages/web/src/main.zig +++ b/packages/web/src/main.zig @@ -8,8 +8,24 @@ const UUID = web.UUID; var running: std.atomic.Value(bool) = .init(true); -fn interruptionHandler(signal: i32) callconv(.c) void { - switch (signal) { +fn interruptionHandler(sig: i32) callconv(.c) void { + var buf: [32]u8 = undefined; + + const signal_name = blk: inline for (@typeInfo(linux.SIG).@"struct".decls) |decl| { + if (comptime std.mem.eql(u8, decl.name, "BLOCK") or + std.mem.eql(u8, decl.name, "UNBLOCK") or + std.mem.eql(u8, decl.name, "SETMASK")) continue; + + const decl_value = @field(linux.SIG, decl.name); + if (@TypeOf(decl_value) != comptime_int) continue; + if (decl_value == sig) break :blk "SIG" ++ decl.name; + } else { + break :blk std.fmt.bufPrint(&buf, "#{d}", .{sig}) catch unreachable; + }; + + std.log.debug("Interrupted with signal {s}.", .{signal_name}); + + switch (sig) { linux.SIG.INT, linux.SIG.TERM => { running.store(false, .release); }, @@ -19,6 +35,12 @@ fn interruptionHandler(signal: i32) callconv(.c) void { const Handler = struct { fn handle(_: *anyopaque, request: *web.Request, response: *web.Response) !void { + std.log.info("{f} | {s} {s}", .{ + response.connection.address, + @tagName(request.method), + request.pathname, + }); + if (!std.mem.eql(u8, request.pathname, "/")) { try response.body_writer.writeAll("Not Found\n"); @@ -96,14 +118,16 @@ pub fn main() !void { .mask = linux.sigemptyset(), .flags = linux.SA.RESETHAND, }; - const rc = linux.sigaction(linux.SIG.INT, &sigaction, null); - switch (errno(rc)) { - .SUCCESS => {}, - else => |e| { - std.log.err("Error while estabilishing interruption handler: {s}", .{@tagName(e)}); - return error.SystemError; - }, - } + signal(linux.SIG.INT, &sigaction); + signal(linux.SIG.TERM, &sigaction); try server.listen(&running); } + +fn signal(sig: u8, action: *const linux.Sigaction) void { + var old_action = std.mem.zeroes(linux.Sigaction); + _ = linux.sigaction(sig, null, &old_action); + if (old_action.handler.handler == linux.SIG.IGN) return; + + _ = linux.sigaction(sig, action, null); +} diff --git a/packages/web/src/openssl/Ssl.zig b/packages/web/src/openssl/Ssl.zig index b487d6c..3a379a6 100644 --- a/packages/web/src/openssl/Ssl.zig +++ b/packages/web/src/openssl/Ssl.zig @@ -19,6 +19,10 @@ pub const Ssl = opaque { import.SSL_free(self); } + pub inline fn getError(self: *const Ssl, ret_code: i32) i32 { + return import.SSL_get_error(self, ret_code); + } + pub inline fn new(ctx: *SslContext) !*Ssl { return import.SSL_new(ctx) orelse error.OpenSslError; } @@ -26,7 +30,12 @@ pub const Ssl = opaque { pub inline fn read(self: *Ssl, buf: []u8) !usize { var bytes_read: usize = undefined; const res = import.SSL_read_ex(self, buf.ptr, buf.len, &bytes_read); - return if (res <= 0) error.OpenSslError else bytes_read; + if (res <= 0) { + const err = self.getError(res); + return if (err == c_ssl.SSL_ERROR_ZERO_RETURN) 0 else error.OpenSslError; + } else { + return bytes_read; + } } pub fn readAll(self: *Ssl, buf: []u8) !void {