From 9cd70331b7f7f5b747ab76e24ec7dc2813b86496 Mon Sep 17 00:00:00 2001 From: Juan Vanecek Date: Tue, 3 Sep 2024 13:09:50 -0300 Subject: [PATCH 01/10] Add support to publish to exchange Add support for binding queues to exchanges --- .../RabbitMQClientTest.class.st | 83 ++++++++++++++++--- .../Amqp091BasicProperties.extension.st | 7 ++ .../RabbitMQPublisher.class.st | 57 +++++++++---- .../Ansible-RabbitMQ/RabbitMQWorker.class.st | 12 ++- source/Ansible-Tests/AMQPTest.class.st | 62 +++++++------- 5 files changed, 161 insertions(+), 60 deletions(-) create mode 100644 source/Ansible-RabbitMQ/Amqp091BasicProperties.extension.st diff --git a/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st b/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st index c115d0a7..53940fa9 100644 --- a/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st +++ b/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st @@ -82,7 +82,7 @@ RabbitMQClientTest >> assertQueueStatusAfterPublishing: messagesToSend on: queue queue := publisher channel declareQueueApplying: [ :builder | builder name: queueName ]. - publisher publishOnly: messagesToSend onQueueNamed: queueName. + publisher publish: messagesToSend toQueue: queueName. self wait. @@ -199,6 +199,38 @@ RabbitMQClientTest >> runMemoryLoggerDuring: aBlock assertingLogRecordsMatchRege assertLogRecordsMatchUsing: expectedLogEntriesWithTimestamp ] +{ #category : 'running' } +RabbitMQClientTest >> runWorkerNamed: aName consumingFrom: aQueueName bindedTo: anExchange routedBy: aRoutingKey doing: aProcessingBlock during: aBlock [ + + | process | + process := [ + | worker | + worker := RabbitMQWorker + configuredBy: [ :options | + options + at: #hostname put: 'localhost'; + at: #queueName put: aQueueName; + at: #extraClientProperties put: ( Dictionary new + at: 'process' put: aName; + yourself ) + ] + processingPayloadWith: aProcessingBlock. + worker bindQueueTo: anExchange routedBy: aRoutingKey. + [ worker start ] ensure: [ worker stop ] + ] newProcess. + process + name: aName; + priority: Processor userBackgroundPriority. + + [ + process resume. + Processor yield. + self wait. + aBlock value. + self wait + ] ensure: [ process terminate ] +] + { #category : 'running' } RabbitMQClientTest >> setUp [ @@ -259,7 +291,7 @@ RabbitMQClientTest >> testDebuggingLogsEnabled [ self runMemoryLoggerDuring: [ anotherPublisher start. - anotherPublisher publishOnly: 'Hello!' onQueueNamed: self queueName. + anotherPublisher publish: 'Hello!' toQueue: self queueName. anotherPublisher stop ] assertingLogRecordsMatchRegexes: @@ -295,7 +327,7 @@ RabbitMQClientTest >> testDebuggingLogsTurnedOff [ self runMemoryLoggerDuring: [ anotherPublisher start. - anotherPublisher publishOnly: 'Hello!' onQueueNamed: self queueName. + anotherPublisher publish: 'Hello!' toQueue: self queueName. anotherPublisher stop ] assertingLogRecordsMatchRegexes: @@ -321,8 +353,8 @@ RabbitMQClientTest >> testPublishingMessageWhenClientUnexpectedlyClosesConnectio self resumeWorkerDuring: [ publisher - publishOnly: 'Hello' onQueueNamed: self queueName; - publishOnly: 'World' onQueueNamed: self queueName. + publish: 'Hello' toQueue: self queueName; + publish: 'World' toQueue: self queueName. self wait. @@ -336,7 +368,7 @@ RabbitMQClientTest >> testPublishingMessageWhenClientUnexpectedlyClosesConnectio self closeAllConnectionsFromTheClientSide; waitUntilAllRabbitMQConnectionsClose. - publisher publishOnly: 'Test connection restored' onQueueNamed: self queueName + publisher publish: 'Test connection restored' toQueue: self queueName ] assertingLogRecordsMatchRegexes: { '\[INFO\] AMQP connection localhost\:(\d+)->localhost\:5672 closed due to (Connection close while waiting for data.|primitive #primSocketSendDone\: in Socket failed|primitive #primSocketReceiveDataAvailable\: in Socket failed)' . @@ -356,8 +388,8 @@ RabbitMQClientTest >> testPublishingMessageWhenConnectionIsTemporallyLost [ self resumeWorkerDuring: [ publisher - publishOnly: 'Hello' onQueueNamed: self queueName; - publishOnly: 'World' onQueueNamed: self queueName. + publish: 'Hello' toQueue: self queueName; + publish: 'World' toQueue: self queueName. self wait. @@ -371,7 +403,7 @@ RabbitMQClientTest >> testPublishingMessageWhenConnectionIsTemporallyLost [ self closeAllConnectionsFromTheBrokerSide; waitUntilAllRabbitMQConnectionsClose. - publisher publishOnly: 'Test connection restored' onQueueNamed: self queueName + publisher publish: 'Test connection restored' toQueue: self queueName ] assertingLogRecordsMatchRegexes: { '\[INFO\] AMQP connection localhost\:(\d+)->localhost\:5672 closed due to CONNECTION_FORCED - CloseConnectionsTest' . @@ -390,7 +422,7 @@ RabbitMQClientTest >> testPublishingMessageWhenConnectionIsTemporallyLost [ { #category : 'tests' } RabbitMQClientTest >> testPublishingMessages [ - self resumeWorkerDuring: [ publisher publish: #( 'Hello' 'World' ) onQueueNamed: self queueName ]. + self resumeWorkerDuring: [ publisher publishAll: #( 'Hello' 'World' ) toQueue: self queueName ]. self assert: reversedTexts size equals: 2; @@ -401,13 +433,42 @@ RabbitMQClientTest >> testPublishingMessages [ { #category : 'tests' } RabbitMQClientTest >> testPublishingOneMessage [ - self resumeWorkerDuring: [ publisher publishOnly: 'Hello' onQueueNamed: self queueName ]. + self resumeWorkerDuring: [ publisher publish: 'Hello' toQueue: self queueName ]. self withTheOnlyOneIn: reversedTexts do: [ :reversedText | self assert: reversedText equals: 'olleH' ] ] +{ #category : 'tests' } +RabbitMQClientTest >> testPublishingToFanoutExchange [ + + | routingKey firstWorkerMessages secondWorkerMessages | + routingKey := self queueName. + firstWorkerMessages := OrderedCollection new. + secondWorkerMessages := OrderedCollection new. + + self + runWorkerNamed: 'Reverser' + consumingFrom: 'reverser-queue' + bindedTo: 'amq.fanout' + routedBy: routingKey + doing: [ :message | firstWorkerMessages add: message reversed ] + during: [ + self + runWorkerNamed: 'Appender' + consumingFrom: 'appender-queue' + bindedTo: 'amq.fanout' + routedBy: routingKey + doing: [ :message | secondWorkerMessages add: message ] + during: [ publisher publish: 'Hello' toExchange: 'amq.fanout' usingRoutingKey: routingKey ] + ]. + + self + withTheOnlyOneIn: firstWorkerMessages do: [ :text | self assert: text equals: 'olleH' ]; + withTheOnlyOneIn: secondWorkerMessages do: [ :text | self assert: text equals: 'Hello' ] +] + { #category : 'private - support' } RabbitMQClientTest >> wait [ diff --git a/source/Ansible-RabbitMQ/Amqp091BasicProperties.extension.st b/source/Ansible-RabbitMQ/Amqp091BasicProperties.extension.st new file mode 100644 index 00000000..8c12e7eb --- /dev/null +++ b/source/Ansible-RabbitMQ/Amqp091BasicProperties.extension.st @@ -0,0 +1,7 @@ +Extension { #name : 'Amqp091BasicProperties' } + +{ #category : '*Ansible-RabbitMQ' } +Amqp091BasicProperties >> bePersistent [ + + self deliveryMode: 2 +] diff --git a/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st b/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st index 0a88dd35..805e2a8b 100644 --- a/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st +++ b/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st @@ -26,6 +26,12 @@ RabbitMQPublisher class >> configuredBy: aConfigurationAction [ ^ self new initializeConfiguredBy: options ] +{ #category : 'private - accessing' } +RabbitMQPublisher >> directExchange [ + + ^ '' +] + { #category : 'initialization' } RabbitMQPublisher >> ensureChannelOpen [ @@ -42,13 +48,14 @@ RabbitMQPublisher >> initializeConfiguredBy: anOptionsDictionary [ ] { #category : 'private - logging' } -RabbitMQPublisher >> logDebuggingInfoFor: aMessage publishedTo: aQueueName [ +RabbitMQPublisher >> logDebuggingInfoFor: aMessage publishedTo: anExchange using: aRoutingKey [ self shouldLogDebuggingInfo then: [ LogRecord emitStructuredDebuggingInfo: 'RabbitMQ message published' with: [ :data | + data at: #messagePublished put: aMessage. + anExchange = self directExchange ifFalse: [ data at: #exchange put: anExchange ]. data - at: #messagePublished put: aMessage; - at: #routingKey put: aQueueName; + at: #routingKey put: aRoutingKey; at: #connectionDescription put: connection connectionPairsDescription ] ] @@ -61,30 +68,34 @@ RabbitMQPublisher >> onPublicationConfirmationDo: anAckBlock onRejectionDo: aNac theChannel onPublicationConfirmationDo: anAckBlock onRejectionDo: aNackBlock ] ] -{ #category : 'private - configuring' } -RabbitMQPublisher >> persistentDeliveryMode [ - - ^ connection protocolClass basicPropertiesClass new deliveryMode: 2 -] - { #category : 'publishing' } -RabbitMQPublisher >> publish: aMessageCollection onQueueNamed: aQueueName [ +RabbitMQPublisher >> publish: aMessage toExchange: anExchangeName usingRoutingKey: aRoutingKey [ - aMessageCollection do: [:message | self publishOnly: message onQueueNamed: aQueueName] + self + publish: aMessage + toExchange: anExchangeName + usingRoutingKey: aRoutingKey + configuredWith: [ :properties | properties bePersistent ] ] { #category : 'publishing' } -RabbitMQPublisher >> publishOnly: aMessage onQueueNamed: aQueueName [ +RabbitMQPublisher >> publish: aMessage toExchange: anExchangeName usingRoutingKey: aRoutingKey configuredWith: aConfigurationBlock [ + + | properties tryToPublishMessage | + properties := connection protocolClass basicPropertiesClass new. + aConfigurationBlock value: properties. - | tryToPublishMessage | tryToPublishMessage := [ self ensureChannelOpen. channel basicPublish: aMessage utf8Encoded - exchange: '' - routingKey: aQueueName - properties: self persistentDeliveryMode. - self logDebuggingInfoFor: aMessage publishedTo: aQueueName + exchange: anExchangeName + routingKey: aRoutingKey + properties: properties. + self + logDebuggingInfoFor: aMessage + publishedTo: anExchangeName + using: aRoutingKey ]. self try: tryToPublishMessage onConnectivityErrorDo: [ :attemptNumber :error | @@ -93,6 +104,18 @@ RabbitMQPublisher >> publishOnly: aMessage onQueueNamed: aQueueName [ ] ] +{ #category : 'publishing' } +RabbitMQPublisher >> publish: aMessage toQueue: aQueueName [ + + self publish: aMessage toExchange: self directExchange usingRoutingKey: aQueueName +] + +{ #category : 'publishing' } +RabbitMQPublisher >> publishAll: aMessageCollection toQueue: aQueueName [ + + aMessageCollection do: [:message | self publish: message toQueue: aQueueName] +] + { #category : 'connecting' } RabbitMQPublisher >> start [ diff --git a/source/Ansible-RabbitMQ/RabbitMQWorker.class.st b/source/Ansible-RabbitMQ/RabbitMQWorker.class.st index b08c2985..c52e9560 100644 --- a/source/Ansible-RabbitMQ/RabbitMQWorker.class.st +++ b/source/Ansible-RabbitMQ/RabbitMQWorker.class.st @@ -41,6 +41,16 @@ RabbitMQWorker class >> configuredBy: aConfigurationAction processingPayloadWith processingMessagesWith: [ :message | aPayloadProcessor value: message body ] ] +{ #category : 'Configuring' } +RabbitMQWorker >> bindQueueTo: anExchange routedBy: aRoutingKey [ + + self + ensureChannelOpen; + declareQueueInChannel. + + channel queueBind: self queueName exchange: anExchange routingKey: aRoutingKey +] + { #category : 'private' } RabbitMQWorker >> declareQueueInChannel [ @@ -78,7 +88,7 @@ RabbitMQWorker >> makeQueueDurable [ ^ options at: #queueDurable ifAbsent: [ true ] ] -{ #category : 'accessing' } +{ #category : 'private' } RabbitMQWorker >> queueName [ ^ options at: #queueName diff --git a/source/Ansible-Tests/AMQPTest.class.st b/source/Ansible-Tests/AMQPTest.class.st index 33b12008..7bdacc72 100644 --- a/source/Ansible-Tests/AMQPTest.class.st +++ b/source/Ansible-Tests/AMQPTest.class.st @@ -58,34 +58,6 @@ AMQPTest >> publish: aMessageCollection onExchangeNamed: anExchangeName of: anEx of: anExchangeType ] -{ #category : 'tests - support' } -AMQPTest >> publish: aMessageCollection onQueueNamed: aQueueName [ - - self - withLocalhostConnectionDo: [ :connection | - | channel queue | - - channel := connection createChannel. - channel declareQueueApplying: [ :builder | builder name: aQueueName ]. - - aMessageCollection - do: [ :message | - channel - basicPublish: message utf8Encoded - exchange: '' - routingKey: aQueueName - properties: ( connection protocolClass basicPropertiesClass new deliveryMode: 2 ) - ]. - - ( Delay forMilliseconds: 100 ) wait. - queue := channel declareQueueApplying: [ :builder | builder name: aQueueName ]. - - self - assert: queue method messageCount equals: aMessageCollection size; - assert: queue method consumerCount equals: 0 - ] -] - { #category : 'tests - support' } AMQPTest >> publish: aMessageCollection to: aRoute onExchangeNamed: anExchangeName of: anExchangeType [ @@ -144,6 +116,34 @@ AMQPTest >> publish: aMessageCollection with: aHeadersDictionary onHeadersExchan withProperties: ( connection protocolClass basicPropertiesClass new headers: aHeadersDictionary )] ] +{ #category : 'tests - support' } +AMQPTest >> publishAll: aMessageCollection toQueue: aQueueName [ + + self + withLocalhostConnectionDo: [ :connection | + | channel queue | + + channel := connection createChannel. + channel declareQueueApplying: [ :builder | builder name: aQueueName ]. + + aMessageCollection + do: [ :message | + channel + basicPublish: message utf8Encoded + exchange: '' + routingKey: aQueueName + properties: ( connection protocolClass basicPropertiesClass new deliveryMode: 2 ) + ]. + + ( Delay forMilliseconds: 100 ) wait. + queue := channel declareQueueApplying: [ :builder | builder name: aQueueName ]. + + self + assert: queue method messageCount equals: aMessageCollection size; + assert: queue method consumerCount equals: 0 + ] +] + { #category : 'tests - support' } AMQPTest >> setUp [ @@ -311,7 +311,7 @@ AMQPTest >> testBasicConsume [ | channel queue | - self publish: #('Do it!') onQueueNamed: 'tasks'. + self publishAll: #('Do it!') toQueue: 'tasks'. self withLocalhostConnectionDo: [ :connection | @@ -351,8 +351,8 @@ AMQPTest >> testBasicConsumeWithMultipleWorkers [ secondWorkerMessages := OrderedCollection new. self - publish: #('Do it!' 'Do it.!' 'Do it..!' 'Do it...!' 'Do it....!' 'Do it.....!') - onQueueNamed: 'tasks'. + publishAll: #('Do it!' 'Do it.!' 'Do it..!' 'Do it...!' 'Do it....!' 'Do it.....!') + toQueue: 'tasks'. firstWorker := self spawnWorkerNamed: 'first_worker' From 94a9cbd13404c15cb6bcd971cb8ba7bf48c26951 Mon Sep 17 00:00:00 2001 From: Juan Vanecek Date: Tue, 3 Sep 2024 17:35:29 -0300 Subject: [PATCH 02/10] Decode message that was fanout in the test --- source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st b/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st index 53940fa9..57264626 100644 --- a/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st +++ b/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st @@ -453,14 +453,14 @@ RabbitMQClientTest >> testPublishingToFanoutExchange [ consumingFrom: 'reverser-queue' bindedTo: 'amq.fanout' routedBy: routingKey - doing: [ :message | firstWorkerMessages add: message reversed ] + doing: [ :message | firstWorkerMessages add: message utf8Decoded reversed ] during: [ self runWorkerNamed: 'Appender' consumingFrom: 'appender-queue' bindedTo: 'amq.fanout' routedBy: routingKey - doing: [ :message | secondWorkerMessages add: message ] + doing: [ :message | secondWorkerMessages add: message utf8Decoded ] during: [ publisher publish: 'Hello' toExchange: 'amq.fanout' usingRoutingKey: routingKey ] ]. From 0892135ae5ab556f651b70039d9edec07045a4c5 Mon Sep 17 00:00:00 2001 From: Juan Vanecek Date: Tue, 3 Sep 2024 17:35:55 -0300 Subject: [PATCH 03/10] Unbind queue before worker termination --- source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st | 5 ++++- source/Ansible-RabbitMQ/RabbitMQWorker.class.st | 8 +++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st b/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st index 57264626..5337190f 100644 --- a/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st +++ b/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st @@ -216,7 +216,10 @@ RabbitMQClientTest >> runWorkerNamed: aName consumingFrom: aQueueName bindedTo: ] processingPayloadWith: aProcessingBlock. worker bindQueueTo: anExchange routedBy: aRoutingKey. - [ worker start ] ensure: [ worker stop ] + [ worker start ] ensure: [ + worker unbindQueueTo: anExchange routedBy: aRoutingKey. + worker stop + ] ] newProcess. process name: aName; diff --git a/source/Ansible-RabbitMQ/RabbitMQWorker.class.st b/source/Ansible-RabbitMQ/RabbitMQWorker.class.st index c52e9560..bb5ea313 100644 --- a/source/Ansible-RabbitMQ/RabbitMQWorker.class.st +++ b/source/Ansible-RabbitMQ/RabbitMQWorker.class.st @@ -41,7 +41,7 @@ RabbitMQWorker class >> configuredBy: aConfigurationAction processingPayloadWith processingMessagesWith: [ :message | aPayloadProcessor value: message body ] ] -{ #category : 'Configuring' } +{ #category : 'configuring' } RabbitMQWorker >> bindQueueTo: anExchange routedBy: aRoutingKey [ self @@ -144,3 +144,9 @@ RabbitMQWorker >> startProcessing [ ] ] repeat ] + +{ #category : 'configuring' } +RabbitMQWorker >> unbindQueueTo: anExchange routedBy: aRoutingKey [ + + channel queueUnbind: self queueName exchange: anExchange routingKey: aRoutingKey +] From ff411a1e56864ae5a5dd41cd22e80a1e1a4fed11 Mon Sep 17 00:00:00 2001 From: Juan Vanecek Date: Wed, 4 Sep 2024 16:20:06 -0300 Subject: [PATCH 04/10] Rename publisher protocol --- .../RabbitMQClientTest.class.st | 32 +++++++++---------- .../RabbitMQPublisher.class.st | 24 +++++++------- source/Ansible-Tests/AMQPTest.class.st | 6 ++-- 3 files changed, 31 insertions(+), 31 deletions(-) diff --git a/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st b/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st index 5337190f..2610fadc 100644 --- a/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st +++ b/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st @@ -82,7 +82,7 @@ RabbitMQClientTest >> assertQueueStatusAfterPublishing: messagesToSend on: queue queue := publisher channel declareQueueApplying: [ :builder | builder name: queueName ]. - publisher publish: messagesToSend toQueue: queueName. + publisher publish: messagesToSend to: queueName. self wait. @@ -294,7 +294,7 @@ RabbitMQClientTest >> testDebuggingLogsEnabled [ self runMemoryLoggerDuring: [ anotherPublisher start. - anotherPublisher publish: 'Hello!' toQueue: self queueName. + anotherPublisher publish: 'Hello!' to: self queueName. anotherPublisher stop ] assertingLogRecordsMatchRegexes: @@ -330,7 +330,7 @@ RabbitMQClientTest >> testDebuggingLogsTurnedOff [ self runMemoryLoggerDuring: [ anotherPublisher start. - anotherPublisher publish: 'Hello!' toQueue: self queueName. + anotherPublisher publish: 'Hello!' to: self queueName. anotherPublisher stop ] assertingLogRecordsMatchRegexes: @@ -356,8 +356,8 @@ RabbitMQClientTest >> testPublishingMessageWhenClientUnexpectedlyClosesConnectio self resumeWorkerDuring: [ publisher - publish: 'Hello' toQueue: self queueName; - publish: 'World' toQueue: self queueName. + publish: 'Hello' to: self queueName; + publish: 'World' to: self queueName. self wait. @@ -371,7 +371,7 @@ RabbitMQClientTest >> testPublishingMessageWhenClientUnexpectedlyClosesConnectio self closeAllConnectionsFromTheClientSide; waitUntilAllRabbitMQConnectionsClose. - publisher publish: 'Test connection restored' toQueue: self queueName + publisher publish: 'Test connection restored' to: self queueName ] assertingLogRecordsMatchRegexes: { '\[INFO\] AMQP connection localhost\:(\d+)->localhost\:5672 closed due to (Connection close while waiting for data.|primitive #primSocketSendDone\: in Socket failed|primitive #primSocketReceiveDataAvailable\: in Socket failed)' . @@ -391,8 +391,8 @@ RabbitMQClientTest >> testPublishingMessageWhenConnectionIsTemporallyLost [ self resumeWorkerDuring: [ publisher - publish: 'Hello' toQueue: self queueName; - publish: 'World' toQueue: self queueName. + publish: 'Hello' to: self queueName; + publish: 'World' to: self queueName. self wait. @@ -406,7 +406,7 @@ RabbitMQClientTest >> testPublishingMessageWhenConnectionIsTemporallyLost [ self closeAllConnectionsFromTheBrokerSide; waitUntilAllRabbitMQConnectionsClose. - publisher publish: 'Test connection restored' toQueue: self queueName + publisher publish: 'Test connection restored' to: self queueName ] assertingLogRecordsMatchRegexes: { '\[INFO\] AMQP connection localhost\:(\d+)->localhost\:5672 closed due to CONNECTION_FORCED - CloseConnectionsTest' . @@ -425,7 +425,7 @@ RabbitMQClientTest >> testPublishingMessageWhenConnectionIsTemporallyLost [ { #category : 'tests' } RabbitMQClientTest >> testPublishingMessages [ - self resumeWorkerDuring: [ publisher publishAll: #( 'Hello' 'World' ) toQueue: self queueName ]. + self resumeWorkerDuring: [ publisher publishAll: #( 'Hello' 'World' ) to: self queueName ]. self assert: reversedTexts size equals: 2; @@ -436,7 +436,7 @@ RabbitMQClientTest >> testPublishingMessages [ { #category : 'tests' } RabbitMQClientTest >> testPublishingOneMessage [ - self resumeWorkerDuring: [ publisher publish: 'Hello' toQueue: self queueName ]. + self resumeWorkerDuring: [ publisher publish: 'Hello' to: self queueName ]. self withTheOnlyOneIn: reversedTexts @@ -446,8 +446,8 @@ RabbitMQClientTest >> testPublishingOneMessage [ { #category : 'tests' } RabbitMQClientTest >> testPublishingToFanoutExchange [ - | routingKey firstWorkerMessages secondWorkerMessages | - routingKey := self queueName. + | route firstWorkerMessages secondWorkerMessages | + route := self queueName. firstWorkerMessages := OrderedCollection new. secondWorkerMessages := OrderedCollection new. @@ -455,16 +455,16 @@ RabbitMQClientTest >> testPublishingToFanoutExchange [ runWorkerNamed: 'Reverser' consumingFrom: 'reverser-queue' bindedTo: 'amq.fanout' - routedBy: routingKey + routedBy: route doing: [ :message | firstWorkerMessages add: message utf8Decoded reversed ] during: [ self runWorkerNamed: 'Appender' consumingFrom: 'appender-queue' bindedTo: 'amq.fanout' - routedBy: routingKey + routedBy: route doing: [ :message | secondWorkerMessages add: message utf8Decoded ] - during: [ publisher publish: 'Hello' toExchange: 'amq.fanout' usingRoutingKey: routingKey ] + during: [ publisher publish: 'Hello' to: route through: 'amq.fanout' ] ]. self diff --git a/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st b/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st index 805e2a8b..3deed50f 100644 --- a/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st +++ b/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st @@ -69,17 +69,23 @@ RabbitMQPublisher >> onPublicationConfirmationDo: anAckBlock onRejectionDo: aNac ] { #category : 'publishing' } -RabbitMQPublisher >> publish: aMessage toExchange: anExchangeName usingRoutingKey: aRoutingKey [ +RabbitMQPublisher >> publish: aMessage to: aQueueName [ + + self publish: aMessage to: aQueueName through: self directExchange +] + +{ #category : 'publishing' } +RabbitMQPublisher >> publish: aMessage to: aRoutingKey through: anExchangeName [ self publish: aMessage - toExchange: anExchangeName - usingRoutingKey: aRoutingKey + to: aRoutingKey + through: anExchangeName configuredWith: [ :properties | properties bePersistent ] ] { #category : 'publishing' } -RabbitMQPublisher >> publish: aMessage toExchange: anExchangeName usingRoutingKey: aRoutingKey configuredWith: aConfigurationBlock [ +RabbitMQPublisher >> publish: aMessage to: aRoutingKey through: anExchangeName configuredWith: aConfigurationBlock [ | properties tryToPublishMessage | properties := connection protocolClass basicPropertiesClass new. @@ -105,15 +111,9 @@ RabbitMQPublisher >> publish: aMessage toExchange: anExchangeName usingRoutingKe ] { #category : 'publishing' } -RabbitMQPublisher >> publish: aMessage toQueue: aQueueName [ - - self publish: aMessage toExchange: self directExchange usingRoutingKey: aQueueName -] - -{ #category : 'publishing' } -RabbitMQPublisher >> publishAll: aMessageCollection toQueue: aQueueName [ +RabbitMQPublisher >> publishAll: aMessageCollection to: aQueueName [ - aMessageCollection do: [:message | self publish: message toQueue: aQueueName] + aMessageCollection do: [:message | self publish: message to: aQueueName] ] { #category : 'connecting' } diff --git a/source/Ansible-Tests/AMQPTest.class.st b/source/Ansible-Tests/AMQPTest.class.st index 7bdacc72..4bd01570 100644 --- a/source/Ansible-Tests/AMQPTest.class.st +++ b/source/Ansible-Tests/AMQPTest.class.st @@ -117,7 +117,7 @@ AMQPTest >> publish: aMessageCollection with: aHeadersDictionary onHeadersExchan ] { #category : 'tests - support' } -AMQPTest >> publishAll: aMessageCollection toQueue: aQueueName [ +AMQPTest >> publishAll: aMessageCollection to: aQueueName [ self withLocalhostConnectionDo: [ :connection | @@ -311,7 +311,7 @@ AMQPTest >> testBasicConsume [ | channel queue | - self publishAll: #('Do it!') toQueue: 'tasks'. + self publishAll: #('Do it!') to: 'tasks'. self withLocalhostConnectionDo: [ :connection | @@ -352,7 +352,7 @@ AMQPTest >> testBasicConsumeWithMultipleWorkers [ self publishAll: #('Do it!' 'Do it.!' 'Do it..!' 'Do it...!' 'Do it....!' 'Do it.....!') - toQueue: 'tasks'. + to: 'tasks'. firstWorker := self spawnWorkerNamed: 'first_worker' From 19d022adbb3bbee20a6799fdc56c3e67e20f7e8d Mon Sep 17 00:00:00 2001 From: Juan Vanecek Date: Tue, 10 Sep 2024 12:27:02 -0300 Subject: [PATCH 05/10] Add docs reference about the default exchange --- source/Ansible-RabbitMQ/RabbitMQPublisher.class.st | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st b/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st index 3deed50f..138e9bdb 100644 --- a/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st +++ b/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st @@ -28,7 +28,12 @@ RabbitMQPublisher class >> configuredBy: aConfigurationAction [ { #category : 'private - accessing' } RabbitMQPublisher >> directExchange [ - + + "Ref: https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-default + The default exchange is a direct exchange with no name (empty string) pre-declared by the broker. + It has one special property that makes it very useful for simple applications: + every queue that is created is automatically bound to it with a routing key which is the same as the queue name. " + ^ '' ] From 979a7d51263fcfbd94a9b2ef14561c4910bdf71b Mon Sep 17 00:00:00 2001 From: Juan Vanecek Date: Fri, 20 Sep 2024 16:01:49 -0300 Subject: [PATCH 06/10] Add tests for different types of exchange (default, topic, direct, fanout) Recategorize publisher protocol --- .../RabbitMQClientTest.class.st | 97 ++++++++++++++++++- .../RabbitMQPublisher.class.st | 16 ++- 2 files changed, 104 insertions(+), 9 deletions(-) diff --git a/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st b/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st index 2610fadc..dc4df0b5 100644 --- a/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st +++ b/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st @@ -434,7 +434,7 @@ RabbitMQClientTest >> testPublishingMessages [ ] { #category : 'tests' } -RabbitMQClientTest >> testPublishingOneMessage [ +RabbitMQClientTest >> testPublishingToDefaultExchange [ self resumeWorkerDuring: [ publisher publish: 'Hello' to: self queueName ]. @@ -444,7 +444,36 @@ RabbitMQClientTest >> testPublishingOneMessage [ ] { #category : 'tests' } -RabbitMQClientTest >> testPublishingToFanoutExchange [ +RabbitMQClientTest >> testPublishingToDirectExchangeWithTwoQueuesBindedWithDifferentRoutingKey [ + + | firstWorkerMessages secondWorkerMessages | + + firstWorkerMessages := OrderedCollection new. + secondWorkerMessages := OrderedCollection new. + + self + runWorkerNamed: 'Reverser' + consumingFrom: 'reverser-queue' + bindedTo: 'amq.direct' + routedBy: 'the-reverse-key' + doing: [ :message | firstWorkerMessages add: message utf8Decoded reversed ] + during: [ + self + runWorkerNamed: 'Appender' + consumingFrom: 'appender-queue' + bindedTo: 'amq.direct' + routedBy: 'the-appender-key' + doing: [ :message | secondWorkerMessages add: message utf8Decoded ] + during: [ publisher publish: 'Hello' to: 'the-reverse-key' through: 'amq.direct' ] + ]. + + self + withTheOnlyOneIn: firstWorkerMessages do: [ :text | self assert: text equals: 'olleH' ]; + assert: secondWorkerMessages isEmpty +] + +{ #category : 'tests' } +RabbitMQClientTest >> testPublishingToDirectExchangeWithTwoQueuesBindedWithSameRoutingKey [ | route firstWorkerMessages secondWorkerMessages | route := self queueName. @@ -454,17 +483,75 @@ RabbitMQClientTest >> testPublishingToFanoutExchange [ self runWorkerNamed: 'Reverser' consumingFrom: 'reverser-queue' - bindedTo: 'amq.fanout' + bindedTo: 'amq.direct' routedBy: route doing: [ :message | firstWorkerMessages add: message utf8Decoded reversed ] during: [ self runWorkerNamed: 'Appender' consumingFrom: 'appender-queue' - bindedTo: 'amq.fanout' + bindedTo: 'amq.direct' routedBy: route doing: [ :message | secondWorkerMessages add: message utf8Decoded ] - during: [ publisher publish: 'Hello' to: route through: 'amq.fanout' ] + during: [ publisher publish: 'Hello' to: route through: 'amq.direct' ] + ]. + + self + withTheOnlyOneIn: firstWorkerMessages do: [ :text | self assert: text equals: 'olleH' ]; + withTheOnlyOneIn: secondWorkerMessages do: [ :text | self assert: text equals: 'Hello' ] +] + +{ #category : 'tests' } +RabbitMQClientTest >> testPublishingToFanoutExchange [ + + | firstWorkerMessages secondWorkerMessages | + + firstWorkerMessages := OrderedCollection new. + secondWorkerMessages := OrderedCollection new. + + self + runWorkerNamed: 'Reverser' + consumingFrom: 'reverser-queue' + bindedTo: 'amq.fanout' + routedBy: ('queue-<1p>' expandMacrosWith: Random new next) + doing: [ :message | firstWorkerMessages add: message utf8Decoded reversed ] + during: [ + self + runWorkerNamed: 'Appender' + consumingFrom: 'appender-queue' + bindedTo: 'amq.fanout' + routedBy: ('queue-<1p>' expandMacrosWith: Random new next) + doing: [ :message | secondWorkerMessages add: message utf8Decoded ] + during: [ publisher broadcast: 'Hello' toAllQueuesBindedTo: 'amq.fanout' ] + ]. + + self + withTheOnlyOneIn: firstWorkerMessages do: [ :text | self assert: text equals: 'olleH' ]; + withTheOnlyOneIn: secondWorkerMessages do: [ :text | self assert: text equals: 'Hello' ] +] + +{ #category : 'tests' } +RabbitMQClientTest >> testPublishingToTopicExchange [ + + | firstWorkerMessages secondWorkerMessages | + + firstWorkerMessages := OrderedCollection new. + secondWorkerMessages := OrderedCollection new. + + self + runWorkerNamed: 'Reverser' + consumingFrom: 'reverser-queue' + bindedTo: 'amq.topic' + routedBy: '*.reverser' + doing: [ :message | firstWorkerMessages add: message utf8Decoded reversed ] + during: [ + self + runWorkerNamed: 'Appender' + consumingFrom: 'appender-queue' + bindedTo: 'amq.topic' + routedBy: 'textprocessing.*' + doing: [ :message | secondWorkerMessages add: message utf8Decoded ] + during: [ publisher publish: 'Hello' to: 'textprocessing.reverser' through: 'amq.topic' ] ]. self diff --git a/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st b/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st index 138e9bdb..c6311733 100644 --- a/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st +++ b/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st @@ -26,6 +26,14 @@ RabbitMQPublisher class >> configuredBy: aConfigurationAction [ ^ self new initializeConfiguredBy: options ] +{ #category : 'publish - exchange fanout' } +RabbitMQPublisher >> broadcast: aMessage toAllQueuesBindedTo: aFanoutExchange [ + + " Ref: https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-topic. + A fanout exchange routes messages to all of the queues that are bound to it and the routing key is ignored. " + self publish: aMessage to: '' through: aFanoutExchange +] + { #category : 'private - accessing' } RabbitMQPublisher >> directExchange [ @@ -73,13 +81,13 @@ RabbitMQPublisher >> onPublicationConfirmationDo: anAckBlock onRejectionDo: aNac theChannel onPublicationConfirmationDo: anAckBlock onRejectionDo: aNackBlock ] ] -{ #category : 'publishing' } +{ #category : 'publish - exchange default' } RabbitMQPublisher >> publish: aMessage to: aQueueName [ self publish: aMessage to: aQueueName through: self directExchange ] -{ #category : 'publishing' } +{ #category : 'publish - exchange topic/direct' } RabbitMQPublisher >> publish: aMessage to: aRoutingKey through: anExchangeName [ self @@ -89,7 +97,7 @@ RabbitMQPublisher >> publish: aMessage to: aRoutingKey through: anExchangeName [ configuredWith: [ :properties | properties bePersistent ] ] -{ #category : 'publishing' } +{ #category : 'publish - exchange topic/direct' } RabbitMQPublisher >> publish: aMessage to: aRoutingKey through: anExchangeName configuredWith: aConfigurationBlock [ | properties tryToPublishMessage | @@ -115,7 +123,7 @@ RabbitMQPublisher >> publish: aMessage to: aRoutingKey through: anExchangeName c ] ] -{ #category : 'publishing' } +{ #category : 'publish - exchange default' } RabbitMQPublisher >> publishAll: aMessageCollection to: aQueueName [ aMessageCollection do: [:message | self publish: message to: aQueueName] From bba57216432ed64886b71b12dd13f8dad90e9958 Mon Sep 17 00:00:00 2001 From: Juan Vanecek Date: Mon, 21 Oct 2024 14:12:32 -0300 Subject: [PATCH 07/10] Extract exchange names in tests --- .../RabbitMQClientTest.class.st | 64 +++++++++++++------ 1 file changed, 43 insertions(+), 21 deletions(-) diff --git a/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st b/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st index dc4df0b5..47957189 100644 --- a/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st +++ b/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st @@ -5,7 +5,8 @@ Class { 'reversedTexts', 'workerProcess', 'loggingAsserter', - 'publisher' + 'publisher', + 'semaphore' ], #category : 'Ansible-RabbitMQ-Tests', #package : 'Ansible-RabbitMQ-Tests' @@ -131,6 +132,18 @@ RabbitMQClientTest >> closeAllConnectionsOf: aRabbitmqContainerId for: aUsername with: aCloseReason ) ] +{ #category : 'private - accessing' } +RabbitMQClientTest >> defaultDirectExchangeName [ + + ^ 'amq.direct' +] + +{ #category : 'private - accessing' } +RabbitMQClientTest >> defaultFanoutExchangeName [ + + ^ 'amq.fanout' +] + { #category : 'private - accessing' } RabbitMQClientTest >> defaultRabbitMQPublisher [ @@ -152,6 +165,12 @@ RabbitMQClientTest >> defaultRabbitMQWorkerUsername [ ^ AmqpConnectionBuilder usingAMQP091Protocol credentials username ] +{ #category : 'private - accessing' } +RabbitMQClientTest >> defaultTopicExchangeName [ + + ^ 'amq.topic' +] + { #category : 'private - accessing' } RabbitMQClientTest >> queueName [ @@ -446,30 +465,30 @@ RabbitMQClientTest >> testPublishingToDefaultExchange [ { #category : 'tests' } RabbitMQClientTest >> testPublishingToDirectExchangeWithTwoQueuesBindedWithDifferentRoutingKey [ - | firstWorkerMessages secondWorkerMessages | - + | firstWorkerMessages secondWorkerMessages | firstWorkerMessages := OrderedCollection new. secondWorkerMessages := OrderedCollection new. self runWorkerNamed: 'Reverser' consumingFrom: 'reverser-queue' - bindedTo: 'amq.direct' + bindedTo: self defaultDirectExchangeName routedBy: 'the-reverse-key' doing: [ :message | firstWorkerMessages add: message utf8Decoded reversed ] during: [ self runWorkerNamed: 'Appender' consumingFrom: 'appender-queue' - bindedTo: 'amq.direct' + bindedTo: self defaultDirectExchangeName routedBy: 'the-appender-key' doing: [ :message | secondWorkerMessages add: message utf8Decoded ] - during: [ publisher publish: 'Hello' to: 'the-reverse-key' through: 'amq.direct' ] + during: [ + publisher publish: 'Hello' to: 'the-reverse-key' through: self defaultDirectExchangeName ] ]. self - withTheOnlyOneIn: firstWorkerMessages do: [ :text | self assert: text equals: 'olleH' ]; - assert: secondWorkerMessages isEmpty + withTheOnlyOneIn: firstWorkerMessages do: [ :text | self assert: text equals: 'olleH' ]; + assert: secondWorkerMessages isEmpty ] { #category : 'tests' } @@ -483,17 +502,17 @@ RabbitMQClientTest >> testPublishingToDirectExchangeWithTwoQueuesBindedWithSameR self runWorkerNamed: 'Reverser' consumingFrom: 'reverser-queue' - bindedTo: 'amq.direct' + bindedTo: self defaultDirectExchangeName routedBy: route doing: [ :message | firstWorkerMessages add: message utf8Decoded reversed ] during: [ self runWorkerNamed: 'Appender' consumingFrom: 'appender-queue' - bindedTo: 'amq.direct' + bindedTo: self defaultDirectExchangeName routedBy: route doing: [ :message | secondWorkerMessages add: message utf8Decoded ] - during: [ publisher publish: 'Hello' to: route through: 'amq.direct' ] + during: [ publisher publish: 'Hello' to: route through: self defaultDirectExchangeName ] ]. self @@ -505,24 +524,24 @@ RabbitMQClientTest >> testPublishingToDirectExchangeWithTwoQueuesBindedWithSameR RabbitMQClientTest >> testPublishingToFanoutExchange [ | firstWorkerMessages secondWorkerMessages | - firstWorkerMessages := OrderedCollection new. secondWorkerMessages := OrderedCollection new. self runWorkerNamed: 'Reverser' consumingFrom: 'reverser-queue' - bindedTo: 'amq.fanout' - routedBy: ('queue-<1p>' expandMacrosWith: Random new next) + bindedTo: self defaultFanoutExchangeName + routedBy: ( 'queue-<1p>' expandMacrosWith: Random new next ) doing: [ :message | firstWorkerMessages add: message utf8Decoded reversed ] during: [ self runWorkerNamed: 'Appender' consumingFrom: 'appender-queue' - bindedTo: 'amq.fanout' - routedBy: ('queue-<1p>' expandMacrosWith: Random new next) + bindedTo: self defaultFanoutExchangeName + routedBy: ( 'queue-<1p>' expandMacrosWith: Random new next ) doing: [ :message | secondWorkerMessages add: message utf8Decoded ] - during: [ publisher broadcast: 'Hello' toAllQueuesBindedTo: 'amq.fanout' ] + during: [ + publisher broadcast: 'Hello' toAllQueuesBindedTo: self defaultFanoutExchangeName ] ]. self @@ -534,24 +553,27 @@ RabbitMQClientTest >> testPublishingToFanoutExchange [ RabbitMQClientTest >> testPublishingToTopicExchange [ | firstWorkerMessages secondWorkerMessages | - firstWorkerMessages := OrderedCollection new. secondWorkerMessages := OrderedCollection new. self runWorkerNamed: 'Reverser' consumingFrom: 'reverser-queue' - bindedTo: 'amq.topic' + bindedTo: self defaultTopicExchangeName routedBy: '*.reverser' doing: [ :message | firstWorkerMessages add: message utf8Decoded reversed ] during: [ self runWorkerNamed: 'Appender' consumingFrom: 'appender-queue' - bindedTo: 'amq.topic' + bindedTo: self defaultTopicExchangeName routedBy: 'textprocessing.*' doing: [ :message | secondWorkerMessages add: message utf8Decoded ] - during: [ publisher publish: 'Hello' to: 'textprocessing.reverser' through: 'amq.topic' ] + during: [ + publisher + publish: 'Hello' + to: 'textprocessing.reverser' + through: self defaultTopicExchangeName ] ]. self From 5d220c73247776e7ff1e782383c978e3b5aa8a9a Mon Sep 17 00:00:00 2001 From: Juan Vanecek Date: Mon, 21 Oct 2024 14:30:21 -0300 Subject: [PATCH 08/10] Add deprecation transformations in RabbitMQPublisher --- .../RabbitMQPublisher.extension.st | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 source/Ansible-Deprecated-v3/RabbitMQPublisher.extension.st diff --git a/source/Ansible-Deprecated-v3/RabbitMQPublisher.extension.st b/source/Ansible-Deprecated-v3/RabbitMQPublisher.extension.st new file mode 100644 index 00000000..cb9b3860 --- /dev/null +++ b/source/Ansible-Deprecated-v3/RabbitMQPublisher.extension.st @@ -0,0 +1,25 @@ +Extension { #name : 'RabbitMQPublisher' } + +{ #category : '*Ansible-Deprecated-v3' } +RabbitMQPublisher >> publish: aMessageCollection onQueueNamed: aQueueName [ + + self + deprecated: 'Use confirmPublicationWith:otherwise:' + transformWith: + '`@receiver publish: `@aMessageCollection onQueueNamed: `@aQueueName' + -> '`@receiver publishAll: `@aMessageCollection to: `@aQueueName'. + + self publishAll: aMessageCollection to: aQueueName +] + +{ #category : '*Ansible-Deprecated-v3' } +RabbitMQPublisher >> publishOnly: aMessage onQueueNamed: aQueueName [ + + self + deprecated: 'Use confirmPublicationWith:otherwise:' + transformWith: + '`@receiver publishOnly: `@aMessage onQueueNamed: `@aQueueName' + -> '`@receiver publish: `@aMessage to: `@aQueueName'. + + self publish: aMessage to: aQueueName +] From 8397371f34395bca3f7793bb440443b53a6dedee Mon Sep 17 00:00:00 2001 From: Juan Vanecek Date: Tue, 22 Oct 2024 14:43:44 -0300 Subject: [PATCH 09/10] Fix typo in message --- source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st | 2 +- source/Ansible-RabbitMQ/RabbitMQPublisher.class.st | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st b/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st index 47957189..c8cc42b1 100644 --- a/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st +++ b/source/Ansible-RabbitMQ-Tests/RabbitMQClientTest.class.st @@ -541,7 +541,7 @@ RabbitMQClientTest >> testPublishingToFanoutExchange [ routedBy: ( 'queue-<1p>' expandMacrosWith: Random new next ) doing: [ :message | secondWorkerMessages add: message utf8Decoded ] during: [ - publisher broadcast: 'Hello' toAllQueuesBindedTo: self defaultFanoutExchangeName ] + publisher broadcast: 'Hello' toAllQueuesBoundTo: self defaultFanoutExchangeName ] ]. self diff --git a/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st b/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st index c6311733..c4423cc9 100644 --- a/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st +++ b/source/Ansible-RabbitMQ/RabbitMQPublisher.class.st @@ -27,7 +27,7 @@ RabbitMQPublisher class >> configuredBy: aConfigurationAction [ ] { #category : 'publish - exchange fanout' } -RabbitMQPublisher >> broadcast: aMessage toAllQueuesBindedTo: aFanoutExchange [ +RabbitMQPublisher >> broadcast: aMessage toAllQueuesBoundTo: aFanoutExchange [ " Ref: https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-topic. A fanout exchange routes messages to all of the queues that are bound to it and the routing key is ignored. " From 5668c021a1b57e1f839ed1fbcc9e9ba05bc90c41 Mon Sep 17 00:00:00 2001 From: Juan Vanecek Date: Tue, 22 Oct 2024 15:36:32 -0300 Subject: [PATCH 10/10] Update Publisher and Worker docs --- docs/tutorials/RabbitMQPublisher.md | 42 +++++++++++++++++++++++++++++ docs/tutorials/RabbitMQWorker.md | 35 ++++++++++++++++++++++++ 2 files changed, 77 insertions(+) diff --git a/docs/tutorials/RabbitMQPublisher.md b/docs/tutorials/RabbitMQPublisher.md index 5d234226..b5af1261 100644 --- a/docs/tutorials/RabbitMQPublisher.md +++ b/docs/tutorials/RabbitMQPublisher.md @@ -17,3 +17,45 @@ Accepts the following options: | #enableDebuggingLogs | A boolean indicating whether to log debugging events | Optional | false | | #extraClientProperties | A dictionary with keys and values to set the [client properties](https://www.rabbitmq.com/docs/connections#capabilities) |Optional | Empty | | #retry | A block that can configure the internal `Retry` instance | Optional | `[]` | + +## Usage + +You need to instantiate the publisher with the options above and then send `#start` to ensure the [channel](https://www.rabbitmq.com/docs/channels) is open. + +```smalltalk +| publisher | +publisher := RabbitMQPublisher configuredBy: [ :options | + options + at: #username put: 'guest'; + at: #password put: 'guest'; + at: #hostname put: 'localhost' +]. + +publisher start. +``` + +Once the publisher is started, you can use its protocol to send messages. + +* Publish directly to a specific queue using the [default exchange](https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-default). + +```smalltalk +publisher publish: 'The message' to: 'the-queue'. +``` + +* Publish to a routing key through a [direct](https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-direct) or [topic](https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-topic) exchange. + +```smalltalk +publisher publish: 'The message' to: 'a-routing-key' through: 'the-exchange'. +``` + +* Publish to all queues bound to a [fanout exchange](https://www.rabbitmq.com/tutorials/amqp-concepts#exchange-fanout). + +```smalltalk +publisher broadcast: 'The message' toAllQueuesBindedTo: 'a-fanout-exchange'. +``` + +For proper shutdown and connection closure, you need to run: + +```smalltalk +publisher stop. +``` diff --git a/docs/tutorials/RabbitMQWorker.md b/docs/tutorials/RabbitMQWorker.md index 424065dc..c7c9a61d 100644 --- a/docs/tutorials/RabbitMQWorker.md +++ b/docs/tutorials/RabbitMQWorker.md @@ -19,3 +19,38 @@ Accepts the following options: | #retry | A block that can configure the internal `Retry` instance | Optional | `[]` | | #queueName | Queue name where to consume from | Mandatory | | | #queueDurable | When false sets the [queue durability](https://www.rabbitmq.com/docs/queues#durability) to transient, otherwise will be durable | Optional | true | + +## Usage + +You need to instantiate the worker with the options above. + +```smalltalk +| worker | +worker := RabbitMQWorker configuredBy: [ :options | + options + at: #hostname put: 'localhost'; + at: #queueName put: aQueueName; + at: #extraClientProperties put: ( + Dictionary new + at: 'process' put: 'aName'; + yourself ) + ] + processingPayloadWith: [:message | message inspect ]. +``` + +Before sending `#start`, you need to consider [bind the queue](https://www.rabbitmq.com/tutorials/tutorial-four-python#bindings) to an exchange if necessary. + +```smalltalk +worker bindQueueTo: 'an-exchange-name' routedBy: 'a-routing-key'. +``` + +The #start method will block the socket, waiting for any new event, so it's recommended to fork the process and ensure that, before terminating, you unbind the queue and close the connection properly. + +```smalltalk +workerProcess := [ + [ worker start ] ensure: [ + worker unbindQueueTo: 'an-exchange-name' routedBy: 'a-routing-key'. + worker stop + ] +] fork +```