Skip to content

Commit

Permalink
Periodically request summaries new from clients
Browse files Browse the repository at this point in the history
  • Loading branch information
Marvin Schürz committed Jan 22, 2025
1 parent e8bbc22 commit 8d84209
Show file tree
Hide file tree
Showing 8 changed files with 118 additions and 25 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
"pixi.js": "^8.4.1",
"reflect-metadata": "^0.1.13",
"regedit": "^5.1.3",
"socket.io-msgpack-parser": "^3.0.2",
"tslib": "^2.3.0",
"uuid": "^9.0.1",
"variant": "^2.1.0"
Expand Down
43 changes: 34 additions & 9 deletions packages/core/src/editor/multiplayer/MultiplayerClient.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import type { ClientSocket, IMutation, InitialStateServerMessage, MutationContext, MutationsSubmittedMessage } from '@osucad/multiplayer';
import type { ClientSocket, IMutation, InitialStateServerMessage, MutationContext, MutationsSubmittedMessage, SummaryResponse } from '@osucad/multiplayer';
import type { Beatmap } from '../../beatmap/Beatmap';
import type { FileStore } from '../../beatmap/io/FileStore';
import { Component } from '@osucad/framework';
import { MutationSource, UpdateHandler } from '@osucad/multiplayer';
import { io } from 'socket.io-client';
import msgPackParser from 'socket.io-msgpack-parser';
import { SimpleFile } from '../../beatmap/io/SimpleFile';
import { StaticFileStore } from '../../beatmap/io/StaticFileStore';
import { BoxedBeatmap } from './BoxedBeatmap';
Expand All @@ -15,9 +16,11 @@ export class MultiplayerClient extends Component {
this.socket = io(url, {
autoConnect: false,
transports: ['websocket'],
parser: msgPackParser,
});

this.socket.on('mutationsSubmitted', msg => this.mutationSubmitted(msg));
this.socket.on('requestSummary', cb => cb(this.createSummaryResponse()));

this.users = new ConnectedUsers(this);
}
Expand Down Expand Up @@ -54,6 +57,8 @@ export class MultiplayerClient extends Component {

beatmap.initializeFromSummary(initialState.document.summary);

this.#latestSequenceNumber = initialState.document.sequenceNumber;

if (!beatmap.beatmap)
throw new Error('Beatmap failed to initialize');

Expand All @@ -69,7 +74,7 @@ export class MultiplayerClient extends Component {
};

for (const mutation of op.mutations)
this.updateHandler.apply(mutation, ctx);
this.updateHandler.apply(JSON.parse(mutation), ctx);
}

