Massive logging, improve connection closing/timeout handling

This commit is contained in:
2026-03-13 02:47:36 +01:00
parent 712e214f61
commit cbf0d6a9da
5 changed files with 140 additions and 28 deletions

View File

@@ -33,13 +33,16 @@ pub const FileDescriptor = enum(i32) {
}
pub fn accept(self: FileDescriptor, noalias addr: ?*linux.sockaddr, noalias len: ?*linux.socklen_t) !FileDescriptor {
const rc = linux.accept(@intFromEnum(self), addr, len);
return switch (errno(rc)) {
.SUCCESS => @enumFromInt(@as(i32, @intCast(rc))),
.CONNABORTED => return error.ConnectionAborted,
.PERM => return error.BlockedByFirewall,
else => return error.SystemError,
};
while (true) {
const rc = linux.accept(@intFromEnum(self), addr, len);
switch (errno(rc)) {
.SUCCESS => return @enumFromInt(@as(i32, @intCast(rc))),
.INPROGRESS, .AGAIN => continue,
.CONNABORTED => return error.ConnectionAborted,
.PERM => return error.BlockedByFirewall,
else => return error.SystemError,
}
}
}
pub fn bind(self: FileDescriptor, addr: *const linux.sockaddr, len: linux.socklen_t) !void {
@@ -90,6 +93,7 @@ pub const FileDescriptor = enum(i32) {
switch (errno(rc)) {
.SUCCESS => return rc,
.INTR => continue,
.AGAIN => return error.Timeout,
else => return error.SystemError,
}
}
@@ -111,6 +115,7 @@ pub const FileDescriptor = enum(i32) {
switch (errno(rc)) {
.SUCCESS => return rc,
.INTR => continue,
.AGAIN, .WOULDBLOCK => error.Timeout,
else => return error.SystemError,
}
}

View File

@@ -9,6 +9,7 @@ const Request = @import("Request.zig");
const RequestHandler = @import("RequestHandler.zig");
const Worker = @import("Worker.zig");
const log = std.log.scoped(.Server);
const linux = std.os.linux;
const errno = linux.E.init;
@@ -18,6 +19,7 @@ ssl_ctx: ?*openssl.SslContext,
workers: []Worker,
threads: []std.Thread,
request_handler: RequestHandler,
read_timeout_us: u64,
connection_queue: std.DoublyLinkedList,
// NOTE Connection pool has no need for being doubly-linked, but the queue has
@@ -65,6 +67,10 @@ pub const Options = struct {
/// bodies generated with the body writer and not to bodies sent with
/// `sendfile`.
body_write_buffer_huge_pages: u32 = 1,
/// How much time should a worker wait on an idle connection before closing
/// it. Specifically, how much time can a `read` syscall block for, before
/// the connection is forcefully closed.
read_timeout_us: u64 = 1 * std.time.us_per_s,
};
pub fn init(allocator: std.mem.Allocator, options: Options) !Server {
@@ -220,6 +226,7 @@ pub fn init(allocator: std.mem.Allocator, options: Options) !Server {
.workers = workers,
.threads = threads,
.request_handler = options.request_handler,
.read_timeout_us = options.read_timeout_us,
.connection_queue = .{},
.connection_pool = connection_pool,
@@ -232,6 +239,7 @@ pub fn init(allocator: std.mem.Allocator, options: Options) !Server {
}
pub fn deinit(self: *Server, allocator: std.mem.Allocator) void {
log.debug("Deinitializing Server.", .{});
const worker_count = self.workers.len;
const single_read_buffers_size = self.workers[0].read_buffer_size;
@@ -273,14 +281,18 @@ pub fn listen(self: *Server, running: *const std.atomic.Value(bool)) !void {
var spawned: usize = 0;
defer {
log.debug("Storing `false` into worker_running.", .{});
worker_running.store(false, .release);
log.debug("Broadcasting connection queued condition variable.", .{});
self.cond_connection_queued.broadcast();
for (self.threads[0..spawned]) |*thread| {
for (self.threads[0..spawned], 0..) |*thread, i| {
log.debug("Joining the thread of worker #{d}.", .{i});
thread.join();
}
}
for (self.workers, 0..) |*worker, i| {
log.debug("Spawning thread for worker #{d}.", .{i});
self.threads[i] = try std.Thread.spawn(.{}, Worker.worker, .{ worker, self, &worker_running });
spawned += 1;
}
@@ -289,34 +301,53 @@ pub fn listen(self: *Server, running: *const std.atomic.Value(bool)) !void {
var address: std.net.Address = undefined;
var address_size: u32 = @sizeOf(std.net.Address);
log.debug("Accepting connection.", .{});
const fd = self.fd.accept(&address.any, &address_size) catch |e| {
std.log.err("Error while accepting connection: {}", .{e});
log.err("Error while accepting connection: {}", .{e});
continue;
};
log.debug("Accepted connection from {f}", .{address});
const timeout: linux.timeval = .{
.sec = @intCast(self.read_timeout_us / std.time.us_per_s),
.usec = @intCast(self.read_timeout_us % std.time.us_per_s),
};
try fd.setsockopt(linux.SOL.SOCKET, linux.SO.RCVTIMEO, std.mem.asBytes(&timeout));
const ssl: ?*openssl.Ssl = self.maybeInitSsl(fd) catch |e| {
std.log.err("Error while estabilishing SSL connection: {}", .{e});
log.err("Error while estabilishing SSL connection: {}", .{e});
fd.close();
continue;
};
{
log.debug("Acquiring mutex.", .{});
self.mutex.lock();
defer self.mutex.unlock();
log.debug("Acquired mutex.", .{});
defer {
log.debug("Unlocking mutex.", .{});
self.mutex.unlock();
}
while (true) {
if (self.connection_pool.pop()) |node| {
const connection: *Connection = @fieldParentPtr("node", node);
connection.reinit(address, fd, ssl);
log.debug("Adding connection to {f} to the connection queue.", .{connection.address});
self.connection_queue.prepend(node);
break;
}
log.debug("Waiting on connection freed condition variable.", .{});
self.cond_connection_freed.wait(&self.mutex);
log.debug("Woken up on connection freed condition variable.", .{});
}
}
log.debug("Signaling connection queued condition variable.", .{});
self.cond_connection_queued.signal();
} else {
log.debug("Loaded `false` from running, the accept loop exited.", .{});
}
}
@@ -324,7 +355,9 @@ fn maybeInitSsl(self: *const Server, fd: FileDescriptor) !?*openssl.Ssl {
if (self.ssl_ctx) |ssl_ctx| {
const ssl = try openssl.Ssl.new(ssl_ctx);
try ssl.setFd(fd);
log.debug("Accepting SSL layer.", .{});
try ssl.accept();
log.debug("Accepted SSL layer.", .{});
return ssl;
} else {
return null;

View File

@@ -9,6 +9,8 @@ const RequestHandler = @import("RequestHandler.zig");
const Response = @import("Response.zig");
const Server = @import("Server.zig");
const log = std.log.scoped(.Worker);
/// Integer unique for this worker. Has no functional meaning. Can be used for
/// debugging and profiling.
worker_id: usize,
@@ -61,6 +63,8 @@ pub fn init(allocator: std.mem.Allocator, options: Options) !Worker {
}
pub fn deinit(self: *Worker, allocator: std.mem.Allocator) void {
log.debug("[#{d}] Deinitializing Worker.", .{self.worker_id});
allocator.free(self.header_value_buffer);
self.header_hash_map.deinit(allocator);
@@ -72,26 +76,43 @@ pub fn worker(
server: *Server,
running: *const std.atomic.Value(bool),
) void {
log.debug("[#{d}] Acquiring mutex.", .{self.worker_id});
server.mutex.lock();
defer server.mutex.unlock();
log.debug("[#{d}] Acquired mutex.", .{self.worker_id});
defer {
log.debug("[#{d}] Unlocking mutex.", .{self.worker_id});
server.mutex.unlock();
}
while (running.load(.acquire)) {
if (server.connection_queue.pop()) |node| {
const connection: *Connection = @fieldParentPtr("node", node);
log.debug("[#{d}] Popped connection to {f} from the connection queue.", .{ self.worker_id, connection.address });
log.debug("[#{d}] Unlocking mutex.", .{self.worker_id});
server.mutex.unlock();
defer {
log.debug("[#{d}] Acquiring mutex.", .{self.worker_id});
server.mutex.lock();
log.debug("[#{d}] Acquired mutex.", .{self.worker_id});
log.debug("[#{d}] Returning connection to connection pool.", .{self.worker_id});
server.connection_pool.append(&connection.node);
log.debug("[#{d}] Signaling connection freed condition variable.", .{self.worker_id});
server.cond_connection_freed.signal();
}
log.debug("[#{d}] Handling connection to {f}.", .{ self.worker_id, connection.address });
self.handleConnection(server.request_handler, connection, running) catch |err| {
std.log.err("Error while handling connection: {}", .{err});
log.err("[#{d}] Error while handling connection: {}", .{ self.worker_id, err });
};
} else {
log.debug("[#{d}] Waiting on connection queued condition variable.", .{self.worker_id});
server.cond_connection_queued.wait(&server.mutex);
log.debug("[#{d}] Woken up on connection queued condition variable.", .{self.worker_id});
}
} else {
log.debug("[#{d}] Loaded `false` from running, the worker loop exited.", .{self.worker_id});
}
}
@@ -105,11 +126,16 @@ fn handleConnection(
while (running.load(.acquire)) {
const res = self.handleRequest(request_handler, connection) catch |err| {
std.log.err("Error while handling request: {}", .{err});
log.err("[#{d}] Error while handling request: {}", .{ self.worker_id, err });
return err;
};
if (!res) break;
if (!res) {
log.debug("[#{d}] Request handler indicated to stop handling the connection to {f}.", .{ self.worker_id, connection.address });
break;
}
} else {
log.debug("[#{d}] Loaded `false` from running, the connection handler loop exited.", .{self.worker_id});
}
}
@@ -131,6 +157,7 @@ fn handleRequest(
var next_header_index: usize = 0;
var ignore: bool = false;
var client_closed: bool = false;
var leftover_bytes = self.read_tail - self.read_head;
const max_read_tail = self.read_head + self.read_buffer_size;
@@ -145,7 +172,17 @@ fn handleRequest(
leftover_bytes = 0;
} else {
const read_tail = self.read_tail;
bytes_read = try connection.read(self.read_buffer_ptr[read_tail..max_read_tail]);
bytes_read = connection.read(self.read_buffer_ptr[read_tail..max_read_tail]) catch |err| switch (err) {
error.Timeout => {
log.debug("[#{d}] Connection to {f} timed out.", .{ self.worker_id, connection.address });
return false;
},
else => return err,
};
if (bytes_read == 0) {
log.debug("[#{d}] Read zero bytes from connection to {f}.", .{ self.worker_id, connection.address });
return false;
}
chunk = self.read_buffer_ptr[read_tail .. read_tail + bytes_read];
self.read_tail += bytes_read;
}
@@ -175,6 +212,10 @@ fn handleRequest(
.method => |method| request.method = method,
.pathname => |pathname| request.pathname = pathname,
.header => |header| blk: {
if (header.isNamedKnown(.Connection) and std.mem.eql(u8, header.value, "close")) {
client_closed = true;
}
if (ignore) {
break :blk;
}
@@ -258,7 +299,7 @@ fn handleRequest(
leftover_bytes = bytes_read - res.consumed;
self.read_head = (self.read_tail - leftover_bytes) & ~(self.read_buffer_size - 1);
self.read_tail = self.read_head + leftover_bytes;
return true;
return !client_closed;
},
}
}

View File

@@ -8,8 +8,24 @@ const UUID = web.UUID;
var running: std.atomic.Value(bool) = .init(true);
fn interruptionHandler(signal: i32) callconv(.c) void {
switch (signal) {
fn interruptionHandler(sig: i32) callconv(.c) void {
var buf: [32]u8 = undefined;
const signal_name = blk: inline for (@typeInfo(linux.SIG).@"struct".decls) |decl| {
if (comptime std.mem.eql(u8, decl.name, "BLOCK") or
std.mem.eql(u8, decl.name, "UNBLOCK") or
std.mem.eql(u8, decl.name, "SETMASK")) continue;
const decl_value = @field(linux.SIG, decl.name);
if (@TypeOf(decl_value) != comptime_int) continue;
if (decl_value == sig) break :blk "SIG" ++ decl.name;
} else {
break :blk std.fmt.bufPrint(&buf, "#{d}", .{sig}) catch unreachable;
};
std.log.debug("Interrupted with signal {s}.", .{signal_name});
switch (sig) {
linux.SIG.INT, linux.SIG.TERM => {
running.store(false, .release);
},
@@ -19,6 +35,12 @@ fn interruptionHandler(signal: i32) callconv(.c) void {
const Handler = struct {
fn handle(_: *anyopaque, request: *web.Request, response: *web.Response) !void {
std.log.info("{f} | {s} {s}", .{
response.connection.address,
@tagName(request.method),
request.pathname,
});
if (!std.mem.eql(u8, request.pathname, "/")) {
try response.body_writer.writeAll("Not Found\n");
@@ -96,14 +118,16 @@ pub fn main() !void {
.mask = linux.sigemptyset(),
.flags = linux.SA.RESETHAND,
};
const rc = linux.sigaction(linux.SIG.INT, &sigaction, null);
switch (errno(rc)) {
.SUCCESS => {},
else => |e| {
std.log.err("Error while estabilishing interruption handler: {s}", .{@tagName(e)});
return error.SystemError;
},
}
signal(linux.SIG.INT, &sigaction);
signal(linux.SIG.TERM, &sigaction);
try server.listen(&running);
}
fn signal(sig: u8, action: *const linux.Sigaction) void {
var old_action = std.mem.zeroes(linux.Sigaction);
_ = linux.sigaction(sig, null, &old_action);
if (old_action.handler.handler == linux.SIG.IGN) return;
_ = linux.sigaction(sig, action, null);
}

View File

@@ -19,6 +19,10 @@ pub const Ssl = opaque {
import.SSL_free(self);
}
pub inline fn getError(self: *const Ssl, ret_code: i32) i32 {
return import.SSL_get_error(self, ret_code);
}
pub inline fn new(ctx: *SslContext) !*Ssl {
return import.SSL_new(ctx) orelse error.OpenSslError;
}
@@ -26,7 +30,12 @@ pub const Ssl = opaque {
pub inline fn read(self: *Ssl, buf: []u8) !usize {
var bytes_read: usize = undefined;
const res = import.SSL_read_ex(self, buf.ptr, buf.len, &bytes_read);
return if (res <= 0) error.OpenSslError else bytes_read;
if (res <= 0) {
const err = self.getError(res);
return if (err == c_ssl.SSL_ERROR_ZERO_RETURN) 0 else error.OpenSslError;
} else {
return bytes_read;
}
}
pub fn readAll(self: *Ssl, buf: []u8) !void {