Skip to content

Commit

Permalink
Add support to publish to exchange
Browse files Browse the repository at this point in the history
Add support for binding queues to exchanges
  • Loading branch information
jvanecek committed Sep 3, 2024
1 parent 28fe5ab commit 9cd7033
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 60 deletions.
83 changes: 72 additions & 11 deletions source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ RabbitMQClientTest >> assertQueueStatusAfterPublishing: messagesToSend on: queue

queue := publisher channel declareQueueApplying: [ :builder | builder name: queueName ].

publisher publishOnly: messagesToSend onQueueNamed: queueName.
publisher publish: messagesToSend toQueue: queueName.

self wait.

Expand Down Expand Up @@ -199,6 +199,38 @@ RabbitMQClientTest >> runMemoryLoggerDuring: aBlock assertingLogRecordsMatchRege
assertLogRecordsMatchUsing: expectedLogEntriesWithTimestamp
]

{ #category : 'running' }
RabbitMQClientTest >> runWorkerNamed: aName consumingFrom: aQueueName bindedTo: anExchange routedBy: aRoutingKey doing: aProcessingBlock during: aBlock [

| process |
process := [
| worker |
worker := RabbitMQWorker
configuredBy: [ :options |
options
at: #hostname put: 'localhost';
at: #queueName put: aQueueName;
at: #extraClientProperties put: ( Dictionary new
at: 'process' put: aName;
yourself )
]
processingPayloadWith: aProcessingBlock.
worker bindQueueTo: anExchange routedBy: aRoutingKey.
[ worker start ] ensure: [ worker stop ]
] newProcess.
process
name: aName;
priority: Processor userBackgroundPriority.

[
process resume.
Processor yield.
self wait.
aBlock value.
self wait
] ensure: [ process terminate ]
]

