web: before compile & fix marathon
This commit is contained in:
@@ -10,11 +10,11 @@ pub const State = union(enum) {
|
|||||||
errored: anyerror,
|
errored: anyerror,
|
||||||
};
|
};
|
||||||
|
|
||||||
connection: Connection,
|
connection: *Connection,
|
||||||
writer: std.Io.Writer,
|
writer: std.Io.Writer,
|
||||||
state: State,
|
state: State,
|
||||||
|
|
||||||
pub fn init(connection: Connection, write_buffer: []u8) Response {
|
pub fn init(connection: *Connection, write_buffer: []u8) Response {
|
||||||
return .{
|
return .{
|
||||||
.connection = connection,
|
.connection = connection,
|
||||||
.writer = .fixed(write_buffer),
|
.writer = .fixed(write_buffer),
|
||||||
@@ -32,14 +32,14 @@ pub const ResponseOptions = struct {
|
|||||||
response_body: []const u8,
|
response_body: []const u8,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub fn sendResponseEmpty(self: *Response, options: ResponseEmptyOptions) !void {
|
pub fn sendEmpty(self: *Response, options: ResponseEmptyOptions) !void {
|
||||||
try self.writer.print("{s}", .{options.status_text});
|
try self.writer.print("{s}", .{options.status_text});
|
||||||
try self.writer.print("\r\n", .{});
|
try self.writer.print("\r\n", .{});
|
||||||
|
|
||||||
self.finalize();
|
self.finalize();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn sendResponseClose(self: *Response, options: ResponseEmptyOptions) !void {
|
pub fn sendClose(self: *Response, options: ResponseEmptyOptions) !void {
|
||||||
try self.writer.print("{s}", .{options.status_text});
|
try self.writer.print("{s}", .{options.status_text});
|
||||||
try self.writer.print("Connection: close\r\n", .{});
|
try self.writer.print("Connection: close\r\n", .{});
|
||||||
try self.writer.print("\r\n", .{});
|
try self.writer.print("\r\n", .{});
|
||||||
|
|||||||
177
packages/web/src/Worker.zig
Normal file
177
packages/web/src/Worker.zig
Normal file
@@ -0,0 +1,177 @@
|
|||||||
|
const std = @import("std");
|
||||||
|
const Worker = @This();
|
||||||
|
|
||||||
|
const Connection = @import("Connection.zig");
|
||||||
|
const FileDescriptor = @import("FileDescriptor.zig").FileDescriptor;
|
||||||
|
const http = @import("http.zig");
|
||||||
|
const RequestHandler = @import("RequestHandler.zig");
|
||||||
|
const RequestRouter = @import("RequestRouter.zig");
|
||||||
|
const Response = @import("Response.zig");
|
||||||
|
const Server = @import("Server.zig");
|
||||||
|
|
||||||
|
read_buffer_ptr: [*]u8,
|
||||||
|
read_buffer_size: usize,
|
||||||
|
read_head: usize,
|
||||||
|
read_tail: usize,
|
||||||
|
|
||||||
|
write_buffer: []const u8,
|
||||||
|
|
||||||
|
pub fn worker(
|
||||||
|
self: *Worker,
|
||||||
|
server: *Server,
|
||||||
|
running: *const std.atomic.Value(bool),
|
||||||
|
) void {
|
||||||
|
server.mutex.lock();
|
||||||
|
defer server.mutex.unlock();
|
||||||
|
|
||||||
|
while (running.load(.acquire)) {
|
||||||
|
if (server.connection_queue.pop()) |node| {
|
||||||
|
const connection: *Connection = @fieldParentPtr("node", node);
|
||||||
|
|
||||||
|
server.mutex.unlock();
|
||||||
|
defer {
|
||||||
|
server.mutex.lock();
|
||||||
|
server.connection_pool.append(self.connection.node);
|
||||||
|
server.cond_connection_freed.signal();
|
||||||
|
}
|
||||||
|
|
||||||
|
self.handleConnection(server, server.request_router, connection, running) catch |err| {
|
||||||
|
std.log.err("Error while handling connection: {}", .{err});
|
||||||
|
};
|
||||||
|
} else {
|
||||||
|
server.cond_connection_queued.wait(&server.mutex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handleConnection(
|
||||||
|
self: *Worker,
|
||||||
|
request_router: RequestRouter,
|
||||||
|
connection: *Connection,
|
||||||
|
running: *const std.atomic.Value(bool),
|
||||||
|
) !void {
|
||||||
|
defer connection.deinit();
|
||||||
|
|
||||||
|
while (running.load(.acquire)) {
|
||||||
|
const res = self.handleRequest(request_router, running) catch |err| {
|
||||||
|
std.log.err("Error while handling request: {}", .{err});
|
||||||
|
};
|
||||||
|
|
||||||
|
if (!res) break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handleRequest(
|
||||||
|
self: *Worker,
|
||||||
|
request_router: RequestRouter,
|
||||||
|
connection: *Connection,
|
||||||
|
) !bool {
|
||||||
|
var response: Response = .init(connection, self.write_buffer);
|
||||||
|
var parser: http.Parser = .init(request_router, &response);
|
||||||
|
|
||||||
|
var leftover_bytes = self.read_tail - self.read_head;
|
||||||
|
const max_read_tail = self.read_head + self.read_buffer_size;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
var bytes_read: usize = undefined;
|
||||||
|
var chunk: []const u8 = undefined;
|
||||||
|
|
||||||
|
if (leftover_bytes > 0) {
|
||||||
|
bytes_read = leftover_bytes;
|
||||||
|
chunk = self.read_buffer_ptr[self.read_head..self.read_tail];
|
||||||
|
leftover_bytes = 0;
|
||||||
|
} else {
|
||||||
|
const read_tail = self.read_tail;
|
||||||
|
bytes_read = try connection.fd.read(self.read_buffer_ptr[read_tail..max_read_tail]);
|
||||||
|
chunk = self.read_buffer_ptr[read_tail .. read_tail + bytes_read];
|
||||||
|
self.read_tail += bytes_read;
|
||||||
|
}
|
||||||
|
|
||||||
|
const res = parser.consume(chunk) catch |err| {
|
||||||
|
switch (err) {
|
||||||
|
error.MethodNotSupported => {
|
||||||
|
try response.sendClose(.{ .status_text = http.status.method_not_allowed });
|
||||||
|
},
|
||||||
|
error.HttpVersionNotSupported => {
|
||||||
|
try response.sendClose(.{ .status_text = http.status.http_version_not_supported });
|
||||||
|
},
|
||||||
|
error.MissingLineFeed => {
|
||||||
|
try response.sendClose(.{ .status_text = http.status.bad_request });
|
||||||
|
},
|
||||||
|
error.InvalidContentLength => {
|
||||||
|
try response.sendClose(.{ .status_text = http.status.bad_request });
|
||||||
|
},
|
||||||
|
error.RouterError => {
|
||||||
|
const cause = parser.last_router_error;
|
||||||
|
const cause_name = @errorName(cause);
|
||||||
|
|
||||||
|
// TODO Could really use separate header and content write
|
||||||
|
// buffers and this code shows why. We can't start writing
|
||||||
|
// until we know the exact byte length of the response body,
|
||||||
|
// because the value of "Content-Length" header depends on
|
||||||
|
// it. I guess we could do preemptive padding and in-place
|
||||||
|
// patch it later, but two buffers would be ideal anyway.
|
||||||
|
// We could then use writev syscall and it still will be
|
||||||
|
// exactly one syscall per response (unless we want to
|
||||||
|
// sendfile, but then it will be two syscalls either way).
|
||||||
|
|
||||||
|
try response.writer.print("{s}", .{http.status.internal_server_error});
|
||||||
|
try response.writer.print("Content-Type: {s}\r\n", .{"text/plain; charset=utf-8"});
|
||||||
|
try response.writer.print("Content-Length: {d}\r\n", .{cause_name.len});
|
||||||
|
try response.writer.print("\r\n", .{});
|
||||||
|
try response.writer.print("{s}", .{cause_name});
|
||||||
|
},
|
||||||
|
error.HandlerError => {
|
||||||
|
const cause = parser.last_handler_error;
|
||||||
|
const cause_name = @errorName(cause);
|
||||||
|
|
||||||
|
try response.writer.print("{s}", .{http.status.internal_server_error});
|
||||||
|
try response.writer.print("Content-Type: {s}\r\n", .{"text/plain; charset=utf-8"});
|
||||||
|
try response.writer.print("Content-Length: {d}\r\n", .{cause_name.len});
|
||||||
|
try response.writer.print("\r\n", .{});
|
||||||
|
try response.writer.print("{s}", .{cause_name});
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
};
|
||||||
|
|
||||||
|
if (self.read_tail - self.read_head >= self.read_buffer_size and !res.done) {
|
||||||
|
if (parser.state == .body) {
|
||||||
|
try response.sendClose(.{ .status_text = http.status.content_too_large });
|
||||||
|
} else {
|
||||||
|
try response.sendClose(.{ .status_text = http.status.request_header_fields_too_large });
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (res.done) {
|
||||||
|
leftover_bytes = bytes_read - res.consumed;
|
||||||
|
self.read_head = (self.read_tail - leftover_bytes) & ~(self.read_buffer_size - 1);
|
||||||
|
self.read_tail = self.read_head + leftover_bytes;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read(self: *Worker, fd: FileDescriptor) ![]const u8 {
|
||||||
|
const read_tail = self.read_tail;
|
||||||
|
const max_read_tail = self.read_head + self.read_buffer_size;
|
||||||
|
|
||||||
|
std.debug.assert(max_read_tail > read_tail);
|
||||||
|
|
||||||
|
const bytes_read = try fd.read(self.read_buffer_ptr[self.read_head .. self.read_head + self.read_buffer_size]);
|
||||||
|
const bytes = self.read_buffer_ptr[self.read_head .. self.read_head + bytes_read];
|
||||||
|
|
||||||
|
std.debug.assert(std.math.isPowerOfTwo(self.read_buffer_size));
|
||||||
|
self.read_head = (self.read_head + bytes_read) & ~(self.read_buffer_size - 1);
|
||||||
|
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn consumeReadHead(self: *Worker, consumed: usize) void {
|
||||||
|
const bytes_read = self.read_tail - self.read_head;
|
||||||
|
std.debug.assert(consumed <= bytes_read);
|
||||||
|
|
||||||
|
self.read_head = (self.read_head + consumed) & ~(self.read_buffer_size - 1);
|
||||||
|
self.read_tail = self.read_head + (bytes_read - consumed);
|
||||||
|
}
|
||||||
6
packages/web/src/http.zig
Normal file
6
packages/web/src/http.zig
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
const std = @import("std");
|
||||||
|
|
||||||
|
pub const Header = @import("http/Header.zig");
|
||||||
|
pub const Method = @import("http/Method.zig").Method;
|
||||||
|
pub const Parser = @import("http/Parser.zig");
|
||||||
|
pub const status = @import("http/status.zig");
|
||||||
@@ -124,9 +124,9 @@ const Handler = struct {
|
|||||||
const self: *Handler = @ptrCast(@alignCast(ctx));
|
const self: *Handler = @ptrCast(@alignCast(ctx));
|
||||||
|
|
||||||
const time_ns = self.timer.read();
|
const time_ns = self.timer.read();
|
||||||
const time_us = time_ns / std.time.ns_per_us;
|
const time_us_ceil = (time_ns + std.time.ns_per_us - 1) / std.time.ns_per_us;
|
||||||
|
|
||||||
std.log.info("{s} {s} [{d}]", .{ @tagName(self.route.method), self.route.pathname, time_us });
|
std.log.info("{s} {s} (<={} [µs])", .{ @tagName(self.route.method), self.route.pathname, time_us_ceil });
|
||||||
|
|
||||||
self.allocator.destroy(self);
|
self.allocator.destroy(self);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user