-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathredis_broker.go
99 lines (79 loc) · 2.07 KB
/
redis_broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package gotasks
import (
"log"
"time"
redis "github.com/go-redis/redis/v7"
)
var (
_ Broker = &RedisBroker{}
// rc: RedisClient
rc *redis.Client
)
func genTaskName(taskID string) string {
return "gt:task:" + taskID
}
func genQueueName(queueName string) string {
return "gt:queue:" + queueName
}
type RedisBroker struct {
TaskTTL int
}
type RedisBrokerOption func(rb *RedisBroker)
func WithRedisTaskTTL(ttl int) RedisBrokerOption {
return func(rb *RedisBroker) {
rb.TaskTTL = ttl
}
}
func UseRedisBroker(redisURL string, brokerOptions ...RedisBrokerOption) {
options, err := redis.ParseURL(redisURL)
if err != nil {
log.Panicf("failed to parse redis URL %s: %s", redisURL, err)
}
rc = redis.NewClient(options)
rb := &RedisBroker{}
for _, o := range brokerOptions {
o(rb)
}
broker = rb
}
func (r *RedisBroker) Acquire(queueName string) *Task {
task := Task{}
vs, err := rc.BRPop(time.Duration(0), genQueueName(queueName)).Result()
if err != nil {
log.Panicf("failed to get task from redis: %s", err)
return nil // never executed
}
v := []byte(vs[1])
if err := json.Unmarshal(v, &task); err != nil {
log.Panicf("failed to get task from redis: %s", err)
return nil // never executed
}
return &task
}
func (r *RedisBroker) Ack(task *Task) bool {
// redis doesn't support ACK
return true
}
func (r *RedisBroker) Update(task *Task) {
task.UpdatedAt = time.Now()
taskBytes, err := json.Marshal(task)
if err != nil {
log.Panicf("failed to enquue task %+v: %s", task, err)
return // never executed here
}
rc.Set(genTaskName(task.ID), taskBytes, time.Duration(r.TaskTTL)*time.Second)
}
func (r *RedisBroker) Enqueue(task *Task) string {
taskBytes, err := json.Marshal(task)
if err != nil {
log.Panicf("failed to enquue task %+v: %s", task, err)
return "" // never executed here
}
rc.Set(genTaskName(task.ID), taskBytes, time.Duration(r.TaskTTL)*time.Second)
rc.LPush(genQueueName(task.QueueName), taskBytes)
return task.ID
}
func (r *RedisBroker) QueueLen(queueName string) int64 {
l, _ := rc.LLen(genQueueName(queueName)).Result()
return l
}