Skip to content

Commit

Permalink
test(ntp): add tests for NTP Checker (#1358)
Browse files Browse the repository at this point in the history
  • Loading branch information
b00f authored Jun 22, 2024
1 parent be881df commit f3f451e
Show file tree
Hide file tree
Showing 10 changed files with 237 additions and 100 deletions.
12 changes: 8 additions & 4 deletions cmd/gtk/widget_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (wn *widgetNode) timeout10() bool {
if wn.model.node.ConsManager().HasActiveInstance() {
isInCommittee = "Yes"
}
offset, offsetErr := wn.model.node.Sync().GetClockOffset()
offset, offsetErr := wn.model.node.Sync().ClockOffset()

glib.IdleAdd(func() bool {
styleContext, err := wn.labelClockOffset.GetStyleContext()
Expand All @@ -147,11 +147,15 @@ func (wn *widgetNode) timeout10() bool {

if offsetErr != nil {
styleContext.AddClass("warning")
wn.labelClockOffset.SetText("Error response from NTP server.")
wn.labelClockOffset.SetText("N/A")
} else {
wn.labelClockOffset.SetText(fmt.Sprintf("%v second(s)", math.Round(offset.Seconds())))
offset := math.Round(offset.Seconds())
if offset == 0 {
offset = math.Abs(offset) // To fix "-0 second(s)" issue
}
wn.labelClockOffset.SetText(fmt.Sprintf("%v second(s)", offset))

if wn.model.node.Sync().OutOfSync(offset) {
if wn.model.node.Sync().IsClockOutOfSync() {
styleContext.AddClass("warning")
} else {
styleContext.RemoveClass("warning")
Expand Down
4 changes: 2 additions & 2 deletions sync/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ type Synchronizer interface {
SelfID() peer.ID
PeerSet() *peerset.PeerSet
Services() service.Services
GetClockOffset() (time.Duration, error)
OutOfSync(time.Duration) bool
ClockOffset() (time.Duration, error)
IsClockOutOfSync() bool
}
4 changes: 2 additions & 2 deletions sync/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,10 @@ func (m *MockSync) Services() service.Services {
return m.TestServices
}

func (*MockSync) GetClockOffset() (time.Duration, error) {
func (*MockSync) ClockOffset() (time.Duration, error) {
return 1 * time.Second, nil
}

func (*MockSync) OutOfSync(_ time.Duration) bool {
func (*MockSync) IsClockOutOfSync() bool {
return false
}
10 changes: 5 additions & 5 deletions sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func NewSynchronizer(
network: net,
broadcastCh: broadcastCh,
networkCh: net.EventChannel(),
ntp: ntp.NewNtpChecker(1*time.Minute, 1*time.Second),
ntp: ntp.NewNtpChecker(),
}

sync.peerSet = peerset.NewPeerSet(conf.SessionTimeout)
Expand Down Expand Up @@ -133,12 +133,12 @@ func (sync *synchronizer) Stop() {
sync.logger.Debug("context closed", "reason", sync.ctx.Err())
}

func (sync *synchronizer) GetClockOffset() (time.Duration, error) {
return sync.ntp.GetClockOffset()
func (sync *synchronizer) ClockOffset() (time.Duration, error) {
return sync.ntp.ClockOffset()
}

func (sync *synchronizer) OutOfSync(offset time.Duration) bool {
return sync.ntp.OutOfSync(offset)
func (sync *synchronizer) IsClockOutOfSync() bool {
return sync.ntp.IsOutOfSync()
}

func (sync *synchronizer) stateHeight() uint32 {
Expand Down
2 changes: 0 additions & 2 deletions util/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ const (
ErrDuplicateVote

ErrCount
ErrNtpError
)

var messages = map[int]string{
Expand All @@ -43,7 +42,6 @@ var messages = map[int]string{
ErrInvalidVote: "invalid vote",
ErrInvalidMessage: "invalid message",
ErrDuplicateVote: "duplicate vote",
ErrNtpError: "error on getting clock offset",
}

type withCodeError struct {
Expand Down
127 changes: 86 additions & 41 deletions util/ntp/ntp.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,21 @@ import (
"sync"
"time"

"github.com/beevik/ntp"
"github.com/pactus-project/pactus/util/errors"
"github.com/pactus-project/pactus/util/logger"
)

const (
maxClockOffset = time.Duration(math.MinInt64)
)

// QueryError is returned when a query from all NTP pools encounters an error
// and we have no valid response from them.
type QueryError struct{}

func (QueryError) Error() string {
return "failed to get NTP query from all pools"
}

var _pools = []string{
"pool.ntp.org",
"time.google.com",
Expand All @@ -24,28 +30,69 @@ var _pools = []string{
"ntp.ubuntu.com",
}

// Checker represents a NTP checker that periodically checks the system time against the network time.
type Checker struct {
lock sync.RWMutex
ctx context.Context
cancel func()
lk sync.RWMutex

ticker *time.Ticker
threshold time.Duration
ctx context.Context
cancel func()
querier Querier
offset time.Duration
interval time.Duration
threshold time.Duration
ticker *time.Ticker
}

// CheckerOption defines the type for functions that configure a Checker.
type CheckerOption func(*Checker)

// WithQuerier sets the Querier for the Checker.
func WithQuerier(querier Querier) CheckerOption {
return func(c *Checker) {
c.querier = querier
}
}

// WithInterval sets the interval at which the checker will run.
// Default interval is 1 minute.
func WithInterval(interval time.Duration) CheckerOption {
return func(c *Checker) {
c.interval = interval
c.ticker = time.NewTicker(interval)
}
}

// WithThreshold sets the threshold for determining if the system time is out of sync.
// Default threshold is 1 second.
func WithThreshold(threshold time.Duration) CheckerOption {
return func(c *Checker) {
c.threshold = threshold
}
}

func NewNtpChecker(interval, threshold time.Duration) *Checker {
// NewNtpChecker creates a new Checker with the provided options.
// If no options are provided, it uses default values for interval and threshold.
func NewNtpChecker(opts ...CheckerOption) *Checker {
ctxWithCancel, cancel := context.WithCancel(context.Background())
server := &Checker{
defaultInterval := time.Minute
defaultThreshold := time.Second

// Initialize the checker with default values.
checker := &Checker{
ctx: ctxWithCancel,
cancel: cancel,
interval: interval,
threshold: threshold,
ticker: time.NewTicker(interval),
interval: defaultInterval,
threshold: defaultThreshold,
querier: RemoteQuerier{},
ticker: time.NewTicker(defaultInterval),
}

return server
// Apply provided options to override default values.
for _, opt := range opts {
opt(checker)
}

return checker
}

func (c *Checker) Start() {
Expand All @@ -55,21 +102,16 @@ func (c *Checker) Start() {
return

case <-c.ticker.C:
offset := c.clockOffset()
c.lock.Lock()
offset, _ := c.queryClockOffset()

c.lk.Lock()
c.offset = offset
c.lock.Unlock()
c.lk.Unlock()

if c.offset == maxClockOffset {
logger.Error("error on getting clock offset")
} else if c.OutOfSync(offset) {
if c.offset != maxClockOffset && c.IsOutOfSync() {
logger.Error(
"The node is out of sync with the network time",
"threshold", c.threshold,
"offset", offset,
"threshold(secs)", c.threshold.Seconds(),
"offset(secs)", offset.Seconds(),
)
"the system time is out of sync with the network time by more than one second",
"threshold", c.threshold, "offset", offset)
}
}
}
Expand All @@ -80,28 +122,29 @@ func (c *Checker) Stop() {
c.ticker.Stop()
}

func (c *Checker) OutOfSync(offset time.Duration) bool {
return math.Abs(float64(offset)) > float64(c.threshold)
}
func (c *Checker) IsOutOfSync() bool {
c.lk.RLock()
defer c.lk.RUnlock()

func (c *Checker) GetClockOffset() (time.Duration, error) {
c.lock.RLock()
defer c.lock.RUnlock()
return c.offset.Abs() > c.threshold
}

offset := c.offset
func (c *Checker) ClockOffset() (time.Duration, error) {
c.lk.RLock()
defer c.lk.RUnlock()

if offset == maxClockOffset {
return 0, errors.Errorf(errors.ErrNtpError, "unable to get clock offset")
if c.offset == maxClockOffset {
return 0, QueryError{}
}

return offset, nil
return c.offset, nil
}

func (*Checker) clockOffset() time.Duration {
func (c *Checker) queryClockOffset() (time.Duration, error) {
for _, server := range _pools {
response, err := ntp.Query(server)
response, err := c.querier.Query(server)
if err != nil {
logger.Warn("ntp error", "server", server, "error", err)
logger.Warn("ntp query error", "server", server, "error", err)

continue
}
Expand All @@ -112,10 +155,12 @@ func (*Checker) clockOffset() time.Duration {
continue
}

return response.ClockOffset
logger.Debug("successful ntp query", "offset", response.ClockOffset, "RTT", response.RTT)

return response.ClockOffset, nil
}

logger.Error("failed to get ntp query from all pool, set default max clock offset")
logger.Error("failed to get ntp query from all pool")

return maxClockOffset
return maxClockOffset, QueryError{}
}
Loading

0 comments on commit f3f451e

Please sign in to comment.