Skip to content

Commit

Permalink
Fix linter complains, test coverage, improve DiscardAfterThreshold API
Browse files Browse the repository at this point in the history
  • Loading branch information
Mykyta committed Aug 31, 2024
1 parent 2c26467 commit 46402bc
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 38 deletions.
25 changes: 12 additions & 13 deletions group_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package syncs
import "context"

type options struct {
ctx context.Context
preLock bool
termOnError bool
discardIfFull bool
tresholdDiscard int
ctx context.Context
preLock bool
termOnError bool
discardIfFull bool
tresholdSize int
}

// GroupOption functional option type
Expand Down Expand Up @@ -36,18 +36,17 @@ func Discard(o *options) {
o.preLock = true // discard implies preemptive
}

// DiscardAfterTreshold works similarly to Discard, but buffers task until buffer treshold reach
// For example, if 10 gouroutines are allowed and bufferTreshold is equal to 5, then 10 tasks
// can run simultaneously in gouroutines and 5 tasks can be kept in buffer until gouroutines become
// available.
func DiscardAfterTreshold(bufferSize int) GroupOption {
// DiscardAfterTreshold works similarly to Discard, but buffers tasks if all goroutines are busy
// until the treshold size of 'active' tasks (i.e. executing and scheduled for execution) is achieved
// If this value is lower than size, it will be ignored and common Discard mode will is used
func DiscardAfterTreshold(tresholdSize int) GroupOption {
return func(o *options) {
o.discardIfFull = true
o.preLock = true

if bufferSize < 1 {
bufferSize = 0
if tresholdSize < 1 {
tresholdSize = 0
}
o.tresholdDiscard = bufferSize
o.tresholdSize = tresholdSize
}
}
8 changes: 4 additions & 4 deletions semaphore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,16 @@ func TestSemaphore(t *testing.T) {

// if number of locks are less than capacity, all should be acquired
if tt.lockTimes <= tt.capacity {
assert.Equal(t, int32(tt.lockTimes), atomic.LoadInt32(&locks))
assert.Equal(t, tt.lockTimes, int(atomic.LoadInt32(&locks)))
wg.Wait()
return
}
// if number of locks exceed capacity, it should hang after reaching the capacity
assert.Equal(t, int32(tt.capacity), atomic.LoadInt32(&locks))
assert.Equal(t, tt.capacity, int(atomic.LoadInt32(&locks)))
sema.Unlock()
time.Sleep(10 * time.Millisecond)
// after unlock, it should be able to acquire another lock
assert.Equal(t, int32(tt.capacity+1), atomic.LoadInt32(&locks))
assert.Equal(t, tt.capacity+1, int(atomic.LoadInt32(&locks)))
wg.Wait()
})
}
Expand Down Expand Up @@ -81,7 +81,7 @@ func TestSemaphore_TryLock(t *testing.T) {
}

