-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathapp.ts
197 lines (173 loc) · 5.21 KB
/
app.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import { parseRequest, send } from './request.ts'
import type { JsonRpcResponse, RPCOptions } from './types.ts'
import { lazyJSONParse, paramsEncoder } from './utils.ts'
export class App {
httpConn?: Deno.HttpConn
listener?: Deno.Listener
options: RPCOptions
socks: Map<string, WebSocket>
methods: Map<
string,
(params: unknown[], clientId: string) => unknown | Promise<unknown>
>
emitters: Map<
string,
(params: unknown[], emit: (data: unknown) => void, clientId: string) => void
>
#timeout: number
constructor(options: RPCOptions = { path: '/' }) {
this.options = options
this.socks = new Map()
this.methods = new Map()
this.emitters = new Map()
this.#timeout = options.timeout || 1000 * 60 * 60 * 24
}
/**
* Upgrade a request to WebSocket and handle it
* @param request request object
* @returns response object
*/
async handle(request: Request) {
const { socket, response } = Deno.upgradeWebSocket(request)
const protocolHeader = request.headers.get('sec-websocket-protocol')
const incomingParamaters = protocolHeader
? lazyJSONParse(paramsEncoder.decrypt(protocolHeader))
: {}
let clientId =
await (this.options.clientAdded || (() => crypto.randomUUID()))(
incomingParamaters,
socket,
)
if (!clientId) clientId = crypto.randomUUID()
if (typeof clientId === 'object') {
send(socket, { id: null, error: clientId.error })
socket.close()
return response
}
this.socks.set(clientId, socket)
// Close the socket after timeout
setTimeout(() => socket.close(), this.#timeout)
socket.onmessage = ({ data }) => {
if (typeof data === 'string') {
this.#handleRPCMethod(clientId as string, data)
} else {
console.warn('Warn: an invalid jsonrpc message was sent. Skipping.')
}
}
socket.onclose = async () => {
if (this.options.clientRemoved) {
await this.options.clientRemoved(clientId as string)
}
this.socks.delete(clientId as string)
}
socket.onerror = (err) => {
if (err instanceof Error) console.log(err.message)
if (socket.readyState !== socket.CLOSED) {
socket.close(1000)
}
}
return response
}
/**
* Add a method handler
* @param method method name
* @param handler method handler
*/
method<T extends unknown[] = unknown[]>(
method: string,
handler: (params: T, clientId: string) => unknown | Promise<unknown>,
) {
this.methods.set(
method,
handler as (
params: unknown,
clientId: string,
) => unknown | Promise<unknown>,
)
}
/**
* Handle a JSONRPC method
* @param client client ID
* @param data Received data
*/
async #handleRPCMethod(client: string, data: string) {
const sock = this.socks.get(client)
if (!sock) {
return console.warn(
`Warn: recieved a request from and undefined connection`,
)
}
const requests = parseRequest(data)
if (requests === 'parse-error') {
return send(sock, {
id: null,
error: { code: -32700, message: 'Parse error' },
})
}
const responses: JsonRpcResponse[] = []
const promises = requests.map(async (request) => {
if (request === 'invalid') {
return responses.push({
id: null,
error: { code: -32600, message: 'Invalid Request' },
})
}
if (!request.method.endsWith(':')) {
const handler = this.methods.get(request.method)
if (!handler) {
return responses.push({
error: { code: -32601, message: 'Method not found' },
id: request.id!,
})
}
const result = await handler(request.params, client)
responses.push({ id: request.id!, result })
} else {
// It's an emitter
const handler = this.emitters.get(request.method)
if (!handler) {
return responses.push({
error: { code: -32601, message: 'Emitter not found' },
id: request.id!,
})
}
// Because emitters can return a value at any time, we are going to have to send messages on their schedule.
// This may break batches, but I don't think that is a big deal
handler(
request.params,
(data) => {
send(sock, { result: data, id: request.id || null })
},
client,
)
}
})
await Promise.all(promises)
send(sock, responses)
}
/**
* Start a websocket server and listen it on a specified host/port
* @param options `Deno.listen` options
* @param cb Callback that triggers after HTTP server is started
*/
async listen(options: Deno.ListenOptions, cb?: (addr: Deno.NetAddr) => void) {
const listener = Deno.listen(options)
cb?.(listener.addr as Deno.NetAddr)
for await (const conn of listener) {
const requests = Deno.serveHttp(conn)
for await (const { request, respondWith } of requests) {
const response = await this.handle(request)
if (response) {
respondWith(response)
}
}
}
}
/**
* Close the server
*/
close() {
this.httpConn?.close()
this.listener?.close()
}
}