Skip to content

Commit

Permalink
Remove deprecated HTTPMessageSender usage (#1176)
Browse files Browse the repository at this point in the history
  • Loading branch information
creydr authored Jul 23, 2023
1 parent 49c6bb3 commit 2ab06f3
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 42 deletions.
36 changes: 15 additions & 21 deletions pkg/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,12 @@ func NewEnvConfig() adapter.EnvConfigAccessor {
}

type Adapter struct {
config *adapterConfig
sink duckv1.Addressable
httpMessageSender *kncloudevents.HTTPMessageSender
reporter source.StatsReporter
logger *zap.Logger
context context.Context
rmqHelper rabbit.RabbitMQConnectionsHandlerInterface
config *adapterConfig
sink duckv1.Addressable
reporter source.StatsReporter
logger *zap.Logger
context context.Context
rmqHelper rabbit.RabbitMQConnectionsHandlerInterface
}

var _ adapter.MessageAdapter = (*Adapter)(nil)
Expand All @@ -77,17 +76,12 @@ var (
func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, sink duckv1.Addressable, reporter source.StatsReporter) adapter.MessageAdapter {
logger := logging.FromContext(ctx).Desugar()
config := processed.(*adapterConfig)
httpMessageSender, err := kncloudevents.NewHTTPMessageSenderWithTarget(sink.URL.String())
if err != nil {
logger.Sugar().Fatalf("Couldn't start message sender, %w", err)
}
return &Adapter{
config: config,
sink: sink,
httpMessageSender: httpMessageSender,
reporter: reporter,
logger: logger,
context: ctx,
config: config,
sink: sink,
reporter: reporter,
logger: logger,
context: ctx,
}
}

Expand Down Expand Up @@ -219,8 +213,8 @@ func (a *Adapter) processMessages(wg *sync.WaitGroup, queue <-chan amqp.Delivery
}

func (a *Adapter) postMessage(msg *amqp.Delivery) error {
a.logger.Info("target: " + a.httpMessageSender.Target)
req, err := a.httpMessageSender.NewCloudEventRequest(a.context)
a.logger.Info("target: " + a.sink.URL.String())
req, err := kncloudevents.NewCloudEventRequest(a.context, a.sink)
if err != nil {
return err
}
Expand All @@ -231,14 +225,14 @@ func (a *Adapter) postMessage(msg *amqp.Delivery) error {
a.config.Namespace,
a.config.QueueName,
msg,
req,
req.Request,
a.logger)
if err != nil {
a.logger.Error("error writing event to http", zap.Error(err))
return err
}

res, err := a.httpMessageSender.SendWithRetries(req, retryConfig)
res, err := req.SendWithRetries(retryConfig)
if err != nil {
a.logger.Error("error while sending the message", zap.Error(err))
return err
Expand Down
31 changes: 10 additions & 21 deletions pkg/adapter/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"knative.dev/eventing-rabbitmq/pkg/rabbit"
"knative.dev/eventing/pkg/adapter/v2"
v1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/metrics/source"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
Expand Down Expand Up @@ -125,10 +124,6 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
}
sinkServer := httptest.NewServer(h)
defer sinkServer.Close()
s, err := kncloudevents.NewHTTPMessageSenderWithTarget(sinkServer.URL)
if err != nil {
t.Fatal(err)
}

target, err := apis.ParseURL(sinkServer.URL)
if err != nil {
Expand All @@ -144,12 +139,11 @@ func TestPostMessage_ServeHTTP(t *testing.T) {
config = adapterConfig{Retry: tc.retry, BackoffPolicy: string(v1.BackoffPolicyLinear), BackoffDelay: "PT0.1S"}
}
a := &Adapter{
config: &config,
context: context.TODO(),
sink: sink,
httpMessageSender: s,
logger: zap.NewNop(),
reporter: statsReporter,
config: &config,
context: context.TODO(),
sink: sink,
logger: zap.NewNop(),
reporter: statsReporter,
}

data, err := json.Marshal(tc.data)
Expand Down Expand Up @@ -286,10 +280,6 @@ func TestAdapter_NewAdapter(t *testing.T) {
sinkServer := httptest.NewServer(h)
defer sinkServer.Close()

s, err := kncloudevents.NewHTTPMessageSenderWithTarget(sinkServer.URL)
if err != nil {
t.Fatal(err)
}
target, err := apis.ParseURL(sinkServer.URL)
if err != nil {
t.Fatal(err)
Expand All @@ -301,12 +291,11 @@ func TestAdapter_NewAdapter(t *testing.T) {
statsReporter, _ := source.NewStatsReporter()
a := NewAdapter(ctx, env, sink, statsReporter)
cmpA := &Adapter{
config: env.(*adapterConfig),
sink: sink,
httpMessageSender: s,
reporter: statsReporter,
logger: logging.FromContext(ctx).Desugar(),
context: ctx,
config: env.(*adapterConfig),
sink: sink,
reporter: statsReporter,
logger: logging.FromContext(ctx).Desugar(),
context: ctx,
}

if a == cmpA {
Expand Down

0 comments on commit 2ab06f3

Please sign in to comment.