diff --git a/group_options.go b/group_options.go index e5fd26f..94d9679 100644 --- a/group_options.go +++ b/group_options.go @@ -3,11 +3,11 @@ package syncs import "context" type options struct { - ctx context.Context - preLock bool - termOnError bool - discardIfFull bool - tresholdDiscard int + ctx context.Context + preLock bool + termOnError bool + discardIfFull bool + tresholdSize int } // GroupOption functional option type @@ -36,18 +36,17 @@ func Discard(o *options) { o.preLock = true // discard implies preemptive } -// DiscardAfterTreshold works similarly to Discard, but buffers task until buffer treshold reach -// For example, if 10 gouroutines are allowed and bufferTreshold is equal to 5, then 10 tasks -// can run simultaneously in gouroutines and 5 tasks can be kept in buffer until gouroutines become -// available. -func DiscardAfterTreshold(bufferSize int) GroupOption { +// DiscardAfterTreshold works similarly to Discard, but buffers tasks if all goroutines are busy +// until the treshold size of 'active' tasks (i.e. executing and scheduled for execution) is achieved +// If this value is lower than size, it will be ignored and common Discard mode will is used +func DiscardAfterTreshold(tresholdSize int) GroupOption { return func(o *options) { o.discardIfFull = true o.preLock = true - if bufferSize < 1 { - bufferSize = 0 + if tresholdSize < 1 { + tresholdSize = 0 } - o.tresholdDiscard = bufferSize + o.tresholdSize = tresholdSize } } diff --git a/semaphore_test.go b/semaphore_test.go index 3757d2b..c01d00c 100644 --- a/semaphore_test.go +++ b/semaphore_test.go @@ -41,16 +41,16 @@ func TestSemaphore(t *testing.T) { // if number of locks are less than capacity, all should be acquired if tt.lockTimes <= tt.capacity { - assert.Equal(t, int32(tt.lockTimes), atomic.LoadInt32(&locks)) + assert.Equal(t, tt.lockTimes, int(atomic.LoadInt32(&locks))) wg.Wait() return } // if number of locks exceed capacity, it should hang after reaching the capacity - assert.Equal(t, int32(tt.capacity), atomic.LoadInt32(&locks)) + assert.Equal(t, tt.capacity, int(atomic.LoadInt32(&locks))) sema.Unlock() time.Sleep(10 * time.Millisecond) // after unlock, it should be able to acquire another lock - assert.Equal(t, int32(tt.capacity+1), atomic.LoadInt32(&locks)) + assert.Equal(t, tt.capacity+1, int(atomic.LoadInt32(&locks))) wg.Wait() }) } @@ -81,7 +81,7 @@ func TestSemaphore_TryLock(t *testing.T) { } // Check the acquired locks, it should not exceed capacity. - assert.Equal(t, int32(tt.expectedLocks), atomic.LoadInt32(&locks)) + assert.Equal(t, tt.expectedLocks, int(atomic.LoadInt32(&locks))) }) } } diff --git a/sizedgroup.go b/sizedgroup.go index d7bf190..e1d1e8c 100644 --- a/sizedgroup.go +++ b/sizedgroup.go @@ -30,8 +30,8 @@ func NewSizedGroup(size int, opts ...GroupOption) *SizedGroup { // queue size either equal to number of workers or larger, otherwise does not make sense queueSize := size - if res.tresholdDiscard > 0 { - queueSize += res.tresholdDiscard + if res.tresholdSize > size { + queueSize = res.tresholdSize } res.jobQueue = make(chan func(ctx context.Context), queueSize) diff --git a/sizedgroup_test.go b/sizedgroup_test.go index e8f4786..ff8537d 100644 --- a/sizedgroup_test.go +++ b/sizedgroup_test.go @@ -17,7 +17,7 @@ func TestSizedGroup(t *testing.T) { var c uint32 for i := 0; i < 1000; i++ { - swg.Go(func(ctx context.Context) { + swg.Go(func(context.Context) { time.Sleep(5 * time.Millisecond) atomic.AddUint32(&c, 1) }) @@ -32,7 +32,7 @@ func TestSizedGroup_Discard(t *testing.T) { var c uint32 for i := 0; i < 100; i++ { - swg.Go(func(ctx context.Context) { + swg.Go(func(context.Context) { time.Sleep(5 * time.Millisecond) atomic.AddUint32(&c, 1) }) @@ -42,21 +42,6 @@ func TestSizedGroup_Discard(t *testing.T) { assert.Equal(t, uint32(10), c, fmt.Sprintf("%d, not all routines have been executed", c)) } -func TestSizedGroup_DiscardAfterTreshold(t *testing.T) { - swg := NewSizedGroup(10, DiscardAfterTreshold(10)) - var c uint32 - - for i := 0; i < 100; i++ { - swg.Go(func(ctx context.Context) { - time.Sleep(5 * time.Millisecond) - atomic.AddUint32(&c, 1) - }) - } - assert.True(t, runtime.NumGoroutine() < 15, "goroutines %d", runtime.NumGoroutine()) - swg.Wait() - assert.Equal(t, uint32(20), c, fmt.Sprintf("%d, wrong number of routines have been executed", c)) -} - func TestSizedGroup_Preemptive(t *testing.T) { swg := NewSizedGroup(10, Preemptive) var c uint32 @@ -79,7 +64,7 @@ func TestSizedGroup_Canceled(t *testing.T) { var c uint32 for i := 0; i < 100; i++ { - swg.Go(func(ctx context.Context) { + swg.Go(func(context.Context) { select { case <-ctx.Done(): return @@ -92,6 +77,51 @@ func TestSizedGroup_Canceled(t *testing.T) { assert.True(t, c < 100) } +func TestSizedGroup_DiscardAfterTreshold(t *testing.T) { + swg := NewSizedGroup(10, DiscardAfterTreshold(20)) + var c uint32 + + for i := 0; i < 100; i++ { + swg.Go(func(context.Context) { + time.Sleep(5 * time.Millisecond) + atomic.AddUint32(&c, 1) + }) + } + assert.True(t, runtime.NumGoroutine() < 15, "goroutines %d", runtime.NumGoroutine()) + swg.Wait() + assert.Equal(t, uint32(20), c, fmt.Sprintf("%d, wrong number of routines have been executed", c)) +} + +func TestSizedGroup_DiscardAfterTreshold_WithNegativeTreshold(t *testing.T) { + swg := NewSizedGroup(10, DiscardAfterTreshold(-1)) + var c uint32 + + for i := 0; i < 100; i++ { + swg.Go(func(context.Context) { + time.Sleep(5 * time.Millisecond) + atomic.AddUint32(&c, 1) + }) + } + assert.True(t, runtime.NumGoroutine() < 15, "goroutines %d", runtime.NumGoroutine()) + swg.Wait() + assert.Equal(t, uint32(10), c, fmt.Sprintf("%d, wrong number of routines have been executed", c)) +} + +func TestSizedGroup_DiscardAfterTreshold_WithTresholdNotAboveSize(t *testing.T) { + swg := NewSizedGroup(10, DiscardAfterTreshold(10)) + var c uint32 + + for i := 0; i < 100; i++ { + swg.Go(func(context.Context) { + time.Sleep(5 * time.Millisecond) + atomic.AddUint32(&c, 1) + }) + } + assert.True(t, runtime.NumGoroutine() < 15, "goroutines %d", runtime.NumGoroutine()) + swg.Wait() + assert.Equal(t, uint32(10), c, fmt.Sprintf("%d, wrong number of routines have been executed", c)) +} + // illustrates the use of a SizedGroup for concurrent, limited execution of goroutines. func ExampleSizedGroup_go() { @@ -99,7 +129,7 @@ func ExampleSizedGroup_go() { var c uint32 for i := 0; i < 1000; i++ { - grp.Go(func(ctx context.Context) { // Go call is non-blocking, like regular go statement + grp.Go(func(context.Context) { // Go call is non-blocking, like regular go statement // do some work in 10 goroutines in parallel atomic.AddUint32(&c, 1) time.Sleep(10 * time.Millisecond)