diff --git a/concurrency-patterns-in-go/the-tee-channel/fig-tee-channel.go b/concurrency-patterns-in-go/the-tee-channel/fig-tee-channel.go new file mode 100644 index 0000000..c9afd1c --- /dev/null +++ b/concurrency-patterns-in-go/the-tee-channel/fig-tee-channel.go @@ -0,0 +1,98 @@ +package main + +import ( + "fmt" +) + +func main() { + repeat := func( + done <-chan interface{}, + values ...interface{}, + ) <-chan interface{} { + valueStream := make(chan interface{}) + go func() { + defer close(valueStream) + for { + for _, v := range values { + select { + case <-done: + return + case valueStream <- v: + } + } + } + }() + return valueStream + } + take := func( + done <-chan interface{}, + valueStream <-chan interface{}, + num int, + ) <-chan interface{} { + takeStream := make(chan interface{}) + go func() { + defer close(takeStream) + for i := 0; i < num; i++ { + select { + case <-done: + return + case takeStream <- <-valueStream: + } + } + }() + return takeStream + } + orDone := func(done, c <-chan interface{}) <-chan interface{} { + valStream := make(chan interface{}) + go func() { + defer close(valStream) + for { + select { + case <-done: + return + case v, ok := <-c: + if ok == false { + return + } + select { + case valStream <- v: + case <-done: + } + } + } + }() + return valStream + } + tee := func( + done <-chan interface{}, + in <-chan interface{}, + ) (_, _ <-chan interface{}) { + out1 := make(chan interface{}) + out2 := make(chan interface{}) + go func() { + defer close(out1) + defer close(out2) + for val := range orDone(done, in) { + var out1copy, out2copy = out1, out2 + for i := 0; i < 2; i++ { + select { + case <-done: + case out1copy <- val: + out1copy = nil + case out2copy <- val: + out2copy = nil + } + } + } + }() + return out1, out2 + } + done := make(chan interface{}) + defer close(done) + + out1, out2 := tee(done, take(done, repeat(done, 1, 2), 4)) + + for val1 := range out1 { + fmt.Printf("out1: %v, out2: %v\n", val1, <-out2) + } +}