Skip to content

Commit

Permalink
Merge pull request #5 from project-kardeshev/kv-impl
Browse files Browse the repository at this point in the history
feat(kv): update kv impl
  • Loading branch information
atticusofsparta authored Nov 6, 2024
2 parents ce85717 + 08f26c7 commit ef87567
Show file tree
Hide file tree
Showing 5 changed files with 251 additions and 27 deletions.
199 changes: 199 additions & 0 deletions src/common/process/aoconnect.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
import { connect } from '@permaweb/aoconnect';
import { EventEmitter } from 'eventemitter3';

import {
AoConnectProcessConfig,
AoProcess,
AoProcessRead,
AoProcessWrite,
AoResult,
AoSU,
AoSUMessageNode,
AoSigner,
AoWriteOptions,
WritableAoConnectProcessConfig,
isWritableAoConnectProcessConfig,
} from '../../types/ao.js';
import { Logger } from '../../utils/logger.js';
import { defaultLogger } from '../constants.js';
import { RemoteSU } from '../index.js';

export type AoClient = ReturnType<typeof connect>;

export class AoConnectProcess implements AoProcess {
readonly logger: Logger;
readonly ao: AoClient;
readonly processId: string;

constructor(config: { processId: string; ao: AoClient; logger: Logger }) {
this.logger = config.logger;
this.ao = config.ao;
this.processId = config.processId;
}

/**
*
* @param config
* @returns - ProcessReadable if no signer is provided, ProcessWritable if signer is provided
*/
static init(
config: AoConnectProcessConfig & { signer: undefined | AoSigner },
): AoConnectProcessReadable | AoConnectProcessWritable;
static init(
config: AoConnectProcessConfig & { signer?: undefined },
): AoConnectProcessReadable;
static init(
config: AoConnectProcessConfig & { signer: AoSigner },
): AoConnectProcessWritable;
static init(
config: AoConnectProcessConfig & { signer?: AoSigner | undefined },
): AoConnectProcessReadable | AoConnectProcessWritable {
if (
isWritableAoConnectProcessConfig(config as WritableAoConnectProcessConfig)
) {
return new AoConnectProcessWritable(
config as WritableAoConnectProcessConfig,
);
}
return new AoConnectProcessReadable(config);
}

static createRemoteProcess({
processId,
ao = connect(),
signer,
logger,
}: {
processId: string;
ao?: AoClient;
signer?: AoSigner;
logger?: Logger;
}): AoConnectProcessReadable | AoConnectProcessWritable {
return AoConnectProcess.init({
processId,
signer,
logger,
ao,
});
}
}

export class AoConnectProcessReadable
extends EventEmitter
implements AoProcessRead
{
readonly logger: Logger;
readonly ao: AoClient;
readonly su: AoSU;
readonly processId: string;
private pollInterval?: NodeJS.Timeout;
lastMessageId?: string;
constructor({
logger = defaultLogger,
ao,
processId,
}: AoConnectProcessConfig) {
super();
this.logger = logger;
this.ao = ao;
this.processId = processId;
this.su = new RemoteSU({ suUrl: 'https://su-router.ao-testnet.xyz' });
}

// Polling function to check for new transactions/messages
startPolling(interval = 5000) {
this.pollInterval = setInterval(async () => {
try {
const messages = await this.checkForNewMessages();
// sort messages oldest to newest
messages.sort((a, b) => a.timestamp - b.timestamp);
messages.forEach((message) => this.emit('message', message));
this.lastMessageId = messages.at(-1)?.message.id;
} catch (error) {
this.logger.error('Polling error:', error);
}
}, interval);
}

stopPolling() {
if (this.pollInterval) clearInterval(this.pollInterval);
}

private async checkForNewMessages(): Promise<AoSUMessageNode[]> {
const messages = await this.su.getProcessMessages({
processId: this.processId,
from: this.lastMessageId,
});
return messages.edges.map((suPage) => suPage.node);
}

/**
* @param param0 - the tags and data to be passed to dryrun
* @returns @type {Promise<AoResult>}
*/
async read({ tags, data }: AoWriteOptions) {
try {
this.logger.info('Dryrun', {
tags,
data,
target: this.processId,
});
const result = await this.ao.dryrun({
tags: tags,
data: data,
process: this.processId,
});
this.logger.info('Dryrun result', result);

return result as AoResult;
} catch (error) {
this.logger.error(error);
throw error;
}
}
}

export class AoConnectProcessWritable
extends AoConnectProcessReadable
implements AoProcessWrite
{
declare logger: Logger;
declare ao: AoClient;
declare processId: string;
readonly signer: AoSigner;
constructor({ signer, ...config }: WritableAoConnectProcessConfig) {
super(config);
this.signer = signer;
}

async write({
tags,
data,
}: AoWriteOptions): Promise<{ id: string; result: AoResult }> {
try {
this.logger.info('Message', {
tags,
data,
target: this.processId,
});
const messageId = await this.ao.message({
tags,
data: data?.toString(),
process: this.processId,
signer: this.signer,
});
this.logger.info('Message ID', messageId);

const result = await this.ao.result({
message: messageId,
process: this.processId,
});
this.logger.info(`Result for ${messageId}`, result);

return { id: messageId, result: result as AoResult };
} catch (error) {
this.logger.error(error);
throw error;
}
}
}
32 changes: 18 additions & 14 deletions src/common/process/impl/kv/registry.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import { AoSigner, AoWriteOptions } from '../../../../types/ao.js';
import { findMessageByTag } from '../../../../utils/ao.js';
import { safeDecode } from '../../../../utils/json.js';
import { Process, ProcessReadable, ProcessWritable } from '../../process.js';
import {
AoConnectProcess,
AoConnectProcessReadable,
AoConnectProcessWritable,
} from '../../aoconnect.js';