{ #category : 'running' }
RabbitMQClientTest >> setUp [

Expand Down Expand Up @@ -259,7 +291,7 @@ RabbitMQClientTest >> testDebuggingLogsEnabled [
self
runMemoryLoggerDuring: [
anotherPublisher start.
anotherPublisher publishOnly: 'Hello!' onQueueNamed: self queueName.
anotherPublisher publish: 'Hello!' toQueue: self queueName.
anotherPublisher stop
]
assertingLogRecordsMatchRegexes:
Expand Down Expand Up @@ -295,7 +327,7 @@ RabbitMQClientTest >> testDebuggingLogsTurnedOff [
self
runMemoryLoggerDuring: [
anotherPublisher start.
anotherPublisher publishOnly: 'Hello!' onQueueNamed: self queueName.
anotherPublisher publish: 'Hello!' toQueue: self queueName.
anotherPublisher stop
]
assertingLogRecordsMatchRegexes:
Expand All @@ -321,8 +353,8 @@ RabbitMQClientTest >> testPublishingMessageWhenClientUnexpectedlyClosesConnectio

self resumeWorkerDuring: [
publisher
publishOnly: 'Hello' onQueueNamed: self queueName;
publishOnly: 'World' onQueueNamed: self queueName.
publish: 'Hello' toQueue: self queueName;
publish: 'World' toQueue: self queueName.

self wait.

Expand All @@ -336,7 +368,7 @@ RabbitMQClientTest >> testPublishingMessageWhenClientUnexpectedlyClosesConnectio
self
closeAllConnectionsFromTheClientSide;
waitUntilAllRabbitMQConnectionsClose.
publisher publishOnly: 'Test connection restored' onQueueNamed: self queueName
publisher publish: 'Test connection restored' toQueue: self queueName
]
assertingLogRecordsMatchRegexes:
{ '\[INFO\] AMQP connection localhost\:(\d+)->localhost\:5672 closed due to (Connection close while waiting for data.|primitive #primSocketSendDone\: in Socket failed|primitive #primSocketReceiveDataAvailable\: in Socket failed)' .
Expand All @@ -356,8 +388,8 @@ RabbitMQClientTest >> testPublishingMessageWhenConnectionIsTemporallyLost [

self resumeWorkerDuring: [
publisher
publishOnly: 'Hello' onQueueNamed: self queueName;
publishOnly: 'World' onQueueNamed: self queueName.
publish: 'Hello' toQueue: self queueName;
publish: 'World' toQueue: self queueName.

self wait.

Expand All @@ -371,7 +403,7 @@ RabbitMQClientTest >> testPublishingMessageWhenConnectionIsTemporallyLost [
self
closeAllConnectionsFromTheBrokerSide;
waitUntilAllRabbitMQConnectionsClose.
publisher publishOnly: 'Test connection restored' onQueueNamed: self queueName
publisher publish: 'Test connection restored' toQueue: self queueName
]
assertingLogRecordsMatchRegexes:
{ '\[INFO\] AMQP connection localhost\:(\d+)->localhost\:5672 closed due to CONNECTION_FORCED - CloseConnectionsTest' .
Expand All @@ -390,7 +422,7 @@ RabbitMQClientTest >> testPublishingMessageWhenConnectionIsTemporallyLost [
{ #category : 'tests' }
RabbitMQClientTest >> testPublishingMessages [

self resumeWorkerDuring: [ publisher publish: #( 'Hello' 'World' ) onQueueNamed: self queueName ].
self resumeWorkerDuring: [ publisher publishAll: #( 'Hello' 'World' ) toQueue: self queueName ].

self
assert: reversedTexts size equals: 2;
Expand All @@ -401,13 +433,42 @@ RabbitMQClientTest >> testPublishingMessages [
{ #category : 'tests' }
RabbitMQClientTest >> testPublishingOneMessage [

self resumeWorkerDuring: [ publisher publishOnly: 'Hello' onQueueNamed: self queueName ].
self resumeWorkerDuring: [ publisher publish: 'Hello' toQueue: self queueName ].

self
withTheOnlyOneIn: reversedTexts
do: [ :reversedText | self assert: reversedText equals: 'olleH' ]
]

{ #category : 'tests' }
RabbitMQClientTest >> testPublishingToFanoutExchange [

| routingKey firstWorkerMessages secondWorkerMessages |
routingKey := self queueName.
firstWorkerMessages := OrderedCollection new.
secondWorkerMessages := OrderedCollection new.

self
runWorkerNamed: 'Reverser'
consumingFrom: 'reverser-queue'
bindedTo: 'amq.fanout'
routedBy: routingKey
doing: [ :message | firstWorkerMessages add: message reversed ]
during: [
self
runWorkerNamed: 'Appender'
consumingFrom: 'appender-queue'
bindedTo: 'amq.fanout'
routedBy: routingKey
doing: [ :message | secondWorkerMessages add: message ]
during: [ publisher publish: 'Hello' toExchange: 'amq.fanout' usingRoutingKey: routingKey ]
].

self
withTheOnlyOneIn: firstWorkerMessages do: [ :text | self assert: text equals: 'olleH' ];
withTheOnlyOneIn: secondWorkerMessages do: [ :text | self assert: text equals: 'Hello' ]
]

{ #category : 'private - support' }
RabbitMQClientTest >> wait [

Expand Down
7 changes: 7 additions & 0 deletions source/Ansible-RabbitMQ/Amqp091BasicProperties.extension.st
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Extension { #name : 'Amqp091BasicProperties' }

{ #category : '*Ansible-RabbitMQ' }
Amqp091BasicProperties >> bePersistent [

self deliveryMode: 2
]
57 changes: 40 additions & 17 deletions source/Ansible-RabbitMQ/RabbitMQPublisher.class.st
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ RabbitMQPublisher class >> configuredBy: aConfigurationAction [
^ self new initializeConfiguredBy: options
]

{ #category : 'private - accessing' }
RabbitMQPublisher >> directExchange [

^ ''
]

{ #category : 'initialization' }
RabbitMQPublisher >> ensureChannelOpen [

Expand All @@ -42,13 +48,14 @@ RabbitMQPublisher >> initializeConfiguredBy: anOptionsDictionary [
]

{ #category : 'private - logging' }
RabbitMQPublisher >> logDebuggingInfoFor: aMessage publishedTo: aQueueName [
RabbitMQPublisher >> logDebuggingInfoFor: aMessage publishedTo: anExchange using: aRoutingKey [

self shouldLogDebuggingInfo then: [
LogRecord emitStructuredDebuggingInfo: 'RabbitMQ message published' with: [ :data |
data at: #messagePublished put: aMessage.
anExchange = self directExchange ifFalse: [ data at: #exchange put: anExchange ].
data
at: #messagePublished put: aMessage;
at: #routingKey put: aQueueName;
at: #routingKey put: aRoutingKey;
at: #connectionDescription put: connection connectionPairsDescription
]
]
Expand All @@ -61,30 +68,34 @@ RabbitMQPublisher >> onPublicationConfirmationDo: anAckBlock onRejectionDo: aNac
theChannel onPublicationConfirmationDo: anAckBlock onRejectionDo: aNackBlock ]
]

{ #category : 'private - configuring' }
RabbitMQPublisher >> persistentDeliveryMode [

^ connection protocolClass basicPropertiesClass new deliveryMode: 2
]

{ #category : 'publishing' }
RabbitMQPublisher >> publish: aMessageCollection onQueueNamed: aQueueName [
RabbitMQPublisher >> publish: aMessage toExchange: anExchangeName usingRoutingKey: aRoutingKey [

aMessageCollection do: [:message | self publishOnly: message onQueueNamed: aQueueName]
self
publish: aMessage
toExchange: anExchangeName
usingRoutingKey: aRoutingKey
configuredWith: [ :properties | properties bePersistent ]
]

{ #category : 'publishing' }
RabbitMQPublisher >> publishOnly: aMessage onQueueNamed: aQueueName [
RabbitMQPublisher >> publish: aMessage toExchange: anExchangeName usingRoutingKey: aRoutingKey configuredWith: aConfigurationBlock [

| properties tryToPublishMessage |
properties := connection protocolClass basicPropertiesClass new.
aConfigurationBlock value: properties.

| tryToPublishMessage |
tryToPublishMessage := [
self ensureChannelOpen.
channel
basicPublish: aMessage utf8Encoded
exchange: ''
routingKey: aQueueName
properties: self persistentDeliveryMode.
self logDebuggingInfoFor: aMessage publishedTo: aQueueName
exchange: anExchangeName
routingKey: aRoutingKey
properties: properties.
self
logDebuggingInfoFor: aMessage
publishedTo: anExchangeName
using: aRoutingKey
].

self try: tryToPublishMessage onConnectivityErrorDo: [ :attemptNumber :error |
Expand All @@ -93,6 +104,18 @@ RabbitMQPublisher >> publishOnly: aMessage onQueueNamed: aQueueName [
]
]

{ #category : 'publishing' }
RabbitMQPublisher >> publish: aMessage toQueue: aQueueName [

self publish: aMessage toExchange: self directExchange usingRoutingKey: aQueueName
]

{ #category : 'publishing' }
RabbitMQPublisher >> publishAll: aMessageCollection toQueue: aQueueName [

aMessageCollection do: [:message | self publish: message toQueue: aQueueName]
]

{ #category : 'connecting' }
RabbitMQPublisher >> start [

Expand Down
12 changes: 11 additions & 1 deletion source/Ansible-RabbitMQ/RabbitMQWorker.class.st
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ RabbitMQWorker class >> configuredBy: aConfigurationAction processingPayloadWith
processingMessagesWith: [ :message | aPayloadProcessor value: message body ]
]

{ #category : 'Configuring' }
RabbitMQWorker >> bindQueueTo: anExchange routedBy: aRoutingKey [

self
ensureChannelOpen;
declareQueueInChannel.

channel queueBind: self queueName exchange: anExchange routingKey: aRoutingKey
]

{ #category : 'private' }
RabbitMQWorker >> declareQueueInChannel [

Expand Down Expand Up @@ -78,7 +88,7 @@ RabbitMQWorker >> makeQueueDurable [
^ options at: #queueDurable ifAbsent: [ true ]
]

{ #category : 'accessing' }
{ #category : 'private' }
RabbitMQWorker >> queueName [

^ options at: #queueName
Expand Down
62 changes: 31 additions & 31 deletions source/Ansible-Tests/AMQPTest.class.st
Original file line number Diff line number Diff line change
Expand Up @@ -58,34 +58,6 @@ AMQPTest >> publish: aMessageCollection onExchangeNamed: anExchangeName of: anEx
of: anExchangeType
]

{ #category : 'tests - support' }
AMQPTest >> publish: aMessageCollection onQueueNamed: aQueueName [

self
withLocalhostConnectionDo: [ :connection |
| channel queue |

channel := connection createChannel.
channel declareQueueApplying: [ :builder | builder name: aQueueName ].

aMessageCollection
do: [ :message |
channel
basicPublish: message utf8Encoded
exchange: ''
routingKey: aQueueName
properties: ( connection protocolClass basicPropertiesClass new deliveryMode: 2 )
].

( Delay forMilliseconds: 100 ) wait.
queue := channel declareQueueApplying: [ :builder | builder name: aQueueName ].

self
assert: queue method messageCount equals: aMessageCollection size;
assert: queue method consumerCount equals: 0
]
]

{ #category : 'tests - support' }
AMQPTest >> publish: aMessageCollection to: aRoute onExchangeNamed: anExchangeName of: anExchangeType [

Expand Down Expand Up @@ -144,6 +116,34 @@ AMQPTest >> publish: aMessageCollection with: aHeadersDictionary onHeadersExchan
withProperties: ( connection protocolClass basicPropertiesClass new headers: aHeadersDictionary )]
]

{ #category : 'tests - support' }
AMQPTest >> publishAll: aMessageCollection toQueue: aQueueName [

self
withLocalhostConnectionDo: [ :connection |
| channel queue |

channel := connection createChannel.
channel declareQueueApplying: [ :builder | builder name: aQueueName ].

aMessageCollection
do: [ :message |
channel
basicPublish: message utf8Encoded
exchange: ''
routingKey: aQueueName
properties: ( connection protocolClass basicPropertiesClass new deliveryMode: 2 )
].

( Delay forMilliseconds: 100 ) wait.
queue := channel declareQueueApplying: [ :builder | builder name: aQueueName ].

self
assert: queue method messageCount equals: aMessageCollection size;
assert: queue method consumerCount equals: 0
]
]

{ #category : 'tests - support' }
AMQPTest >> setUp [

Expand Down Expand Up @@ -311,7 +311,7 @@ AMQPTest >> testBasicConsume [

| channel queue |

self publish: #('Do it!') onQueueNamed: 'tasks'.
self publishAll: #('Do it!') toQueue: 'tasks'.

self
withLocalhostConnectionDo: [ :connection |
Expand Down Expand Up @@ -351,8 +351,8 @@ AMQPTest >> testBasicConsumeWithMultipleWorkers [
secondWorkerMessages := OrderedCollection new.

self
publish: #('Do it!' 'Do it.!' 'Do it..!' 'Do it...!' 'Do it....!' 'Do it.....!')
onQueueNamed: 'tasks'.
publishAll: #('Do it!' 'Do it.!' 'Do it..!' 'Do it...!' 'Do it....!' 'Do it.....!')
toQueue: 'tasks'.

firstWorker := self
spawnWorkerNamed: 'first_worker'
Expand Down

0 comments on commit 9cd7033

Please sign in to comment.