-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcond.go
143 lines (122 loc) · 4.06 KB
/
cond.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package cond
import (
"context"
"sync"
"github.com/nursik/wake"
)
type commonCond struct {
s *wake.Signaller
r *wake.Receiver
}
// Signal wakes n goroutines (if there are any) and reports how many goroutines were awoken.
// If n <= 0 it wakes all goroutines and returns 0 (same as [commonCond.Broadcast]).
func (c *commonCond) Signal(n int) int {
if n <= 0 {
c.s.Broadcast()
return 0
}
var x int
// we need to notify at least one receiver if we know that at least one is waiting.
// we are doing it in for loop, because unlike golang's sync.Cond we may start waiting after sending Signal.
// golang's sync.Cond Wait() appends to notification_list before unlocking.
for c.s.WaitCount() > 0 {
x = c.s.Signal(n)
n = n - x
if x > 0 {
break
}
}
// don't accidentally broadcast
if n == 0 {
return x
}
return x + c.s.Signal(n)
}
// SignalWithContext wakes n goroutines and reports how many goroutines were awoken and ctx.Err() if context was cancelled.
// It is a blocking operation and will be finished when all n goroutines are awoken, context is cancelled or Cond/RWCond was closed.
// If n <= 0, it wakes all goroutines (same as [commonCond.Broadcast]) regardless of context cancellation.
func (c *commonCond) SignalWithContext(ctx context.Context, n int) (int, error) {
return c.s.SignalWithContext(ctx, n)
}
// Broadcast wakes up all goroutines.
func (c *commonCond) Broadcast() {
c.s.Broadcast()
}
// Close closes Cond/RWCond and wakes all waiting goroutines.
// The first Close() returns true and subsequent calls always return false.
func (c *commonCond) Close() bool {
return c.s.Close()
}
// IsClosed reports if Cond/RWCond is closed.
func (c *commonCond) IsClosed() bool {
return c.s.IsClosed()
}
// WaitCount returns current number of goroutines waiting for signal.
func (c *commonCond) WaitCount() int {
return c.s.WaitCount()
}
type Cond struct {
L sync.Locker
commonCond
}
// Wait Unlocks locker, blocks until awaken (returns true) or Cond was closed (returns false), and at the end Locks locker again.
func (c *Cond) Wait() bool {
return wake.UnsafeWait(c.r, c.L)
}
// WaitWithContext Unlocks locker, blocks until awaken, context was cancelled or Cond was closed, and at the end Locks locker again.
// Returns true and nil, if awaken by signal/broadcast.
// Returns false and nil, if Cond was closed.
// Returns false and ctx.Err(), if context was cancelled.
func (c *Cond) WaitWithContext(ctx context.Context) (bool, error) {
return wake.UnsafeWaitContext(c.r, c.L, ctx)
}
// New returns Cond with associated locker. Same as sync.Cond in terms of usage, but has more functionality.
// Only Wait and WaitWithContext methods use associated locker and other methods do not use locker. Using closed Cond is safe.
// Slower than sync.Cond by ~3 times (sync.Cond's tests which only benchmarks broadcast).
func New(l sync.Locker) *Cond {
s, r := wake.New()
return &Cond{
L: l,
commonCond: commonCond{
s: s,
r: r,
},
}
}
type RWCond struct {
L *sync.RWMutex
rwl rlocker
commonCond
}
// Wait RUnlocks locker, blocks until awaken (returns true) or RWCond was closed (returns false), and at the end RLocks locker again.
func (c *RWCond) Wait() bool {
return wake.UnsafeWait(c.r, c.rwl)
}
// WaitWithContext RUnlocks locker, blocks until awaken, context was cancelled or RWCond was closed, and at the end RLocks locker again.
// Returns true and nil, if awaken by signal/broadcast.
// Returns false and nil, if RWCond was closed.
// Returns false and ctx.Err(), if context was cancelled.
func (c *RWCond) WaitWithContext(ctx context.Context) (bool, error) {
return wake.UnsafeWaitContext(c.r, c.rwl, ctx)
}
type rlocker struct {
mtx *sync.RWMutex
}
func (l rlocker) Lock() {
l.mtx.RLock()
}
func (l rlocker) Unlock() {
l.mtx.RUnlock()
}
// NewRW returns RWCond with associated sync.RWMutex. Uses RUnlock and RLock for Wait and WaitWithContext methods. Other methods do not use associated sync.RWMutex.
func NewRW(l *sync.RWMutex) *RWCond {
s, r := wake.New()
return &RWCond{
L: l,
rwl: rlocker{mtx: l},
commonCond: commonCond{
s: s,
r: r,
},
}
}