Skip to content

Commit

Permalink
refactor: simplify code
Browse files Browse the repository at this point in the history
  • Loading branch information
WqyJh committed Jan 29, 2024
1 parent b20aab9 commit 392b16d
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 51 deletions.
22 changes: 2 additions & 20 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,25 +164,16 @@ func (c *Client) weakFetchBatch(ctx context.Context, keys []string, expire time.
go func(i int) {
defer wg.Done()
r, err := c.luaGet(ctx, keys[i], owner)
ticker := time.NewTimer(c.Options.LockSleep)
defer ticker.Stop()
for err == nil && r[0] == nil && r[1].(string) != locked {
debugf("batch weak: empty result for %s locked by other, so sleep %s", keys[i], c.Options.LockSleep.String())
select {
case <-ctx.Done():
ch <- pair{idx: i, err: ctx.Err()}
return
case <-ticker.C:
case <-time.After(c.Options.LockSleep):
// equal to time.Sleep(c.Options.LockSleep) but can be canceled
}
r, err = c.luaGet(ctx, keys[i], owner)
// Reset ticker after luaGet
// If we reset ticker before luaGet, since luaGet takes a period of time,
// the actual sleep time will be shorter than expected
if !ticker.Stop() && len(ticker.C) > 0 {
<-ticker.C
}
ticker.Reset(c.Options.LockSleep)
}
if err != nil {
ch <- pair{idx: i, data: "", err: err}
Expand Down Expand Up @@ -289,25 +280,16 @@ func (c *Client) strongFetchBatch(ctx context.Context, keys []string, expire tim
go func(i int) {
defer wg.Done()
r, err := c.luaGet(ctx, keys[i], owner)
ticker := time.NewTimer(c.Options.LockSleep)
defer ticker.Stop()
for err == nil && r[1] != nil && r[1] != locked { // locked by other
debugf("batch: locked by other, so sleep %s", c.Options.LockSleep)
select {
case <-ctx.Done():
ch <- pair{idx: i, err: ctx.Err()}
return
case <-ticker.C:
case <-time.After(c.Options.LockSleep):
// equal to time.Sleep(c.Options.LockSleep) but can be canceled
}
r, err = c.luaGet(ctx, keys[i], owner)
// Reset ticker after luaGet
// If we reset ticker before luaGet, since luaGet takes a period of time,
// the actual sleep time will be shorter than expected
if !ticker.Stop() && len(ticker.C) > 0 {
<-ticker.C
}
ticker.Reset(c.Options.LockSleep)
}
if err != nil {
ch <- pair{idx: i, data: "", err: err}
Expand Down
8 changes: 4 additions & 4 deletions batch_cover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func TestWeakFetchBatchCanceled(t *testing.T) {
defer cancel()
_, err := rc.FetchBatch2(ctx, keys, 60*time.Second, genBatchDataFunc(values2, 200))
assert.ErrorIs(t, err, context.DeadlineExceeded)
assert.Less(t, time.Since(began), time.Duration(210)*time.Millisecond)
assertEqualDuration(t, time.Duration(200)*time.Millisecond, time.Since(began))

ctx, cancel = context.WithCancel(context.Background())
go func() {
Expand All @@ -154,7 +154,7 @@ func TestWeakFetchBatchCanceled(t *testing.T) {
began = time.Now()
_, err = rc.FetchBatch2(ctx, keys, 60*time.Second, genBatchDataFunc(values3, 200))
assert.ErrorIs(t, err, context.Canceled)
assert.Less(t, time.Since(began), time.Duration(210)*time.Millisecond)
assertEqualDuration(t, time.Duration(200)*time.Millisecond, time.Since(began))
}

func TestStrongFetchBatchCanceled(t *testing.T) {
Expand All @@ -178,7 +178,7 @@ func TestStrongFetchBatchCanceled(t *testing.T) {
defer cancel()
_, err := rc.FetchBatch2(ctx, keys, 60*time.Second, genBatchDataFunc(values2, 200))
assert.ErrorIs(t, err, context.DeadlineExceeded)
assert.Less(t, time.Since(began), time.Duration(210)*time.Millisecond)
assertEqualDuration(t, time.Duration(200)*time.Millisecond, time.Since(began))

ctx, cancel = context.WithCancel(context.Background())
go func() {
Expand All @@ -188,5 +188,5 @@ func TestStrongFetchBatchCanceled(t *testing.T) {
began = time.Now()
_, err = rc.FetchBatch2(ctx, keys, 60*time.Second, genBatchDataFunc(values3, 200))
assert.ErrorIs(t, err, context.Canceled)
assert.Less(t, time.Since(began), time.Duration(210)*time.Millisecond)
assertEqualDuration(t, time.Duration(200)*time.Millisecond, time.Since(began))
}
25 changes: 2 additions & 23 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package rockscache
import (
"context"
"fmt"
"log"
"math"
"math/rand"
"time"
Expand Down Expand Up @@ -172,25 +171,15 @@ func (c *Client) weakFetch(ctx context.Context, key string, expire time.Duration
debugf("weakFetch: key=%s", key)
owner := shortuuid.New()
r, err := c.luaGet(ctx, key, owner)
ticker := time.NewTimer(c.Options.LockSleep)
defer ticker.Stop()
for err == nil && r[0] == nil && r[1].(string) != locked {
debugf("empty result for %s locked by other, so sleep %s", key, c.Options.LockSleep.String())
select {
case <-ctx.Done():
return "", ctx.Err()
case <-ticker.C:
log.Printf("ticker")
case <-time.After(c.Options.LockSleep):
// equal to time.Sleep(c.Options.LockSleep) but can be canceled
}
r, err = c.luaGet(ctx, key, owner)
// Reset ticker after luaGet
// If we reset ticker before luaGet, since luaGet takes a period of time,
// the actual sleep time will be shorter than expected
if !ticker.Stop() && len(ticker.C) > 0 {
<-ticker.C
}
ticker.Reset(c.Options.LockSleep)
}
if err != nil {
return "", err
Expand All @@ -211,25 +200,15 @@ func (c *Client) strongFetch(ctx context.Context, key string, expire time.Durati
debugf("strongFetch: key=%s", key)
owner := shortuuid.New()
r, err := c.luaGet(ctx, key, owner)
ticker := time.NewTimer(c.Options.LockSleep)
defer ticker.Stop()
for err == nil && r[1] != nil && r[1] != locked { // locked by other
debugf("locked by other, so sleep %s", c.Options.LockSleep)
select {
case <-ctx.Done():
return "", ctx.Err()
case <-ticker.C:
log.Printf("ticker")
case <-time.After(c.Options.LockSleep):
// equal to time.Sleep(c.Options.LockSleep) but can be canceled
}
r, err = c.luaGet(ctx, key, owner)
// Reset ticker after luaGet
// If we reset ticker before luaGet, since luaGet takes a period of time,
// the actual sleep time will be shorter than expected
if !ticker.Stop() && len(ticker.C) > 0 {
<-ticker.C
}
ticker.Reset(c.Options.LockSleep)
}
if err != nil {
return "", err
Expand Down
18 changes: 14 additions & 4 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@ func TestStrongErrorFetch(t *testing.T) {
assert.True(t, time.Since(began) < time.Duration(150)*time.Millisecond)
}

func assertEqualDuration(t *testing.T, expected, actual time.Duration) {

Check failure on line 151 in client_test.go

View workflow job for this annotation

GitHub Actions / CI

`assertEqualDuration` - `expected` always receives `time.Duration(200) * time.Millisecond (200000000)` (unparam)
t.Helper()
delta := expected - actual
if delta < 0 {
delta = -delta
}
t.Logf("expected=%s, actual=%s, delta=%s", expected, actual, delta)
assert.Less(t, delta, time.Duration(2)*time.Millisecond)
}

func TestStrongFetchCanceled(t *testing.T) {
clearCache()
rc := NewClient(rdb, NewDefaultOptions())
Expand All @@ -166,7 +176,7 @@ func TestStrongFetchCanceled(t *testing.T) {
defer cancel()
_, err := rc.Fetch2(ctx, rdbKey, 60*time.Second, genDataFunc(expected, 200))
assert.ErrorIs(t, err, context.DeadlineExceeded)
assert.Less(t, time.Since(began), time.Duration(210)*time.Millisecond)
assertEqualDuration(t, time.Duration(200)*time.Millisecond, time.Since(began))

ctx, cancel = context.WithCancel(context.Background())
go func() {
Expand All @@ -176,7 +186,7 @@ func TestStrongFetchCanceled(t *testing.T) {
began = time.Now()
_, err = rc.Fetch2(ctx, rdbKey, 60*time.Second, genDataFunc(expected, 200))
assert.ErrorIs(t, err, context.Canceled)
assert.Less(t, time.Since(began), time.Duration(210)*time.Millisecond)
assertEqualDuration(t, time.Duration(200)*time.Millisecond, time.Since(began))
}

func TestWeakErrorFetch(t *testing.T) {
Expand Down Expand Up @@ -215,7 +225,7 @@ func TestWeakFetchCanceled(t *testing.T) {
defer cancel()
_, err := rc.Fetch2(ctx, rdbKey, 60*time.Second, genDataFunc(expected, 200))
assert.ErrorIs(t, err, context.DeadlineExceeded)
assert.Less(t, time.Since(began), time.Duration(210)*time.Millisecond)
assertEqualDuration(t, time.Duration(200)*time.Millisecond, time.Since(began))

ctx, cancel = context.WithCancel(context.Background())
go func() {
Expand All @@ -225,7 +235,7 @@ func TestWeakFetchCanceled(t *testing.T) {
began = time.Now()
_, err = rc.Fetch2(ctx, rdbKey, 60*time.Second, genDataFunc(expected, 200))
assert.ErrorIs(t, err, context.Canceled)
assert.Less(t, time.Since(began), time.Duration(210)*time.Millisecond)
assertEqualDuration(t, time.Duration(200)*time.Millisecond, time.Since(began))
}

func TestRawGet(t *testing.T) {
Expand Down

0 comments on commit 392b16d

Please sign in to comment.