From 6b4d132f3ba04ff0531d3b6e5aaae2ec16cd82bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Nowosielski?= Date: Mon, 23 Dec 2024 11:03:19 +0100 Subject: [PATCH] Add `starknet_subscribeTransactionStatus` Co-authored-by: IronGauntlets --- blockchain/blockchain.go | 22 +- blockchain/blockchain_test.go | 17 ++ .../transaction/0x1001.json | 11 + .../transaction/0x1010.json | 12 + .../transaction/0x1111.json | 13 + docs/docs/websocket.md | 55 +++- mocks/mock_blockchain.go | 14 + mocks/mock_plugin.go | 4 +- mocks/mock_subscriber.go | 33 +- rpc/events.go | 8 +- rpc/handlers.go | 10 + rpc/subscriptions.go | 161 +++++++++- rpc/subscriptions_test.go | 286 ++++++++++++++++-- rpc/transaction.go | 19 +- sync/sync.go | 12 +- utils/{check.go => nil.go} | 0 utils/{check_test.go => nil_test.go} | 0 17 files changed, 601 insertions(+), 76 deletions(-) create mode 100644 clients/feeder/testdata/sepolia-integration/transaction/0x1001.json create mode 100644 clients/feeder/testdata/sepolia-integration/transaction/0x1010.json create mode 100644 clients/feeder/testdata/sepolia-integration/transaction/0x1111.json rename utils/{check.go => nil.go} (100%) rename utils/{check_test.go => nil_test.go} (100%) diff --git a/blockchain/blockchain.go b/blockchain/blockchain.go index 108c63c423..4a4619385f 100644 --- a/blockchain/blockchain.go +++ b/blockchain/blockchain.go @@ -11,16 +11,22 @@ import ( "github.com/NethermindEth/juno/core/felt" "github.com/NethermindEth/juno/db" "github.com/NethermindEth/juno/encoder" + "github.com/NethermindEth/juno/feed" "github.com/NethermindEth/juno/utils" "github.com/ethereum/go-ethereum/common" ) +type L1HeadSubscription struct { + *feed.Subscription[*core.L1Head] +} + //go:generate mockgen -destination=../mocks/mock_blockchain.go -package=mocks github.com/NethermindEth/juno/blockchain Reader type Reader interface { Height() (height uint64, err error) Head() (head *core.Block, err error) L1Head() (*core.L1Head, error) + SubscribeL1Head() L1HeadSubscription BlockByNumber(number uint64) (block *core.Block, err error) BlockByHash(hash *felt.Felt) (block *core.Block, err error) @@ -81,6 +87,7 @@ type Blockchain struct { network *utils.Network database db.DB listener EventListener + l1HeadFeed *feed.Feed[*core.L1Head] pendingBlockFn func() *core.Block } @@ -90,6 +97,7 @@ func New(database db.DB, network *utils.Network, pendingBlockFn func() *core.Blo database: database, network: network, listener: &SelectiveListener{}, + l1HeadFeed: feed.New[*core.L1Head](), pendingBlockFn: pendingBlockFn, } } @@ -279,6 +287,10 @@ func (b *Blockchain) Receipt(hash *felt.Felt) (*core.TransactionReceipt, *felt.F }) } +func (b *Blockchain) SubscribeL1Head() L1HeadSubscription { + return L1HeadSubscription{b.l1HeadFeed.Subscribe()} +} + func (b *Blockchain) L1Head() (*core.L1Head, error) { b.listener.OnRead("L1Head") var update *core.L1Head @@ -305,9 +317,15 @@ func (b *Blockchain) SetL1Head(update *core.L1Head) error { if err != nil { return err } - return b.database.Update(func(txn db.Transaction) error { + + if err := b.database.Update(func(txn db.Transaction) error { return txn.Set(db.L1Height.Key(), updateBytes) - }) + }); err != nil { + return err + } + + b.l1HeadFeed.Send(update) + return nil } // Store takes a block and state update and performs sanity checks before putting in the database. diff --git a/blockchain/blockchain_test.go b/blockchain/blockchain_test.go index 1e1e07ada9..7b046b0a53 100644 --- a/blockchain/blockchain_test.go +++ b/blockchain/blockchain_test.go @@ -690,3 +690,20 @@ func TestL1Update(t *testing.T) { }) } } + +func TestSubscribeL1Head(t *testing.T) { + l1Head := &core.L1Head{ + BlockNumber: 1, + StateRoot: new(felt.Felt).SetUint64(2), + } + + chain := blockchain.New(pebble.NewMemTest(t), &utils.Mainnet, nil) + sub := chain.SubscribeL1Head() + t.Cleanup(sub.Unsubscribe) + + require.NoError(t, chain.SetL1Head(l1Head)) + + got, ok := <-sub.Recv() + require.True(t, ok) + assert.Equal(t, l1Head, got) +} diff --git a/clients/feeder/testdata/sepolia-integration/transaction/0x1001.json b/clients/feeder/testdata/sepolia-integration/transaction/0x1001.json new file mode 100644 index 0000000000..f83607937f --- /dev/null +++ b/clients/feeder/testdata/sepolia-integration/transaction/0x1001.json @@ -0,0 +1,11 @@ +{ + "finality_status": "RECEIVED", + "status": "this is deprecated", + "block_hash": "0x01010101", + "block_number": 304740, + "transaction_index": 1, + "transaction_hash": "0x1001", + "l2_to_l1_messages": [], + "events": [], + "actual_fee": "0x247aff6e224" +} \ No newline at end of file diff --git a/clients/feeder/testdata/sepolia-integration/transaction/0x1010.json b/clients/feeder/testdata/sepolia-integration/transaction/0x1010.json new file mode 100644 index 0000000000..45f9d94cb0 --- /dev/null +++ b/clients/feeder/testdata/sepolia-integration/transaction/0x1010.json @@ -0,0 +1,12 @@ +{ + "execution_status": "SUCCEEDED", + "finality_status": "ACCEPTED_ON_L1", + "status": "this is deprecated", + "block_hash": "0x01010101", + "block_number": 304740, + "transaction_index": 1, + "transaction_hash": "0x1010", + "l2_to_l1_messages": [], + "events": [], + "actual_fee": "0x247aff6e224" +} \ No newline at end of file diff --git a/clients/feeder/testdata/sepolia-integration/transaction/0x1111.json b/clients/feeder/testdata/sepolia-integration/transaction/0x1111.json new file mode 100644 index 0000000000..86bd286ab0 --- /dev/null +++ b/clients/feeder/testdata/sepolia-integration/transaction/0x1111.json @@ -0,0 +1,13 @@ +{ + "revert_error": "some error", + "execution_status": "REJECTED", + "finality_status": "RECEIVED", + "status": "this is deprecated", + "block_hash": "0x01010101", + "block_number": 304740, + "transaction_index": 1, + "transaction_hash": "0x1111", + "l2_to_l1_messages": [], + "events": [], + "actual_fee": "0x247aff6e224" +} \ No newline at end of file diff --git a/docs/docs/websocket.md b/docs/docs/websocket.md index f31e369811..2d75d38e87 100644 --- a/docs/docs/websocket.md +++ b/docs/docs/websocket.md @@ -153,7 +153,60 @@ When a new block is added, you will receive a message like this: } ``` -## Unsubscribe from newly created blocks +## Subscribe to transaction status changes + +The WebSocket server provides a `starknet_subscribeTransactionStatus` method that emits an event when a transaction status changes: + + + + +```json +{ + "jsonrpc": "2.0", + "method": "starknet_subscribeTransactionStatus", + "params": [ + { + "transaction_hash": "0x631333277e88053336d8c302630b4420dc3ff24018a1c464da37d5e36ea19df" + } + ], + "id": 1 +} +``` + + + + +```json +{ + "jsonrpc": "2.0", + "result": 16570962336122680234, + "id": 1 +} +``` + + + + +When a transaction get a new status, you will receive a message like this: + +```json +{ + "jsonrpc": "2.0", + "method": "starknet_subscriptionTransactionsStatus", + "params": { + "result": { + "transaction_hash": "0x631333277e88053336d8c302630b4420dc3ff24018a1c464da37d5e36ea19df", + "status": { + "finality_status": "ACCEPTED_ON_L2", + "execution_status": "SUCCEEDED" + } + }, + "subscription_id": 16570962336122680234 + } +} +``` + +## Unsubscribe from previous subscription Use the `starknet_unsubscribe` method with the `result` value from the subscription response or the `subscription` field from any new block event to stop receiving updates for new blocks: diff --git a/mocks/mock_blockchain.go b/mocks/mock_blockchain.go index 05ec6b7f9c..a7de660233 100644 --- a/mocks/mock_blockchain.go +++ b/mocks/mock_blockchain.go @@ -317,6 +317,20 @@ func (mr *MockReaderMockRecorder) StateUpdateByNumber(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateUpdateByNumber", reflect.TypeOf((*MockReader)(nil).StateUpdateByNumber), arg0) } +// SubscribeL1Head mocks base method. +func (m *MockReader) SubscribeL1Head() blockchain.L1HeadSubscription { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SubscribeL1Head") + ret0, _ := ret[0].(blockchain.L1HeadSubscription) + return ret0 +} + +// SubscribeL1Head indicates an expected call of SubscribeL1Head. +func (mr *MockReaderMockRecorder) SubscribeL1Head() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeL1Head", reflect.TypeOf((*MockReader)(nil).SubscribeL1Head)) +} + // TransactionByBlockNumberAndIndex mocks base method. func (m *MockReader) TransactionByBlockNumberAndIndex(arg0, arg1 uint64) (core.Transaction, error) { m.ctrl.T.Helper() diff --git a/mocks/mock_plugin.go b/mocks/mock_plugin.go index 7c1d4a4391..678b745220 100644 --- a/mocks/mock_plugin.go +++ b/mocks/mock_plugin.go @@ -14,7 +14,7 @@ import ( core "github.com/NethermindEth/juno/core" felt "github.com/NethermindEth/juno/core/felt" - junoplugin "github.com/NethermindEth/juno/plugin" + plugin "github.com/NethermindEth/juno/plugin" gomock "go.uber.org/mock/gomock" ) @@ -70,7 +70,7 @@ func (mr *MockJunoPluginMockRecorder) NewBlock(arg0, arg1, arg2 any) *gomock.Cal } // RevertBlock mocks base method. -func (m *MockJunoPlugin) RevertBlock(arg0, arg1 *junoplugin.BlockAndStateUpdate, arg2 *core.StateDiff) error { +func (m *MockJunoPlugin) RevertBlock(arg0, arg1 *plugin.BlockAndStateUpdate, arg2 *core.StateDiff) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RevertBlock", arg0, arg1, arg2) ret0, _ := ret[0].(error) diff --git a/mocks/mock_subscriber.go b/mocks/mock_subscriber.go index 103c988fa2..6babde7d1f 100644 --- a/mocks/mock_subscriber.go +++ b/mocks/mock_subscriber.go @@ -25,7 +25,6 @@ import ( type MockSubscriber struct { ctrl *gomock.Controller recorder *MockSubscriberMockRecorder - isgomock struct{} } // MockSubscriberMockRecorder is the mock recorder for MockSubscriber. @@ -46,18 +45,18 @@ func (m *MockSubscriber) EXPECT() *MockSubscriberMockRecorder { } // ChainID mocks base method. -func (m *MockSubscriber) ChainID(ctx context.Context) (*big.Int, error) { +func (m *MockSubscriber) ChainID(arg0 context.Context) (*big.Int, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ChainID", ctx) + ret := m.ctrl.Call(m, "ChainID", arg0) ret0, _ := ret[0].(*big.Int) ret1, _ := ret[1].(error) return ret0, ret1 } // ChainID indicates an expected call of ChainID. -func (mr *MockSubscriberMockRecorder) ChainID(ctx any) *gomock.Call { +func (mr *MockSubscriberMockRecorder) ChainID(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainID", reflect.TypeOf((*MockSubscriber)(nil).ChainID), ctx) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ChainID", reflect.TypeOf((*MockSubscriber)(nil).ChainID), arg0) } // Close mocks base method. @@ -73,46 +72,46 @@ func (mr *MockSubscriberMockRecorder) Close() *gomock.Call { } // FinalisedHeight mocks base method. -func (m *MockSubscriber) FinalisedHeight(ctx context.Context) (uint64, error) { +func (m *MockSubscriber) FinalisedHeight(arg0 context.Context) (uint64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FinalisedHeight", ctx) + ret := m.ctrl.Call(m, "FinalisedHeight", arg0) ret0, _ := ret[0].(uint64) ret1, _ := ret[1].(error) return ret0, ret1 } // FinalisedHeight indicates an expected call of FinalisedHeight. -func (mr *MockSubscriberMockRecorder) FinalisedHeight(ctx any) *gomock.Call { +func (mr *MockSubscriberMockRecorder) FinalisedHeight(arg0 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FinalisedHeight", reflect.TypeOf((*MockSubscriber)(nil).FinalisedHeight), ctx) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FinalisedHeight", reflect.TypeOf((*MockSubscriber)(nil).FinalisedHeight), arg0) } // TransactionReceipt mocks base method. -func (m *MockSubscriber) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) { +func (m *MockSubscriber) TransactionReceipt(arg0 context.Context, arg1 common.Hash) (*types.Receipt, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "TransactionReceipt", ctx, txHash) + ret := m.ctrl.Call(m, "TransactionReceipt", arg0, arg1) ret0, _ := ret[0].(*types.Receipt) ret1, _ := ret[1].(error) return ret0, ret1 } // TransactionReceipt indicates an expected call of TransactionReceipt. -func (mr *MockSubscriberMockRecorder) TransactionReceipt(ctx, txHash any) *gomock.Call { +func (mr *MockSubscriberMockRecorder) TransactionReceipt(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TransactionReceipt", reflect.TypeOf((*MockSubscriber)(nil).TransactionReceipt), ctx, txHash) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TransactionReceipt", reflect.TypeOf((*MockSubscriber)(nil).TransactionReceipt), arg0, arg1) } // WatchLogStateUpdate mocks base method. -func (m *MockSubscriber) WatchLogStateUpdate(ctx context.Context, sink chan<- *contract.StarknetLogStateUpdate) (event.Subscription, error) { +func (m *MockSubscriber) WatchLogStateUpdate(arg0 context.Context, arg1 chan<- *contract.StarknetLogStateUpdate) (event.Subscription, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WatchLogStateUpdate", ctx, sink) + ret := m.ctrl.Call(m, "WatchLogStateUpdate", arg0, arg1) ret0, _ := ret[0].(event.Subscription) ret1, _ := ret[1].(error) return ret0, ret1 } // WatchLogStateUpdate indicates an expected call of WatchLogStateUpdate. -func (mr *MockSubscriberMockRecorder) WatchLogStateUpdate(ctx, sink any) *gomock.Call { +func (mr *MockSubscriberMockRecorder) WatchLogStateUpdate(arg0, arg1 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WatchLogStateUpdate", reflect.TypeOf((*MockSubscriber)(nil).WatchLogStateUpdate), ctx, sink) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WatchLogStateUpdate", reflect.TypeOf((*MockSubscriber)(nil).WatchLogStateUpdate), arg0, arg1) } diff --git a/rpc/events.go b/rpc/events.go index 0f403c546f..bbc6453936 100644 --- a/rpc/events.go +++ b/rpc/events.go @@ -11,6 +11,10 @@ type EventsArg struct { ResultPageRequest } +type SubscriptionID struct { + ID uint64 `json:"subscription_id"` +} + type EventFilter struct { FromBlock *BlockID `json:"from_block"` ToBlock *BlockID `json:"to_block"` @@ -41,10 +45,6 @@ type EventsChunk struct { ContinuationToken string `json:"continuation_token,omitempty"` } -type SubscriptionID struct { - ID uint64 `json:"subscription_id"` -} - /**************************************************** Events Handlers *****************************************************/ diff --git a/rpc/handlers.go b/rpc/handlers.go index 83795b666f..310776599d 100644 --- a/rpc/handlers.go +++ b/rpc/handlers.go @@ -96,6 +96,7 @@ type Handler struct { newHeads *feed.Feed[*core.Header] reorgs *feed.Feed[*sync.ReorgBlockRange] pendingTxs *feed.Feed[[]core.Transaction] + l1Heads *feed.Feed[*core.L1Head] idgen func() uint64 mu stdsync.Mutex // protects subscriptions. @@ -138,6 +139,7 @@ func New(bcReader blockchain.Reader, syncReader sync.Reader, virtualMachine vm.V newHeads: feed.New[*core.Header](), reorgs: feed.New[*sync.ReorgBlockRange](), pendingTxs: feed.New[[]core.Transaction](), + l1Heads: feed.New[*core.L1Head](), subscriptions: make(map[uint64]*subscription), blockTraceCache: lru.NewCache[traceCacheKey, []TracedBlockTransaction](traceCacheSize), @@ -181,12 +183,15 @@ func (h *Handler) Run(ctx context.Context) error { newHeadsSub := h.syncReader.SubscribeNewHeads().Subscription reorgsSub := h.syncReader.SubscribeReorg().Subscription pendingTxsSub := h.syncReader.SubscribePendingTxs().Subscription + l1HeadsSub := h.bcReader.SubscribeL1Head().Subscription defer newHeadsSub.Unsubscribe() defer reorgsSub.Unsubscribe() defer pendingTxsSub.Unsubscribe() + defer l1HeadsSub.Unsubscribe() feed.Tee(newHeadsSub, h.newHeads) feed.Tee(reorgsSub, h.reorgs) feed.Tee(pendingTxsSub, h.pendingTxs) + feed.Tee(l1HeadsSub, h.l1Heads) <-ctx.Done() for _, sub := range h.subscriptions { @@ -358,6 +363,11 @@ func (h *Handler) Methods() ([]jsonrpc.Method, string) { //nolint: funlen Params: []jsonrpc.Parameter{{Name: "block", Optional: true}}, Handler: h.SubscribeNewHeads, }, + { + Name: "starknet_subscribeTransactionStatus", + Params: []jsonrpc.Parameter{{Name: "transaction_hash"}, {Name: "block", Optional: true}}, + Handler: h.SubscribeTransactionStatus, + }, { Name: "starknet_subscribePendingTransactions", Params: []jsonrpc.Parameter{{Name: "transaction_details", Optional: true}, {Name: "sender_address", Optional: true}}, diff --git a/rpc/subscriptions.go b/rpc/subscriptions.go index 7a57981261..94eb0ae42d 100644 --- a/rpc/subscriptions.go +++ b/rpc/subscriptions.go @@ -38,6 +38,10 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys return nil, ErrTooManyKeysInFilter } + if blockID != nil && blockID.Pending { + return nil, ErrCallOnPending + } + requestedHeader, headHeader, rpcErr := h.resolveBlockRange(blockID) if rpcErr != nil { return nil, rpcErr @@ -90,6 +94,133 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys return &SubscriptionID{ID: id}, nil } +// SubscribeTransactionStatus subscribes to status changes of a transaction. It checks for updates each time a new block is added. +// Later updates are sent only when the transaction status changes. +// The optional block_id parameter is ignored, as status changes are not stored and historical data cannot be sent. +// +//nolint:gocyclo +func (h *Handler) SubscribeTransactionStatus(ctx context.Context, txHash felt.Felt, blockID *BlockID) (*SubscriptionID, + *jsonrpc.Error, +) { + w, ok := jsonrpc.ConnFromContext(ctx) + if !ok { + return nil, jsonrpc.Err(jsonrpc.MethodNotFound, nil) + } + + // resolveBlockRange is only used to make sure that the requested block id is not older than 1024 block and check + // if the requested block is found. The range is inconsequential since we assume the provided transaction hash + // of a transaction is included in the block range: latest/pending - 1024. + _, _, rpcErr := h.resolveBlockRange(blockID) + if rpcErr != nil { + return nil, rpcErr + } + + curStatus, rpcErr := h.TransactionStatus(context.Background(), txHash) + if rpcErr != nil { + return nil, rpcErr + } + + id := h.idgen() + subscriptionCtx, subscriptionCtxCancel := context.WithCancel(ctx) + sub := &subscription{ + cancel: subscriptionCtxCancel, + conn: w, + } + h.mu.Lock() + h.subscriptions[id] = sub + h.mu.Unlock() + + l2HeadSub := h.newHeads.Subscribe() + l1HeadSub := h.l1Heads.Subscribe() + reorgSub := h.reorgs.Subscribe() + + sub.wg.Go(func() { + defer func() { + h.unsubscribe(sub, id) + l2HeadSub.Unsubscribe() + l1HeadSub.Unsubscribe() + reorgSub.Unsubscribe() + }() + + var wg conc.WaitGroup + + err := h.sendTxnStatus(w, SubscriptionTransactionStatus{&txHash, *curStatus}, id) + if err != nil { + h.log.Errorw("Error while sending Txn status", "txHash", txHash, "err", err) + return + } + + // Check if the requested transaction is already final. + // A transaction is considered to be final if it has been rejected or accepted on l1 + if curStatus.Finality == TxnStatusRejected || curStatus.Finality == TxnStatusAcceptedOnL1 { + return + } + + // At this point, the transaction has not reached finality. + wg.Go(func() { + for { + select { + case <-subscriptionCtx.Done(): + return + case <-l2HeadSub.Recv(): + // A new block has been added to the DB, hence, check if transaction has reached l2 finality, + // if not, check feeder. + // We could use a separate timer to periodically check for the transaction status at feeder + // gateway, however, for the time being new l2 head update is sufficient. + if curStatus.Finality < TxnStatusAcceptedOnL2 { + prevStatus := curStatus + curStatus, rpcErr = h.TransactionStatus(subscriptionCtx, txHash) + + if rpcErr != nil { + h.log.Errorw("Error while getting Txn status", "txHash", txHash, "err", rpcErr) + return + } + + if curStatus.Finality > prevStatus.Finality { + err := h.sendTxnStatus(w, SubscriptionTransactionStatus{&txHash, *curStatus}, id) + if err != nil { + h.log.Errorw("Error while sending Txn status", "txHash", txHash, "err", err) + return + } + if curStatus.Finality == TxnStatusRejected || curStatus.Finality == TxnStatusAcceptedOnL1 { + return + } + } + } + case <-l1HeadSub.Recv(): + receipt, rpcErr := h.TransactionReceiptByHash(txHash) + if rpcErr != nil { + h.log.Errorw("Error while getting Receipt", "txHash", txHash, "err", rpcErr) + return + } + + if receipt.FinalityStatus == TxnAcceptedOnL1 { + s := &TransactionStatus{ + Finality: TxnStatus(receipt.FinalityStatus), + Execution: receipt.ExecutionStatus, + FailureReason: receipt.RevertReason, + } + + err := h.sendTxnStatus(w, SubscriptionTransactionStatus{&txHash, *s}, id) + if err != nil { + h.log.Errorw("Error while sending Txn status", "txHash", txHash, "err", err) + } + return + } + } + } + }) + + wg.Go(func() { + h.processReorgs(subscriptionCtx, reorgSub, w, id) + }) + + wg.Wait() + }) + + return &SubscriptionID{ID: id}, nil +} + func (h *Handler) processEvents(ctx context.Context, w jsonrpc.Conn, id, from, to uint64, fromAddr *felt.Felt, keys [][]felt.Felt) { filter, err := h.bcReader.EventFilter(fromAddr, keys) if err != nil { @@ -176,6 +307,10 @@ func (h *Handler) SubscribeNewHeads(ctx context.Context, blockID *BlockID) (*Sub return nil, jsonrpc.Err(jsonrpc.MethodNotFound, nil) } + if blockID != nil && blockID.Pending { + return nil, ErrCallOnPending + } + startHeader, latestHeader, rpcErr := h.resolveBlockRange(blockID) if rpcErr != nil { return nil, rpcErr @@ -364,10 +499,6 @@ func (h *Handler) resolveBlockRange(blockID *BlockID) (*core.Header, *core.Heade return latestHeader, latestHeader, nil } - if blockID.Pending { - return nil, nil, ErrCallOnPending - } - startHeader, rpcErr := h.blockHeaderByID(blockID) if rpcErr != nil { return nil, nil, rpcErr @@ -501,3 +632,25 @@ func (h *Handler) Unsubscribe(ctx context.Context, id uint64) (bool, *jsonrpc.Er sub.wg.Wait() // Let the subscription finish before responding. return true, nil } + +type SubscriptionTransactionStatus struct { + TransactionHash *felt.Felt `json:"transaction_hash"` + Status TransactionStatus `json:"status"` +} + +// sendTxnStatus creates a response and sends it to the client +func (h *Handler) sendTxnStatus(w jsonrpc.Conn, status SubscriptionTransactionStatus, id uint64) error { + resp, err := json.Marshal(SubscriptionResponse{ + Version: "2.0", + Method: "starknet_subscriptionTransactionsStatus", + Params: map[string]any{ + "subscription_id": id, + "result": status, + }, + }) + if err != nil { + return err + } + _, err = w.Write(resp) + return err +} diff --git a/rpc/subscriptions_test.go b/rpc/subscriptions_test.go index 4757ab73d8..9ba587ca42 100644 --- a/rpc/subscriptions_test.go +++ b/rpc/subscriptions_test.go @@ -14,6 +14,7 @@ import ( "github.com/NethermindEth/juno/clients/feeder" "github.com/NethermindEth/juno/core" "github.com/NethermindEth/juno/core/felt" + "github.com/NethermindEth/juno/db" "github.com/NethermindEth/juno/db/pebble" "github.com/NethermindEth/juno/feed" "github.com/NethermindEth/juno/jsonrpc" @@ -29,9 +30,6 @@ import ( var emptyCommitments = core.BlockCommitments{} -// Due to the difference in how some test files in rpc use "package rpc" vs "package rpc_test" it was easiest to copy -// the fakeConn here. -// Todo: move all the subscription related test here type fakeConn struct { w io.Writer } @@ -93,8 +91,6 @@ func TestSubscribeEvents(t *testing.T) { subCtx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) - mockChain.EXPECT().HeadsHeader().Return(&core.Header{Number: 1}, nil) - id, rpcErr := handler.SubscribeEvents(subCtx, fromAddr, keys, blockID) assert.Zero(t, id) assert.Equal(t, ErrCallOnPending, rpcErr) @@ -330,6 +326,226 @@ func TestSubscribeEvents(t *testing.T) { }) } +func TestSubscribeTxnStatus(t *testing.T) { + log := utils.NewNopZapLogger() + txHash := new(felt.Felt).SetUint64(1) + + t.Run("Return error when transaction not found", func(t *testing.T) { + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + mockChain := mocks.NewMockReader(mockCtrl) + mockSyncer := mocks.NewMockSyncReader(mockCtrl) + handler := New(mockChain, mockSyncer, nil, "", log) + + mockChain.EXPECT().HeadsHeader().Return(&core.Header{Number: 1}, nil) + mockChain.EXPECT().TransactionByHash(txHash).Return(nil, db.ErrKeyNotFound) + mockSyncer.EXPECT().PendingBlock().Return(nil) + + serverConn, _ := net.Pipe() + t.Cleanup(func() { + require.NoError(t, serverConn.Close()) + }) + + subCtx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) + + id, rpcErr := handler.SubscribeTransactionStatus(subCtx, *txHash, nil) + assert.Nil(t, id) + assert.Equal(t, ErrTxnHashNotFound, rpcErr) + }) + + t.Run("Return error if block is too far back", func(t *testing.T) { + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + mockChain := mocks.NewMockReader(mockCtrl) + mockSyncer := mocks.NewMockSyncReader(mockCtrl) + handler := New(mockChain, mockSyncer, nil, "", log) + + blockID := &BlockID{Number: 0} + + serverConn, _ := net.Pipe() + t.Cleanup(func() { + require.NoError(t, serverConn.Close()) + }) + + subCtx := context.WithValue(context.Background(), jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) + + // Note the end of the window doesn't need to be tested because if requested block number is more than the + // head, a block not found error will be returned. This behaviour has been tested in various other tests, and we + // don't need to test it here again. + t.Run("head is 1024", func(t *testing.T) { + mockChain.EXPECT().HeadsHeader().Return(&core.Header{Number: 1024}, nil) + mockChain.EXPECT().BlockHeaderByNumber(blockID.Number).Return(&core.Header{Number: 0}, nil) + + id, rpcErr := handler.SubscribeTransactionStatus(subCtx, *txHash, blockID) + assert.Zero(t, id) + assert.Equal(t, ErrTooManyBlocksBack, rpcErr) + }) + + t.Run("head is more than 1024", func(t *testing.T) { + mockChain.EXPECT().HeadsHeader().Return(&core.Header{Number: 2024}, nil) + mockChain.EXPECT().BlockHeaderByNumber(blockID.Number).Return(&core.Header{Number: 0}, nil) + + id, rpcErr := handler.SubscribeTransactionStatus(subCtx, *txHash, blockID) + assert.Zero(t, id) + assert.Equal(t, ErrTooManyBlocksBack, rpcErr) + }) + }) + + t.Run("Transaction status is final", func(t *testing.T) { + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + mockChain := mocks.NewMockReader(mockCtrl) + mockSyncer := mocks.NewMockSyncReader(mockCtrl) + handler := New(mockChain, mockSyncer, nil, "", log) + handler.WithFeeder(feeder.NewTestClient(t, &utils.SepoliaIntegration)) + + t.Run("rejected", func(t *testing.T) { //nolint:dupl + serverConn, clientConn := net.Pipe() + t.Cleanup(func() { + require.NoError(t, serverConn.Close()) + require.NoError(t, clientConn.Close()) + }) + + respStr := `{"jsonrpc":"2.0","method":"starknet_subscriptionTransactionsStatus","params":{"result":{"transaction_hash":"%v","status":{"finality_status":"%s","failure_reason":"some error"}},"subscription_id":%v}}` + txHash, err := new(felt.Felt).SetString("0x1111") + require.NoError(t, err) + + mockChain.EXPECT().HeadsHeader().Return(&core.Header{Number: 1}, nil) + mockChain.EXPECT().TransactionByHash(txHash).Return(nil, db.ErrKeyNotFound) + mockSyncer.EXPECT().PendingBlock().Return(nil) + + ctx, cancel := context.WithCancel(context.Background()) + subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) + id, rpcErr := handler.SubscribeTransactionStatus(subCtx, *txHash, nil) + require.Nil(t, rpcErr) + + b, err := TxnStatusRejected.MarshalText() + require.NoError(t, err) + + resp := fmt.Sprintf(respStr, txHash, b, id.ID) + got := make([]byte, len(resp)) + _, err = clientConn.Read(got) + require.NoError(t, err) + assert.Equal(t, resp, string(got)) + cancel() + }) + + t.Run("accepted on L1", func(t *testing.T) { //nolint:dupl + serverConn, clientConn := net.Pipe() + t.Cleanup(func() { + require.NoError(t, serverConn.Close()) + require.NoError(t, clientConn.Close()) + }) + + respStr := `{"jsonrpc":"2.0","method":"starknet_subscriptionTransactionsStatus","params":{"result":{"transaction_hash":"%v","status":{"finality_status":"%s","execution_status":"SUCCEEDED"}},"subscription_id":%v}}` + txHash, err := new(felt.Felt).SetString("0x1010") + require.NoError(t, err) + + mockChain.EXPECT().HeadsHeader().Return(&core.Header{Number: 1}, nil) + mockChain.EXPECT().TransactionByHash(txHash).Return(nil, db.ErrKeyNotFound) + mockSyncer.EXPECT().PendingBlock().Return(nil) + + ctx, cancel := context.WithCancel(context.Background()) + subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) + id, rpcErr := handler.SubscribeTransactionStatus(subCtx, *txHash, nil) + require.Nil(t, rpcErr) + + b, err := TxnStatusAcceptedOnL1.MarshalText() + require.NoError(t, err) + + resp := fmt.Sprintf(respStr, txHash, b, id.ID) + got := make([]byte, len(resp)) + _, err = clientConn.Read(got) + require.NoError(t, err) + assert.Equal(t, resp, string(got)) + cancel() + }) + }) + + t.Run("Multiple transaction status update", func(t *testing.T) { + mockCtrl := gomock.NewController(t) + t.Cleanup(mockCtrl.Finish) + + client := feeder.NewTestClient(t, &utils.SepoliaIntegration) + gw := adaptfeeder.New(client) + mockChain := mocks.NewMockReader(mockCtrl) + mockSyncer := mocks.NewMockSyncReader(mockCtrl) + handler := New(mockChain, mockSyncer, nil, "", log) + handler.WithFeeder(client) + l2Feed := feed.New[*core.Header]() + l1Feed := feed.New[*core.L1Head]() + handler.newHeads = l2Feed + handler.l1Heads = l1Feed + + block, err := gw.BlockByNumber(context.Background(), 38748) + require.NoError(t, err) + + serverConn, clientConn := net.Pipe() + t.Cleanup(func() { + require.NoError(t, serverConn.Close()) + require.NoError(t, clientConn.Close()) + }) + + receivedRespStr := `{"jsonrpc":"2.0","method":"starknet_subscriptionTransactionsStatus","params":{"result":{"transaction_hash":"%v","status":{"finality_status":"%s"}},"subscription_id":%v}}` + txHash, err := new(felt.Felt).SetString("0x1001") + require.NoError(t, err) + + mockChain.EXPECT().HeadsHeader().Return(&core.Header{Number: block.Number}, nil) + mockChain.EXPECT().TransactionByHash(txHash).Return(nil, db.ErrKeyNotFound) + mockSyncer.EXPECT().PendingBlock().Return(nil) + + ctx, cancel := context.WithCancel(context.Background()) + subCtx := context.WithValue(ctx, jsonrpc.ConnKey{}, &fakeConn{w: serverConn}) + id, rpcErr := handler.SubscribeTransactionStatus(subCtx, *txHash, nil) + require.Nil(t, rpcErr) + + b, err := TxnStatusReceived.MarshalText() + require.NoError(t, err) + + resp := fmt.Sprintf(receivedRespStr, txHash, b, id.ID) + got := make([]byte, len(resp)) + _, err = clientConn.Read(got) + require.NoError(t, err) + assert.Equal(t, resp, string(got)) + + mockChain.EXPECT().TransactionByHash(txHash).Return(block.Transactions[0], nil) + mockChain.EXPECT().Receipt(txHash).Return(block.Receipts[0], block.Hash, block.Number, nil) + mockChain.EXPECT().L1Head().Return(nil, db.ErrKeyNotFound) + + l2Feed.Send(&core.Header{Number: block.Number + 1}) + + b, err = TxnStatusAcceptedOnL2.MarshalText() + require.NoError(t, err) + + l1AndL2RespStr := `{"jsonrpc":"2.0","method":"starknet_subscriptionTransactionsStatus","params":{"result":{"transaction_hash":"%v","status":{"finality_status":"%s","execution_status":"SUCCEEDED"}},"subscription_id":%v}}` + resp = fmt.Sprintf(l1AndL2RespStr, txHash, b, id.ID) + got = make([]byte, len(resp)) + _, err = clientConn.Read(got) + require.NoError(t, err) + assert.Equal(t, resp, string(got)) + + l1Head := &core.L1Head{BlockNumber: block.Number} + mockChain.EXPECT().TransactionByHash(txHash).Return(block.Transactions[0], nil) + mockChain.EXPECT().Receipt(txHash).Return(block.Receipts[0], block.Hash, block.Number, nil) + mockChain.EXPECT().L1Head().Return(l1Head, nil) + + l1Feed.Send(l1Head) + + b, err = TxnStatusAcceptedOnL1.MarshalText() + require.NoError(t, err) + + resp = fmt.Sprintf(l1AndL2RespStr, txHash, b, id.ID) + got = make([]byte, len(resp)) + _, err = clientConn.Read(got) + require.NoError(t, err) + assert.Equal(t, resp, string(got)) + cancel() + }) +} + type fakeSyncer struct { newHeads *feed.Feed[*core.Header] reorgs *feed.Feed[*sync.ReorgBlockRange] @@ -379,8 +595,6 @@ func TestSubscribeNewHeads(t *testing.T) { mockSyncer := mocks.NewMockSyncReader(mockCtrl) handler := New(mockChain, mockSyncer, nil, "", log) - mockChain.EXPECT().HeadsHeader().Return(&core.Header{}, nil) - serverConn, _ := net.Pipe() t.Cleanup(func() { require.NoError(t, serverConn.Close()) @@ -438,10 +652,12 @@ func TestSubscribeNewHeads(t *testing.T) { mockChain := mocks.NewMockReader(mockCtrl) syncer := newFakeSyncer() - handler, server := setupRPC(t, ctx, mockChain, syncer) + l1Feed := feed.New[*core.L1Head]() mockChain.EXPECT().HeadsHeader().Return(&core.Header{}, nil) + mockChain.EXPECT().SubscribeL1Head().Return(blockchain.L1HeadSubscription{Subscription: l1Feed.Subscribe()}) + handler, server := setupRPC(t, ctx, mockChain, syncer) conn := createWsConn(t, ctx, server) id := uint64(1) @@ -520,6 +736,10 @@ func TestMultipleSubscribeNewHeadsAndUnsubscribe(t *testing.T) { mockChain := mocks.NewMockReader(mockCtrl) syncer := newFakeSyncer() + + l1Feed := feed.New[*core.L1Head]() + mockChain.EXPECT().SubscribeL1Head().Return(blockchain.L1HeadSubscription{Subscription: l1Feed.Subscribe()}) + handler, server := setupRPC(t, ctx, mockChain, syncer) mockChain.EXPECT().HeadsHeader().Return(&core.Header{}, nil).Times(2) @@ -584,6 +804,9 @@ func TestSubscriptionReorg(t *testing.T) { t.Cleanup(mockCtrl.Finish) mockChain := mocks.NewMockReader(mockCtrl) + l1Feed := feed.New[*core.L1Head]() + mockChain.EXPECT().SubscribeL1Head().Return(blockchain.L1HeadSubscription{Subscription: l1Feed.Subscribe()}) + syncer := newFakeSyncer() handler, server := setupRPC(t, ctx, mockChain, syncer) @@ -648,6 +871,9 @@ func TestSubscribePendingTxs(t *testing.T) { t.Cleanup(mockCtrl.Finish) mockChain := mocks.NewMockReader(mockCtrl) + l1Feed := feed.New[*core.L1Head]() + mockChain.EXPECT().SubscribeL1Head().Return(blockchain.L1HeadSubscription{Subscription: l1Feed.Subscribe()}) + syncer := newFakeSyncer() handler, server := setupRPC(t, ctx, mockChain, syncer) @@ -799,28 +1025,6 @@ func subMsg(method string) string { return fmt.Sprintf(`{"jsonrpc":"2.0","id":1,"method":%q}`, method) } -func testHeader(t *testing.T) *core.Header { - t.Helper() - - header := &core.Header{ - Hash: utils.HexToFelt(t, "0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6"), - ParentHash: utils.HexToFelt(t, "0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb"), - Number: 2, - GlobalStateRoot: utils.HexToFelt(t, "0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9"), - Timestamp: 1637084470, - SequencerAddress: utils.HexToFelt(t, "0x0"), - L1DataGasPrice: &core.GasPrice{ - PriceInFri: utils.HexToFelt(t, "0x0"), - PriceInWei: utils.HexToFelt(t, "0x0"), - }, - GasPrice: utils.HexToFelt(t, "0x0"), - GasPriceSTRK: utils.HexToFelt(t, "0x0"), - L1DAMode: core.Calldata, - ProtocolVersion: "", - } - return header -} - func newHeadsResponse(id uint64) string { return fmt.Sprintf(`{"jsonrpc":"2.0","method":"starknet_subscriptionNewHeads","params":{"result":{"block_hash":"0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6","parent_hash":"0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb","block_number":2,"new_root":"0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9","timestamp":1637084470,"sequencer_address":"0x0","l1_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_data_gas_price":{"price_in_fri":"0x0","price_in_wei":"0x0"},"l1_da_mode":"CALLDATA","starknet_version":""},"subscription_id":%d}}`, id) } @@ -866,3 +1070,25 @@ func marshalSubEventsResp(e *EmittedEvent, id uint64) ([]byte, error) { }, }) } + +func testHeader(t *testing.T) *core.Header { + t.Helper() + + header := &core.Header{ + Hash: utils.HexToFelt(t, "0x4e1f77f39545afe866ac151ac908bd1a347a2a8a7d58bef1276db4f06fdf2f6"), + ParentHash: utils.HexToFelt(t, "0x2a70fb03fe363a2d6be843343a1d81ce6abeda1e9bd5cc6ad8fa9f45e30fdeb"), + Number: 2, + GlobalStateRoot: utils.HexToFelt(t, "0x3ceee867d50b5926bb88c0ec7e0b9c20ae6b537e74aac44b8fcf6bb6da138d9"), + Timestamp: 1637084470, + SequencerAddress: utils.HexToFelt(t, "0x0"), + L1DataGasPrice: &core.GasPrice{ + PriceInFri: utils.HexToFelt(t, "0x0"), + PriceInWei: utils.HexToFelt(t, "0x0"), + }, + GasPrice: utils.HexToFelt(t, "0x0"), + GasPriceSTRK: utils.HexToFelt(t, "0x0"), + L1DAMode: core.Calldata, + ProtocolVersion: "", + } + return header +} diff --git a/rpc/transaction.go b/rpc/transaction.go index 0610671299..561f10a005 100644 --- a/rpc/transaction.go +++ b/rpc/transaction.go @@ -72,10 +72,10 @@ func (t *TransactionType) UnmarshalJSON(data []byte) error { type TxnStatus uint8 const ( - TxnStatusAcceptedOnL1 TxnStatus = iota + 1 - TxnStatusAcceptedOnL2 - TxnStatusReceived + TxnStatusReceived TxnStatus = iota + 1 TxnStatusRejected + TxnStatusAcceptedOnL2 + TxnStatusAcceptedOnL1 ) func (s TxnStatus) MarshalText() ([]byte, error) { @@ -114,8 +114,8 @@ func (es TxnExecutionStatus) MarshalText() ([]byte, error) { type TxnFinalityStatus uint8 const ( - TxnAcceptedOnL1 TxnFinalityStatus = iota + 1 - TxnAcceptedOnL2 + TxnAcceptedOnL2 TxnFinalityStatus = iota + 3 + TxnAcceptedOnL1 ) func (fs TxnFinalityStatus) MarshalText() ([]byte, error) { @@ -510,6 +510,10 @@ func (h *Handler) TransactionReceiptByHash(hash felt.Felt) (*TransactionReceipt, break } } + + if txn == nil { + return nil, ErrTxnHashNotFound + } } var ( @@ -616,8 +620,9 @@ func (h *Handler) TransactionStatus(ctx context.Context, hash felt.Felt) (*Trans switch txErr { case nil: return &TransactionStatus{ - Finality: TxnStatus(receipt.FinalityStatus), - Execution: receipt.ExecutionStatus, + Finality: TxnStatus(receipt.FinalityStatus), + Execution: receipt.ExecutionStatus, + FailureReason: receipt.RevertReason, }, nil case ErrTxnHashNotFound: if h.feederClient == nil { diff --git a/sync/sync.go b/sync/sync.go index 499bd396ba..2b25ef03fe 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -568,21 +568,15 @@ func (s *Synchronizer) HighestBlockHeader() *core.Header { } func (s *Synchronizer) SubscribeNewHeads() HeaderSubscription { - return HeaderSubscription{ - Subscription: s.newHeads.Subscribe(), - } + return HeaderSubscription{s.newHeads.Subscribe()} } func (s *Synchronizer) SubscribeReorg() ReorgSubscription { - return ReorgSubscription{ - Subscription: s.reorgFeed.Subscribe(), - } + return ReorgSubscription{s.reorgFeed.Subscribe()} } func (s *Synchronizer) SubscribePendingTxs() PendingTxSubscription { - return PendingTxSubscription{ - Subscription: s.pendingTxsFeed.Subscribe(), - } + return PendingTxSubscription{s.pendingTxsFeed.Subscribe()} } // StorePending stores a pending block given that it is for the next height diff --git a/utils/check.go b/utils/nil.go similarity index 100% rename from utils/check.go rename to utils/nil.go diff --git a/utils/check_test.go b/utils/nil_test.go similarity index 100% rename from utils/check_test.go rename to utils/nil_test.go