diff --git a/committee/committee.go b/committee/committee.go index 2d82c36..e2bca2e 100644 --- a/committee/committee.go +++ b/committee/committee.go @@ -115,7 +115,8 @@ type ( currentHeight uint64 lastUpdateTimestamp int64 terminate chan bool - terminated bool + terminatedCarrier bool + terminatedArchive bool mutex sync.RWMutex gravityChainBatchSize uint64 ceilingHeight uint64 @@ -186,7 +187,8 @@ func NewCommittee(archive PollArchive, cfg Config) (Committee, error) { scoreThreshold: scoreThreshold, selfStakingThreshold: selfStakingThreshold, terminate: make(chan bool), - terminated: false, + terminatedCarrier: false, + terminatedArchive: false, startHeight: cfg.GravityChainStartHeight, ceilingHeight: cfg.GravityChainCeilingHeight, interval: cfg.GravityChainHeightInterval, @@ -201,6 +203,15 @@ func (ec *committee) Start(ctx context.Context) (err error) { if err = ec.archive.Start(ctx); err != nil { return err } + ceilingHeight := ec.ceilingHeight + if ceilingHeight >= ec.interval { + ceilingHeight -= ec.interval + } + if ec.latestHeightInArchive() >= ceilingHeight && ec.ceilingHeight != 0 { + zap.L().Info("stop syncing") + ec.terminateCarrier(ctx) + return nil + } tip, err := ec.carrier.Tip() if err != nil { return errors.Wrap(err, "failed to get tip height") @@ -234,10 +245,9 @@ func (ec *committee) Start(ctx context.Context) (err error) { case <-ec.terminate: return case tip := <-tipChan: - if ec.currentHeight >= ec.ceilingHeight { - if err := ec.Stop(context.Background()); err != nil { - zap.L().Error("failed to stop eth committee", zap.Error(err)) - } + if ec.currentHeight >= ec.ceilingHeight && ec.ceilingHeight != 0 { + zap.L().Info("stop syncing") + ec.terminateCarrier(ctx) return } @@ -253,18 +263,29 @@ func (ec *committee) Start(ctx context.Context) (err error) { return nil } +func (ec *committee) terminateCarrier(ctx context.Context) { + if !ec.terminatedCarrier { + close(ec.terminate) + ec.carrier.Close() + ec.terminatedCarrier = true + } +} + +func (ec *committee) terminateArchive(ctx context.Context) error { + if !ec.terminatedArchive { + ec.terminatedArchive = true + return ec.archive.Stop(ctx) + } + return nil +} + func (ec *committee) Stop(ctx context.Context) error { ec.mutex.Lock() defer ec.mutex.Unlock() - defer func() { ec.terminated = true }() - if ec.terminated { - return nil - } - close(ec.terminate) - ec.carrier.Close() + ec.terminateCarrier(ctx) + return ec.terminateArchive(ctx) - return ec.archive.Stop(ctx) } func (ec *committee) Status() STATUS {