-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdot_flush.go
145 lines (133 loc) · 3.12 KB
/
dot_flush.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package xtemplate
import (
"context"
"fmt"
"math"
"net/http"
"strings"
"time"
)
type dotFlushProvider struct{}
func (dotFlushProvider) FieldName() string { return "Flush" }
func (dotFlushProvider) Init(_ context.Context) error { return nil }
func (dotFlushProvider) Value(r Request) (any, error) {
f, ok := r.W.(flusher)
if !ok {
return &DotFlush{}, fmt.Errorf("response writer could not cast to http.Flusher")
}
return &DotFlush{flusher: f, serverCtx: r.ServerCtx, requestCtx: r.R.Context()}, nil
}
func (dotFlushProvider) Cleanup(v any, err error) error {
if err == nil {
v.(*DotFlush).flusher.Flush()
}
return err
}
var _ CleanupDotProvider = dotFlushProvider{}
type flusher interface {
http.ResponseWriter
http.Flusher
}
// DotFlush is used as the .Flush field for flushing template handlers (SSE).
type DotFlush struct {
flusher flusher
serverCtx, requestCtx context.Context
}
// SendSSE sends an sse message by formatting the provided args as an sse event:
//
// Requires 1-4 args: event, data, id, retry
func (f *DotFlush) SendSSE(args ...string) error {
var event, data, id, retry string
switch len(args) {
case 4:
retry = args[3]
fallthrough
case 3:
id = args[2]
fallthrough
case 2:
data = args[1]
fallthrough
case 1:
event = args[0]
default:
return fmt.Errorf("wrong number of args provided. got %d, need 1-4", len(args))
}
written := false
if event != "" {
fmt.Fprintf(f.flusher, "event: %s\n", strings.SplitN(event, "\n", 2)[0])
written = true
}
if data != "" {
for _, line := range strings.Split(data, "\n") {
fmt.Fprintf(f.flusher, "data: %s\n", line)
written = true
}
}
if id != "" {
fmt.Fprintf(f.flusher, "id: %s\n", strings.SplitN(id, "\n", 2)[0])
written = true
}
if retry != "" {
fmt.Fprintf(f.flusher, "retry: %s\n", strings.SplitN(retry, "\n", 2)[0])
written = true
}
if written {
fmt.Fprintf(f.flusher, "\n\n")
f.flusher.Flush()
}
return nil
}
// Flush flushes any content waiting to written to the client.
func (f *DotFlush) Flush() string {
f.flusher.Flush()
return ""
}
// Repeat generates numbers up to max, using math.MaxInt64 if no max is provided.
func (f *DotFlush) Repeat(max_ ...int) <-chan int {
max := math.MaxInt64 // sorry you can only loop for 2^63-1 iterations max
if len(max_) > 0 {
max = max_[0]
}
c := make(chan int)
go func() {
i := 0
loop:
for {
select {
case <-f.requestCtx.Done():
break loop
case <-f.serverCtx.Done():
break loop
case c <- i:
}
if i >= max {
break
}
i++
}
close(c)
}()
return c
}
// Sleep sleeps for ms millisecionds.
func (f *DotFlush) Sleep(ms int) (string, error) {
select {
case <-time.After(time.Duration(ms) * time.Millisecond):
case <-f.requestCtx.Done():
return "", ReturnError{}
case <-f.serverCtx.Done():
return "", ReturnError{}
}
return "", nil
}
// WaitForServerStop blocks execution until the request is canceled by the
// client or until the server closes.
func (f *DotFlush) WaitForServerStop() (string, error) {
select {
case <-f.requestCtx.Done():
return "", ReturnError{}
case <-f.serverCtx.Done():
return "", nil
}
}