diff --git a/store/store.go b/store/store.go index 6ea126ff..fa0f244e 100644 --- a/store/store.go +++ b/store/store.go @@ -408,9 +408,12 @@ func (s *Store[H]) flushLoop() { log.Errorw("writing header batch", "try", i+1, "from", from, "to", to, "err", err) s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), true) - const maxRetrySleep = time.Second - sleep := min(10*time.Duration(i+1)*time.Millisecond, maxRetrySleep) - time.Sleep(sleep) + const maxRetrySleep = 100 * time.Millisecond + sleepDur := min(10*time.Duration(i+1)*time.Millisecond, maxRetrySleep) + + if err := sleep(ctx, sleepDur); err != nil { + break + } } s.metrics.flush(ctx, time.Since(startTime), s.pending.Len(), false) @@ -511,3 +514,16 @@ func indexTo[H header.Header[H]](ctx context.Context, batch datastore.Batch, hea } return nil } + +// sleep with cancellation, returns nil when timer has fired, error otherwise. +func sleep(ctx context.Context, duration time.Duration) error { + timer := time.NewTimer(duration) + defer timer.Stop() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return nil + } +} diff --git a/store/store_test.go b/store/store_test.go index 53d40d55..b5173a99 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -2,6 +2,7 @@ package store import ( "context" + "sync/atomic" "testing" "time" @@ -89,6 +90,45 @@ func TestStore(t *testing.T) { assert.Len(t, out, 12) } +func TestStore_BadFlush(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + t.Cleanup(cancel) + + suite := headertest.NewTestSuite(t) + + var callCount atomic.Int32 + + rootDS := sync.MutexWrap(datastore.NewMapDatastore()) + ds := &badBatchDatastore{ + Datastore: rootDS, + BatchFn: func(ctx context.Context) (datastore.Batch, error) { + count := callCount.Add(1) + // do not fail on 1st call due to store.Init and stop failing after 5 call. + if count > 1 && count < 5 { + return nil, context.Canceled + } + return rootDS.Batch(ctx) + }, + } + store := NewTestStore(t, ctx, ds, suite.Head()) + + head, err := store.Head(ctx) + require.NoError(t, err) + assert.EqualValues(t, suite.Head().Hash(), head.Hash()) + + in := suite.GenDummyHeaders(10) + err = store.Append(ctx, in...) + require.NoError(t, err) + + assert.Eventually(t, func() bool { + // accessing store.ds directly because inside store we use wrappedStore, + // accessing ds or rootDS directly will result in constant error due to key prefix. + ok, err := store.ds.Has(ctx, headKey) + require.NoError(t, err) + return ok + }, 3*time.Second, 50*time.Millisecond) +} + // TestStore_GetRangeByHeight_ExpectedRange func TestStore_GetRangeByHeight_ExpectedRange(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) @@ -279,3 +319,18 @@ func TestStoreInit(t *testing.T) { err = store.Init(ctx, headers[len(headers)-1]) // init should work with any height, not only 1 require.NoError(t, err) } + +var _ datastore.Batching = &badBatchDatastore{} + +type badBatchDatastore struct { + datastore.Datastore + + BatchFn func(ctx context.Context) (datastore.Batch, error) +} + +func (s *badBatchDatastore) Batch(ctx context.Context) (datastore.Batch, error) { + if s.BatchFn != nil { + return s.BatchFn(ctx) + } + return nil, nil +}