// Check the acquired locks, it should not exceed capacity.
assert.Equal(t, int32(tt.expectedLocks), atomic.LoadInt32(&locks))
assert.Equal(t, tt.expectedLocks, int(atomic.LoadInt32(&locks)))
})
}
}
4 changes: 2 additions & 2 deletions sizedgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ func NewSizedGroup(size int, opts ...GroupOption) *SizedGroup {

// queue size either equal to number of workers or larger, otherwise does not make sense
queueSize := size
if res.tresholdDiscard > 0 {
queueSize += res.tresholdDiscard
if res.tresholdSize > size {
queueSize = res.tresholdSize
}

res.jobQueue = make(chan func(ctx context.Context), queueSize)
Expand Down
68 changes: 49 additions & 19 deletions sizedgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestSizedGroup(t *testing.T) {
var c uint32

for i := 0; i < 1000; i++ {
swg.Go(func(ctx context.Context) {
swg.Go(func(context.Context) {
time.Sleep(5 * time.Millisecond)
atomic.AddUint32(&c, 1)
})
Expand All @@ -32,7 +32,7 @@ func TestSizedGroup_Discard(t *testing.T) {
var c uint32

for i := 0; i < 100; i++ {
swg.Go(func(ctx context.Context) {
swg.Go(func(context.Context) {
time.Sleep(5 * time.Millisecond)
atomic.AddUint32(&c, 1)
})
Expand All @@ -42,21 +42,6 @@ func TestSizedGroup_Discard(t *testing.T) {
assert.Equal(t, uint32(10), c, fmt.Sprintf("%d, not all routines have been executed", c))
}

func TestSizedGroup_DiscardAfterTreshold(t *testing.T) {
swg := NewSizedGroup(10, DiscardAfterTreshold(10))
var c uint32

for i := 0; i < 100; i++ {
swg.Go(func(ctx context.Context) {
time.Sleep(5 * time.Millisecond)
atomic.AddUint32(&c, 1)
})
}
assert.True(t, runtime.NumGoroutine() < 15, "goroutines %d", runtime.NumGoroutine())
swg.Wait()
assert.Equal(t, uint32(20), c, fmt.Sprintf("%d, wrong number of routines have been executed", c))
}

func TestSizedGroup_Preemptive(t *testing.T) {
swg := NewSizedGroup(10, Preemptive)
var c uint32
Expand All @@ -79,7 +64,7 @@ func TestSizedGroup_Canceled(t *testing.T) {
var c uint32

for i := 0; i < 100; i++ {
swg.Go(func(ctx context.Context) {
swg.Go(func(context.Context) {
select {
case <-ctx.Done():
return
Expand All @@ -92,14 +77,59 @@ func TestSizedGroup_Canceled(t *testing.T) {
assert.True(t, c < 100)
}

func TestSizedGroup_DiscardAfterTreshold(t *testing.T) {
swg := NewSizedGroup(10, DiscardAfterTreshold(20))
var c uint32

for i := 0; i < 100; i++ {
swg.Go(func(context.Context) {
time.Sleep(5 * time.Millisecond)
atomic.AddUint32(&c, 1)
})
}
assert.True(t, runtime.NumGoroutine() < 15, "goroutines %d", runtime.NumGoroutine())
swg.Wait()
assert.Equal(t, uint32(20), c, fmt.Sprintf("%d, wrong number of routines have been executed", c))
}

func TestSizedGroup_DiscardAfterTreshold_WithNegativeTreshold(t *testing.T) {
swg := NewSizedGroup(10, DiscardAfterTreshold(-1))
var c uint32

for i := 0; i < 100; i++ {
swg.Go(func(context.Context) {
time.Sleep(5 * time.Millisecond)
atomic.AddUint32(&c, 1)
})
}
assert.True(t, runtime.NumGoroutine() < 15, "goroutines %d", runtime.NumGoroutine())
swg.Wait()
assert.Equal(t, uint32(10), c, fmt.Sprintf("%d, wrong number of routines have been executed", c))
}

func TestSizedGroup_DiscardAfterTreshold_WithTresholdNotAboveSize(t *testing.T) {
swg := NewSizedGroup(10, DiscardAfterTreshold(10))
var c uint32

for i := 0; i < 100; i++ {
swg.Go(func(context.Context) {
time.Sleep(5 * time.Millisecond)
atomic.AddUint32(&c, 1)
})
}
assert.True(t, runtime.NumGoroutine() < 15, "goroutines %d", runtime.NumGoroutine())
swg.Wait()
assert.Equal(t, uint32(10), c, fmt.Sprintf("%d, wrong number of routines have been executed", c))
}

// illustrates the use of a SizedGroup for concurrent, limited execution of goroutines.
func ExampleSizedGroup_go() {

grp := NewSizedGroup(10) // create sized waiting group allowing maximum 10 goroutines

var c uint32
for i := 0; i < 1000; i++ {
grp.Go(func(ctx context.Context) { // Go call is non-blocking, like regular go statement
grp.Go(func(context.Context) { // Go call is non-blocking, like regular go statement
// do some work in 10 goroutines in parallel
atomic.AddUint32(&c, 1)
time.Sleep(10 * time.Millisecond)
Expand Down

0 comments on commit 46402bc

Please sign in to comment.