Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
gwbaik9717 committed Sep 4, 2024
2 parents 14a2ae7 + 2b2ee0b commit 2493aba
Show file tree
Hide file tree
Showing 8 changed files with 464 additions and 61 deletions.
6 changes: 3 additions & 3 deletions examples/vanilla-codemirror6/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
"vite": "^5.0.12"
},
"dependencies": {
"@codemirror/commands": "^6.1.2",
"@codemirror/commands": "6.1.2",
"@codemirror/highlight": "^0.19.8",
"@codemirror/lang-markdown": "^6.0.2",
"@codemirror/language-data": "^6.1.0",
"@codemirror/state": "^6.1.2",
"@codemirror/view": "^6.3.1",
"@codemirror/state": "^6.4.1",
"@codemirror/view": "6.23.1",
"codemirror": "^6.0.1",
"yorkie-js-sdk": "workspace:*"
}
Expand Down
5 changes: 5 additions & 0 deletions packages/sdk/src/client/attachment.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Document, Indexable } from '@yorkie-js-sdk/src/document/document';
import { SyncMode } from '@yorkie-js-sdk/src/client/client';
import { Unsubscribe } from '../yorkie';

/**
* `WatchStream` is a stream that watches the changes of the document.
Expand All @@ -21,17 +22,21 @@ export class Attachment<T, P extends Indexable> {
watchLoopTimerID?: ReturnType<typeof setTimeout>;
watchAbortController?: AbortController;

unsubscribeBroadcastEvent: Unsubscribe;

constructor(
reconnectStreamDelay: number,
doc: Document<T, P>,
docID: string,
syncMode: SyncMode,
unsubscribeBroacastEvent: Unsubscribe,
) {
this.reconnectStreamDelay = reconnectStreamDelay;
this.doc = doc;
this.docID = docID;
this.syncMode = syncMode;
this.remoteChangeEventReceived = false;
this.unsubscribeBroadcastEvent = unsubscribeBroacastEvent;
}

/**
Expand Down
71 changes: 71 additions & 0 deletions packages/sdk/src/client/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import {
import { OpSource } from '@yorkie-js-sdk/src/document/operation/operation';
import { createAuthInterceptor } from '@yorkie-js-sdk/src/client/auth_interceptor';
import { createMetricInterceptor } from '@yorkie-js-sdk/src/client/metric_interceptor';
import { validateSerializable } from '../util/validator';

/**
* `SyncMode` defines synchronization modes for the PushPullChanges API.
Expand Down Expand Up @@ -303,6 +304,21 @@ export class Client {
}
doc.setActor(this.id!);
doc.update((_, p) => p.set(options.initialPresence || {}));
const unsubscribeBroacastEvent = doc.subscribe(
'local-broadcast',
(event) => {
const { topic, payload } = event.value;
const errorFn = event.error;

try {
this.broadcast(doc.getKey(), topic, payload);
} catch (error: unknown) {
if (error instanceof Error) {
errorFn?.(error);
}
}
},
);

const syncMode = options.syncMode ?? SyncMode.Realtime;
return this.enqueueTask(async () => {
Expand All @@ -329,6 +345,7 @@ export class Client {
doc,
res.documentId,
syncMode,
unsubscribeBroacastEvent,
),
);

Expand Down Expand Up @@ -584,6 +601,59 @@ export class Client {
return this.conditions[condition];
}

/**
* `broadcast` broadcasts the given payload to the given topic.
*/
public broadcast(
docKey: DocumentKey,
topic: string,
payload: any,
): Promise<void> {
if (!this.isActive()) {
throw new YorkieError(
Code.ErrClientNotActivated,
`${this.key} is not active`,
);
}
const attachment = this.attachmentMap.get(docKey);
if (!attachment) {
throw new YorkieError(
Code.ErrDocumentNotAttached,
`${docKey} is not attached`,
);
}

if (!validateSerializable(payload)) {
throw new YorkieError(
Code.ErrInvalidArgument,
'payload is not serializable',
);
}

return this.enqueueTask(async () => {
return this.rpcClient
.broadcast(
{
clientId: this.id!,
documentId: attachment.docID,
topic,
payload: new TextEncoder().encode(JSON.stringify(payload)),
},
{ headers: { 'x-shard-key': `${this.apiKey}/${docKey}` } },
)
.then(() => {
logger.info(
`[BC] c:"${this.getKey()}" broadcasts d:"${docKey}" t:"${topic}"`,
);
})
.catch((err) => {
logger.error(`[BC] c:"${this.getKey()}" err :`, err);
this.handleConnectError(err);
throw err;
});
});
}

/**
* `runSyncLoop` runs the sync loop. The sync loop pushes local changes to
* the server and pulls remote changes from the server.
Expand Down Expand Up @@ -748,6 +818,7 @@ export class Client {
}

attachment.cancelWatchStream();
attachment.unsubscribeBroadcastEvent();
this.attachmentMap.delete(docKey);
}

Expand Down
101 changes: 99 additions & 2 deletions packages/sdk/src/document/document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,16 @@ export enum DocEventType {
* `PresenceChanged` means that the presences of the client has updated.
*/
PresenceChanged = 'presence-changed',

/**
* `Broadcast` means that the broadcast event is received from the remote client.
*/
Broadcast = 'broadcast',

/**
* `LocalBroadcast` means that the broadcast event is sent from the local client.
*/
LocalBroadcast = 'local-broadcast',
}

