forked from tushar2708/conveyor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcustom_context.go
145 lines (123 loc) · 3.32 KB
/
custom_context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package conveyor
import (
"context"
"fmt"
"sync"
"time"
)
// Message struct stores one unit of message that conveyor passed back for logging
type Message struct {
Text string
LogLevel int32
// Err error
}
// CtxData stores the information that is stored inside a conveyor, useful for it's lifecycle.ConveyorData
// Any fields only useful for initialization shouldn't be here
type CtxData struct {
Name string
logs chan Message
status chan string
cancelProgress context.CancelFunc
// cancelAll context.CancelFunc
}
// CnvContext is an interface, which is satisfied by CnvContext.
// This interface is primarily to enabling mocking for unit-testing.CnvContextAble
// Or may be, something fancy that you might want to do.
type CnvContext interface {
context.Context
WithCancel() CnvContext
WithTimeout(time.Duration) CnvContext
Cancel()
SendLog(int32, string, error)
SendStatus(string)
GetData() interface{}
}
// cnvContext is a wrapper over context.Context
// To avoid sacrificing type checking with context.WithValue() wherever it's not needed
type cnvContext struct {
context.Context
cancelOnce sync.Once
Data CtxData
}
// GetData
func (ctx *cnvContext) GetData() interface{} {
return ctx.Data
}
// WithCancel is a wrapper on context.WithCancel() for CnvContext type,
// that also copies the Data to new context
func (ctx *cnvContext) WithCancel() CnvContext {
newctx, cancel := context.WithCancel(ctx.Context)
cnvContext := &cnvContext{
Context: newctx,
Data: ctx.Data,
}
cnvContext.Data.cancelProgress = cancel
return cnvContext
}
// WithTimeout is a wrapper on context.WithTimeout() for CnvContext type,
// that also copies the Data to new context
func (ctx *cnvContext) WithTimeout(timeout time.Duration) CnvContext {
newctx, cancel := context.WithTimeout(ctx.Context, timeout)
cnvContext := &cnvContext{
Context: newctx,
Data: ctx.Data,
}
cnvContext.Data.cancelProgress = cancel
return cnvContext
}
// Cancel the derived context, along with closing internal channels
// It has been made to follow the "non-panic multiple cancel" behaviour of built-in context
func (ctx *cnvContext) Cancel() {
if ctx.Data.cancelProgress != nil {
ctx.Data.cancelProgress()
}
ctx.cancelOnce.Do(func() {
if ctx.Data.logs != nil {
close(ctx.Data.logs)
}
if ctx.Data.logs != nil {
close(ctx.Data.status)
}
})
}
// SendLog sends conveyor's internal logs to be available on conveyor.Logs()
func (ctx *cnvContext) SendLog(logLevel int32, text string, err error) {
if err != nil {
text = fmt.Sprintf("conveyor: %s, [err: %s]\n", text, err)
} else {
text = fmt.Sprintf("conveyor: %s\n", text)
}
msg := Message{
LogLevel: logLevel,
Text: text,
}
select {
case <-ctx.Done():
return
default:
}
select {
case <-ctx.Done():
return
case ctx.Data.logs <- msg:
default:
<-ctx.Data.logs // If not consumed, throw away old status and update with new value
ctx.Data.logs <- msg
}
}
// SendStatus sends conveyor's internal logs to be available on conveyor.Status()
func (ctx *cnvContext) SendStatus(status string) {
select {
case <-ctx.Done():
return
default:
}
select {
case <-ctx.Done():
return
case ctx.Data.status <- status:
default:
<-ctx.Data.status // If not consumed, throw away old status and update with new value
ctx.Data.status <- status
}
}