Skip to content

Commit

Permalink
refactor: refactor QueuedMessage to TaskMessage across the codebase
Browse files Browse the repository at this point in the history
- Rename `QueuedMessage` to `TaskMessage` throughout the codebase
- Change method `Bytes()` to `Payload()` for `TaskMessage`
- Update comments to reflect the new `TaskMessage` interface
- Add a new mock file `mock_task_message.go` for the `TaskMessage` interface
- Rename `mock_message.go` to `mock_queued_message.go`
- Update test cases to use `TaskMessage` instead of `QueuedMessage`
- Remove unused import `github.com/golang-queue/queue/job` from `ring.go`
- Comment out the `Data` struct and its methods in `ring.go`

Signed-off-by: Bo-Yi Wu <[email protected]>
  • Loading branch information
appleboy committed Jan 20, 2025
1 parent cc471ea commit 05b70f2
Show file tree
Hide file tree
Showing 13 changed files with 152 additions and 77 deletions.
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ func main() {
rets := make(chan string, taskN)

// initial queue pool
q := queue.NewPool(5, queue.WithFn(func(ctx context.Context, m core.QueuedMessage) error {
q := queue.NewPool(5, queue.WithFn(func(ctx context.Context, m core.TaskMessage) error {
v, ok := m.(*job)
if !ok {
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
if err := json.Unmarshal(m.Payload(), &v); err != nil {
return err
}
}
Expand Down Expand Up @@ -207,10 +207,10 @@ func main() {
nsq.WithChannel("foobar"),
// concurrent job number
nsq.WithMaxInFlight(10),
nsq.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
nsq.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
v, ok := m.(*job)
if !ok {
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
if err := json.Unmarshal(m.Payload(), &v); err != nil {
return err
}
}
Expand Down Expand Up @@ -286,10 +286,10 @@ func main() {
nats.WithAddr("127.0.0.1:4222"),
nats.WithSubj("example"),
nats.WithQueue("foobar"),
nats.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
nats.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
v, ok := m.(*job)
if !ok {
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
if err := json.Unmarshal(m.Payload(), &v); err != nil {
return err
}
}
Expand Down Expand Up @@ -370,10 +370,10 @@ func main() {
w := redisdb.NewWorker(
redisdb.WithAddr("127.0.0.1:6379"),
redisdb.WithChannel("foobar"),
redisdb.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
redisdb.WithRunFunc(func(ctx context.Context, m core.TaskMessage) error {
v, ok := m.(*job)
if !ok {
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
if err := json.Unmarshal(m.Payload(), &v); err != nil {
return err
}
}
Expand Down
4 changes: 2 additions & 2 deletions _example/example02/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ func main() {
rets := make(chan string, taskN)

// initial queue pool
q := queue.NewPool(5, queue.WithFn(func(ctx context.Context, m core.QueuedMessage) error {
q := queue.NewPool(5, queue.WithFn(func(ctx context.Context, m core.TaskMessage) error {
v, ok := m.(*job)
if !ok {
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
if err := json.Unmarshal(m.Payload(), &v); err != nil {
return err
}
}
Expand Down
8 changes: 4 additions & 4 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
var count = 1

type testqueue interface {
Queue(task core.QueuedMessage) error
Request() (core.QueuedMessage, error)
Queue(task core.TaskMessage) error
Request() (core.TaskMessage, error)
}

func testQueue(b *testing.B, pool testqueue) {
Expand Down Expand Up @@ -94,7 +94,7 @@ func BenchmarkQueue(b *testing.B) {
// Payload: []byte(`{"timeout":3600000000000}`),
// }
// w := NewRing(
// WithFn(func(ctx context.Context, m core.QueuedMessage) error {
// WithFn(func(ctx context.Context, m core.TaskMessage) error {
// return nil
// }),
// )
Expand All @@ -119,7 +119,7 @@ func BenchmarkRingWithTask(b *testing.B) {
},
}
w := NewRing(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
WithFn(func(ctx context.Context, m core.TaskMessage) error {
return nil
}),
)
Expand Down
12 changes: 7 additions & 5 deletions core/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,24 @@ import (
"context"
)

// Worker represents a worker that processes queued messages.
// It provides methods to run the worker, shut it down, queue messages, and request messages from the queue.
// Worker represents an interface for a worker that processes tasks.
// It provides methods to run tasks, shut down the worker, queue tasks, and request tasks from the queue.
type Worker interface {
// Run starts the worker and processes the given task in the provided context.
// It returns an error if the task cannot be processed.
Run(ctx context.Context, task QueuedMessage) error
Run(ctx context.Context, task TaskMessage) error

// Shutdown stops the worker and performs any necessary cleanup.
// It returns an error if the shutdown process fails.
Shutdown() error

// Queue adds a task to the worker's queue.
// It returns an error if the task cannot be added to the queue.
Queue(task QueuedMessage) error
Queue(task TaskMessage) error

// Request retrieves a task from the worker's queue.
// It returns the queued message and an error if the retrieval fails.
Request() (QueuedMessage, error)
Request() (TaskMessage, error)
}

// QueuedMessage represents an interface for a message that can be queued.
Expand All @@ -31,6 +31,8 @@ type QueuedMessage interface {
Bytes() []byte
}

// TaskMessage represents an interface for a task message that can be queued.
// It embeds the QueuedMessage interface and adds a method to retrieve the payload of the message.
type TaskMessage interface {
QueuedMessage
Payload() []byte
Expand Down
4 changes: 2 additions & 2 deletions metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (

func TestMetricData(t *testing.T) {
w := NewRing(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
switch string(m.Bytes()) {
WithFn(func(ctx context.Context, m core.TaskMessage) error {
switch string(m.Payload()) {
case "foo1":
panic("missing something")
case "foo2":
Expand Down
2 changes: 1 addition & 1 deletion mocks/mock_message.go → mocks/mock_queued_message.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

67 changes: 67 additions & 0 deletions mocks/mock_task_message.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions mocks/mock_worker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion mocks/mocks.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package mocks

//go:generate mockgen -package=mocks -destination=mock_worker.go github.com/golang-queue/queue/core Worker
//go:generate mockgen -package=mocks -destination=mock_message.go github.com/golang-queue/queue/core QueuedMessage
//go:generate mockgen -package=mocks -destination=mock_queued_message.go github.com/golang-queue/queue/core QueuedMessage
//go:generate mockgen -package=mocks -destination=mock_task_message.go github.com/golang-queue/queue/core TaskMessage
6 changes: 3 additions & 3 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ var (
defaultCapacity = 0
defaultWorkerCount = int64(runtime.NumCPU())
defaultNewLogger = NewLogger()
defaultFn = func(context.Context, core.QueuedMessage) error { return nil }
defaultFn = func(context.Context, core.TaskMessage) error { return nil }
defaultMetric = NewMetric()
)

Expand Down Expand Up @@ -67,7 +67,7 @@ func WithWorker(w core.Worker) Option {
}

// WithFn set custom job function
func WithFn(fn func(context.Context, core.QueuedMessage) error) Option {
func WithFn(fn func(context.Context, core.TaskMessage) error) Option {
return OptionFunc(func(q *Options) {
q.fn = fn
})
Expand All @@ -86,7 +86,7 @@ type Options struct {
logger Logger
queueSize int
worker core.Worker
fn func(context.Context, core.QueuedMessage) error
fn func(context.Context, core.TaskMessage) error
afterFn func()
metric Metric
}
Expand Down
17 changes: 11 additions & 6 deletions queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ func (m mockMessage) Bytes() []byte {
return bytesconv.StrToBytes(m.message)
}

func (m mockMessage) Payload() []byte {
return bytesconv.StrToBytes(m.message)
}

func TestNewQueueWithZeroWorker(t *testing.T) {
controller := gomock.NewController(t)
defer controller.Finish()
Expand Down Expand Up @@ -61,8 +65,9 @@ func TestNewQueueWithDefaultWorker(t *testing.T) {
assert.Nil(t, q)

w := mocks.NewMockWorker(controller)
m := mocks.NewMockQueuedMessage(controller)
m := mocks.NewMockTaskMessage(controller)
m.EXPECT().Bytes().Return([]byte("test")).AnyTimes()
m.EXPECT().Payload().Return([]byte("test")).AnyTimes()
w.EXPECT().Shutdown().Return(nil)
w.EXPECT().Request().Return(m, nil).AnyTimes()
w.EXPECT().Run(context.Background(), m).Return(nil).AnyTimes()
Expand All @@ -83,7 +88,7 @@ func TestHandleTimeout(t *testing.T) {
Body: []byte("foo"),
}
w := NewRing(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
WithFn(func(ctx context.Context, m core.TaskMessage) error {
time.Sleep(200 * time.Millisecond)
return nil
}),
Expand Down Expand Up @@ -115,7 +120,7 @@ func TestJobComplete(t *testing.T) {
Body: []byte("foo"),
}
w := NewRing(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
WithFn(func(ctx context.Context, m core.TaskMessage) error {
return errors.New("job completed")
}),
)
Expand All @@ -136,7 +141,7 @@ func TestJobComplete(t *testing.T) {
}

w = NewRing(
WithFn(func(ctx context.Context, m core.QueuedMessage) error {
WithFn(func(ctx context.Context, m core.TaskMessage) error {
time.Sleep(200 * time.Millisecond)
return errors.New("job completed")
}),
Expand Down Expand Up @@ -196,11 +201,11 @@ func TestMockWorkerAndMessage(t *testing.T) {
controller := gomock.NewController(t)
defer controller.Finish()

m := mocks.NewMockQueuedMessage(controller)
m := mocks.NewMockTaskMessage(controller)

w := mocks.NewMockWorker(controller)
w.EXPECT().Shutdown().Return(nil)
w.EXPECT().Request().DoAndReturn(func() (core.QueuedMessage, error) {
w.EXPECT().Request().DoAndReturn(func() (core.TaskMessage, error) {
return m, errors.New("nil")
})

Expand Down
Loading

0 comments on commit 05b70f2

Please sign in to comment.