Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: add SubscriptionAwareGrpcClient #1054

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
npm i @drift-labs/sdk
```

If you are using a gRPC flavored account subscriber, you will need an updated `protoc` installation, please refer to the Protobuf [installation guide](https://grpc.io/docs/protoc-installation/)

## Getting Started

Documentation:
Expand Down
3 changes: 2 additions & 1 deletion sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
"@pythnetwork/client": "2.5.3",
"@solana/spl-token": "^0.3.7",
"@solana/web3.js": "1.91.7",
"@triton-one/yellowstone-grpc": "0.4.0",
"strict-event-emitter-types": "^2.0.0",
"uuid": "^8.3.2",
"zstddec": "^0.1.0"
Expand Down Expand Up @@ -68,4 +69,4 @@
"engines": {
"node": ">=18"
}
}
}
74 changes: 74 additions & 0 deletions sdk/src/accounts/grpcAccountSubscriber.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import {
DataAndSlot,
BufferAndSlot,
AccountSubscriber,
ResubOpts,
} from './types';
import { AnchorProvider, Program } from '@coral-xyz/anchor';
import { AccountInfo, Commitment, Context, PublicKey } from '@solana/web3.js';

Check failure on line 8 in sdk/src/accounts/grpcAccountSubscriber.ts

View workflow job for this annotation

GitHub Actions / yarn-lint

'AccountInfo' is defined but never used. Allowed unused vars must match /^_/u

Check failure on line 8 in sdk/src/accounts/grpcAccountSubscriber.ts

View workflow job for this annotation

GitHub Actions / yarn-lint

'Context' is defined but never used. Allowed unused vars must match /^_/u
import { capitalize } from './utils';

Check failure on line 9 in sdk/src/accounts/grpcAccountSubscriber.ts

View workflow job for this annotation

GitHub Actions / yarn-lint

'capitalize' is defined but never used. Allowed unused vars must match /^_/u
import Client, {
CommitmentLevel,

Check failure on line 11 in sdk/src/accounts/grpcAccountSubscriber.ts

View workflow job for this annotation

GitHub Actions / yarn-lint

'CommitmentLevel' is defined but never used. Allowed unused vars must match /^_/u
SubscribeRequest,

Check failure on line 12 in sdk/src/accounts/grpcAccountSubscriber.ts

View workflow job for this annotation

GitHub Actions / yarn-lint

'SubscribeRequest' is defined but never used. Allowed unused vars must match /^_/u
SubscribeRequestFilterAccountsFilter,

Check failure on line 13 in sdk/src/accounts/grpcAccountSubscriber.ts

View workflow job for this annotation

GitHub Actions / yarn-lint

'SubscribeRequestFilterAccountsFilter' is defined but never used. Allowed unused vars must match /^_/u
} from '@triton-one/yellowstone-grpc';
import { ChannelOptions } from '@grpc/grpc-js';

export class GrpcAccountSubscriber<T> implements AccountSubscriber<T> {
dataAndSlot?: DataAndSlot<T>;
bufferAndSlot?: BufferAndSlot;
accountName: string;
program: Program;
client: Client;
accountPublicKey: PublicKey;
decodeBufferFn: (buffer: Buffer) => T;
onChange: (data: T) => void;
listenerId?: number;

resubOpts?: ResubOpts;

commitment?: Commitment;
isUnsubscribing = false;

timeoutId?: NodeJS.Timeout;

receivingData: boolean;

public constructor(
accountName: string,
program: Program,
accountPublicKey: PublicKey,
grpcEndpoint: string,
grpcXToken?: string,
grpcChannelOptions?: ChannelOptions,
// decodeBuffer?: (buffer: Buffer) => T,
commitment?: Commitment
) {
this.accountName = accountName;
this.program = program;
this.accountPublicKey = accountPublicKey;
this.receivingData = false;
this.commitment =
commitment ?? (this.program.provider as AnchorProvider).opts.commitment;
this.client = new Client(grpcEndpoint, grpcXToken, grpcChannelOptions);
}

async subscribe(onChange: (data: T) => void): Promise<void> {
if (this.listenerId != null || this.isUnsubscribing) {
return;
}
this.onChange = onChange;
if (!this.dataAndSlot) {
await this.fetch();
}
}
fetch(): Promise<void> {
throw new Error('Method not implemented.');
}
unsubscribe(): Promise<void> {
throw new Error('Method not implemented.');
}
setData(userAccount: T, slot?: number): void {

Check failure on line 71 in sdk/src/accounts/grpcAccountSubscriber.ts

View workflow job for this annotation

GitHub Actions / yarn-lint

'userAccount' is defined but never used. Allowed unused args must match /^_/u

Check failure on line 71 in sdk/src/accounts/grpcAccountSubscriber.ts

View workflow job for this annotation

GitHub Actions / yarn-lint

'slot' is defined but never used. Allowed unused args must match /^_/u
throw new Error('Method not implemented.');
}
}
272 changes: 272 additions & 0 deletions sdk/src/accounts/grpcClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
import Client, {
SubscribeRequest,
SubscribeUpdate,
SubscribeRequestFilterAccountsFilter,
SubscribeRequestFilterAccounts,
SubscribeRequestFilterSlots,
SubscribeRequestFilterTransactions,
SubscribeRequestFilterBlocks,
SubscribeRequestFilterBlocksMeta,
SubscribeRequestFilterEntry,
} from '@triton-one/yellowstone-grpc';
import { ChannelOptions, ClientDuplexStream } from '@grpc/grpc-js';
import {
SubscribeRequestPing,
SubscribeUpdateAccount,
SubscribeUpdateSlot,
} from '@triton-one/yellowstone-grpc/dist/grpc/geyser';

const emptyRequest: SubscribeRequest = {
slots: {},
accounts: {},
transactions: {},
blocks: {},
blocksMeta: {},
accountsDataSlice: [],
entry: {},
};

/// A single grpc connection can manage multiple subscriptions (accounts, programs, blocks, etc.)
/// This class is a wrapper around a grpc client to manage multiplexing these subscriptions
/// and avoid unintentionally unsubscribing when multiple account subscribers share a connection.
///
/// SubscriptionAwareGrpcClient can be safely shared across multiple objects.
export class SubscriptionAwareGrpcClient {
client: Client;
/// the latest set of subscriptions
callbacks: Map<
string,
[(key: string, data: any) => void, keyof SubscribeUpdate]
> = new Map();
subcribeRequest: SubscribeRequest;
stream?: ClientDuplexStream<SubscribeRequest, SubscribeUpdate>;

constructor(
grpcEndpoint: string,
grpcXToken?: string,
grpcChannelOptions?: ChannelOptions
) {
this.client = new Client(grpcEndpoint, grpcXToken, grpcChannelOptions);
this.subcribeRequest = emptyRequest;
}

async subscribe() {
if (this.stream) {
return;
}
this.stream = await this.client.subscribe();
this.stream.on('data', (data: SubscribeUpdate) => {
console.log('DATA:', data);
for (const filter of data.filters) {
if (this.callbacks.has(filter)) {
const [callback, payloadKey] = this.callbacks.get(filter);
callback(filter, data[payloadKey]);
}
}
});
}

async unsubscribe() {
if (!this.stream) {
return;
}

// https://docs.triton.one/project-yellowstone/dragons-mouth-grpc-subscriptions#unsubscribing
this.subcribeRequest = emptyRequest;
this.stream.write(this.subcribeRequest);
this.stream.end();
this.stream = undefined;
this.callbacks.clear();
}

addAccountSubscription(
key: string,
account: string[],
callback: (key: string, data: SubscribeUpdateAccount) => void
) {
if (!this.stream) {
throw new Error('must call subscribe() before adding subscriptions');
}
if (this.subcribeRequest.accounts[key]) {
throw new Error(
`Account subscription key ${key} already exists, existing subscription: ${JSON.stringify(
this.subcribeRequest.accounts[key]
)}, new subscription: ${JSON.stringify(account)}`
);
}
this.callbacks.set(key, [callback, 'account']);
this.subcribeRequest.accounts[key] = SubscribeRequestFilterAccounts.create({
account,
});
this.stream.write(this.subcribeRequest);
}

addProgramSubscription(
key: string,
owner: string[],
filters: SubscribeRequestFilterAccountsFilter[],
callback: (key: string, data: SubscribeUpdateAccount) => void
) {
if (!this.stream) {
throw new Error('must call subscribe() before adding subscriptions');
}
if (this.subcribeRequest.accounts[key]) {
throw new Error(
`Account subscription key ${key} already exists, existing subscription: ${JSON.stringify(
this.subcribeRequest.accounts[key]
)}, new subscription: ${JSON.stringify({ owner, filters })}`
);
}
this.callbacks.set(key, [callback, 'account']);
this.subcribeRequest.accounts[key] = SubscribeRequestFilterAccounts.create({
owner,
filters,
});
this.stream.write(this.subcribeRequest);
}

addSlotSubscription(
key: string,
callback: (key: string, data: SubscribeUpdateSlot) => void
) {
if (!this.stream) {
throw new Error('must call subscribe() before adding subscriptions');
}
if (this.subcribeRequest.slots[key]) {
throw new Error(`Slot subscription key ${key} already exists`);
}
this.callbacks.set(key, [callback, 'slot']);
this.subcribeRequest.slots[key] = SubscribeRequestFilterSlots.create({});
this.stream.write(this.subcribeRequest);
}

addTransactionSubscription(
key: string,
subscription: SubscribeRequestFilterTransactions,
callback: (key: string, data: any) => void
) {
if (!this.stream) {
throw new Error('must call subscribe() before adding subscriptions');
}
if (this.subcribeRequest.transactions[key]) {
throw new Error(
`Transaction subscription key ${key} already exists, existing subscription: ${JSON.stringify(
this.subcribeRequest.transactions[key]
)}, new subscription: ${JSON.stringify(subscription)}`
);
}
this.callbacks.set(key, [callback, 'transaction']);
this.subcribeRequest.transactions[key] = subscription;
}

addBlockSubscription(
key: string,
subscription: SubscribeRequestFilterBlocks,
callback: (key: string, data: any) => void
) {
if (!this.stream) {
throw new Error('must call subscribe() before adding subscriptions');
}
if (this.subcribeRequest.blocks[key]) {
throw new Error(
`Block subscription key ${key} already exists, existing subscription: ${JSON.stringify(
this.subcribeRequest.blocks[key]
)}, new subscription: ${JSON.stringify(subscription)}`
);
}
this.callbacks.set(key, [callback, 'block']);
this.subcribeRequest.blocks[key] = subscription;
this.stream.write(this.subcribeRequest);
}

addBlockMetaSubscription(
key: string,
subscription: SubscribeRequestFilterBlocksMeta,
callback: (key: string, data: any) => void
) {
if (!this.stream) {
throw new Error('must call subscribe() before adding subscriptions');
}
if (this.subcribeRequest.blocksMeta[key]) {
throw new Error(
`Block meta subscription key ${key} already exists, existing subscription: ${JSON.stringify(
this.subcribeRequest.blocksMeta[key]
)}, new subscription: ${JSON.stringify(subscription)}`
);
}
this.callbacks.set(key, [callback, 'blockMeta']);
this.subcribeRequest.blocksMeta[key] = subscription;
this.stream.write(this.subcribeRequest);
}

addEntrySubscription(
key: string,
subscription: SubscribeRequestFilterEntry,
callback: (key: string, data: any) => void
) {
if (!this.stream) {
throw new Error('must call subscribe() before adding subscriptions');
}
if (this.subcribeRequest.entry[key]) {
throw new Error(
`Entry subscription key ${key} already exists, existing subscription: ${JSON.stringify(
this.subcribeRequest.entry[key]
)}, new subscription: ${JSON.stringify(subscription)}`
);
}
this.callbacks.set(key, [callback, 'entry']);
this.subcribeRequest.entry[key] = subscription;
this.stream.write(this.subcribeRequest);
}

// addAccountsDataSliceSubscription(subscription: SubscribeRequestAccountsDataSlice, callback: (key: string, data: any) => void) {
// if (!this.stream) {
// throw new Error("must call subscribe() before adding subscriptions");
// }
// this.callbacks.set(key, callback);
// this.subcribeRequest.accountsDataSlice.push(subscription);
// this.stream.write(this.subcribeRequest);
// }

addPingSubscription(subscription: SubscribeRequestPing) {
this.subcribeRequest.ping = subscription;
this.stream.write(this.subcribeRequest);
}
}

const main = async () => {
const client = new SubscriptionAwareGrpcClient(
'https://api.rpcpool.com:443',
'<X_TOKEN>'
);

await client.subscribe();

client.addAccountSubscription(
'BRksHqLiq2gvQw1XxsZq6DXZjD3GB5a9J63tUBgd6QS9-account',
['BRksHqLiq2gvQw1XxsZq6DXZjD3GB5a9J63tUBgd6QS9'],
(key, data) => {
console.log('ACCOUNT', key, data);
}
);

client.addAccountSubscription(
'BRksHqLiq2gvQw1XxsZq6DXZjD3GB5a9J63tUBgd6QS9-account2',
['BRksHqLiq2gvQw1XxsZq6DXZjD3GB5a9J63tUBgd6QS9'],
(key, data) => {
console.log('ACCOUNT_2', key, data);
}
);

// client.addSlotSubscription('slots', (key, data) => {
// console.log("SLOT", key, data);
// })

console.log('waiting...');
await new Promise((resolve) => setTimeout(resolve, 30_000));
console.log('done!');
await client.unsubscribe();
process.exit(0);
};

main();
Loading
Loading