From 9cd70331b7f7f5b747ab76e24ec7dc2813b86496 Mon Sep 17 00:00:00 2001 From: Juan Vanecek Date: Tue, 3 Sep 2024 13:09:50 -0300 Subject: [PATCH] Add support to publish to exchange Add support for binding queues to exchanges --- .../RabbitMQClientTest.class.st | 83 ++++++++++++++++--- .../Amqp091BasicProperties.extension.st | 7 ++ .../RabbitMQPublisher.class.st | 57 +++++++++---- .../Ansible-RabbitMQ/RabbitMQWorker.class.st | 12 ++- source/Ansible-Tests/AMQPTest.class.st | 62 +++++++------- 5 files changed, 161 insertions(+), 60 deletions(-) create mode 100644 source/Ansible-RabbitMQ/Amqp091BasicProperties.extension.st diff --git a/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st b/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st index c115d0a7..53940fa9 100644 --- a/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st +++ b/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st @@ -82,7 +82,7 @@ RabbitMQClientTest >> assertQueueStatusAfterPublishing: messagesToSend on: queue queue := publisher channel declareQueueApplying: [ :builder | builder name: queueName ]. - publisher publishOnly: messagesToSend onQueueNamed: queueName. + publisher publish: messagesToSend toQueue: queueName. self wait. @@ -199,6 +199,38 @@ RabbitMQClientTest >> runMemoryLoggerDuring: aBlock assertingLogRecordsMatchRege assertLogRecordsMatchUsing: expectedLogEntriesWithTimestamp ] +{ #category : 'running' } +RabbitMQClientTest >> runWorkerNamed: aName consumingFrom: aQueueName bindedTo: anExchange routedBy: aRoutingKey doing: aProcessingBlock during: aBlock [ + + | process | + process := [ + | worker | + worker := RabbitMQWorker + configuredBy: [ :options | + options + at: #hostname put: 'localhost'; + at: #queueName put: aQueueName; + at: #extraClientProperties put: ( Dictionary new + at: 'process' put: aName; + yourself ) + ] + processingPayloadWith: aProcessingBlock. + worker bindQueueTo: anExchange routedBy: aRoutingKey. + [ worker start ] ensure: [ worker stop ] + ] newProcess. + process + name: aName; + priority: Processor userBackgroundPriority. + + [ + process resume. + Processor yield. + self wait. + aBlock value. + self wait + ] ensure: [ process terminate ] +] + { #category : 'running' } RabbitMQClientTest >> setUp [ @@ -259,7 +291,7 @@ RabbitMQClientTest >> testDebuggingLogsEnabled [ self runMemoryLoggerDuring: [ anotherPublisher start. - anotherPublisher publishOnly: 'Hello!' onQueueNamed: self queueName. + anotherPublisher publish: 'Hello!' toQueue: self queueName. anotherPublisher stop ] assertingLogRecordsMatchRegexes: @@ -295,7 +327,7 @@ RabbitMQClientTest >> testDebuggingLogsTurnedOff [ self runMemoryLoggerDuring: [ anotherPublisher start. - anotherPublisher publishOnly: 'Hello!' onQueueNamed: self queueName. + anotherPublisher publish: 'Hello!' toQueue: self queueName. anotherPublisher stop ] assertingLogRecordsMatchRegexes: @@ -321,8 +353,8 @@ RabbitMQClientTest >> testPublishingMessageWhenClientUnexpectedlyClosesConnectio self resumeWorkerDuring: [ publisher - publishOnly: 'Hello' onQueueNamed: self queueName; - publishOnly: 'World' onQueueNamed: self queueName. + publish: 'Hello' toQueue: self queueName; + publish: 'World' toQueue: self queueName. self wait. @@ -336,7 +368,7 @@ RabbitMQClientTest >> testPublishingMessageWhenClientUnexpectedlyClosesConnectio self closeAllConnectionsFromTheClientSide; waitUntilAllRabbitMQConnectionsClose. - publisher publishOnly: 'Test connection restored' onQueueNamed: self queueName + publisher publish: 'Test connection restored' toQueue: self queueName ] assertingLogRecordsMatchRegexes: { '\[INFO\] AMQP connection localhost\:(\d+)->localhost\:5672 closed due to (Connection close while waiting for data.|primitive #primSocketSendDone\: in Socket failed|primitive #primSocketReceiveDataAvailable\: in Socket failed)' . @@ -356,8 +388,8 @@ RabbitMQClientTest >> testPublishingMessageWhenConnectionIsTemporallyLost [ self resumeWorkerDuring: [ publisher - publishOnly: 'Hello' onQueueNamed: self queueName; - publishOnly: 'World' onQueueNamed: self queueName. + publish: 'Hello' toQueue: self queueName; + publish: 'World' toQueue: self queueName. self wait. @@ -371,7 +403,7 @@ RabbitMQClientTest >> testPublishingMessageWhenConnectionIsTemporallyLost [ self closeAllConnectionsFromTheBrokerSide; waitUntilAllRabbitMQConnectionsClose. - publisher publishOnly: 'Test connection restored' onQueueNamed: self queueName + publisher publish: 'Test connection restored' toQueue: self queueName ] assertingLogRecordsMatchRegexes: { '\[INFO\] AMQP connection localhost\:(\d+)->localhost\:5672 closed due to CONNECTION_FORCED - CloseConnectionsTest' . @@ -390,7 +422,7 @@ RabbitMQClientTest >> testPublishingMessageWhenConnectionIsTemporallyLost [ { #category : 'tests' } RabbitMQClientTest >> testPublishingMessages [ - self resumeWorkerDuring: [ publisher publish: #( 'Hello' 'World' ) onQueueNamed: self queueName ]. + self resumeWorkerDuring: [ publisher publishAll: #( 'Hello' 'World' ) toQueue: self queueName ]. self assert: reversedTexts size equals: 2; @@ -401,13 +433,42 @@ RabbitMQClientTest >> testPublishingMessages [ { #category : 'tests' } RabbitMQClientTest >> testPublishingOneMessage [ - self resumeWorkerDuring: [ publisher publishOnly: 'Hello' onQueueNamed: self queueName ]. + self resumeWorkerDuring: [ publisher publish: 'Hello' toQueue: self queueName ]. self withTheOnlyOneIn: reversedTexts do: [ :reversedText | self assert: reversedText equals: 'olleH' ] ] +{ #category : 'tests' } +RabbitMQClientTest >> testPublishingToFanoutExchange [ + + | routingKey firstWorkerMessages secondWorkerMessages | + routingKey := self queueName. + firstWorkerMessages := OrderedCollection new. + secondWorkerMessages := OrderedCollection new. + + self + runWorkerNamed: 'Reverser' + consumingFrom: 'reverser-queue' + bindedTo: 'amq.fanout' + routedBy: routingKey + doing: [ :message | firstWorkerMessages add: message reversed ] + during: [ + self + runWorkerNamed: 'Appender' + consumingFrom: 'appender-queue' + bindedTo: 'amq.fanout' + routedBy: routingKey + doing: [ :message | secondWorkerMessages add: message ] + during: [ publisher publish: 'Hello' toExchange: 'amq.fanout' usingRoutingKey: routingKey ] + ]. + + self + withTheOnlyOneIn: firstWorkerMessages do: [ :text | self assert: text equals: 'olleH' ]; + withTheOnlyOneIn: secondWorkerMessages do: [ :text | self assert: text equals: 'Hello' ] +] + { #category : 'private - support' } RabbitMQClientTest >> wait [ diff --git a/source/Ansible-RabbitMQ/Amqp091BasicProperties.extension.st b/source/Ansible-RabbitMQ/Amqp091BasicProperties.extension.st new file mode 100644 index 00000000..8c12e7eb --- /dev/null +++ b/source/Ansible-RabbitMQ/Amqp091BasicProperties.extension.st @@ -0,0 +1,7 @@ +Extension { #name : 'Amqp091BasicProperties' } + +{ #category : '*Ansible-RabbitMQ' } +Amqp091BasicProperties >> bePersistent [ + + self deliveryMode: 2 +] diff --git a/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st b/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st index 0a88dd35..805e2a8b 100644 --- a/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st +++ b/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st @@ -26,6 +26,12 @@ RabbitMQPublisher class >> configuredBy: aConfigurationAction [ ^ self new initializeConfiguredBy: options ] +{ #category : 'private - accessing' } +RabbitMQPublisher >> directExchange [ + + ^ '' +] + { #category : 'initialization' } RabbitMQPublisher >> ensureChannelOpen [ @@ -42,13 +48,14 @@ RabbitMQPublisher >> initializeConfiguredBy: anOptionsDictionary [ ] { #category : 'private - logging' } -RabbitMQPublisher >> logDebuggingInfoFor: aMessage publishedTo: aQueueName [ +RabbitMQPublisher >> logDebuggingInfoFor: aMessage publishedTo: anExchange using: aRoutingKey [ self shouldLogDebuggingInfo then: [ LogRecord emitStructuredDebuggingInfo: 'RabbitMQ message published' with: [ :data | + data at: #messagePublished put: aMessage. + anExchange = self directExchange ifFalse: [ data at: #exchange put: anExchange ]. data - at: #messagePublished put: aMessage; - at: #routingKey put: aQueueName; + at: #routingKey put: aRoutingKey; at: #connectionDescription put: connection connectionPairsDescription ] ] @@ -61,30 +68,34 @@ RabbitMQPublisher >> onPublicationConfirmationDo: anAckBlock onRejectionDo: aNac theChannel onPublicationConfirmationDo: anAckBlock onRejectionDo: aNackBlock ] ] -{ #category : 'private - configuring' } -RabbitMQPublisher >> persistentDeliveryMode [ - - ^ connection protocolClass basicPropertiesClass new deliveryMode: 2 -] - { #category : 'publishing' } -RabbitMQPublisher >> publish: aMessageCollection onQueueNamed: aQueueName [ +RabbitMQPublisher >> publish: aMessage toExchange: anExchangeName usingRoutingKey: aRoutingKey [ - aMessageCollection do: [:message | self publishOnly: message onQueueNamed: aQueueName] + self + publish: aMessage + toExchange: anExchangeName + usingRoutingKey: aRoutingKey + configuredWith: [ :properties | properties bePersistent ] ] { #category : 'publishing' } -RabbitMQPublisher >> publishOnly: aMessage onQueueNamed: aQueueName [ +RabbitMQPublisher >> publish: aMessage toExchange: anExchangeName usingRoutingKey: aRoutingKey configuredWith: aConfigurationBlock [ + + | properties tryToPublishMessage | + properties := connection protocolClass basicPropertiesClass new. + aConfigurationBlock value: properties. - | tryToPublishMessage | tryToPublishMessage := [ self ensureChannelOpen. channel basicPublish: aMessage utf8Encoded - exchange: '' - routingKey: aQueueName - properties: self persistentDeliveryMode. - self logDebuggingInfoFor: aMessage publishedTo: aQueueName + exchange: anExchangeName + routingKey: aRoutingKey + properties: properties. + self + logDebuggingInfoFor: aMessage + publishedTo: anExchangeName + using: aRoutingKey ]. self try: tryToPublishMessage onConnectivityErrorDo: [ :attemptNumber :error | @@ -93,6 +104,18 @@ RabbitMQPublisher >> publishOnly: aMessage onQueueNamed: aQueueName [ ] ] +{ #category : 'publishing' } +RabbitMQPublisher >> publish: aMessage toQueue: aQueueName [ + + self publish: aMessage toExchange: self directExchange usingRoutingKey: aQueueName +] + +{ #category : 'publishing' } +RabbitMQPublisher >> publishAll: aMessageCollection toQueue: aQueueName [ + + aMessageCollection do: [:message | self publish: message toQueue: aQueueName] +] + { #category : 'connecting' } RabbitMQPublisher >> start [ diff --git a/source/Ansible-RabbitMQ/RabbitMQWorker.class.st b/source/Ansible-RabbitMQ/RabbitMQWorker.class.st index b08c2985..c52e9560 100644 --- a/source/Ansible-RabbitMQ/RabbitMQWorker.class.st +++ b/source/Ansible-RabbitMQ/RabbitMQWorker.class.st @@ -41,6 +41,16 @@ RabbitMQWorker class >> configuredBy: aConfigurationAction processingPayloadWith processingMessagesWith: [ :message | aPayloadProcessor value: message body ] ] +{ #category : 'Configuring' } +RabbitMQWorker >> bindQueueTo: anExchange routedBy: aRoutingKey [ + + self + ensureChannelOpen; + declareQueueInChannel. + + channel queueBind: self queueName exchange: anExchange routingKey: aRoutingKey +] + { #category : 'private' } RabbitMQWorker >> declareQueueInChannel [ @@ -78,7 +88,7 @@ RabbitMQWorker >> makeQueueDurable [ ^ options at: #queueDurable ifAbsent: [ true ] ] -{ #category : 'accessing' } +{ #category : 'private' } RabbitMQWorker >> queueName [ ^ options at: #queueName diff --git a/source/Ansible-Tests/AMQPTest.class.st b/source/Ansible-Tests/AMQPTest.class.st index 33b12008..7bdacc72 100644 --- a/source/Ansible-Tests/AMQPTest.class.st +++ b/source/Ansible-Tests/AMQPTest.class.st @@ -58,34 +58,6 @@ AMQPTest >> publish: aMessageCollection onExchangeNamed: anExchangeName of: anEx of: anExchangeType ] -{ #category : 'tests - support' } -AMQPTest >> publish: aMessageCollection onQueueNamed: aQueueName [ - - self - withLocalhostConnectionDo: [ :connection | - | channel queue | - - channel := connection createChannel. - channel declareQueueApplying: [ :builder | builder name: aQueueName ]. - - aMessageCollection - do: [ :message | - channel - basicPublish: message utf8Encoded - exchange: '' - routingKey: aQueueName - properties: ( connection protocolClass basicPropertiesClass new deliveryMode: 2 ) - ]. - - ( Delay forMilliseconds: 100 ) wait. - queue := channel declareQueueApplying: [ :builder | builder name: aQueueName ]. - - self - assert: queue method messageCount equals: aMessageCollection size; - assert: queue method consumerCount equals: 0 - ] -] - { #category : 'tests - support' } AMQPTest >> publish: aMessageCollection to: aRoute onExchangeNamed: anExchangeName of: anExchangeType [ @@ -144,6 +116,34 @@ AMQPTest >> publish: aMessageCollection with: aHeadersDictionary onHeadersExchan withProperties: ( connection protocolClass basicPropertiesClass new headers: aHeadersDictionary )] ] +{ #category : 'tests - support' } +AMQPTest >> publishAll: aMessageCollection toQueue: aQueueName [ + + self + withLocalhostConnectionDo: [ :connection | + | channel queue | + + channel := connection createChannel. + channel declareQueueApplying: [ :builder | builder name: aQueueName ]. + + aMessageCollection + do: [ :message | + channel + basicPublish: message utf8Encoded + exchange: '' + routingKey: aQueueName + properties: ( connection protocolClass basicPropertiesClass new deliveryMode: 2 ) + ]. + + ( Delay forMilliseconds: 100 ) wait. + queue := channel declareQueueApplying: [ :builder | builder name: aQueueName ]. + + self + assert: queue method messageCount equals: aMessageCollection size; + assert: queue method consumerCount equals: 0 + ] +] + { #category : 'tests - support' } AMQPTest >> setUp [ @@ -311,7 +311,7 @@ AMQPTest >> testBasicConsume [ | channel queue | - self publish: #('Do it!') onQueueNamed: 'tasks'. + self publishAll: #('Do it!') toQueue: 'tasks'. self withLocalhostConnectionDo: [ :connection | @@ -351,8 +351,8 @@ AMQPTest >> testBasicConsumeWithMultipleWorkers [ secondWorkerMessages := OrderedCollection new. self - publish: #('Do it!' 'Do it.!' 'Do it..!' 'Do it...!' 'Do it....!' 'Do it.....!') - onQueueNamed: 'tasks'. + publishAll: #('Do it!' 'Do it.!' 'Do it..!' 'Do it...!' 'Do it....!' 'Do it.....!') + toQueue: 'tasks'. firstWorker := self spawnWorkerNamed: 'first_worker'