Skip to content

Commit

Permalink
feat: max allowed latency to compute ack timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
ilbertt committed Dec 8, 2023
1 parent 2b46b15 commit b119c8b
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 37 deletions.
127 changes: 99 additions & 28 deletions src/ic-websocket.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { setupServer } from "msw/node";
import { CallRequest, Cbor, fromHex } from "@dfinity/agent";
import { IDL } from "@dfinity/candid";

import IcWebSocket, { createWsConfig } from "./ic-websocket";
import IcWebSocket, { MAX_ALLOWED_NETWORK_LATENCY_MS, createWsConfig } from "./ic-websocket";
import { Principal } from "@dfinity/principal";
import { generateRandomIdentity } from "./identity";
import { CanisterWsMessageArguments, CanisterWsOpenArguments, ClientKey, WebsocketServiceMessageContent, _WS_CANISTER_SERVICE, decodeWebsocketServiceMessageContent, isClientKeyEq, wsMessageIdl, wsOpenIdl } from "./idl";
Expand Down Expand Up @@ -425,102 +425,173 @@ describe("Messages acknowledgement", () => {
});

afterEach(() => {
jest.useRealTimers();
mockWsServer.close();
});

it("fails if messages are not acknowledged in time", async () => {
const ackMessageTimeoutMs = 2000;
it("fails if messages are never acknowledged", async () => {
const ackMessageIntervalMs = 2000;
const icWsConfig = createWsConfig({
...icWebsocketConfig,
ackMessageTimeout: ackMessageTimeoutMs,
ackMessageIntervalMs,
});
const onError = jest.fn();
const onClose = jest.fn();

const icWs = new IcWebSocket(wsGatewayAddress, undefined, icWsConfig);
expect(icWs).toBeDefined();
// workaround: simulate the client identity
icWs["_clientKey"] = client1Key;

icWs.onerror = onError;
icWs.onclose = onClose;
await mockWsServer.connected;
await sendHandshakeMessage(VALID_HANDSHAKE_MESSAGE_FROM_GATEWAY);

// wait for the open message from the client
// wait for the ws_open message from the client
await mockWsServer.nextMessage;
// send the open message to the client
mockWsServer.send(Cbor.encode(VALID_OPEN_MESSAGE));

const originalClientKey = { ...icWs["_clientKey"] };
// workaround to simulate the client identity
// workaround: wait 100ms to make sure that
// the client processes the open message
await sleep(100);

// when the client sends a message, it makes the ack timeout start,
// so here we have to mock the timers
jest.useFakeTimers();

// send a random application message from the client,
// so that the ack timeout starts
icWs.send({ text: "test" });

// wait for the second message from the client
await jest.advanceTimersToNextTimerAsync(); // needed just to advance the mockWsServer timeouts
await mockWsServer.nextMessage;

// make the ack timeout expire
await jest.advanceTimersByTimeAsync(ackMessageIntervalMs + MAX_ALLOWED_NETWORK_LATENCY_MS);

const ackTimeoutError = new Error(`Ack message timeout. Not received ack for sequence numbers: ${[BigInt(1)]}`);
expect(onError).toHaveBeenCalledWith(new ErrorEvent("error", { error: ackTimeoutError }));
expect(onClose).toHaveBeenCalled();
});

it("fails if messages are not acknowledged in time", async () => {
const ackMessageIntervalMs = 2000;
const icWsConfig = createWsConfig({
...icWebsocketConfig,
ackMessageIntervalMs,
});
const onError = jest.fn();
const onClose = jest.fn();

const icWs = new IcWebSocket(wsGatewayAddress, undefined, icWsConfig);
expect(icWs).toBeDefined();
// workaround: simulate the client identity
icWs["_clientKey"] = client1Key;
// send the open confirmation message from the canister

icWs.onerror = onError;
icWs.onclose = onClose;
await mockWsServer.connected;
await sendHandshakeMessage(VALID_HANDSHAKE_MESSAGE_FROM_GATEWAY);

// wait for the ws_open message from the client
await mockWsServer.nextMessage;
// send the open message to the client
mockWsServer.send(Cbor.encode(VALID_OPEN_MESSAGE));

// workaround: wait 100ms to make sure that
// the client processes the open message
await sleep(100);
// set the client key back
icWs["_clientKey"] = originalClientKey;

// send a random application message
// when the client sends a message, it makes the ack timeout start,
// so here we have to mock the timers
jest.useFakeTimers();

// send a random application message from the client,
// so that the ack timeout starts
icWs.send({ text: "test" });

// wait for the second message from the client
await jest.advanceTimersToNextTimerAsync(); // needed just to advance the mockWsServer timeouts
await mockWsServer.nextMessage;

await sleep(ackMessageTimeoutMs);
// make the ack timeout expire
await jest.advanceTimersByTimeAsync(ackMessageIntervalMs + MAX_ALLOWED_NETWORK_LATENCY_MS);

// send the ack message from the canister
// when the ack timeout is already expired
mockWsServer.send(Cbor.encode(VALID_ACK_MESSAGE));

const ackTimeoutError = new Error(`Ack message timeout. Not received ack for sequence numbers: ${[BigInt(1)]}`);
expect(onError).toHaveBeenCalledWith(new ErrorEvent("error", { error: ackTimeoutError }));
await sleep(10);
expect(onClose).toHaveBeenCalled();
});

it("acknowledges messages", async () => {
const ackMessageTimeoutMs = 2000;
const ackMessageIntervalMs = 2000;
const icWsConfig = createWsConfig({
...icWebsocketConfig,
ackMessageTimeout: ackMessageTimeoutMs,
ackMessageIntervalMs,
});
const onMessage = jest.fn();
const onError = jest.fn();
const onClose = jest.fn();
const icWs = new IcWebSocket(wsGatewayAddress, undefined, icWsConfig);
expect(icWs).toBeDefined();
// workaround: simulate the client identity
icWs["_clientKey"] = client1Key;

icWs.onmessage = onMessage;
icWs.onerror = onError;
icWs.onclose = onClose;
await mockWsServer.connected;
await sendHandshakeMessage(VALID_HANDSHAKE_MESSAGE_FROM_GATEWAY);

// wait for the open message from the client
// wait for the ws_open message from the client
await mockWsServer.nextMessage;

const originalClientKey = { ...icWs["_clientKey"] };
// workaround to simulate the client identity
icWs["_clientKey"] = client1Key;
// send the open confirmation message from the canister
// send the open message to the client
mockWsServer.send(Cbor.encode(VALID_OPEN_MESSAGE));

// workaround: wait 100ms to make sure that
// the client processes the open message
await sleep(100);
// set the client key back
icWs["_clientKey"] = originalClientKey;

// when the client sends a message, it makes the ack timeout start,
// so here we have to mock the timers
jest.useFakeTimers();

// send a random application message
// so that the ack timeout starts
icWs.send({ text: "test" });

// wait for the second message from the client
await jest.advanceTimersToNextTimerAsync(); // needed just to advance the mockWsServer timeouts
await mockWsServer.nextMessage;

// send the ack message from the canister
mockWsServer.send(Cbor.encode(VALID_ACK_MESSAGE));

// wait until the ack timeout should expire
await sleep(ackMessageTimeoutMs);
console.log("sent ack message from canister");

expect(onError).not.toHaveBeenCalled();
// make the ack timeout expire
await jest.advanceTimersByTimeAsync(ackMessageIntervalMs + MAX_ALLOWED_NETWORK_LATENCY_MS);

// first message has been acknowledged correctly,
// as the error only reports the missing ack for the keep alive response
const ackTimeoutError = new Error(`Ack message timeout. Not received ack for sequence numbers: ${[BigInt(2)]}`);
expect(onError).toHaveBeenCalledWith(new ErrorEvent("error", { error: ackTimeoutError }));
expect(onClose).not.toHaveBeenCalled();
// make sure onmessage is not called for service messages
expect(onMessage).not.toHaveBeenCalled();
});

it("send an ack message after receiving the ack", async () => {
const ackMessageTimeoutMs = 2000;
const ackMessageIntervalMs = 2000;
const icWsConfig = createWsConfig({
...icWebsocketConfig,
ackMessageTimeout: ackMessageTimeoutMs,
ackMessageIntervalMs,
});
const onMessage = jest.fn();
const onError = jest.fn();
Expand Down
23 changes: 14 additions & 9 deletions src/ic-websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,15 @@ import {
import { WsAgent } from "./agent";

/**
* The default expiration time for receiving an ack message from the canister after sending a message.
* It's **3/2 times** the canister's default send ack period.
* The default interval (in milliseconds) at which the canister sends an ack message.
*/
const DEFAULT_ACK_MESSAGE_TIMEOUT_MS = 450_000;
const DEFAULT_ACK_MESSAGE_INTERVAL_MS = 300_000;
/**
* The maximum latency allowed between the client and the canister.
*
* Used to determine the ack message timeout.
*/
export const MAX_ALLOWED_NETWORK_LATENCY_MS = 30_000;

/**
* Interface to create a new IcWebSocketConfig. For a simple configuration, use {@link createWsConfig}.
Expand All @@ -62,12 +67,12 @@ export interface IcWebSocketConfig<S extends _WS_CANISTER_SERVICE> {
*/
networkUrl: string;
/**
* The expiration (in milliseconds) time for receiving an ack message from the canister after sending a message.
* If the ack message is not received within this time, the connection will be closed.
* This parameter should always me **3/2 times or more** the canister's send ack period.
* @default 450_000 (7.5 minutes = 3/2 default send ack period on the canister)
* The interval (in milliseconds) at which the canister sends an ack message.
* This parameter must be **equal** to the canister's send ack interval.
*
* @default 300_000 (default send ack period on the canister)
*/
ackMessageTimeout?: number;
ackMessageIntervalMs?: number;
/**
* The maximum age of the certificate received from the canister, in minutes. You won't likely need to set this parameter. Used in tests.
*
Expand Down Expand Up @@ -174,7 +179,7 @@ export default class IcWebSocket<
});

this._ackMessagesQueue = new AckMessagesQueue({
expirationMs: config.ackMessageTimeout || DEFAULT_ACK_MESSAGE_TIMEOUT_MS,
expirationMs: (config.ackMessageIntervalMs || DEFAULT_ACK_MESSAGE_INTERVAL_MS) + MAX_ALLOWED_NETWORK_LATENCY_MS,
timeoutExpiredCallback: this._onAckMessageTimeout.bind(this),
});

Expand Down

0 comments on commit b119c8b

Please sign in to comment.