diff --git a/group_options.go b/group_options.go index 1344335..2b3029d 100644 --- a/group_options.go +++ b/group_options.go @@ -3,10 +3,11 @@ package syncs import "context" type options struct { - ctx context.Context - preLock bool - termOnError bool - discardIfFull bool + ctx context.Context + preLock bool + termOnError bool + discardIfFull bool + tresholdDiscard int } // GroupOption functional option type @@ -34,3 +35,19 @@ func Discard(o *options) { o.discardIfFull = true o.preLock = true // discard implies preemptive } + +// DiscardAfterTreshold works similarly to Discard, but buffers task until buffer treshold reach +// For example, if 10 gouroutines are allowed and bufferTreshold is equal to 5, then 10 tasks +// can run simultaneously in gouroutines and 5 tasks can be kept in buffer until gouroutines become +// available. +func DiscardAfterTreshold(bufferSize int) GroupOption { + return func(o *options) { + o.discardIfFull = true + o.preLock = true + + if (bufferSize < 1) { + bufferSize = 0 + } + o.tresholdDiscard = bufferSize + } +} diff --git a/sizedgroup.go b/sizedgroup.go index b73f77a..2b1aa5f 100644 --- a/sizedgroup.go +++ b/sizedgroup.go @@ -10,62 +10,110 @@ import ( // SizedGroup interface enforces constructor usage and doesn't allow direct creation of sizedGroup type SizedGroup struct { options - wg sync.WaitGroup - sema Locker + wg sync.WaitGroup + workers chan struct{} + scheduledJobs chan struct{} + jobQueue chan func(ctx context.Context) + workersMutex sync.Mutex } // NewSizedGroup makes wait group with limited size alive goroutines func NewSizedGroup(size int, opts ...GroupOption) *SizedGroup { - res := SizedGroup{sema: NewSemaphore(size)} + if size < 0 { + size = 1 + } + res := SizedGroup{workers: make(chan struct{}, size)} res.options.ctx = context.Background() for _, opt := range opts { opt(&res.options) } + + // queue size either equal to number of workers or larger, otherwise does not make sense + queueSize := size + if res.tresholdDiscard > 0 { + queueSize += res.tresholdDiscard + } + + res.jobQueue = make(chan func(ctx context.Context), queueSize) + res.scheduledJobs = make(chan struct{}, queueSize) return &res } // Go calls the given function in a new goroutine. // Every call will be unblocked, but some goroutines may wait if semaphore locked. func (g *SizedGroup) Go(fn func(ctx context.Context)) { - canceled := func() bool { - select { - case <-g.ctx.Done(): - return true - default: - return false - } + if g.canceled() { + return } - if canceled() { + g.wg.Add(1) + if !g.preLock { + go func() { + defer g.wg.Done() + if g.canceled() { + return + } + g.scheduledJobs <- struct{}{} + fn(g.ctx) + <-g.scheduledJobs + }() return } - - if g.preLock { - lockOk := g.sema.TryLock() - if !lockOk && g.discardIfFull { - // lock failed and discardIfFull is set, discard this goroutine + + toRun := func(job func(ctx context.Context)) { + defer g.wg.Done() + if g.canceled() { return } - if !lockOk && !g.discardIfFull { - g.sema.Lock() // make sure we have block until lock is acquired - } + job(g.ctx) + <- g.scheduledJobs } - g.wg.Add(1) - go func() { - defer g.wg.Done() - - if canceled() { - return + startWorkerIfNeeded := func() { + g.workersMutex.Lock() + select { + case g.workers <- struct{}{}: + g.workersMutex.Unlock() + go func() { + for { + select { + case job := <-g.jobQueue: + toRun(job) + default: + g.workersMutex.Lock() + select { + case job := <-g.jobQueue: + g.workersMutex.Unlock() + toRun(job) + continue + default: + <-g.workers + g.workersMutex.Unlock() + } + return + } + } + }() + default: + g.workersMutex.Unlock() } + } - if !g.preLock { - g.sema.Lock() + if g.discardIfFull { + select { + case g.scheduledJobs <- struct{}{}: + g.jobQueue <- fn + startWorkerIfNeeded() + default: + g.wg.Done() } - fn(g.ctx) - g.sema.Unlock() - }() + return + } + + g.scheduledJobs <- struct{}{} + g.jobQueue <- fn + startWorkerIfNeeded() } // Wait blocks until the SizedGroup counter is zero. @@ -73,3 +121,12 @@ func (g *SizedGroup) Go(fn func(ctx context.Context)) { func (g *SizedGroup) Wait() { g.wg.Wait() } + +func (g *SizedGroup) canceled() bool { + select { + case <-g.ctx.Done(): + return true + default: + return false + } +} \ No newline at end of file diff --git a/sizedgroup_test.go b/sizedgroup_test.go index 581d7e0..822a946 100644 --- a/sizedgroup_test.go +++ b/sizedgroup_test.go @@ -42,6 +42,21 @@ func TestSizedGroup_Discard(t *testing.T) { assert.Equal(t, uint32(10), c, fmt.Sprintf("%d, not all routines have been executed", c)) } +func TestSizedGroup_DiscardAfterTreshold(t *testing.T) { + swg := NewSizedGroup(10, DiscardAfterTreshold(10)) + var c uint32 + + for i := 0; i < 100; i++ { + swg.Go(func(ctx context.Context) { + time.Sleep(5 * time.Millisecond) + atomic.AddUint32(&c, 1) + }) + } + assert.True(t, runtime.NumGoroutine() < 15, "goroutines %d", runtime.NumGoroutine()) + swg.Wait() + assert.Equal(t, uint32(20), c, fmt.Sprintf("%d, wrong number of routines have been executed", c)) +} + func TestSizedGroup_Preemptive(t *testing.T) { swg := NewSizedGroup(10, Preemptive) var c uint32