Skip to content

Commit

Permalink
debug: support pprof2
Browse files Browse the repository at this point in the history
  • Loading branch information
yangkaa committed Oct 28, 2024
1 parent 2417722 commit 6431a2d
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 17 deletions.
1 change: 1 addition & 0 deletions mq/api/grpc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (s *mqServer) Dequeue(ctx context.Context, in *pb.DequeueRequest) (*pb.Task
ctx, cancel := context.WithCancel(ctx)
defer cancel()
message, err := s.actionMQ.Dequeue(ctx, in.Topic)
logrus.Errorf("message %s, err %v", message, err)
if err != nil {
return nil, err
}
Expand Down
36 changes: 19 additions & 17 deletions mq/api/mq/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
// KeyValueStore 是一个简单的键值存储结构,支持多值
type KeyValueStore struct {
data map[string][]string
mu sync.Mutex
mu sync.RWMutex
cond *sync.Cond
}

Expand All @@ -17,7 +17,7 @@ func NewKeyValueStore() *KeyValueStore {
kv := &KeyValueStore{
data: make(map[string][]string),
}
kv.cond = sync.NewCond(&kv.mu)
kv.cond = sync.NewCond(kv.mu.RLocker()) // 使用读锁初始化条件变量
return kv
}

Expand All @@ -37,26 +37,28 @@ func (kv *KeyValueStore) Put(key, value string) {

// Get 根据键获取第一个值并删除该键值对中的第一个值
func (kv *KeyValueStore) Get(key string) (string, bool) {
kv.mu.Lock()
defer kv.mu.Unlock()
// 如果值列表为空,等待通知或超时
timeout := time.Now().Add(5 * time.Second)
kv.mu.RLock() // 使用读锁开始
defer kv.mu.RUnlock()

timeout := time.After(5 * time.Second) // 预先创建一个定时器,避免高频创建
for len(kv.data[key]) == 0 {
kv.mu.RUnlock() // 在等待前释放读锁
select {
case <-time.After(timeout.Sub(time.Now())):
case <-timeout:
return "", false // 超时后返回空值
default:
waitChan := make(chan struct{})
kv.cond.L.Unlock()
go func() {
kv.cond.L.Lock()
close(waitChan)
}()
<-waitChan
kv.cond.L.Lock() // 锁住条件变量
kv.cond.Wait() // 等待条件变量通知
kv.cond.L.Unlock() // 释放条件变量的锁
}
kv.mu.RLock() // 重新获得读锁
}

// 获取值列表
// 获取值列表并切换到写锁模式
kv.mu.RUnlock()
kv.mu.Lock()
defer kv.mu.Unlock()

values, ok := kv.data[key]
if ok && len(values) > 0 {
// 获取第一个值并从切片中删除
Expand All @@ -74,8 +76,8 @@ func (kv *KeyValueStore) Get(key string) (string, bool) {

// Size 返回特定键的值列表大小
func (kv *KeyValueStore) Size(topic string) int64 {
kv.mu.Lock()
defer kv.mu.Unlock()
kv.mu.RLock()
defer kv.mu.RUnlock()

return int64(len(kv.data[topic]))
}

0 comments on commit 6431a2d

Please sign in to comment.