Skip to content

Commit

Permalink
Merge pull request cloudevents#1034 from embano1/ce-1275
Browse files Browse the repository at this point in the history
fix: support multiple amqp data fields
  • Loading branch information
embano1 authored Apr 22, 2024
2 parents 20bfd0a + 4cafcc3 commit 7774402
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 4 deletions.
23 changes: 19 additions & 4 deletions protocol/amqp/v2/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ func NewMessage(message *amqp.Message, receiver *amqp.Receiver) *Message {
return &Message{AMQP: message, AMQPrcv: receiver, format: fmt, version: vn}
}

var _ binding.Message = (*Message)(nil)
var _ binding.MessageMetadataReader = (*Message)(nil)
var (
_ binding.Message = (*Message)(nil)
_ binding.MessageMetadataReader = (*Message)(nil)
)

func getSpecVersion(message *amqp.Message) spec.Version {
if sv, ok := message.ApplicationProperties[specs.PrefixedSpecVersionName()]; ok {
Expand All @@ -74,7 +76,8 @@ func (m *Message) ReadEncoding() binding.Encoding {

func (m *Message) ReadStructured(ctx context.Context, encoder binding.StructuredWriter) error {
if m.format != nil {
return encoder.SetStructuredEvent(ctx, m.format, bytes.NewReader(m.AMQP.GetData()))
data := m.getAmqpData()
return encoder.SetStructuredEvent(ctx, m.format, bytes.NewReader(data))
}
return binding.ErrNotStructured
}
Expand Down Expand Up @@ -106,7 +109,7 @@ func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter)
}
}

data := m.AMQP.GetData()
data := m.getAmqpData()
if len(data) != 0 { // Some data
err = encoder.SetData(bytes.NewBuffer(data))
if err != nil {
Expand Down Expand Up @@ -137,3 +140,15 @@ func (m *Message) Finish(err error) error {
}
return m.AMQPrcv.AcceptMessage(context.Background(), m.AMQP)
}

// fixes: github.com/cloudevents/spec/issues/1275
func (m *Message) getAmqpData() []byte {
var data []byte
amqpData := m.AMQP.Data

// TODO: replace with slices.Concat once go mod bumped to 1.22
for idx := range amqpData {
data = append(data, amqpData[idx]...)
}
return data
}
55 changes: 55 additions & 0 deletions protocol/amqp/v2/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,58 @@ func TestNewMessage_message_unknown(t *testing.T) {
got := NewMessage(message, &rcv)
require.Equal(t, binding.EncodingUnknown, got.ReadEncoding())
}

func TestMessage_getAmqpData(t *testing.T) {
tests := []struct {
name string
message *amqp.Message
want []byte
}{
{
name: "nil data",
message: amqp.NewMessage(nil),
want: nil,
},
{
name: "empty string",
message: amqp.NewMessage([]byte(`""`)),
want: []byte(`""`),
},
{
name: "simple string",
message: amqp.NewMessage([]byte("hello world")),
want: []byte("hello world"),
},
{
name: "multiple data with simple strings",
message: &amqp.Message{Data: [][]byte{
[]byte("hello"),
[]byte(" "),
[]byte("world"),
}},
want: []byte("hello world"),
},
{
name: "multiple data to build JSON array",
message: &amqp.Message{Data: [][]byte{
[]byte("["),
[]byte("Foo"),
[]byte(","),
[]byte("Bar"),
[]byte(","),
[]byte("Baz"),
[]byte("]"),
}},
want: []byte("[Foo,Bar,Baz]"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
m := &Message{
AMQP: tt.message,
}
got := m.getAmqpData()
require.Equal(t, tt.want, got)
})
}
}

0 comments on commit 7774402

Please sign in to comment.