Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

removed websocket.zig dependency for the repeater to use SSE instead #115

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions build.zig.zon
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@
.url = "git+https://github.com/andrewrk/mime.git#0b676643886b1e2f19cf11b4e15b028768708342",
.hash = "12209083b0c43d0f68a26a48a7b26ad9f93b22c9cff710c78ddfebb47b89cfb9c7a4",
},
.ws = .{
.url = "git+https://github.com/karlseguin/websocket.zig.git#c77f87d0e6548865636eb9781106a8be72e5755a",
.hash = "12208720b772330f309cfb48957f4152ee0930b716837d0c1d07fee2dea2f4dc712e",
},
.zeit = .{
.url = "git+https://github.com/rockorager/zeit?ref=v0.4.4#5041c7cac243f0d05f4578a334f52e12bb450e55",
.hash = "12204b653c90b503f89e2f1a73c4754b83fb7275c100c81872deaca12c9f17e334ec",
Expand Down
5 changes: 0 additions & 5 deletions build/tools.zig
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,9 @@ fn setupServer(
.target = target,
.optimize = optimize,
});
const ws = b.dependency("ws", .{
.target = target,
.optimize = optimize,
});

server.root_module.addImport("options", options);
server.root_module.addImport("mime", mime.module("mime"));
server.root_module.addImport("ws", ws.module("websocket"));

b.installArtifact(server);
}
Expand Down
107 changes: 21 additions & 86 deletions src/exes/server/Reloader.zig
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
const Reloader = @This();
const std = @import("std");
const builtin = @import("builtin");
const ws = @import("ws");
const AnsiRenderer = @import("AnsiRenderer.zig");

const log = std.log.scoped(.watcher);
Expand All @@ -14,7 +13,6 @@ const Watcher = switch (builtin.target.os.tag) {
};

gpa: std.mem.Allocator,
ws_server: ws.Server,
zig_exe: []const u8,
out_dir_path: []const u8,
website_step_name: []const u8,
Expand All @@ -23,7 +21,7 @@ include_drafts: bool,
watcher: Watcher,

clients_lock: std.Thread.Mutex = .{},
clients: std.AutoArrayHashMapUnmanaged(*ws.Conn, void) = .{},
clients: std.AutoArrayHashMapUnmanaged(std.net.Stream, void) = .{},

