Skip to content

Commit

Permalink
Add a ticker and timer for subscribeTransactionStatus (#2376)
Browse files Browse the repository at this point in the history
If the transaction is not found within 10 seconds of creating the
subscription, then return a transaction not found error.
  • Loading branch information
IronGauntlets authored Jan 15, 2025
1 parent 06717e2 commit 2a572d9
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 6 deletions.
43 changes: 40 additions & 3 deletions rpc/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rpc
import (
"context"
"encoding/json"
"time"

"github.com/NethermindEth/juno/blockchain"
"github.com/NethermindEth/juno/core"
Expand All @@ -15,6 +16,14 @@ import (

const subscribeEventsChunkSize = 1024

// The function signature of SubscribeTransactionStatus cannot be changed since the jsonrpc package maps the number
// of argument in the function to the parameters in the starknet spec, therefore, the following variables are not passed
// as arguments, and they can be modified in the test to make them run faster.
var (
subscribeTxStatusTimeout = 5 * time.Minute
subscribeTxStatusTickerDuration = 5 * time.Second
)

type SubscriptionResponse struct {
Version string `json:"jsonrpc"`
Method string `json:"method"`
Expand Down Expand Up @@ -98,7 +107,7 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys
// Later updates are sent only when the transaction status changes.
// The optional block_id parameter is ignored, as status changes are not stored and historical data cannot be sent.
//
//nolint:gocyclo
//nolint:gocyclo,funlen
func (h *Handler) SubscribeTransactionStatus(ctx context.Context, txHash felt.Felt, blockID *BlockID) (*SubscriptionID,
*jsonrpc.Error,
) {
Expand All @@ -115,9 +124,37 @@ func (h *Handler) SubscribeTransactionStatus(ctx context.Context, txHash felt.Fe
return nil, rpcErr
}

curStatus, rpcErr := h.TransactionStatus(context.Background(), txHash)
// If the error is transaction not found that means the transaction has not been submitted to the feeder gateway,
// therefore, we need to wait for a specified time and at regular interval check if the transaction has been found.
// If the transaction is found during the timout expiry, then we continue to keep track of its status otherwise the
// websocket connection is closed after the expiry.
curStatus, rpcErr := h.TransactionStatus(ctx, txHash)
if rpcErr != nil {
return nil, rpcErr
if rpcErr != ErrTxnHashNotFound {
return nil, rpcErr
}

timeout := time.NewTimer(subscribeTxStatusTimeout)
ticker := time.NewTicker(subscribeTxStatusTickerDuration)

txNotFoundLoop:
for {
select {
case <-timeout.C:
ticker.Stop()
return nil, rpcErr
case <-ticker.C:
curStatus, rpcErr = h.TransactionStatus(ctx, txHash)
if rpcErr != nil {
if rpcErr != ErrTxnHashNotFound {
return nil, rpcErr
}
continue
}
timeout.Stop()
break txNotFoundLoop
}
}
}

id := h.idgen()
Expand Down
9 changes: 6 additions & 3 deletions rpc/subscriptions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,17 +330,20 @@ func TestSubscribeTxnStatus(t *testing.T) {
log := utils.NewNopZapLogger()
txHash := new(felt.Felt).SetUint64(1)

t.Run("Return error when transaction not found", func(t *testing.T) {
t.Run("Return error when transaction not found after timeout expiry", func(t *testing.T) {
mockCtrl := gomock.NewController(t)
t.Cleanup(mockCtrl.Finish)

subscribeTxStatusTimeout = 50 * time.Millisecond
subscribeTxStatusTickerDuration = 10 * time.Millisecond

mockChain := mocks.NewMockReader(mockCtrl)
mockSyncer := mocks.NewMockSyncReader(mockCtrl)
handler := New(mockChain, mockSyncer, nil, "", log)

mockChain.EXPECT().HeadsHeader().Return(&core.Header{Number: 1}, nil)
mockChain.EXPECT().TransactionByHash(txHash).Return(nil, db.ErrKeyNotFound)
mockSyncer.EXPECT().PendingBlock().Return(nil)
mockChain.EXPECT().TransactionByHash(txHash).Return(nil, db.ErrKeyNotFound).AnyTimes()
mockSyncer.EXPECT().PendingBlock().Return(nil).AnyTimes()

serverConn, _ := net.Pipe()
t.Cleanup(func() {
Expand Down

0 comments on commit 2a572d9

Please sign in to comment.