diff --git a/mq/api/grpc/server/server.go b/mq/api/grpc/server/server.go index f097402800..c5400ca373 100644 --- a/mq/api/grpc/server/server.go +++ b/mq/api/grpc/server/server.go @@ -75,6 +75,7 @@ func (s *mqServer) Dequeue(ctx context.Context, in *pb.DequeueRequest) (*pb.Task ctx, cancel := context.WithCancel(ctx) defer cancel() message, err := s.actionMQ.Dequeue(ctx, in.Topic) + logrus.Errorf("message %s, err %v", message, err) if err != nil { return nil, err } diff --git a/mq/api/mq/store.go b/mq/api/mq/store.go index 46f5ce36f5..f5dd9a5e3a 100644 --- a/mq/api/mq/store.go +++ b/mq/api/mq/store.go @@ -8,7 +8,7 @@ import ( // KeyValueStore 是一个简单的键值存储结构,支持多值 type KeyValueStore struct { data map[string][]string - mu sync.Mutex + mu sync.RWMutex cond *sync.Cond } @@ -17,7 +17,7 @@ func NewKeyValueStore() *KeyValueStore { kv := &KeyValueStore{ data: make(map[string][]string), } - kv.cond = sync.NewCond(&kv.mu) + kv.cond = sync.NewCond(kv.mu.RLocker()) // 使用读锁初始化条件变量 return kv } @@ -37,26 +37,28 @@ func (kv *KeyValueStore) Put(key, value string) { // Get 根据键获取第一个值并删除该键值对中的第一个值 func (kv *KeyValueStore) Get(key string) (string, bool) { - kv.mu.Lock() - defer kv.mu.Unlock() - // 如果值列表为空,等待通知或超时 - timeout := time.Now().Add(5 * time.Second) + kv.mu.RLock() // 使用读锁开始 + defer kv.mu.RUnlock() + + timeout := time.After(5 * time.Second) // 预先创建一个定时器,避免高频创建 for len(kv.data[key]) == 0 { + kv.mu.RUnlock() // 在等待前释放读锁 select { - case <-time.After(timeout.Sub(time.Now())): + case <-timeout: return "", false // 超时后返回空值 default: - waitChan := make(chan struct{}) - kv.cond.L.Unlock() - go func() { - kv.cond.L.Lock() - close(waitChan) - }() - <-waitChan + kv.cond.L.Lock() // 锁住条件变量 + kv.cond.Wait() // 等待条件变量通知 + kv.cond.L.Unlock() // 释放条件变量的锁 } + kv.mu.RLock() // 重新获得读锁 } - // 获取值列表 + // 获取值列表并切换到写锁模式 + kv.mu.RUnlock() + kv.mu.Lock() + defer kv.mu.Unlock() + values, ok := kv.data[key] if ok && len(values) > 0 { // 获取第一个值并从切片中删除 @@ -74,8 +76,8 @@ func (kv *KeyValueStore) Get(key string) (string, bool) { // Size 返回特定键的值列表大小 func (kv *KeyValueStore) Size(topic string) int64 { - kv.mu.Lock() - defer kv.mu.Unlock() + kv.mu.RLock() + defer kv.mu.RUnlock() return int64(len(kv.data[topic])) }