Skip to content

Commit

Permalink
use in-house pubsub to refactor receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
yaacovCR committed Feb 6, 2021
1 parent 7188c81 commit b621e1b
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 56 deletions.
2 changes: 1 addition & 1 deletion packages/delegate/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
"dependencies": {
"@ardatan/aggregate-error": "0.0.6",
"@graphql-tools/batch-execute": "^7.0.0",
"@graphql-tools/pubsub": "^7.0.0",
"@graphql-tools/schema": "^7.0.0",
"@graphql-tools/utils": "^7.1.6",
"dataloader": "2.0.0",
"graphql-subscriptions": "^1.1.0",
"is-promise": "4.0.0",
"tslib": "~2.1.0"
},
Expand Down
122 changes: 68 additions & 54 deletions packages/delegate/src/Receiver.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,17 @@
import { AsyncExecutionResult, ExecutionPatchResult, mapAsyncIterator, mergeDeep } from '@graphql-tools/utils';
import { ExecutionResult } from 'graphql';
import { PubSub } from 'graphql-subscriptions';

const PATCH_TOPIC = 'PATCH';
import { AsyncExecutionResult, ExecutionPatchResult, mergeDeep } from '@graphql-tools/utils';

function updateObjectWithPatch(object: any, path: ReadonlyArray<string | number>, patch: Record<string, any>) {
const pathSegment = path[0];
if (path.length === 1) {
mergeDeep(object[pathSegment], patch);
} else {
updateObjectWithPatch(object[pathSegment], path.slice(1), patch);
}
}

function getDataAtPath(object: any, path: ReadonlyArray<string | number>): any {
const pathSegment = path[0];
const data = object[pathSegment];
if (path.length === 1 || data == null) {
return data;
} else {
getDataAtPath(data, path.slice(1));
}
}
import { InMemoryChannel } from '@graphql-tools/pubsub';

