Skip to content

Commit

Permalink
feat: timeout on connection opening
Browse files Browse the repository at this point in the history
  • Loading branch information
ilbertt committed Dec 8, 2023
1 parent b119c8b commit d304b03
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 5 deletions.
55 changes: 51 additions & 4 deletions src/ic-websocket.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,37 @@ describe("IcWebsocket class", () => {
expect(icWs["_isConnectionEstablished"]).toEqual(false);
});

it("closes the connection if the open message is not received in time", async () => {
const onOpen = jest.fn();
const onError = jest.fn();
const onClose = jest.fn();
const icWs = new IcWebSocket(wsGatewayAddress, undefined, icWebsocketConfig);
expect(icWs).toBeDefined();
icWs.onopen = onOpen;
icWs.onerror = onError;
icWs.onclose = onClose;
await mockWsServer.connected;

jest.useFakeTimers();
mockWsServer.send(encodeHandshakeMessage(VALID_HANDSHAKE_MESSAGE_FROM_GATEWAY));

// advance the open timeout
await jest.advanceTimersByTimeAsync(2 * MAX_ALLOWED_NETWORK_LATENCY_MS);

expect(icWs["_isConnectionEstablished"]).toEqual(false);
expect(onOpen).not.toHaveBeenCalled();
const openError = new Error("Open timeout expired before receiving the open message");
expect(onError).toHaveBeenCalledWith(new ErrorEvent("error", { error: openError }));

await jest.runAllTimersAsync();
await expect(mockWsServer.closed).resolves.not.toThrow();

expect(onClose).toHaveBeenCalled();
expect(icWs.readyState).toEqual(WebSocket.CLOSED);

jest.useRealTimers();
});

it("creates a new instance and sends the open message", async () => {
const icWs = new IcWebSocket(wsGatewayAddress, undefined, icWebsocketConfig);
expect(icWs).toBeDefined();
Expand Down Expand Up @@ -230,30 +261,46 @@ describe("IcWebsocket class", () => {
it("onopen is called when open message from canister is received", async () => {
const onOpen = jest.fn();
const onMessage = jest.fn();
const onError = jest.fn();
const icWs = new IcWebSocket(wsGatewayAddress, undefined, icWebsocketConfig);
expect(icWs).toBeDefined();
// workaround: simulate the client identity
icWs["_clientKey"] = client1Key;
icWs.onopen = onOpen;
icWs.onmessage = onMessage;
icWs.onerror = onError;
await mockWsServer.connected;
await sendHandshakeMessage(VALID_HANDSHAKE_MESSAGE_FROM_GATEWAY);

jest.useFakeTimers();
mockWsServer.send(encodeHandshakeMessage(VALID_HANDSHAKE_MESSAGE_FROM_GATEWAY));

expect(onOpen).not.toHaveBeenCalled();
expect(icWs["_isConnectionEstablished"]).toEqual(false);
expect(onError).not.toHaveBeenCalled();
expect(onMessage).not.toHaveBeenCalled();

// wait for the open message from the client
await jest.advanceTimersToNextTimerAsync(); // needed just to advance the mockWsServer timeouts
await mockWsServer.nextMessage;

// workaround to simulate the client identity
icWs["_clientKey"] = client1Key;
// send the open confirmation message from the canister
mockWsServer.send(Cbor.encode(VALID_OPEN_MESSAGE));
await sleep(100);
jest.runAllTicks();

// advance the open timeout so that it expires
// workaround: call the advanceTimers twice
// to make message processing happening in the meantime
await jest.advanceTimersByTimeAsync(MAX_ALLOWED_NETWORK_LATENCY_MS);
await jest.advanceTimersByTimeAsync(MAX_ALLOWED_NETWORK_LATENCY_MS);

expect(onOpen).toHaveBeenCalled();
expect(icWs["_isConnectionEstablished"]).toEqual(true);
expect(onError).not.toHaveBeenCalled();
expect(icWs.readyState).toEqual(WebSocket.OPEN);
// make sure onmessage callback is not called when receiving the first message
expect(onMessage).not.toHaveBeenCalled();

jest.useRealTimers();
});

it("onmessage is called when a valid message is received", async () => {
Expand Down
25 changes: 25 additions & 0 deletions src/ic-websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ export default class IcWebSocket<
private _clientKey: ClientKey;
private _gatewayPrincipal: Principal | null = null;
private _maxCertificateAgeInMinutes = 5;
private _openTimeout: NodeJS.Timeout | null = null;

onclose: ((this: IcWebSocket<S, ApplicationMessageType>, ev: CloseEvent) => any) | null = null;
onerror: ((this: IcWebSocket<S, ApplicationMessageType>, ev: ErrorEvent) => any) | null = null;
Expand Down Expand Up @@ -231,6 +232,27 @@ export default class IcWebSocket<
this._incomingMessagesQueue.addAndProcess(event.data);
}

private _startOpenTimeout() {
// the timeout is double the maximum allowed network latency,
// because opening the connection involves a message sent by the client and one by the canister
this._openTimeout = setTimeout(() => {
if (!this._isConnectionEstablished) {
logger.error("[onWsOpen] Error: Open timeout expired before receiving the open message");
this._callOnErrorCallback(new Error("Open timeout expired before receiving the open message"));
this._wsInstance.close(4000, "Open connection timeout");
}

this._openTimeout = null;
}, 2 * MAX_ALLOWED_NETWORK_LATENCY_MS);
}

private _cancelOpenTimeout() {
if (this._openTimeout) {
clearTimeout(this._openTimeout);
this._openTimeout = null;
}
}

private async _handleHandshakeMessage(handshakeMessage: GatewayHandshakeMessage): Promise<boolean> {
// at this point, we're sure that the gateway_principal is valid
// because the isGatewayHandshakeMessage function checks it
Expand All @@ -239,6 +261,8 @@ export default class IcWebSocket<

try {
await this._sendOpenMessage();

this._startOpenTimeout();
} catch (error) {
logger.error("[onWsMessage] Handshake message error:", error);
// if a handshake message fails, we can't continue
Expand Down Expand Up @@ -340,6 +364,7 @@ export default class IcWebSocket<
}

this._isConnectionEstablished = true;
this._cancelOpenTimeout();

this._callOnOpenCallback();

Expand Down
2 changes: 1 addition & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ export const safeExecute = async <T>(
warnMessage: string
): Promise<T | undefined> => {
try {
return await fn();
return await Promise.resolve(fn());
} catch (error) {
logger.warn(warnMessage, error);
}
Expand Down

0 comments on commit d304b03

Please sign in to comment.