From 18ad89f700c0737d1143af6fd0d045a2d1fbe547 Mon Sep 17 00:00:00 2001 From: LeeJongBeom <52884648+devleejb@users.noreply.github.com> Date: Tue, 3 Sep 2024 09:04:43 +0900 Subject: [PATCH 1/2] Lock version of `@codemirror/view` to prevent potential issues with Korean character input (#890) With recent updates to @codemirror/view, there is a potential issue where the input of Korean consonants and vowels may lead to incorrect position updates, even though it is not currently occurring. To mitigate the risk of future incidents, it is necessary to explicitly lock the version of @codemirror/view and related packages to prevent unintended upgrades due to Semantic Versioning. --- examples/vanilla-codemirror6/package.json | 6 +- pnpm-lock.yaml | 113 +++++++++++----------- 2 files changed, 62 insertions(+), 57 deletions(-) diff --git a/examples/vanilla-codemirror6/package.json b/examples/vanilla-codemirror6/package.json index 03d429684..d4039dd7e 100644 --- a/examples/vanilla-codemirror6/package.json +++ b/examples/vanilla-codemirror6/package.json @@ -13,12 +13,12 @@ "vite": "^5.0.12" }, "dependencies": { - "@codemirror/commands": "^6.1.2", + "@codemirror/commands": "6.1.2", "@codemirror/highlight": "^0.19.8", "@codemirror/lang-markdown": "^6.0.2", "@codemirror/language-data": "^6.1.0", - "@codemirror/state": "^6.1.2", - "@codemirror/view": "^6.3.1", + "@codemirror/state": "^6.4.1", + "@codemirror/view": "6.23.1", "codemirror": "^6.0.1", "yorkie-js-sdk": "workspace:*" } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d1ce7d1cb..3ce613690 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -185,7 +185,7 @@ importers: examples/vanilla-codemirror6: dependencies: '@codemirror/commands': - specifier: ^6.1.2 + specifier: 6.1.2 version: 6.1.2 '@codemirror/highlight': specifier: ^0.19.8 @@ -195,13 +195,13 @@ importers: version: 6.0.5 '@codemirror/language-data': specifier: ^6.1.0 - version: 6.1.0(@codemirror/state@6.1.4)(@codemirror/view@6.7.1)(@lezer/common@1.0.2) + version: 6.1.0(@codemirror/state@6.4.1)(@codemirror/view@6.23.1)(@lezer/common@1.0.2) '@codemirror/state': - specifier: ^6.1.2 - version: 6.1.4 + specifier: ^6.4.1 + version: 6.4.1 '@codemirror/view': - specifier: ^6.3.1 - version: 6.7.1 + specifier: 6.23.1 + version: 6.23.1 codemirror: specifier: ^6.0.1 version: 6.0.1(@lezer/common@1.0.2) @@ -764,8 +764,8 @@ packages: '@codemirror/state@0.19.9': resolution: {integrity: sha512-psOzDolKTZkx4CgUqhBQ8T8gBc0xN5z4gzed109aF6x7D7umpDRoimacI/O6d9UGuyl4eYuDCZmDFr2Rq7aGOw==} - '@codemirror/state@6.1.4': - resolution: {integrity: sha512-g+3OJuRylV5qsXuuhrc6Cvs1NQluNioepYMM2fhnpYkNk7NgX+j0AFuevKSVKzTDmDyt9+Puju+zPdHNECzCNQ==} + '@codemirror/state@6.4.1': + resolution: {integrity: sha512-QkEyUiLhsJoZkbumGZlswmAhA7CBU02Wrz7zvH4SrcifbsqwlXShVXg65f3v/ts57W3dqyamEriMhij1Z3Zz4A==} '@codemirror/text@0.19.6': resolution: {integrity: sha512-T9jnREMIygx+TPC1bOuepz18maGq/92q2a+n4qTqObKwvNMg+8cMTslb8yxeEDEq7S3kpgGWxgO1UWbQRij0dA==} @@ -774,8 +774,8 @@ packages: '@codemirror/view@0.19.48': resolution: {integrity: sha512-0eg7D2Nz4S8/caetCTz61rK0tkHI17V/d15Jy0kLOT8dTLGGNJUponDnW28h2B6bERmPlVHKh8MJIr5OCp1nGw==} - '@codemirror/view@6.7.1': - resolution: {integrity: sha512-kYtS+uqYw/q/0ytYxpkqE1JVuK5NsbmBklWYhwLFTKO9gVuTdh/kDEeZPKorbqHcJ+P+ucrhcsS1czVweOpT2g==} + '@codemirror/view@6.23.1': + resolution: {integrity: sha512-J2Xnn5lFYT1ZN/5ewEoMBCmLlL71lZ3mBdb7cUEuHhX2ESoSrNEucpsDXpX22EuTGm9LOgC9v4Z0wx+Ez8QmGA==} '@connectrpc/connect-web@1.4.0': resolution: {integrity: sha512-13aO4psFbbm7rdOFGV0De2Za64DY/acMspgloDlcOKzLPPs0yZkhp1OOzAQeiAIr7BM/VOHIA3p8mF0inxCYTA==} @@ -6097,6 +6097,9 @@ packages: style-mod@4.0.0: resolution: {integrity: sha512-OPhtyEjyyN9x3nhPsu76f52yUGXiZcgvsrFVtvTkyGRQJ0XK+GPc6ov1z+lRpbeabka+MYEQxOYRnt5nF30aMw==} + style-mod@4.1.2: + resolution: {integrity: sha512-wnD1HyVqpJUI2+eKZ+eo1UwghftP6yuFheBqqe+bWCotBjC2K1YnteJILRMs3SM4V/0dLEW1SC27MWP5y+mwmw==} + styled-jsx@5.1.1: resolution: {integrity: sha512-pW7uC1l4mBZ8ugbiZrcIsiIvVx1UmTfw7UkC3Um2tmfUq9Bhk8IiyEIPl6F8agHgjzku6j0xQEZbfA5uSgSaCw==} engines: {node: '>= 12.0.0'} @@ -7102,18 +7105,18 @@ snapshots: transitivePeerDependencies: - supports-color - '@codemirror/autocomplete@6.3.4(@codemirror/language@6.3.1)(@codemirror/state@6.1.4)(@codemirror/view@6.7.1)(@lezer/common@1.0.2)': + '@codemirror/autocomplete@6.3.4(@codemirror/language@6.3.1)(@codemirror/state@6.4.1)(@codemirror/view@6.23.1)(@lezer/common@1.0.2)': dependencies: '@codemirror/language': 6.3.1 - '@codemirror/state': 6.1.4 - '@codemirror/view': 6.7.1 + '@codemirror/state': 6.4.1 + '@codemirror/view': 6.23.1 '@lezer/common': 1.0.2 '@codemirror/commands@6.1.2': dependencies: '@codemirror/language': 6.3.1 - '@codemirror/state': 6.1.4 - '@codemirror/view': 6.7.1 + '@codemirror/state': 6.4.1 + '@codemirror/view': 6.23.1 '@lezer/common': 1.0.2 '@codemirror/highlight@0.19.8': @@ -7130,11 +7133,11 @@ snapshots: '@codemirror/language': 6.3.1 '@lezer/cpp': 1.0.0 - '@codemirror/lang-css@6.0.1(@codemirror/view@6.7.1)(@lezer/common@1.0.2)': + '@codemirror/lang-css@6.0.1(@codemirror/view@6.23.1)(@lezer/common@1.0.2)': dependencies: - '@codemirror/autocomplete': 6.3.4(@codemirror/language@6.3.1)(@codemirror/state@6.1.4)(@codemirror/view@6.7.1)(@lezer/common@1.0.2) + '@codemirror/autocomplete': 6.3.4(@codemirror/language@6.3.1)(@codemirror/state@6.4.1)(@codemirror/view@6.23.1)(@lezer/common@1.0.2) '@codemirror/language': 6.3.1 - '@codemirror/state': 6.1.4 + '@codemirror/state': 6.4.1 '@lezer/css': 1.1.1 transitivePeerDependencies: - '@codemirror/view' @@ -7142,12 +7145,12 @@ snapshots: '@codemirror/lang-html@6.4.0': dependencies: - '@codemirror/autocomplete': 6.3.4(@codemirror/language@6.3.1)(@codemirror/state@6.1.4)(@codemirror/view@6.7.1)(@lezer/common@1.0.2) - '@codemirror/lang-css': 6.0.1(@codemirror/view@6.7.1)(@lezer/common@1.0.2) + '@codemirror/autocomplete': 6.3.4(@codemirror/language@6.3.1)(@codemirror/state@6.4.1)(@codemirror/view@6.23.1)(@lezer/common@1.0.2) + '@codemirror/lang-css': 6.0.1(@codemirror/view@6.23.1)(@lezer/common@1.0.2) '@codemirror/lang-javascript': 6.1.2 '@codemirror/language': 6.3.1 - '@codemirror/state': 6.1.4 - '@codemirror/view': 6.7.1 + '@codemirror/state': 6.4.1 + '@codemirror/view': 6.23.1 '@lezer/common': 1.0.2 '@lezer/css': 1.1.1 '@lezer/html': 1.2.0 @@ -7159,11 +7162,11 @@ snapshots: '@codemirror/lang-javascript@6.1.2': dependencies: - '@codemirror/autocomplete': 6.3.4(@codemirror/language@6.3.1)(@codemirror/state@6.1.4)(@codemirror/view@6.7.1)(@lezer/common@1.0.2) + '@codemirror/autocomplete': 6.3.4(@codemirror/language@6.3.1)(@codemirror/state@6.4.1)(@codemirror/view@6.23.1)(@lezer/common@1.0.2) '@codemirror/language': 6.3.1 '@codemirror/lint': 6.1.0 - '@codemirror/state': 6.1.4 - '@codemirror/view': 6.7.1 + '@codemirror/state': 6.4.1 + '@codemirror/view': 6.23.1 '@lezer/common': 1.0.2 '@lezer/javascript': 1.3.1 @@ -7176,8 +7179,8 @@ snapshots: dependencies: '@codemirror/lang-html': 6.4.0 '@codemirror/language': 6.3.1 - '@codemirror/state': 6.1.4 - '@codemirror/view': 6.7.1 + '@codemirror/state': 6.4.1 + '@codemirror/view': 6.23.1 '@lezer/common': 1.0.2 '@lezer/markdown': 1.0.2 @@ -7185,13 +7188,13 @@ snapshots: dependencies: '@codemirror/lang-html': 6.4.0 '@codemirror/language': 6.3.1 - '@codemirror/state': 6.1.4 + '@codemirror/state': 6.4.1 '@lezer/common': 1.0.2 '@lezer/php': 1.0.0 - '@codemirror/lang-python@6.1.0(@codemirror/state@6.1.4)(@codemirror/view@6.7.1)(@lezer/common@1.0.2)': + '@codemirror/lang-python@6.1.0(@codemirror/state@6.4.1)(@codemirror/view@6.23.1)(@lezer/common@1.0.2)': dependencies: - '@codemirror/autocomplete': 6.3.4(@codemirror/language@6.3.1)(@codemirror/state@6.1.4)(@codemirror/view@6.7.1)(@lezer/common@1.0.2) + '@codemirror/autocomplete': 6.3.4(@codemirror/language@6.3.1)(@codemirror/state@6.4.1)(@codemirror/view@6.23.1)(@lezer/common@1.0.2) '@codemirror/language': 6.3.1 '@lezer/python': 1.1.1 transitivePeerDependencies: @@ -7204,11 +7207,11 @@ snapshots: '@codemirror/language': 6.3.1 '@lezer/rust': 1.0.0 - '@codemirror/lang-sql@6.3.3(@codemirror/view@6.7.1)(@lezer/common@1.0.2)': + '@codemirror/lang-sql@6.3.3(@codemirror/view@6.23.1)(@lezer/common@1.0.2)': dependencies: - '@codemirror/autocomplete': 6.3.4(@codemirror/language@6.3.1)(@codemirror/state@6.1.4)(@codemirror/view@6.7.1)(@lezer/common@1.0.2) + '@codemirror/autocomplete': 6.3.4(@codemirror/language@6.3.1)(@codemirror/state@6.4.1)(@codemirror/view@6.23.1)(@lezer/common@1.0.2) '@codemirror/language': 6.3.1 - '@codemirror/state': 6.1.4 + '@codemirror/state': 6.4.1 '@lezer/highlight': 1.1.3 '@lezer/lr': 1.2.5 transitivePeerDependencies: @@ -7221,31 +7224,31 @@ snapshots: '@lezer/highlight': 1.1.3 '@lezer/lr': 1.2.5 - '@codemirror/lang-xml@6.0.1(@codemirror/view@6.7.1)': + '@codemirror/lang-xml@6.0.1(@codemirror/view@6.23.1)': dependencies: - '@codemirror/autocomplete': 6.3.4(@codemirror/language@6.3.1)(@codemirror/state@6.1.4)(@codemirror/view@6.7.1)(@lezer/common@1.0.2) + '@codemirror/autocomplete': 6.3.4(@codemirror/language@6.3.1)(@codemirror/state@6.4.1)(@codemirror/view@6.23.1)(@lezer/common@1.0.2) '@codemirror/language': 6.3.1 - '@codemirror/state': 6.1.4 + '@codemirror/state': 6.4.1 '@lezer/common': 1.0.2 '@lezer/xml': 1.0.0 transitivePeerDependencies: - '@codemirror/view' - '@codemirror/language-data@6.1.0(@codemirror/state@6.1.4)(@codemirror/view@6.7.1)(@lezer/common@1.0.2)': + '@codemirror/language-data@6.1.0(@codemirror/state@6.4.1)(@codemirror/view@6.23.1)(@lezer/common@1.0.2)': dependencies: '@codemirror/lang-cpp': 6.0.2 - '@codemirror/lang-css': 6.0.1(@codemirror/view@6.7.1)(@lezer/common@1.0.2) + '@codemirror/lang-css': 6.0.1(@codemirror/view@6.23.1)(@lezer/common@1.0.2) '@codemirror/lang-html': 6.4.0 '@codemirror/lang-java': 6.0.1 '@codemirror/lang-javascript': 6.1.2 '@codemirror/lang-json': 6.0.1 '@codemirror/lang-markdown': 6.0.5 '@codemirror/lang-php': 6.0.1 - '@codemirror/lang-python': 6.1.0(@codemirror/state@6.1.4)(@codemirror/view@6.7.1)(@lezer/common@1.0.2) + '@codemirror/lang-python': 6.1.0(@codemirror/state@6.4.1)(@codemirror/view@6.23.1)(@lezer/common@1.0.2) '@codemirror/lang-rust': 6.0.1 - '@codemirror/lang-sql': 6.3.3(@codemirror/view@6.7.1)(@lezer/common@1.0.2) + '@codemirror/lang-sql': 6.3.3(@codemirror/view@6.23.1)(@lezer/common@1.0.2) '@codemirror/lang-wast': 6.0.1 - '@codemirror/lang-xml': 6.0.1(@codemirror/view@6.7.1) + '@codemirror/lang-xml': 6.0.1(@codemirror/view@6.23.1) '@codemirror/language': 6.3.1 '@codemirror/legacy-modes': 6.3.1 transitivePeerDependencies: @@ -7263,8 +7266,8 @@ snapshots: '@codemirror/language@6.3.1': dependencies: - '@codemirror/state': 6.1.4 - '@codemirror/view': 6.7.1 + '@codemirror/state': 6.4.1 + '@codemirror/view': 6.23.1 '@lezer/common': 1.0.2 '@lezer/highlight': 1.1.3 '@lezer/lr': 1.2.5 @@ -7276,8 +7279,8 @@ snapshots: '@codemirror/lint@6.1.0': dependencies: - '@codemirror/state': 6.1.4 - '@codemirror/view': 6.7.1 + '@codemirror/state': 6.4.1 + '@codemirror/view': 6.23.1 crelt: 1.0.5 '@codemirror/rangeset@0.19.9': @@ -7286,15 +7289,15 @@ snapshots: '@codemirror/search@6.2.3': dependencies: - '@codemirror/state': 6.1.4 - '@codemirror/view': 6.7.1 + '@codemirror/state': 6.4.1 + '@codemirror/view': 6.23.1 crelt: 1.0.5 '@codemirror/state@0.19.9': dependencies: '@codemirror/text': 0.19.6 - '@codemirror/state@6.1.4': {} + '@codemirror/state@6.4.1': {} '@codemirror/text@0.19.6': {} @@ -7306,10 +7309,10 @@ snapshots: style-mod: 4.0.0 w3c-keyname: 2.2.6 - '@codemirror/view@6.7.1': + '@codemirror/view@6.23.1': dependencies: - '@codemirror/state': 6.1.4 - style-mod: 4.0.0 + '@codemirror/state': 6.4.1 + style-mod: 4.1.2 w3c-keyname: 2.2.6 '@connectrpc/connect-web@1.4.0(@bufbuild/protobuf@1.10.0)(@connectrpc/connect@1.4.0(@bufbuild/protobuf@1.10.0))': @@ -10588,13 +10591,13 @@ snapshots: codemirror@6.0.1(@lezer/common@1.0.2): dependencies: - '@codemirror/autocomplete': 6.3.4(@codemirror/language@6.3.1)(@codemirror/state@6.1.4)(@codemirror/view@6.7.1)(@lezer/common@1.0.2) + '@codemirror/autocomplete': 6.3.4(@codemirror/language@6.3.1)(@codemirror/state@6.4.1)(@codemirror/view@6.23.1)(@lezer/common@1.0.2) '@codemirror/commands': 6.1.2 '@codemirror/language': 6.3.1 '@codemirror/lint': 6.1.0 '@codemirror/search': 6.2.3 - '@codemirror/state': 6.1.4 - '@codemirror/view': 6.7.1 + '@codemirror/state': 6.4.1 + '@codemirror/view': 6.23.1 transitivePeerDependencies: - '@lezer/common' @@ -13419,6 +13422,8 @@ snapshots: style-mod@4.0.0: {} + style-mod@4.1.2: {} + styled-jsx@5.1.1(react@18.2.0): dependencies: client-only: 0.0.1 From 2b2ee0ba0f891e048f6d90d9c76505589f486876 Mon Sep 17 00:00:00 2001 From: Gunwoo Baik Date: Tue, 3 Sep 2024 09:55:50 +0900 Subject: [PATCH 2/2] Introduce broadcast API for event sharing (#884) This commit implements broadcast API, which enables the sharing of a broader range of general events beyond the current document and presence events in Yorkie's Publish-Subscribe model. 1. Broadcast Events: Users can now broadcast custom events with a specified topic and payload. The payload can be of any type, as long as it is serializable. ```ts // Broadcast an event with a topic and payload const payload = 'hello'; doc.broadcast('TOPIC_NAME', payload); ``` 2. Subscribe to Broadcast Events: Users can subscribe to specific topics and handle the events via a callback function. The callback is triggered whenever an event with the corresponding topic is broadcast. ```ts // Subscribe to a specific topic for broadcast events doc.subscribe('broadcast', ({value: {topic, payload, clientID}}) => { // Handle the broadcast event for the specified topic }); ``` --- packages/sdk/src/client/attachment.ts | 5 + packages/sdk/src/client/client.ts | 71 +++++++ packages/sdk/src/document/document.ts | 101 ++++++++- packages/sdk/src/util/validator.ts | 31 +++ packages/sdk/test/integration/client_test.ts | 193 ++++++++++++++++++ .../test/integration/integration_helper.ts | 5 +- 6 files changed, 402 insertions(+), 4 deletions(-) create mode 100644 packages/sdk/src/util/validator.ts diff --git a/packages/sdk/src/client/attachment.ts b/packages/sdk/src/client/attachment.ts index 1a020570d..18aec7899 100644 --- a/packages/sdk/src/client/attachment.ts +++ b/packages/sdk/src/client/attachment.ts @@ -1,5 +1,6 @@ import { Document, Indexable } from '@yorkie-js-sdk/src/document/document'; import { SyncMode } from '@yorkie-js-sdk/src/client/client'; +import { Unsubscribe } from '../yorkie'; /** * `WatchStream` is a stream that watches the changes of the document. @@ -21,17 +22,21 @@ export class Attachment { watchLoopTimerID?: ReturnType; watchAbortController?: AbortController; + unsubscribeBroadcastEvent: Unsubscribe; + constructor( reconnectStreamDelay: number, doc: Document, docID: string, syncMode: SyncMode, + unsubscribeBroacastEvent: Unsubscribe, ) { this.reconnectStreamDelay = reconnectStreamDelay; this.doc = doc; this.docID = docID; this.syncMode = syncMode; this.remoteChangeEventReceived = false; + this.unsubscribeBroadcastEvent = unsubscribeBroacastEvent; } /** diff --git a/packages/sdk/src/client/client.ts b/packages/sdk/src/client/client.ts index 03338bc80..aa406ed40 100644 --- a/packages/sdk/src/client/client.ts +++ b/packages/sdk/src/client/client.ts @@ -42,6 +42,7 @@ import { import { OpSource } from '@yorkie-js-sdk/src/document/operation/operation'; import { createAuthInterceptor } from '@yorkie-js-sdk/src/client/auth_interceptor'; import { createMetricInterceptor } from '@yorkie-js-sdk/src/client/metric_interceptor'; +import { validateSerializable } from '../util/validator'; /** * `SyncMode` defines synchronization modes for the PushPullChanges API. @@ -303,6 +304,21 @@ export class Client { } doc.setActor(this.id!); doc.update((_, p) => p.set(options.initialPresence || {})); + const unsubscribeBroacastEvent = doc.subscribe( + 'local-broadcast', + (event) => { + const { topic, payload } = event.value; + const errorFn = event.error; + + try { + this.broadcast(doc.getKey(), topic, payload); + } catch (error: unknown) { + if (error instanceof Error) { + errorFn?.(error); + } + } + }, + ); const syncMode = options.syncMode ?? SyncMode.Realtime; return this.enqueueTask(async () => { @@ -329,6 +345,7 @@ export class Client { doc, res.documentId, syncMode, + unsubscribeBroacastEvent, ), ); @@ -584,6 +601,59 @@ export class Client { return this.conditions[condition]; } + /** + * `broadcast` broadcasts the given payload to the given topic. + */ + public broadcast( + docKey: DocumentKey, + topic: string, + payload: any, + ): Promise { + if (!this.isActive()) { + throw new YorkieError( + Code.ErrClientNotActivated, + `${this.key} is not active`, + ); + } + const attachment = this.attachmentMap.get(docKey); + if (!attachment) { + throw new YorkieError( + Code.ErrDocumentNotAttached, + `${docKey} is not attached`, + ); + } + + if (!validateSerializable(payload)) { + throw new YorkieError( + Code.ErrInvalidArgument, + 'payload is not serializable', + ); + } + + return this.enqueueTask(async () => { + return this.rpcClient + .broadcast( + { + clientId: this.id!, + documentId: attachment.docID, + topic, + payload: new TextEncoder().encode(JSON.stringify(payload)), + }, + { headers: { 'x-shard-key': `${this.apiKey}/${docKey}` } }, + ) + .then(() => { + logger.info( + `[BC] c:"${this.getKey()}" broadcasts d:"${docKey}" t:"${topic}"`, + ); + }) + .catch((err) => { + logger.error(`[BC] c:"${this.getKey()}" err :`, err); + this.handleConnectError(err); + throw err; + }); + }); + } + /** * `runSyncLoop` runs the sync loop. The sync loop pushes local changes to * the server and pulls remote changes from the server. @@ -748,6 +818,7 @@ export class Client { } attachment.cancelWatchStream(); + attachment.unsubscribeBroadcastEvent(); this.attachmentMap.delete(docKey); } diff --git a/packages/sdk/src/document/document.ts b/packages/sdk/src/document/document.ts index 9343b6353..39007afe5 100644 --- a/packages/sdk/src/document/document.ts +++ b/packages/sdk/src/document/document.ts @@ -173,6 +173,16 @@ export enum DocEventType { * `PresenceChanged` means that the presences of the client has updated. */ PresenceChanged = 'presence-changed', + + /** + * `Broadcast` means that the broadcast event is received from the remote client. + */ + Broadcast = 'broadcast', + + /** + * `LocalBroadcast` means that the broadcast event is sent from the local client. + */ + LocalBroadcast = 'local-broadcast', } /** @@ -191,7 +201,9 @@ export type DocEvent

= | InitializedEvent

| WatchedEvent

| UnwatchedEvent

- | PresenceChangedEvent

; + | PresenceChangedEvent

+ | BroadcastEvent + | LocalBroadcastEvent; /** * `TransactionEvent` represents document events that occur within @@ -371,6 +383,18 @@ export interface PresenceChangedEvent

value: { clientID: ActorID; presence: P }; } +export interface BroadcastEvent extends BaseDocEvent { + type: DocEventType.Broadcast; + value: { clientID: ActorID; topic: string; payload: any }; + error?: ErrorFn; +} + +export interface LocalBroadcastEvent extends BaseDocEvent { + type: DocEventType.LocalBroadcast; + value: { topic: string; payload: any }; + error?: ErrorFn; +} + type DocEventCallbackMap

= { default: NextFn< | SnapshotEvent @@ -388,6 +412,8 @@ type DocEventCallbackMap

= { connection: NextFn; status: NextFn; sync: NextFn; + broadcast: NextFn; + 'local-broadcast': NextFn; all: NextFn>; }; export type DocEventTopic = keyof DocEventCallbackMap; @@ -818,6 +844,24 @@ export class Document { error?: ErrorFn, complete?: CompleteFn, ): Unsubscribe; + /** + * `subscribe` registers a callback to subscribe to events on the document. + * The callback will be called when the broadcast event is received from the remote client. + */ + public subscribe( + type: 'broadcast', + next: DocEventCallbackMap

['broadcast'], + error?: ErrorFn, + ): Unsubscribe; + /** + * `subscribe` registers a callback to subscribe to events on the document. + * The callback will be called when the local client sends a broadcast event. + */ + public subscribe( + type: 'local-broadcast', + next: DocEventCallbackMap

['local-broadcast'], + error?: ErrorFn, + ): Unsubscribe; /** * `subscribe` registers a callback to subscribe to events on the document. */ @@ -966,6 +1010,30 @@ export class Document { arg4, ); } + if (arg1 === 'local-broadcast') { + const callback = arg2 as DocEventCallbackMap

['local-broadcast']; + return this.eventStream.subscribe((event) => { + for (const docEvent of event) { + if (docEvent.type !== DocEventType.LocalBroadcast) { + continue; + } + + callback(docEvent); + } + }, arg3); + } + if (arg1 === 'broadcast') { + const callback = arg2 as DocEventCallbackMap

['broadcast']; + return this.eventStream.subscribe((event) => { + for (const docEvent of event) { + if (docEvent.type !== DocEventType.Broadcast) { + continue; + } + + callback(docEvent); + } + }, arg3); + } if (arg1 === 'all') { const callback = arg2 as DocEventCallbackMap

['all']; return this.eventStream.subscribe(callback, arg3, arg4); @@ -1024,6 +1092,7 @@ export class Document { complete, ); } + throw new YorkieError(Code.ErrInvalidArgument, `"${arg1}" is not a valid`); } @@ -1468,7 +1537,8 @@ export class Document { if (resp.body.case === 'event') { const { type, publisher } = resp.body.value; - const event: Array | UnwatchedEvent

> = []; + const event: Array | UnwatchedEvent

| BroadcastEvent> = + []; if (type === PbDocEventType.DOCUMENT_WATCHED) { this.addOnlineClient(publisher); // NOTE(chacha912): We added to onlineClients, but we won't trigger watched event @@ -1495,6 +1565,20 @@ export class Document { value: { clientID: publisher, presence }, }); } + } else if (type === PbDocEventType.DOCUMENT_BROADCAST) { + if (resp.body.value.body) { + const { topic, payload } = resp.body.value.body; + const decoder = new TextDecoder(); + + event.push({ + type: DocEventType.Broadcast, + value: { + clientID: publisher, + topic, + payload: JSON.parse(decoder.decode(payload)), + }, + }); + } } if (event.length > 0) { @@ -1970,4 +2054,17 @@ export class Document { public getRedoStackForTest(): Array>> { return this.internalHistory.getRedoStackForTest(); } + + /** + * `broadcast` the payload to the given topic. + */ + public broadcast(topic: string, payload: any, error?: ErrorFn) { + const broadcastEvent: LocalBroadcastEvent = { + type: DocEventType.LocalBroadcast, + value: { topic, payload }, + error, + }; + + this.publish([broadcastEvent]); + } } diff --git a/packages/sdk/src/util/validator.ts b/packages/sdk/src/util/validator.ts new file mode 100644 index 000000000..33625d837 --- /dev/null +++ b/packages/sdk/src/util/validator.ts @@ -0,0 +1,31 @@ +/* + * Copyright 2024 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * `validateSerializable` returns whether the given value is serializable or not. + */ +export const validateSerializable = (value: any): boolean => { + try { + const serialized = JSON.stringify(value); + + if (serialized === undefined) { + return false; + } + } catch (error) { + return false; + } + return true; +}; diff --git a/packages/sdk/test/integration/client_test.ts b/packages/sdk/test/integration/client_test.ts index b315408f4..004e0b506 100644 --- a/packages/sdk/test/integration/client_test.ts +++ b/packages/sdk/test/integration/client_test.ts @@ -863,4 +863,197 @@ describe.sequential('Client', function () { assert.equal(d1.toSortedJSON(), d2.toSortedJSON()); }, task.name); }); + + it('Should successfully broadcast serializeable payload', async ({ + task, + }) => { + const cli = new yorkie.Client(testRPCAddr); + await cli.activate(); + + const doc = new yorkie.Document<{ t: Text }>(toDocKey(`${task.name}`)); + await cli.attach(doc); + + const broadcastTopic = 'test'; + const payload = { a: 1, b: '2' }; + + expect(async () => doc.broadcast(broadcastTopic, payload)).not.toThrow(); + + await cli.deactivate(); + }); + + it('Should throw error when broadcasting unserializeable payload', async ({ + task, + }) => { + const eventCollector = new EventCollector(); + const cli = new yorkie.Client(testRPCAddr); + await cli.activate(); + + const doc = new yorkie.Document<{ t: Text }>(toDocKey(`${task.name}`)); + await cli.attach(doc); + + // broadcast unserializable payload + const payload = () => {}; + const broadcastTopic = 'test'; + const broadcastErrMessage = 'payload is not serializable'; + + const errorHandler = (error: Error) => { + eventCollector.add(error.message); + }; + + doc.broadcast(broadcastTopic, payload, errorHandler); + + await eventCollector.waitAndVerifyNthEvent(1, broadcastErrMessage); + + await cli.deactivate(); + }); + + it('Should trigger the handler for a subscribed broadcast event', async ({ + task, + }) => { + await withTwoClientsAndDocuments<{ t: Text }>( + async (c1, d1, c2, d2) => { + const eventCollector = new EventCollector<[string, any]>(); + const broadcastTopic = 'test'; + const unsubscribe = d2.subscribe('broadcast', (event) => { + const { topic, payload } = event.value; + + if (topic === broadcastTopic) { + eventCollector.add([topic, payload]); + } + }); + + const payload = { a: 1, b: '2' }; + d1.broadcast(broadcastTopic, payload); + await eventCollector.waitAndVerifyNthEvent(1, [ + broadcastTopic, + payload, + ]); + + assert.equal(eventCollector.getLength(), 1); + + unsubscribe(); + }, + task.name, + SyncMode.Realtime, + ); + }); + + it('Should not trigger the handler for an unsubscribed broadcast event', async ({ + task, + }) => { + await withTwoClientsAndDocuments<{ t: Text }>( + async (c1, d1, c2, d2) => { + const eventCollector = new EventCollector<[string, any]>(); + const broadcastTopic1 = 'test1'; + const broadcastTopic2 = 'test2'; + + const unsubscribe = d2.subscribe('broadcast', (event) => { + const { topic, payload } = event.value; + + if (topic === broadcastTopic1) { + eventCollector.add([topic, payload]); + } else if (topic === broadcastTopic2) { + eventCollector.add([topic, payload]); + } + }); + + const payload = { a: 1, b: '2' }; + d1.broadcast(broadcastTopic1, payload); + await eventCollector.waitAndVerifyNthEvent(1, [ + broadcastTopic1, + payload, + ]); + + assert.equal(eventCollector.getLength(), 1); + + unsubscribe(); + }, + task.name, + SyncMode.Realtime, + ); + }); + + it('Should not trigger the handler for a broadcast event after unsubscribing', async ({ + task, + }) => { + await withTwoClientsAndDocuments<{ t: Text }>( + async (c1, d1, c2, d2) => { + const eventCollector = new EventCollector<[string, any]>(); + const broadcastTopic = 'test'; + const unsubscribe = d2.subscribe('broadcast', (event) => { + const { topic, payload } = event.value; + + if (topic === broadcastTopic) { + eventCollector.add([topic, payload]); + } + }); + + const payload = { a: 1, b: '2' }; + + d1.broadcast(broadcastTopic, payload); + await eventCollector.waitAndVerifyNthEvent(1, [ + broadcastTopic, + payload, + ]); + + unsubscribe(); + + d1.broadcast(broadcastTopic, payload); + + // Assuming that every subscriber can receive the broadcast event within 1000ms. + await new Promise((res) => setTimeout(res, 1000)); + + // No change in the number of calls + assert.equal(eventCollector.getLength(), 1); + }, + task.name, + SyncMode.Realtime, + ); + }); + + it('Should not trigger the handler for a broadcast event sent by the publisher to itself', async ({ + task, + }) => { + await withTwoClientsAndDocuments<{ t: Text }>( + async (c1, d1, c2, d2) => { + const eventCollector1 = new EventCollector<[string, any]>(); + const eventCollector2 = new EventCollector<[string, any]>(); + const broadcastTopic = 'test'; + const payload = { a: 1, b: '2' }; + + // Publisher subscribes to the broadcast event + const unsubscribe1 = d1.subscribe('broadcast', (event) => { + const { topic, payload } = event.value; + + if (topic === broadcastTopic) { + eventCollector1.add([topic, payload]); + } + }); + + const unsubscribe2 = d2.subscribe('broadcast', (event) => { + const { topic, payload } = event.value; + + if (topic === broadcastTopic) { + eventCollector2.add([topic, payload]); + } + }); + + d1.broadcast(broadcastTopic, payload); + + // Assuming that D2 takes longer to receive the broadcast event compared to D1 + await eventCollector2.waitAndVerifyNthEvent(1, [ + broadcastTopic, + payload, + ]); + + unsubscribe1(); + unsubscribe2(); + + assert.equal(eventCollector1.getLength(), 0); + assert.equal(eventCollector2.getLength(), 1); + }, + task.name, + SyncMode.Realtime, + ); + }); }); diff --git a/packages/sdk/test/integration/integration_helper.ts b/packages/sdk/test/integration/integration_helper.ts index 7db44e2ad..f0189e390 100644 --- a/packages/sdk/test/integration/integration_helper.ts +++ b/packages/sdk/test/integration/integration_helper.ts @@ -21,6 +21,7 @@ export async function withTwoClientsAndDocuments( d2: Document, ) => Promise, title: string, + syncMode: SyncMode = SyncMode.Manual, ): Promise { const client1 = new yorkie.Client(testRPCAddr); const client2 = new yorkie.Client(testRPCAddr); @@ -31,8 +32,8 @@ export async function withTwoClientsAndDocuments( const doc1 = new yorkie.Document(docKey); const doc2 = new yorkie.Document(docKey); - await client1.attach(doc1, { syncMode: SyncMode.Manual }); - await client2.attach(doc2, { syncMode: SyncMode.Manual }); + await client1.attach(doc1, { syncMode }); + await client2.attach(doc2, { syncMode }); await callback(client1, doc1, client2, doc2);