From 6506ea1f9a10729408daa25a6e03fdd28460e02c Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 20 Jun 2024 17:25:50 +0300 Subject: [PATCH 1/2] Add functions for provide human readable output. --- blockchain/arbitrum_one/arbitrum_one.go | 34 ++++++++++++++ .../arbitrum_sepolia/arbitrum_sepolia.go | 34 ++++++++++++++ blockchain/ethereum/ethereum.go | 34 ++++++++++++++ .../game7_orbit_arbitrum_sepolia.go | 34 ++++++++++++++ blockchain/handlers.go | 4 ++ blockchain/mantle/mantle.go | 34 ++++++++++++++ blockchain/mantle_sepolia/mantle_sepolia.go | 34 ++++++++++++++ blockchain/polygon/polygon.go | 45 ++++++++++++++++++- blockchain/xai/xai.go | 34 ++++++++++++++ blockchain/xai_sepolia/xai_sepolia.go | 34 ++++++++++++++ cmd.go | 24 ++++++++-- indexer/db.go | 45 ++++++++++++++++++- synchronizer/synchronizer.go | 3 ++ 13 files changed, 387 insertions(+), 6 deletions(-) diff --git a/blockchain/arbitrum_one/arbitrum_one.go b/blockchain/arbitrum_one/arbitrum_one.go index e6d7a31..e8b49fb 100644 --- a/blockchain/arbitrum_one/arbitrum_one.go +++ b/blockchain/arbitrum_one/arbitrum_one.go @@ -683,3 +683,37 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa return labels, nil } + +func (c *Client) DecodeProtoTransactionsToData(data []string) ([]interface{}, error) { + + transactions, err := c.DecodeProtoTransactions(data) + + if err != nil { + return nil, err + } + + var decodedData []interface{} + for _, d := range transactions { + decodedData = append(decodedData, d) + } + + return decodedData, nil + +} + +func (c *Client) DecodeProtoLogsToData(data []string) ([]interface{}, error) { + + events, err := c.DecodeProtoEventLogs(data) + + if err != nil { + return nil, err + } + + var decodedData []interface{} + for _, d := range events { + decodedData = append(decodedData, d) + } + + return decodedData, nil + +} diff --git a/blockchain/arbitrum_sepolia/arbitrum_sepolia.go b/blockchain/arbitrum_sepolia/arbitrum_sepolia.go index 401c7ef..b293e27 100644 --- a/blockchain/arbitrum_sepolia/arbitrum_sepolia.go +++ b/blockchain/arbitrum_sepolia/arbitrum_sepolia.go @@ -683,3 +683,37 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa return labels, nil } + +func (c *Client) DecodeProtoTransactionsToData(data []string) ([]interface{}, error) { + + transactions, err := c.DecodeProtoTransactions(data) + + if err != nil { + return nil, err + } + + var decodedData []interface{} + for _, d := range transactions { + decodedData = append(decodedData, d) + } + + return decodedData, nil + +} + +func (c *Client) DecodeProtoLogsToData(data []string) ([]interface{}, error) { + + events, err := c.DecodeProtoEventLogs(data) + + if err != nil { + return nil, err + } + + var decodedData []interface{} + for _, d := range events { + decodedData = append(decodedData, d) + } + + return decodedData, nil + +} diff --git a/blockchain/ethereum/ethereum.go b/blockchain/ethereum/ethereum.go index 8f12f27..d7f1f86 100644 --- a/blockchain/ethereum/ethereum.go +++ b/blockchain/ethereum/ethereum.go @@ -678,3 +678,37 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa return labels, nil } + +func (c *Client) DecodeProtoTransactionsToData(data []string) ([]interface{}, error) { + + transactions, err := c.DecodeProtoTransactions(data) + + if err != nil { + return nil, err + } + + var decodedData []interface{} + for _, d := range transactions { + decodedData = append(decodedData, d) + } + + return decodedData, nil + +} + +func (c *Client) DecodeProtoLogsToData(data []string) ([]interface{}, error) { + + events, err := c.DecodeProtoEventLogs(data) + + if err != nil { + return nil, err + } + + var decodedData []interface{} + for _, d := range events { + decodedData = append(decodedData, d) + } + + return decodedData, nil + +} diff --git a/blockchain/game7_orbit_arbitrum_sepolia/game7_orbit_arbitrum_sepolia.go b/blockchain/game7_orbit_arbitrum_sepolia/game7_orbit_arbitrum_sepolia.go index ae4f9d5..0e1f103 100644 --- a/blockchain/game7_orbit_arbitrum_sepolia/game7_orbit_arbitrum_sepolia.go +++ b/blockchain/game7_orbit_arbitrum_sepolia/game7_orbit_arbitrum_sepolia.go @@ -683,3 +683,37 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa return labels, nil } + +func (c *Client) DecodeProtoTransactionsToData(data []string) ([]interface{}, error) { + + transactions, err := c.DecodeProtoTransactions(data) + + if err != nil { + return nil, err + } + + var decodedData []interface{} + for _, d := range transactions { + decodedData = append(decodedData, d) + } + + return decodedData, nil + +} + +func (c *Client) DecodeProtoLogsToData(data []string) ([]interface{}, error) { + + events, err := c.DecodeProtoEventLogs(data) + + if err != nil { + return nil, err + } + + var decodedData []interface{} + for _, d := range events { + decodedData = append(decodedData, d) + } + + return decodedData, nil + +} diff --git a/blockchain/handlers.go b/blockchain/handlers.go index e9701c1..df590ab 100644 --- a/blockchain/handlers.go +++ b/blockchain/handlers.go @@ -20,6 +20,8 @@ import ( "google.golang.org/protobuf/proto" ) +/// Define the interface for handling general index types such as blocks, transactions, and logs for fast access to blockchain data. + func NewClient(chain, url string, timeout int) (BlockchainClient, error) { if chain == "ethereum" { client, err := ethereum.NewClient(url, timeout) @@ -68,6 +70,8 @@ type BlockchainClient interface { FetchAsProtoBlocks(*big.Int, *big.Int, bool, int) ([]proto.Message, []proto.Message, []indexer.BlockIndex, []indexer.TransactionIndex, map[uint64]indexer.BlockCache, error) DecodeProtoEventsToLabels([]string, map[uint64]uint64, map[string]map[string]map[string]string) ([]indexer.EventLabel, error) DecodeProtoTransactionsToLabels([]string, map[uint64]uint64, map[string]map[string]map[string]string) ([]indexer.TransactionLabel, error) + DecodeProtoTransactionsToData([]string) ([]interface{}, error) + DecodeProtoLogsToData([]string) ([]interface{}, error) ChainType() string } diff --git a/blockchain/mantle/mantle.go b/blockchain/mantle/mantle.go index 35a832d..d1c60e8 100644 --- a/blockchain/mantle/mantle.go +++ b/blockchain/mantle/mantle.go @@ -678,3 +678,37 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa return labels, nil } + +func (c *Client) DecodeProtoTransactionsToData(data []string) ([]interface{}, error) { + + transactions, err := c.DecodeProtoTransactions(data) + + if err != nil { + return nil, err + } + + var decodedData []interface{} + for _, d := range transactions { + decodedData = append(decodedData, d) + } + + return decodedData, nil + +} + +func (c *Client) DecodeProtoLogsToData(data []string) ([]interface{}, error) { + + events, err := c.DecodeProtoEventLogs(data) + + if err != nil { + return nil, err + } + + var decodedData []interface{} + for _, d := range events { + decodedData = append(decodedData, d) + } + + return decodedData, nil + +} diff --git a/blockchain/mantle_sepolia/mantle_sepolia.go b/blockchain/mantle_sepolia/mantle_sepolia.go index 9355310..c364c0a 100644 --- a/blockchain/mantle_sepolia/mantle_sepolia.go +++ b/blockchain/mantle_sepolia/mantle_sepolia.go @@ -678,3 +678,37 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa return labels, nil } + +func (c *Client) DecodeProtoTransactionsToData(data []string) ([]interface{}, error) { + + transactions, err := c.DecodeProtoTransactions(data) + + if err != nil { + return nil, err + } + + var decodedData []interface{} + for _, d := range transactions { + decodedData = append(decodedData, d) + } + + return decodedData, nil + +} + +func (c *Client) DecodeProtoLogsToData(data []string) ([]interface{}, error) { + + events, err := c.DecodeProtoEventLogs(data) + + if err != nil { + return nil, err + } + + var decodedData []interface{} + for _, d := range events { + decodedData = append(decodedData, d) + } + + return decodedData, nil + +} diff --git a/blockchain/polygon/polygon.go b/blockchain/polygon/polygon.go index d156416..fcb4ea1 100644 --- a/blockchain/polygon/polygon.go +++ b/blockchain/polygon/polygon.go @@ -626,7 +626,16 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa var labels []indexer.TransactionLabel - for _, transaction := range decodedTransactions { + fmt.Println("Decoded transactions: ", decodedTransactions) + + for index, transaction := range decodedTransactions { + + fmt.Println("Transaction index: ", index) + if len(transaction.Input) < 10 { + fmt.Println("Transaction input data too short: ", transaction) + fmt.Println("Transaction input data: ", blocksCache[transaction.BlockNumber]) + fmt.Println("Transaction abi map: ", abiMap[transaction.ToAddress]) + } selector := transaction.Input[:10] @@ -678,3 +687,37 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa return labels, nil } + +func (c *Client) DecodeProtoTransactionsToData(data []string) ([]interface{}, error) { + + transactions, err := c.DecodeProtoTransactions(data) + + if err != nil { + return nil, err + } + + var decodedData []interface{} + for _, d := range transactions { + decodedData = append(decodedData, d) + } + + return decodedData, nil + +} + +func (c *Client) DecodeProtoLogsToData(data []string) ([]interface{}, error) { + + events, err := c.DecodeProtoEventLogs(data) + + if err != nil { + return nil, err + } + + var decodedData []interface{} + for _, d := range events { + decodedData = append(decodedData, d) + } + + return decodedData, nil + +} diff --git a/blockchain/xai/xai.go b/blockchain/xai/xai.go index fe91bdf..49b2777 100644 --- a/blockchain/xai/xai.go +++ b/blockchain/xai/xai.go @@ -683,3 +683,37 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa return labels, nil } + +func (c *Client) DecodeProtoTransactionsToData(data []string) ([]interface{}, error) { + + transactions, err := c.DecodeProtoTransactions(data) + + if err != nil { + return nil, err + } + + var decodedData []interface{} + for _, d := range transactions { + decodedData = append(decodedData, d) + } + + return decodedData, nil + +} + +func (c *Client) DecodeProtoLogsToData(data []string) ([]interface{}, error) { + + events, err := c.DecodeProtoEventLogs(data) + + if err != nil { + return nil, err + } + + var decodedData []interface{} + for _, d := range events { + decodedData = append(decodedData, d) + } + + return decodedData, nil + +} diff --git a/blockchain/xai_sepolia/xai_sepolia.go b/blockchain/xai_sepolia/xai_sepolia.go index dddbbcb..cbe103c 100644 --- a/blockchain/xai_sepolia/xai_sepolia.go +++ b/blockchain/xai_sepolia/xai_sepolia.go @@ -683,3 +683,37 @@ func (c *Client) DecodeProtoTransactionsToLabels(transactions []string, blocksCa return labels, nil } + +func (c *Client) DecodeProtoTransactionsToData(data []string) ([]interface{}, error) { + + transactions, err := c.DecodeProtoTransactions(data) + + if err != nil { + return nil, err + } + + var decodedData []interface{} + for _, d := range transactions { + decodedData = append(decodedData, d) + } + + return decodedData, nil + +} + +func (c *Client) DecodeProtoLogsToData(data []string) ([]interface{}, error) { + + events, err := c.DecodeProtoEventLogs(data) + + if err != nil { + return nil, err + } + + var decodedData []interface{} + for _, d := range events { + decodedData = append(decodedData, d) + } + + return decodedData, nil + +} diff --git a/cmd.go b/cmd.go index 520a914..a58921b 100644 --- a/cmd.go +++ b/cmd.go @@ -392,6 +392,7 @@ func CreateInspectorCommand() *cobra.Command { targetFile := fmt.Sprintf("%s.proto", target) targetFilePath := filepath.Join(basePath, batch, targetFile) + fmt.Printf("Reading file: %s\n", targetFilePath) rawData, readErr := storageInstance.Read(targetFilePath) if readErr != nil { return readErr @@ -403,13 +404,30 @@ func CreateInspectorCommand() *cobra.Command { return nil } - _, cleintErr := blockchain.NewClient(chain, crawler.BlockchainURLs[chain], timeout) + client, cleintErr := blockchain.NewClient(chain, crawler.BlockchainURLs[chain], timeout) if cleintErr != nil { return cleintErr } - // TODO(kompotkot): Finish proto parsing + if target == "transactions" { + transactions, _ := client.DecodeProtoTransactionsToData(rawData) + if len(transactions) == 0 { + log.Printf("No transactions found in %s", targetFilePath) + return nil + } + fmt.Println("Decoded data:") + fmt.Println(transactions) + } else if target == "logs" { + logs, _ := client.DecodeProtoLogsToData(rawData) + if len(logs) == 0 { + log.Printf("No logs found in %s", targetFilePath) + } + fmt.Println("Decoded data:") + // transform logs to json + logsJSON, _ := json.Marshal(logs) + fmt.Println(string(logsJSON)) + } return nil }, } @@ -499,8 +517,6 @@ func CreateInspectorCommand() *cobra.Command { } } - // TODO(kompotkot): Write inspect of missing blocks in database - return nil }, } diff --git a/indexer/db.go b/indexer/db.go index 64f7474..6343c57 100644 --- a/indexer/db.go +++ b/indexer/db.go @@ -545,6 +545,49 @@ func (p *PostgreSQLpgx) GetEdgeDBBlock(ctx context.Context, blockchain, side str return blockIndex, nil } +func (p *PostgreSQLpgx) GetListOfFiles(ctx context.Context, blockchain string, object_type string) ([]string, error) { + pool := p.GetPool() + + conn, err := pool.Acquire(ctx) + if err != nil { + return nil, err + } + + func_of_object_type := func() string { + if object_type == "transactions" { + return TransactionsTableName(blockchain) + } else if object_type == "logs" { + return LogsTableName(blockchain) + } else if object_type == "blocks" { + return BlocksTableName(blockchain) + } else { + return "" + } + } + + defer conn.Release() + + var files []string + + query := fmt.Sprintf("SELECT path FROM %s group by path order by block_number", func_of_object_type()) + + rows, err := conn.Query(ctx, query) + if err != nil { + return nil, err + } + + for rows.Next() { + var path string + err = rows.Scan(&path) + if err != nil { + return nil, err + } + files = append(files, path) + } + + return files, nil +} + func (p *PostgreSQLpgx) GetLatestDBBlockNumber(blockchain string) (uint64, error) { pool := p.GetPool() @@ -712,7 +755,7 @@ func (p *PostgreSQLpgx) ReadUpdates(blockchain string, fromBlock uint64, toBlock FROM abi_jobs WHERE - chain = $3 + chain = $3 and ((abi)::jsonb ->> 'type' = 'function' or (abi)::jsonb ->> 'type' = 'event') ), address_abis AS ( SELECT diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index e57b9b8..268f3ee 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -380,6 +380,9 @@ func (d *Synchronizer) syncCycle() error { RowIds: rowIds, }) } + + fmt.Println("Reading transactions for customer ", update.CustomerID) + fmt.Println("Transactions read map: ", transactionsReadMap) encodedTransactions, err := d.StorageInstance.ReadBatch(transactionsReadMap) if err != nil { From a4b112c4f12184fc7ec7a0b6e1308b50845b42e0 Mon Sep 17 00:00:00 2001 From: Andrey Date: Thu, 20 Jun 2024 17:27:31 +0300 Subject: [PATCH 2/2] Add output. --- synchronizer/synchronizer.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/synchronizer/synchronizer.go b/synchronizer/synchronizer.go index 268f3ee..196a33b 100644 --- a/synchronizer/synchronizer.go +++ b/synchronizer/synchronizer.go @@ -381,8 +381,6 @@ func (d *Synchronizer) syncCycle() error { }) } - fmt.Println("Reading transactions for customer ", update.CustomerID) - fmt.Println("Transactions read map: ", transactionsReadMap) encodedTransactions, err := d.StorageInstance.ReadBatch(transactionsReadMap) if err != nil {