-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathsend_info.go
147 lines (128 loc) · 3.58 KB
/
send_info.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package secureio
import (
"context"
"fmt"
"sync"
"sync/atomic"
)
// SendInfo contains information about the scheduled sending request.
// It's values should be read only after "<-(*SendInfo).Done()" will finish.
type SendInfo struct {
// Err contains the resulting error
Err error
// N contains the number of bytes were written to send the merged
// messaged through the backend.
N int
sendID uint64 // for debug only
c chan struct{}
ctx context.Context
refCount int64
isBusy bool
sess *Session
pool *sendInfoPool
}
var (
nextSendID uint64
)
type sendInfoPool struct {
storage sync.Pool
}
func newSendInfoPool(sess *Session) *sendInfoPool {
pool := &sendInfoPool{}
pool.storage = sync.Pool{
New: func() interface{} {
return &SendInfo{
c: make(chan struct{}),
sendID: atomic.AddUint64(&nextSendID, 1),
sess: sess,
pool: pool,
}
},
}
return pool
}
func (pool *sendInfoPool) AcquireSendInfo(ctx context.Context) *SendInfo {
sendInfo := pool.storage.Get().(*SendInfo)
if sendInfo.isBusy {
panic(`should not happened`)
}
sendInfo.isBusy = true
sendInfo.incRefCount()
sendInfo.c = make(chan struct{})
sendInfo.sendID = atomic.AddUint64(&nextSendID, 1)
sendInfo.ctx = ctx
return sendInfo
}
func (pool *sendInfoPool) Put(freeSendInfo *SendInfo) {
if !freeSendInfo.isBusy {
panic(fmt.Sprintf(`should not happened (isBusy == %v)`,
freeSendInfo.isBusy))
}
freeSendInfo.isBusy = false
freeSendInfo.reset()
pool.storage.Put(freeSendInfo)
}
// Done returns a channel which should be used to wait until
// a real sending will be performed. After that values
// `SendInfo.Err` and `SendInfo.N` could be read and method
// `(*SendInfo).Release()` could be called.
func (sendInfo *SendInfo) Done() <-chan struct{} {
return sendInfo.c
}
// SendID returns the unique ID of the sending request. It could be
// called at any moment before `(*SendInfo).Release()`.
func (sendInfo *SendInfo) SendID() uint64 {
return sendInfo.sendID
}
func (sendInfo *SendInfo) reset() {
sendInfo.Err = nil
sendInfo.N = 0
}
func (sendInfo *SendInfo) incRefCount() int64 {
return atomic.AddInt64(&sendInfo.refCount, 1)
}
func (sendInfo SendInfo) duplicate() interface{} {
return &sendInfo
}
// Release just puts the `*SendInfo` back to the memory pool
// to be re-used in future. It could be used to reduce the pressure on GC.
// It's **NOT** necessary to call this function. It is supposed to be used
// only high-performant applications.
func (sendInfo *SendInfo) Release() {
select {
case <-sendInfo.c:
case <-sendInfo.ctx.Done():
default:
panic("Release() was called on a non-finished sendInfo")
}
refCount := atomic.AddInt64(&sendInfo.refCount, -1)
if refCount > 0 {
return
}
if refCount < 0 {
panic(fmt.Sprintf(`should not happened (refCount == %v)`,
refCount))
}
sendInfo.pool.Put(sendInfo)
}
func (sendInfo *SendInfo) String() string {
return fmt.Sprintf("{c: %v; Err: %v: N: %v: sendID: %v, refCount: %v}",
sendInfo.c, sendInfo.Err, sendInfo.N, sendInfo.sendID, atomic.LoadInt64(&sendInfo.refCount))
}
// SendNowAndWait belays the rest part of the send delay of the remaining send iteration
// and forces to send the data ASAP and wait until it will be done.
func (sendInfo *SendInfo) SendNowAndWait() {
if sendInfo.incRefCount() == 1 {
panic("should not happen")
}
sendInfo.sess.sendDelayedNowChan <- sendInfo
sendInfo.Wait()
}
// Wait waits until message is send or until the context is cancelled
// (for example if session is closed).
func (sendInfo *SendInfo) Wait() {
select {
case <-sendInfo.Done():
case <-sendInfo.ctx.Done():
}
}