const assets = initialState.assets.map(it => new SimpleFile(
Expand All @@ -83,31 +88,51 @@ export class MultiplayerClient extends Component {
}

private mutationSubmitted(msg: MutationsSubmittedMessage) {
const isAck = msg.clientId === this.clientId;

const ctx: MutationContext = {
version: msg.version,
source: msg.clientId === this.clientId
source: isAck
? MutationSource.Ack
: MutationSource.Remote,
};

if (isAck)
this.#latestAckVersion = msg.version;

this.#latestSequenceNumber = msg.sequenceNumber;

for (const mutation of msg.mutations)
this.updateHandler.apply(mutation, ctx);
this.updateHandler.apply(JSON.parse(mutation), ctx);
}

bufferedMutations: IMutation[] = [];
#latestAckVersion = 0;
#latestSequenceNumber = -1;

buffer: string[] = [];

onLocalMutation(mutation: IMutation) {
this.bufferedMutations.push(mutation);
this.buffer.push(JSON.stringify(mutation));
}

flush() {
if (this.bufferedMutations.length === 0)
if (this.buffer.length === 0)
return;

this.socket.emit('submitMutations', {
version: this.updateHandler.version++,
mutations: this.bufferedMutations,
mutations: this.buffer,
});
this.bufferedMutations = [];
this.buffer = [];
}

createSummaryResponse(): SummaryResponse {
if (this.#latestAckVersion !== this.updateHandler.version - 1)
return { error: 'has-pending-ops' };

return {
summary: new BoxedBeatmap(this.beatmap).createSummary(),
sequenceNumber: this.#latestSequenceNumber,
};
}
}
6 changes: 1 addition & 5 deletions packages/multiplayer/src/protocol/ClientMessage.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
import type { SignalKey } from '../client';
import type { IMutation } from './IMutation';
import type { UserPresence } from './types';

export type ClientMessage =
| never;

export interface ClientMessages {
createChatMessage(content: string): void;
submitSignal(key: SignalKey, data: any): void;
Expand All @@ -14,5 +10,5 @@ export interface ClientMessages {

export interface SubmitMutationsMessage {
version: number;
mutations: IMutation[];
mutations: string[];
}
10 changes: 8 additions & 2 deletions packages/multiplayer/src/protocol/ServerMessage.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { IVec2 } from '@osucad/framework';
import type { SignalKey } from '../client';
import type { ChatMessage } from './ChatMessage';
import type { IMutation } from './IMutation';
import type { ClientInfo, UserPresence } from './types';

export type ServerMessage =
Expand All @@ -21,13 +20,20 @@ export interface ServerMessages {
presenceUpdated<Key extends keyof UserPresence>(clientId: number, key: Key, value: UserPresence[Key]): void;

mutationsSubmitted(message: MutationsSubmittedMessage): void;

requestSummary(callback: (response: SummaryResponse) => void): void;
}

export type SummaryResponse =
| { summary: any; sequenceNumber: number }
| { error: 'has-pending-ops' };

export interface InitialStateServerMessage {
clientId: number;
document: {
summary: any;
ops: MutationsSubmittedMessage[];
sequenceNumber: number;
};
assets: AssetInfo[];
connectedUsers: ClientInfo[];
Expand All @@ -52,6 +58,6 @@ export interface UpdateCursorServerMessage {
export interface MutationsSubmittedMessage {
version: number;
clientId: number;
mutations: IMutation[];
mutations: string[];
sequenceNumber: number;
}
5 changes: 2 additions & 3 deletions packages/server/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import cors from 'cors';
import express from 'express';
import request from 'request';
import { Server } from 'socket.io';
import msgpackParser from 'socket.io-msgpack-parser';
import { getAssetPath, loadAssets } from './assets';
import { Gateway } from './Gateway';

Expand All @@ -26,9 +27,7 @@ app.use((req, res, next) => {
const server = http.createServer(app);
const io = new Server(server, {
transports: ['websocket'],
cors: {
origin: true,
},
parser: msgpackParser,
});

const gateway = new Gateway(io);
Expand Down
45 changes: 42 additions & 3 deletions packages/server/src/multiplayer/Room.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import type { AssetInfo, ClientMessages, ServerMessages, SignalKey, SubmitMutationsMessage, UserPresence } from '@osucad/multiplayer';
import type { BroadcastOperator, Socket } from 'socket.io';
import type { OrderingService } from '../services/OrderingService';
import { Json } from '@osucad/serialization';
import { nextClientId } from './clientId';
import { RoomUser } from './RoomUser';

Expand All @@ -14,8 +13,6 @@ export class Room {
) {
}

private readonly json = new Json();

private readonly users = new Map<number, RoomUser>();

private readonly colors = [
Expand Down Expand Up @@ -55,6 +52,7 @@ export class Room {
document: {
summary: summary.summary,
ops,
sequenceNumber: this.orderingService.sequenceNumber,
},
assets: this.assets,
connectedUsers: [...this.users.values()].map(it => it.getInfo()),
Expand Down Expand Up @@ -96,8 +94,49 @@ export class Room {
}

private handleSubmitMutations(user: RoomUser, message: SubmitMutationsMessage) {
const start = performance.now();

const sequencedMessage = this.orderingService.appendOps(user.clientId, message);

console.log(performance.now() - start);

this.broadcast.emit('mutationsSubmitted', sequencedMessage);

console.log(performance.now() - start);

if (this.orderingService.mutationCount > 1000)
this.requestSummary().then();
}

private isRequestingSummary = false;

private async requestSummary() {
if (this.isRequestingSummary)
return;

this.isRequestingSummary = true;
try {
const users = [...this.users.values()];

const user = users[Math.floor(Math.random() * users.length)];

console.log(`Requesting summary from client ${user.clientId} (${user.username})`);

const response = await user.socket.emitWithAck('requestSummary');

if ('summary' in response) {
this.orderingService.appendSummary(user.clientId, response.sequenceNumber, response.summary);
return;
}

if ('error' in response) {
console.log(`Client ${user.clientId} could not generate summary with reason "${response.error}"`);
}
}
finally {
this.isRequestingSummary = false;
}

this.requestSummary().then();
}
}
17 changes: 14 additions & 3 deletions packages/server/src/services/OrderingService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,19 @@ export class OrderingService {

this.#ops.push(sequencedMessage);

this.#mutationCount += message.mutations.length;

return sequencedMessage;
}

appendSummary(clientId: number, summary: any) {
appendSummary(clientId: number, sequenceNumber: number, summary: any) {
this.#ops = this.#ops.filter(op => op.sequenceNumber > sequenceNumber);

this.#mutationCount = 0;

for (const op of this.#ops)
this.#mutationCount += op.mutations.length;

return this.#latestSummary = {
clientId,
summary,
Expand All @@ -48,7 +57,9 @@ export class OrderingService {
return { summary: this.#latestSummary, ops: [...this.#ops] };
}

get opCount() {
return this.#ops.length;
#mutationCount = 0;

get mutationCount() {
return this.#mutationCount;
}
}
16 changes: 16 additions & 0 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 8d84209

Please sign in to comment.