Skip to content

Commit

Permalink
Use redis for storing multiplayer info
Browse files Browse the repository at this point in the history
  • Loading branch information
Marvin Schürz committed Jan 22, 2025
1 parent 81e4f80 commit ef95838
Show file tree
Hide file tree
Showing 13 changed files with 337 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ class MultiplayerCursor extends CompositeDrawable {
override update() {
super.update();

this.position = Vec2.lerp(this.#targetPosition, this.position, Math.exp(-0.05 * this.time.elapsed));
this.position = Vec2.lerp(this.#targetPosition, this.position, Math.exp(-0.1 * this.time.elapsed));
}

override dispose(isDisposing: boolean = true) {
Expand Down
7 changes: 6 additions & 1 deletion packages/core/src/utils/vec2Serializer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@ import type { ISerializer } from '@osucad/multiplayer';
import { Vec2 } from '@osucad/framework';

export const vec2Serializer: ISerializer<Vec2, [number, number]> = {
serialize: (value: Vec2) => [value.x, value.y],
serialize: (value: Vec2) => {
return [
Math.round(value.x * 1000) / 1000,
Math.round(value.y * 1000) / 1000,
];
},
deserialize(plain: [number, number]): Vec2 {
return new Vec2(plain[0], plain[1]);
},
Expand Down
6 changes: 3 additions & 3 deletions packages/multiplayer/src/protocol/ServerMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ export interface ServerMessages {
}

export type SummaryResponse =
| { summary: any; sequenceNumber: number }
| { summary: any; sequenceNumber: string }
| { error: 'has-pending-ops' };

export interface InitialStateServerMessage {
clientId: number;
document: {
summary: any;
ops: MutationsSubmittedMessage[];
sequenceNumber: number;
sequenceNumber: string;
};
assets: AssetInfo[];
connectedUsers: ClientInfo[];
Expand All @@ -59,5 +59,5 @@ export interface MutationsSubmittedMessage {
version: number;
clientId: number;
mutations: string[];
sequenceNumber: number;
sequenceNumber: string;
}
4 changes: 4 additions & 0 deletions packages/server/config/default.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,6 @@
deployment:
port: 3000

redis:
host: localhost
port: 6379
4 changes: 3 additions & 1 deletion packages/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
"vite-node": "^2.1.6"
},
"dependencies": {
"@msgpack/msgpack": "3.0.0-beta2",
"config": "^3.3.12",
"cors": "^2.8.5",
"express": "^4.21.1",
"ioredis": "^5.4.2",
"request": "^2.88.2",
"socket.io": "^4.8.1"
"socket.io": "^4.8.1",
"socket.io-adapter": "^2.5.5"
}
}
7 changes: 5 additions & 2 deletions packages/server/src/Gateway.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { OsuBeatmap } from '@osucad/ruleset-osu';
import type Redis from 'ioredis';
import type { Server } from 'socket.io';
import { randomUUID } from 'node:crypto';
import fs from 'node:fs/promises';
Expand All @@ -16,7 +17,7 @@ export class Gateway {

room!: Room;

async init() {
async init(redis: Redis) {
RulesetStore.register(new OsuRuleset().rulesetInfo);

const ruleset = new OsuRuleset();
Expand All @@ -27,7 +28,9 @@ export class Gateway {

const roomId = randomUUID();

const orderingService = new OrderingService(new BoxedBeatmap(beatmap).createSummary());
const orderingService = new OrderingService(redis, 'edit:1:ops', new BoxedBeatmap(beatmap).createSummary());

await orderingService.init();

const broadcast = this.io.to(roomId);

Expand Down
16 changes: 9 additions & 7 deletions packages/server/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ import config from 'config';
import cors from 'cors';
import express from 'express';
import request from 'request';
import { Server } from 'socket.io';
import msgpackParser from 'socket.io-msgpack-parser';
import { getAssetPath, loadAssets } from './assets';
import { Gateway } from './Gateway';
import { createRedis } from './redis';
import { createSocketIo } from './socketIo';

// region config

Expand All @@ -25,13 +25,15 @@ app.use((req, res, next) => {
});

const server = http.createServer(app);
const io = new Server(server, {
transports: ['websocket'],
parser: msgpackParser,

const redis = createRedis(config.get('redis'));

const io = createSocketIo({
server,
redis,
});

const gateway = new Gateway(io);

app.get('/api/assets/:id', (req, res) => {
const path = getAssetPath(req.params.id);
if (!path) {
Expand All @@ -48,7 +50,7 @@ app.get('/api/users/:id/avatar', async (req, res) => {

async function run() {
await loadAssets();
await gateway.init();
await gateway.init(redis);

server.listen(port, () => {
console.log(`Listening on port ${port}`);
Expand Down
14 changes: 6 additions & 8 deletions packages/server/src/multiplayer/Room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ export class Room {
0xE329AB,
];

accept(socket: Socket<ClientMessages, ServerMessages>) {
async accept(socket: Socket<ClientMessages, ServerMessages>) {
const clientId = nextClientId();

socket.join(this.id);
Expand All @@ -45,14 +45,14 @@ export class Room {
color,
);

const { summary, ops } = this.orderingService.getMessagesSinceLastSummary();
const { summary, ops } = await this.orderingService.getMessagesSinceLastSummary();

socket.emit('initialData', {
clientId,
document: {
summary: summary.summary,
ops,
sequenceNumber: this.orderingService.sequenceNumber,
sequenceNumber: summary.sequenceNumber,
},
assets: this.assets,
connectedUsers: [...this.users.values()].map(it => it.getInfo()),
Expand Down Expand Up @@ -93,14 +93,12 @@ export class Room {
this.broadcast.emit('presenceUpdated', user.clientId, key, data);
}

private handleSubmitMutations(user: RoomUser, message: SubmitMutationsMessage) {
const start = performance.now();

const sequencedMessage = this.orderingService.appendOps(user.clientId, message);
private async handleSubmitMutations(user: RoomUser, message: SubmitMutationsMessage) {
const { sequencedMessage, mutationCount } = await this.orderingService.appendOps(user.clientId, message);

this.broadcast.emit('mutationsSubmitted', sequencedMessage);

if (this.orderingService.mutationCount > 1000)
if (mutationCount > 1000)
this.requestSummary().then();
}

Expand Down
16 changes: 16 additions & 0 deletions packages/server/src/redis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import Redis from 'ioredis';

export interface RedisConfig {
host: string;
port: number;
}

export function createRedis(config: RedisConfig) {
const redis = new Redis();

redis.on('connect', () => console.log('Connected to redis'));

redis.on('message', console.log);

return redis;
}
184 changes: 184 additions & 0 deletions packages/server/src/redisAdapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
import type Redis from 'ioredis';
import type { ClusterMessage, ClusterResponse, Offset, ServerId } from 'socket.io-adapter';
import type { ClusterAdapterOptions } from 'socket.io-adapter/dist/cluster-adapter';
import { decode, encode } from '@msgpack/msgpack';
import { hasBinary } from '@socket.io/redis-streams-adapter/dist/util';
import { ClusterAdapterWithHeartbeat, MessageType } from 'socket.io-adapter';

function mapResult(result: any[]) {
const id = result[0];
const inlineValues = result[1];
const message: any = {};
for (let i = 0; i < inlineValues.length; i += 2) {
message[inlineValues[i]] = inlineValues[i + 1];
}
return {
id,
message,
};
}

export function redisAdapter(redis: Redis, streamName: string = 'osucad') {
const namespaceToAdapters = new Map<string, RedisStreamsAdapter>();

let offset = '$';
let polling = false;
let shouldClose = false;

async function poll() {
try {
const response = await redis.xread('BLOCK', 10, 'STREAMS', streamName, offset)
.then((results) => {
if (results === null)
return null;
return [
{
messages: results[0][1].map(mapResult),
},
];
});

if (response) {
for (const entry of response[0].messages) {
console.debug('reading entry %s', entry.id);
const message = entry.message;

if (message.nsp) {
namespaceToAdapters
.get(message.nsp)
?.onRawMessage(message, entry.id);
}

offset = entry.id;
}
}
}
catch (e: any) {
console.debug('something went wrong while consuming the stream: %s', e.message);
}

if (namespaceToAdapters.size > 0 && !shouldClose) {
poll();
}
else {
polling = false;
}
}

return function (nsp: any) {
const adapter = new RedisStreamsAdapter(redis, streamName, nsp, {});
namespaceToAdapters.set(nsp.name, adapter);

if (!polling) {
polling = true;
shouldClose = false;
poll();
}

const defaultClose = adapter.close;

adapter.close = () => {
namespaceToAdapters.delete(nsp.name);

if (namespaceToAdapters.size === 0) {
shouldClose = true;
}

defaultClose.call(adapter);
};

return adapter;
};
}

class RedisStreamsAdapter extends ClusterAdapterWithHeartbeat {
constructor(
readonly redis: Redis,
readonly streamName: string,
nsp: any,
opts: ClusterAdapterOptions,
) {
super(nsp, opts);
}

protected doPublish(message: ClusterMessage): Promise<Offset> {
const args = [this.streamName, 'MAXLEN', '~', 10_000, '*'] as any;
const payload = RedisStreamsAdapter.encode(message);

for (const key in payload)
args.push(key, payload[key as keyof typeof payload]);

return this.redis.xadd(args) as Promise<string>;
}

protected async doPublishResponse(requesterUid: ServerId, response: ClusterResponse): Promise<void> {
await this.doPublish(response as unknown as ClusterMessage);
}

static encode(message: ClusterMessage): RawClusterMessage {
const rawMessage: RawClusterMessage = {
uid: message.uid,
nsp: message.nsp,
type: message.type.toString(),
};

if ('data' in message) {
const mayContainBinary = [
MessageType.BROADCAST,
MessageType.FETCH_SOCKETS_RESPONSE,
MessageType.SERVER_SIDE_EMIT,
MessageType.SERVER_SIDE_EMIT_RESPONSE,
MessageType.BROADCAST_ACK,
].includes(message.type);

if (mayContainBinary && hasBinary(message.data)) {
// eslint-disable-next-line node/prefer-global/buffer
rawMessage.data = Buffer.from(encode(message.data)).toString('base64');
}
else {
rawMessage.data = JSON.stringify(message.data);
}
}

return rawMessage;
}

public onRawMessage(rawMessage: RawClusterMessage, offset: string) {
let message;
try {
message = RedisStreamsAdapter.decode(rawMessage);
}
catch (e: any) {
return console.debug('invalid format: %s', e.message);
}

this.onMessage(message, offset);
}

static decode(rawMessage: RawClusterMessage): ClusterMessage {
const message: ClusterMessage = {
uid: rawMessage.uid,
nsp: rawMessage.nsp,
type: Number.parseInt(rawMessage.type, 10),
};

if (rawMessage.data) {
if (rawMessage.data.startsWith('{')) {
(message as any).data = JSON.parse(rawMessage.data);
}
else {
// eslint-disable-next-line node/prefer-global/buffer
(message as any).data = decode(Buffer.from(rawMessage.data, 'base64')) as Record<string, unknown>;
}
}

return message;
}
}

interface RawClusterMessage {
uid: string;
nsp: string;
type: string;
data?: string;
}
Loading

0 comments on commit ef95838

Please sign in to comment.