pub fn init(
gpa: std.mem.Allocator,
Expand All @@ -34,13 +32,10 @@ pub fn init(
debug: bool,
include_drafts: bool,
) !Reloader {
const ws_server = try ws.Server.init(gpa, .{});

return .{
.gpa = gpa,
.zig_exe = zig_exe,
.out_dir_path = out_dir_path,
.ws_server = ws_server,
.website_step_name = website_step_name,
.debug = debug,
.include_drafts = include_drafts,
Expand Down Expand Up @@ -114,23 +109,16 @@ pub fn onInputChange(self: *Reloader, path: []const u8, name: []const u8) void {
while (idx < self.clients.entries.len) {
const conn = self.clients.entries.get(idx).key;

const BuildCommand = struct {
command: []const u8 = "build",
err: []const u8,
};

const cmd: BuildCommand = .{ .err = html_err };

var buf = std.ArrayList(u8).init(self.gpa);
defer buf.deinit();

std.json.stringify(cmd, .{}, buf.writer()) catch {
log.err("unable to generate ws message", .{});
return;
conn.writeAll("event: build\n") catch |err| {
log.debug("error writing to sse: {s}", .{
@errorName(err),
});
self.clients.swapRemoveAt(idx);
continue;
};

conn.write(buf.items) catch |err| {
log.debug("error writing to websocket: {s}", .{
conn.writer().print("data: {s}\n\n", .{html_err}) catch |err| {
log.debug("error writing to sse: {s}", .{
@errorName(err),
});
self.clients.swapRemoveAt(idx);
Expand All @@ -140,6 +128,7 @@ pub fn onInputChange(self: *Reloader, path: []const u8, name: []const u8) void {
idx += 1;
}
}

pub fn onOutputChange(self: *Reloader, path: []const u8, name: []const u8) void {
if (std.mem.indexOfScalar(u8, name, '.') == null) {
return;
Expand All @@ -153,27 +142,28 @@ pub fn onOutputChange(self: *Reloader, path: []const u8, name: []const u8) void
while (idx < self.clients.entries.len) {
const conn = self.clients.entries.get(idx).key;

const msg_fmt =
\\{{
\\ "command":"reload",
\\ "path":"{s}/{s}"
\\}}
;
conn.writeAll("event: reload\n") catch |err| {
log.debug("error writing to sse: {s}", .{
@errorName(err),
});
self.clients.swapRemoveAt(idx);
continue;
};

var buf: [4096]u8 = undefined;
const msg = std.fmt.bufPrint(&buf, msg_fmt, .{
const msg = std.fmt.bufPrint(&buf, "data: {s}/{s}\n\n", .{
path[self.out_dir_path.len..],
name,
}) catch {
log.err("unable to generate ws message", .{});
log.err("unable to generate sse message", .{});
return;
};
if (std.fs.path.sep != '/') {
std.mem.replaceScalar(u8, msg, std.fs.path.sep, '/');
}

conn.write(msg) catch |err| {
log.debug("error writing to websocket: {s}", .{
conn.writeAll(msg) catch |err| {
log.debug("error writing to sse: {s}", .{
@errorName(err),
});
self.clients.swapRemoveAt(idx);
Expand All @@ -183,58 +173,3 @@ pub fn onOutputChange(self: *Reloader, path: []const u8, name: []const u8) void
idx += 1;
}
}

pub fn handleWs(self: *Reloader, req: *std.http.Server.Request, h: [20]u8) void {
var buf =
("HTTP/1.1 101 Switching Protocols\r\n" ++
"Upgrade: websocket\r\n" ++
"Connection: upgrade\r\n" ++
"Sec-Websocket-Accept: 0000000000000000000000000000\r\n\r\n").*;

const key_pos = buf.len - 32;
_ = std.base64.standard.Encoder.encode(buf[key_pos .. key_pos + 28], h[0..]);

const stream = req.server.connection.stream;
stream.writeAll(&buf) catch return;

var conn = self.ws_server.newConn(stream);
var context: Handler.Context = .{ .watcher = self };
var handler = Handler.init(undefined, &conn, &context) catch return;
self.ws_server.handle(Handler, &handler, &conn);
}

const Handler = struct {
conn: *ws.Conn,
context: *Context,

const Context = struct {
watcher: *Reloader,
};

pub fn init(h: ws.Handshake, conn: *ws.Conn, context: *Context) !Handler {
_ = h;

const watcher = context.watcher;
watcher.clients_lock.lock();
defer watcher.clients_lock.unlock();
try watcher.clients.put(context.watcher.gpa, conn, {});

return Handler{
.conn = conn,
.context = context,
};
}

pub fn handle(self: *Handler, message: ws.Message) !void {
_ = self;
log.debug("ws message: {s}\n", .{message.data});
}

pub fn close(self: *Handler) void {
log.debug("ws connection was closed\n", .{});
const watcher = self.context.watcher;
watcher.clients_lock.lock();
defer watcher.clients_lock.unlock();
_ = watcher.clients.swapRemove(self.conn);
}
};
50 changes: 22 additions & 28 deletions src/exes/server/main.zig
Original file line number Diff line number Diff line change
Expand Up @@ -68,32 +68,26 @@ const Server = struct {
return false;
}

if (std.mem.eql(u8, path, "/__zine/ws")) {
var it = req.iterateHeaders();
const key = while (it.next()) |header| {
if (std.ascii.eqlIgnoreCase(header.name, "sec-websocket-key")) {
break header.value;
}
} else {
log.debug("couldn't find key header!\n", .{});
if (std.mem.eql(u8, path, "/__zine/sse")) {
var buf =
("HTTP/1.1 200 OK\r\n" ++
"Content-Type: text/event-stream; charset=UTF-8\r\n" ++
"Cache-Control: no-cache\r\n" ++
"Connection: keep-alive\r\n\r\n" ++
"data: connected\n\n").*;

const conn = req.server.connection.stream;
conn.writeAll(&buf) catch |err| {
log.debug("error responding to SSE: {s}", .{
@errorName(err),
});
return false;
};

log.debug("key = '{s}'", .{key});
s.watcher.clients_lock.lock();
defer s.watcher.clients_lock.unlock();
try s.watcher.clients.put(s.watcher.gpa, conn, {});

var hasher = std.crypto.hash.Sha1.init(.{});
hasher.update(key);
hasher.update("258EAFA5-E914-47DA-95CA-C5AB0DC85B11");

var h: [20]u8 = undefined;
hasher.final(&h);

const ws = try std.Thread.spawn(.{}, Reloader.handleWs, .{
s.watcher,
req,
h,
});
ws.detach();
return true;
}

Expand Down Expand Up @@ -277,13 +271,13 @@ fn serve(s: *Server, listen_port: u16) !void {
const conn = try tcp_server.accept();

var http_server = std.http.Server.init(conn, &buffer);
var became_websocket = false;
var became_sse = false;

defer {
if (!became_websocket) {
if (!became_sse) {
conn.stream.close();
} else {
log.debug("request became websocket\n", .{});
log.debug("request became sse\n", .{});
}
}

Expand All @@ -292,16 +286,16 @@ fn serve(s: *Server, listen_port: u16) !void {
if (err != error.HttpConnectionClosing) {
log.debug("connection error: {s}\n", .{@errorName(err)});
}
became_websocket = true;
became_sse = true;
continue :accept;
};

log.debug("request: {s}", .{request.head.target});
became_websocket = s.handleRequest(&request) catch |err| {
became_sse = s.handleRequest(&request) catch |err| {
log.debug("failed request: {s}", .{@errorName(err)});
continue :accept;
};
if (became_websocket) continue :accept;
if (became_sse) continue :accept;
}
}
}
Expand Down
Loading