export class Receiver {
private readonly asyncIterable: AsyncIterable<AsyncExecutionResult>;
private readonly resultTransformer: (originalResult: ExecutionResult) => any;
private readonly initialResultDepth: number;
private readonly pubsub: PubSub;
private readonly channel: InMemoryChannel<ExecutionPatchResult>;
private result: any;
private iterating: boolean;
private numRequests: number;

constructor(
asyncIterable: AsyncIterable<AsyncExecutionResult>,
Expand All @@ -39,8 +21,9 @@ export class Receiver {
this.asyncIterable = asyncIterable;
this.resultTransformer = resultTransformer;
this.initialResultDepth = initialResultDepth;
this.pubsub = new PubSub();
this.channel = new InMemoryChannel();
this.iterating = false;
this.numRequests = 0;
}

public async getInitialResult() {
Expand All @@ -51,56 +34,87 @@ export class Receiver {
return transformedResult;
}

private async iterate() {
for await (const asyncResult of this.asyncIterable) {
if (isPatchResultWithData(asyncResult)) {
const transformedResult = this.resultTransformer(asyncResult);
updateObjectWithPatch(this.result, asyncResult.path, transformedResult);
this._publish(asyncResult);
}
public async request(requestedPath: Array<string | number>): Promise<any> {
const data = getDataAtPath(this.result, requestedPath.slice(this.initialResultDepth));
if (data !== undefined) {
return data;
}

const asyncIterable = this._subscribe();

this.numRequests++;
if (!this.iterating) {
setImmediate(() => this._iterate());
}

return this._reduce(asyncIterable, requestedPath);
}

private _publish(patchResult: ExecutionPatchResult<Record<string, any>>): void {
this.pubsub.publish(PATCH_TOPIC, patchResult);
private _publish(asyncResult: ExecutionPatchResult): void {
return this.channel.publish(asyncResult);
}

private _subscribe(): AsyncIterableIterator<ExecutionPatchResult<Record<string, any>>> {
const asyncIterator = this.pubsub.asyncIterator<ExecutionPatchResult>(PATCH_TOPIC);
return mapAsyncIterator(asyncIterator, value => value);
private _subscribe(): AsyncIterableIterator<ExecutionPatchResult> {
return this.channel.subscribe();
}

public async request(requestedPath: Array<string | number>) {
const data = getDataAtPath(this.result, requestedPath.slice(this.initialResultDepth));
if (data !== undefined) {
return data;
}
private async _iterate(): Promise<void> {
const iterator = this.asyncIterable[Symbol.asyncIterator]();

const asyncIterable = this._subscribe();
let hasNext = true;
while (hasNext && this.numRequests) {
const payload = await iterator.next();

if (!this.iterating) {
setTimeout(() => this.iterate(), 0);
hasNext = !payload.done;
const asyncResult = payload.value;

if (asyncResult != null && isPatchResultWithData(asyncResult)) {
const transformedResult = this.resultTransformer(asyncResult);
updateObjectWithPatch(this.result, asyncResult.path, transformedResult);
this._publish(asyncResult);
}
}
}

private async _reduce(
asyncIterable: AsyncIterableIterator<ExecutionPatchResult>,
requestedPath: Array<string | number>
): Promise<any> {
for await (const patchResult of asyncIterable) {
const receivedPath = patchResult.path;
const receivedPathLength = receivedPath.length;
if (receivedPathLength <= requestedPath.length) {
let match = true;
for (let i = 0; i < receivedPathLength; i++) {
if (receivedPath[i] !== requestedPath[i]) {
match = false;
break;
}
}
if (match) {
return getDataAtPath(patchResult.data, requestedPath.slice(receivedPathLength));
}

if (receivedPathLength > requestedPath.length) {
continue;
}

if (receivedPath.every((value, index) => value === requestedPath[index])) {
this.numRequests--;
return getDataAtPath(patchResult.data, requestedPath.slice(receivedPathLength));
}
}
}
}

function getDataAtPath(object: any, path: ReadonlyArray<string | number>): any {
const pathSegment = path[0];
const data = object[pathSegment];
if (path.length === 1 || data == null) {
return data;
} else {
getDataAtPath(data, path.slice(1));
}
}

function isPatchResultWithData(result: AsyncExecutionResult): result is ExecutionPatchResult {
return (result as ExecutionPatchResult).path != null;
}

function updateObjectWithPatch(object: any, path: ReadonlyArray<string | number>, patch: Record<string, any>) {
const pathSegment = path[0];
if (path.length === 1) {
mergeDeep(object[pathSegment], patch);
} else {
updateObjectWithPatch(object[pathSegment], path.slice(1), patch);
}
}
31 changes: 31 additions & 0 deletions packages/pubsub/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"name": "@graphql-tools/pubsub",
"version": "7.0.0",
"description": "A set of utils for faster development of GraphQL tools",
"repository": {
"type": "git",
"url": "ardatan/graphql-tools",
"directory": "packages/batch-delegate"
},
"license": "MIT",
"sideEffects": false,
"main": "dist/index.cjs.js",
"module": "dist/index.esm.js",
"typings": "dist/index.d.ts",
"typescript": {
"definition": "dist/index.d.ts"
},
"peerDependencies": {
"graphql": "^14.0.0 || ^15.0.0"
},
"buildOptions": {
"input": "./src/index.ts"
},
"dependencies": {
"@repeaterjs/repeater": "^3.0.4"
},
"publishConfig": {
"access": "public",
"directory": "dist"
}
}
48 changes: 48 additions & 0 deletions packages/pubsub/src/in-memory-channel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// adapted from https://github.com/repeaterjs/repeater/blob/7b294acfa7e2c21721ff77018cf77c452c51dad9/packages/pubsub/src/pubsub.ts
// adapted rather than importing @repeaterjs/pubsub
// because of https://github.com/repeaterjs/repeater/issues/67 in which pubsub will be killed!

import { Repeater, RepeaterBuffer } from '@repeaterjs/repeater';

import { Channel } from './types';

interface Hooks<T> {
push(value: T): Promise<unknown>;
stop(reason?: any): unknown;
}

export class InMemoryChannel<T> implements Channel<T> {
protected hooks: Set<Hooks<T>> = new Set();

publish(value: T): void {
const hooks = this.hooks;

for (const { push, stop } of hooks) {
try {
push(value).catch(stop);
} catch (err) {
// push queue is full
stop(err);
}
}
}

unpublish(reason?: any): void {
const hooks = this.hooks;

for (const { stop } of hooks) {
stop(reason);
}

hooks.clear();
}

subscribe(buffer?: RepeaterBuffer): Repeater<T> {
return new Repeater<T>(async (push, stop) => {
const publisher = { push, stop };
this.hooks.add(publisher);
await stop;
this.hooks.delete(publisher);
}, buffer);
}
}
52 changes: 52 additions & 0 deletions packages/pubsub/src/in-memory-pubsub.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// adapted from https://github.com/repeaterjs/repeater/blob/7b294acfa7e2c21721ff77018cf77c452c51dad9/packages/pubsub/src/pubsub.ts
// adapted rather than importing @repeaterjs/pubsub
// because of https://github.com/repeaterjs/repeater/issues/67 in which pubsub will be killed!

import { Repeater, RepeaterBuffer } from '@repeaterjs/repeater';
import { InMemoryChannel } from './in-memory-channel';

import { PubSub } from './types';

export class InMemoryPubSub<T> implements PubSub<T> {
protected channels: Record<string, InMemoryChannel<T>> = Object.create(null);

publish(topic: string, value: T): void {
let channel = this.channels[topic];

if (channel == null) {
channel = this.channels[topic] = new InMemoryChannel();
}

channel.publish(value);
}

unpublish(topic: string, reason?: any): void {
const channel = this.channels[topic];

if (channel == null) {
return;
}

channel.unpublish(reason);

delete this.channels[topic];
}

subscribe(topic: string, buffer?: RepeaterBuffer): Repeater<T> {
let channel = this.channels[topic];

if (this.channels[topic] == null) {
channel = this.channels[topic] = new InMemoryChannel();
}

return channel.subscribe(buffer);
}

close(reason?: any): void {
for (const channel of Object.values(this.channels)) {
channel.unpublish(reason);
}

this.channels = Object.create(null);
}
}
4 changes: 4 additions & 0 deletions packages/pubsub/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export * from './in-memory-channel';
export * from './in-memory-pubsub';

export * from './types';
14 changes: 14 additions & 0 deletions packages/pubsub/src/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { RepeaterBuffer } from '@repeaterjs/repeater';

export interface Channel<T> {
publish(value: T): Promise<unknown> | unknown;
unpublish(reason?: any): Promise<unknown> | unknown;
subscribe(buffer?: RepeaterBuffer): AsyncIterableIterator<T>;
}

export interface PubSub<T> {
publish(topic: string, value: T): Promise<unknown> | unknown;
unpublish(topic: string, reason?: any): Promise<unknown> | unknown;
subscribe(topic: string, buffer?: RepeaterBuffer): AsyncIterableIterator<T>;
close(reason?: any): Promise<unknown> | unknown;
}
7 changes: 6 additions & 1 deletion yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2781,6 +2781,11 @@
resolved "https://registry.yarnpkg.com/@polka/url/-/url-1.0.0-next.11.tgz#aeb16f50649a91af79dbe36574b66d0f9e4d9f71"
integrity sha512-3NsZsJIA/22P3QUyrEDNA2D133H4j224twJrdipXN38dpnIOzAbUDtOwkcJ5pXmn75w7LSQDjA4tO9dm1XlqlA==

"@repeaterjs/repeater@^3.0.4":
version "3.0.4"
resolved "https://registry.yarnpkg.com/@repeaterjs/repeater/-/repeater-3.0.4.tgz#a04d63f4d1bf5540a41b01a921c9a7fddc3bd1ca"
integrity sha512-AW8PKd6iX3vAZ0vA43nOUOnbq/X5ihgU+mSXXqunMkeQADGiqw/PY0JNeYtD5sr0PAy51YPgAPbDoeapv9r8WA==

"@rollup/[email protected]":
version "7.1.1"
resolved "https://registry.yarnpkg.com/@rollup/plugin-node-resolve/-/plugin-node-resolve-7.1.1.tgz#8c6e59c4b28baf9d223028d0e450e06a485bb2b7"
Expand Down Expand Up @@ -7806,7 +7811,7 @@ [email protected]:
dependencies:
tslib "~2.0.3"

[email protected], graphql-subscriptions@^1.1.0:
[email protected]:
version "1.2.0"
resolved "https://registry.yarnpkg.com/graphql-subscriptions/-/graphql-subscriptions-1.2.0.tgz#d82ff76e7504ac91acbbea15f36cd3904043937b"
integrity sha512-uXvp729fztqwa7HFUFaAqKwNMwwOfsvu4HwOu7/35Cd44bNrMPCn97mNGN0ybuuZE36CPXBTaW/4U/xyOS4D9w==
Expand Down

0 comments on commit b621e1b

Please sign in to comment.