Skip to content

Commit

Permalink
[IND-481] Remove non-SQL implementation of creating initial rows (#816)
Browse files Browse the repository at this point in the history
More clean-up before migration to using a SQL based block processor.
  • Loading branch information
lcwik authored Nov 29, 2023
1 parent 3f5d937 commit f9b92e5
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 157 deletions.
66 changes: 5 additions & 61 deletions indexer/services/ender/__tests__/lib/on-message.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,20 +166,7 @@ describe('on-message', () => {
defaultDeleveragingEvent,
).finish());

it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])('successfully processes block with transaction event (%s)', async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_SQL_FUNCTION_TO_CREATE_INITIAL_ROWS = useSqlFunction;
it('successfully processes block with transaction event', async () => {
const transactionIndex: number = 0;
const eventIndex: number = 0;
const events: IndexerTendermintEvent[] = [
Expand Down Expand Up @@ -232,11 +219,7 @@ describe('on-message', () => {
'via SQL function',
true,
],
])('successfully processes block with transaction event with unset version (%s)', async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_SQL_FUNCTION_TO_CREATE_INITIAL_ROWS = useSqlFunction;
])('successfully processes block with transaction event with unset version', async () => {
const transactionIndex: number = 0;
const eventIndex: number = 0;
const events: IndexerTendermintEvent[] = [
Expand Down Expand Up @@ -561,20 +544,7 @@ describe('on-message', () => {
expect.any(Number), 1, { success: 'true' });
});

it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])('successfully processes block with block event (%s)', async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_SQL_FUNCTION_TO_CREATE_INITIAL_ROWS = useSqlFunction;
it('successfully processes block with block event', async () => {
// -1 so that createIndexerTendermintEvent creates a block event
const transactionIndex: number = -1;
const eventIndex: number = 0;
Expand Down Expand Up @@ -616,20 +586,7 @@ describe('on-message', () => {
expect.any(Number), 1, { success: 'true' });
});

it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])('successfully processes block with transaction event and block event (%s)', async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_SQL_FUNCTION_TO_CREATE_INITIAL_ROWS = useSqlFunction;
it('successfully processes block with transaction event and block event', async () => {
const transactionIndex: number = 0;
const eventIndex: number = 0;

Expand Down Expand Up @@ -682,20 +639,7 @@ describe('on-message', () => {
expect.any(Number), 1, { success: 'true' });
});

it.each([
[
'via knex',
false,
],
[
'via SQL function',
true,
],
])('successfully processes block with multiple transactions (%s)', async (
_name: string,
useSqlFunction: boolean,
) => {
config.USE_SQL_FUNCTION_TO_CREATE_INITIAL_ROWS = useSqlFunction;
it('successfully processes block with multiple transactions', async () => {
const transactionIndex0: number = 0;
const transactionIndex1: number = 1;
const eventIndex0: number = 0;
Expand Down
3 changes: 0 additions & 3 deletions indexer/services/ender/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ export const configSchema = {
SEND_WEBSOCKET_MESSAGES: parseBoolean({
default: true,
}),
USE_SQL_FUNCTION_TO_CREATE_INITIAL_ROWS: parseBoolean({
default: true,
}),
};

export default parseSchema(configSchema);
102 changes: 9 additions & 93 deletions indexer/services/ender/src/lib/on-message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,12 @@ import {
import { KafkaTopics } from '@dydxprotocol-indexer/kafka';
import {
Transaction,
BlockTable,
TransactionTable,
TendermintEventTable,
TendermintEventFromDatabase,
TransactionFromDatabase,
IsolationLevel,
CandleFromDatabase,
storeHelpers,
} from '@dydxprotocol-indexer/postgres';
import {
IndexerTendermintBlock,
IndexerTendermintEvent,
} from '@dydxprotocol-indexer/v4-protos';
import {
KafkaMessage,
Expand All @@ -38,7 +32,6 @@ import { refreshDataCaches } from './cache-manager';
import { CandlesGenerator } from './candles-generator';
import {
dateToDateTime,
indexerTendermintEventToTransactionIndex,
} from './helper';
import { KafkaPublisher } from './kafka-publisher';

Expand Down Expand Up @@ -75,10 +68,14 @@ export async function onMessage(message: KafkaMessage): Promise<void> {
try {
validateIndexerTendermintBlock(indexerTendermintBlock);

await createInitialRows(
blockHeight,
txId,
indexerTendermintBlock,
await runFuncWithTimingStat(
createInitialRows(
blockHeight,
txId,
indexerTendermintBlock,
),
{},
'create_initial_rows',
);
const blockProcessor: BlockProcessor = new BlockProcessor(
indexerTendermintBlock,
Expand Down Expand Up @@ -218,49 +215,7 @@ function validateIndexerTendermintBlock(
}
}

function createTendermintEvents(
events: IndexerTendermintEvent[],
blockHeight: string,
txId: number,
): Promise<TendermintEventFromDatabase>[] {
return _.map(events, (event: IndexerTendermintEvent) => {
return createTendermintEvent(event, blockHeight, txId);
});
}

function createTendermintEvent(
event: IndexerTendermintEvent,
blockHeight: string,
txId: number,
): Promise<TendermintEventFromDatabase> {
return TendermintEventTable.create({
blockHeight,
transactionIndex: indexerTendermintEventToTransactionIndex(event),
eventIndex: event.eventIndex,
}, { txId });
}

function createTransactions(
transactionHashes: string[],
blockHeight: string,
txId: number,
): Promise<TransactionFromDatabase>[] {
return _.map(
transactionHashes,
(transactionHash: string, transactionIndex: number) => {
return TransactionTable.create(
{
blockHeight,
transactionIndex,
transactionHash,
},
{ txId },
);
},
);
}

async function createInitialRowsViaSqlFunction(
async function createInitialRows(
blockHeight: string,
txId: number,
block: IndexerTendermintBlock,
Expand All @@ -286,42 +241,3 @@ async function createInitialRowsViaSqlFunction(
throw error;
});
}

async function createInitialRows(
blockHeight: string,
txId: number,
indexerTendermintBlock: IndexerTendermintBlock,
): Promise<void> {
if (config.USE_SQL_FUNCTION_TO_CREATE_INITIAL_ROWS) {
return runFuncWithTimingStat(
createInitialRowsViaSqlFunction(
blockHeight,
txId,
indexerTendermintBlock,
),
{},
'create_initial_rows',
);
} else {
await runFuncWithTimingStat(
Promise.all([
BlockTable.create({
blockHeight,
time: indexerTendermintBlock.time!.toISOString(),
}, { txId }),
...createTransactions(
indexerTendermintBlock.txHashes,
blockHeight,
txId,
),
...createTendermintEvents(
indexerTendermintBlock.events,
blockHeight,
txId,
),
]),
{},
'create_initial_rows',
);
}
}

0 comments on commit f9b92e5

Please sign in to comment.