/**
Expand All @@ -191,7 +201,9 @@ export type DocEvent<P extends Indexable = Indexable, T = OperationInfo> =
| InitializedEvent<P>
| WatchedEvent<P>
| UnwatchedEvent<P>
| PresenceChangedEvent<P>;
| PresenceChangedEvent<P>
| BroadcastEvent
| LocalBroadcastEvent;

/**
* `TransactionEvent` represents document events that occur within
Expand Down Expand Up @@ -371,6 +383,18 @@ export interface PresenceChangedEvent<P extends Indexable>
value: { clientID: ActorID; presence: P };
}

export interface BroadcastEvent extends BaseDocEvent {
type: DocEventType.Broadcast;
value: { clientID: ActorID; topic: string; payload: any };
error?: ErrorFn;
}

export interface LocalBroadcastEvent extends BaseDocEvent {
type: DocEventType.LocalBroadcast;
value: { topic: string; payload: any };
error?: ErrorFn;
}

type DocEventCallbackMap<P extends Indexable> = {
default: NextFn<
| SnapshotEvent
Expand All @@ -388,6 +412,8 @@ type DocEventCallbackMap<P extends Indexable> = {
connection: NextFn<ConnectionChangedEvent>;
status: NextFn<StatusChangedEvent>;
sync: NextFn<SyncStatusChangedEvent>;
broadcast: NextFn<BroadcastEvent>;
'local-broadcast': NextFn<LocalBroadcastEvent>;
all: NextFn<TransactionEvent<P>>;
};
export type DocEventTopic = keyof DocEventCallbackMap<never>;
Expand Down Expand Up @@ -818,6 +844,24 @@ export class Document<T, P extends Indexable = Indexable> {
error?: ErrorFn,
complete?: CompleteFn,
): Unsubscribe;
/**
* `subscribe` registers a callback to subscribe to events on the document.
* The callback will be called when the broadcast event is received from the remote client.
*/
public subscribe(
type: 'broadcast',
next: DocEventCallbackMap<P>['broadcast'],
error?: ErrorFn,
): Unsubscribe;
/**
* `subscribe` registers a callback to subscribe to events on the document.
* The callback will be called when the local client sends a broadcast event.
*/
public subscribe(
type: 'local-broadcast',
next: DocEventCallbackMap<P>['local-broadcast'],
error?: ErrorFn,
): Unsubscribe;
/**
* `subscribe` registers a callback to subscribe to events on the document.
*/
Expand Down Expand Up @@ -966,6 +1010,30 @@ export class Document<T, P extends Indexable = Indexable> {
arg4,
);
}
if (arg1 === 'local-broadcast') {
const callback = arg2 as DocEventCallbackMap<P>['local-broadcast'];
return this.eventStream.subscribe((event) => {
for (const docEvent of event) {
if (docEvent.type !== DocEventType.LocalBroadcast) {
continue;
}

callback(docEvent);
}
}, arg3);
}
if (arg1 === 'broadcast') {
const callback = arg2 as DocEventCallbackMap<P>['broadcast'];
return this.eventStream.subscribe((event) => {
for (const docEvent of event) {
if (docEvent.type !== DocEventType.Broadcast) {
continue;
}

callback(docEvent);
}
}, arg3);
}
if (arg1 === 'all') {
const callback = arg2 as DocEventCallbackMap<P>['all'];
return this.eventStream.subscribe(callback, arg3, arg4);
Expand Down Expand Up @@ -1024,6 +1092,7 @@ export class Document<T, P extends Indexable = Indexable> {
complete,
);
}

throw new YorkieError(Code.ErrInvalidArgument, `"${arg1}" is not a valid`);
}

Expand Down Expand Up @@ -1468,7 +1537,8 @@ export class Document<T, P extends Indexable = Indexable> {

if (resp.body.case === 'event') {
const { type, publisher } = resp.body.value;
const event: Array<WatchedEvent<P> | UnwatchedEvent<P>> = [];
const event: Array<WatchedEvent<P> | UnwatchedEvent<P> | BroadcastEvent> =
[];
if (type === PbDocEventType.DOCUMENT_WATCHED) {
this.addOnlineClient(publisher);
// NOTE(chacha912): We added to onlineClients, but we won't trigger watched event
Expand All @@ -1495,6 +1565,20 @@ export class Document<T, P extends Indexable = Indexable> {
value: { clientID: publisher, presence },
});
}
} else if (type === PbDocEventType.DOCUMENT_BROADCAST) {
if (resp.body.value.body) {
const { topic, payload } = resp.body.value.body;
const decoder = new TextDecoder();

event.push({
type: DocEventType.Broadcast,
value: {
clientID: publisher,
topic,
payload: JSON.parse(decoder.decode(payload)),
},
});
}
}

if (event.length > 0) {
Expand Down Expand Up @@ -1970,4 +2054,17 @@ export class Document<T, P extends Indexable = Indexable> {
public getRedoStackForTest(): Array<Array<HistoryOperation<P>>> {
return this.internalHistory.getRedoStackForTest();
}

/**
* `broadcast` the payload to the given topic.
*/
public broadcast(topic: string, payload: any, error?: ErrorFn) {
const broadcastEvent: LocalBroadcastEvent = {
type: DocEventType.LocalBroadcast,
value: { topic, payload },
error,
};

this.publish([broadcastEvent]);
}
}
31 changes: 31 additions & 0 deletions packages/sdk/src/util/validator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2024 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* `validateSerializable` returns whether the given value is serializable or not.
*/
export const validateSerializable = (value: any): boolean => {
try {
const serialized = JSON.stringify(value);

if (serialized === undefined) {
return false;
}
} catch (error) {
return false;
}
return true;
};
Loading

0 comments on commit 2493aba

Please sign in to comment.