export interface KVRegistryReadable {
getKVStores(
Expand All @@ -21,24 +25,24 @@ export interface KVRegistryWritable {
): Promise<string>;
}

export class KVRegistryProcessReadable implements KVRegistryReadable {
readonly process: ProcessReadable;
export class KVRegistryAoConnectProcessReadable implements KVRegistryReadable {
readonly process: AoConnectProcessReadable;

constructor({
process,
processId,
}: {
process?: ProcessReadable;
process?: AoConnectProcessReadable;
processId?: string;
} = {}) {
if (!process && !processId) {
throw new Error('Either process or processId should be provided');
}
this.process =
process ??
(Process.createRemoteProcess({
(AoConnectProcess.createRemoteProcess({
processId: processId!,
}) as ProcessReadable);
}) as AoConnectProcessReadable);
}

async getKVStores(
Expand Down Expand Up @@ -66,31 +70,31 @@ export class KVRegistryProcessReadable implements KVRegistryReadable {
const message = findMessageByTag({
messages: res.Messages,
name: 'Action',
value: 'KV-Registry.Get-KV-Stores',
value: 'KV-Registry.Get-KV-Stores-Notice',
});
return message?.Data !== undefined
? safeDecode(message.Data)
: { Owned: [], Controlled: [] };
}
}

export class KVRegistryProcessWritable
extends KVRegistryProcessReadable
export class KVRegistryAoConnectProcessWritable
extends KVRegistryAoConnectProcessReadable
implements KVRegistryWritable
{
readonly process: ProcessWritable;
readonly process: AoConnectProcessWritable;

constructor({
signer,
processId,
process = Process.createRemoteProcess({
process = AoConnectProcess.createRemoteProcess({
signer,
processId,
}) as ProcessWritable,
}) as AoConnectProcessWritable,
}: {
signer: AoSigner;
processId: string;
process?: ProcessWritable;
process?: AoConnectProcessWritable;
}) {
super({ process });
this.process = process;
Expand All @@ -111,7 +115,7 @@ export class KVRegistryProcessWritable
...(name
? [
{
name: 'Name',
name: 'KV-Store-Name',
value: name,
},
]
Expand Down
29 changes: 17 additions & 12 deletions src/common/process/impl/kv/store.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import { AoSigner, AoWriteOptions } from '../../../../types/ao.js';
import { findMessageByTag } from '../../../../utils/ao.js';
import { safeDecode } from '../../../../utils/json.js';
import { Process, ProcessReadable, ProcessWritable } from '../../process.js';
import {
AoConnectProcess,
AoConnectProcessReadable,
AoConnectProcessWritable,
} from '../../aoconnect.js';

export interface KVStoreReadable {
getInfo(options?: Omit<AoWriteOptions, 'target'>): Promise<{
Expand Down Expand Up @@ -74,24 +78,24 @@ export interface KVStoreWritable {
): Promise<string>;
}

export class KVStoreProcessReadable implements KVStoreReadable {
readonly process: ProcessReadable;
export class KVStoreAoConnectProcessReadable implements KVStoreReadable {
readonly process: AoConnectProcessReadable;

constructor({
process,
processId,
}: {
process?: ProcessReadable;
process?: AoConnectProcessReadable;
processId?: string;
} = {}) {
if (!process && !processId) {
throw new Error('Either process or processId should be provided');
}
this.process =
process ??
(Process.createRemoteProcess({
(AoConnectProcess.createRemoteProcess({
processId: processId!,
}) as ProcessReadable);
}) as AoConnectProcessReadable);
}

async getInfo(options?: Omit<AoWriteOptions, 'target'>): Promise<{
Expand All @@ -111,6 +115,7 @@ export class KVStoreProcessReadable implements KVStoreReadable {
...(options?.tags ?? []),
],
});
this.process.logger.info('KVStoreAoConnectProcessReadable getInfo', res);

const message = res.Messages[0];
const state = safeDecode(message.Data) as Record<string, any>;
Expand Down Expand Up @@ -204,23 +209,23 @@ export class KVStoreProcessReadable implements KVStoreReadable {
}
}

export class KVStoreProcessWritable
extends KVStoreProcessReadable
export class KVStoreAoConnectProcessWritable
extends KVStoreAoConnectProcessReadable
implements KVStoreWritable
{
readonly process: ProcessWritable;
readonly process: AoConnectProcessWritable;

constructor({
signer,
processId,
process = Process.createRemoteProcess({
process = AoConnectProcess.createRemoteProcess({
signer,
processId,
}) as ProcessWritable,
}) as AoConnectProcessWritable,
}: {
signer: AoSigner;
processId: string;
process?: ProcessWritable;
process?: AoConnectProcessWritable;
}) {
super({ process });
this.process = process;
Expand Down
1 change: 1 addition & 0 deletions src/common/process/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './process.js';
export * from './impl/index.js';
export * from './aoconnect.js';
Loading

0 comments on commit ef87567

Please sign in to comment.