Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Reconnection Logic #43

Merged
merged 65 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
a4410cb
:sparkles: Re-export with Pharo 12 format
jvanecek Jul 27, 2024
6e224b7
:sparkles: Add RabbitMQPublisher
AgusSalvidio Jul 10, 2024
e15581e
:white_check_mark: Add RabbitMQPublisher tests
AgusSalvidio Jul 10, 2024
e91cdda
:sparkles: Add support for reconnection logic and also enhance logging
AgusSalvidio Jul 10, 2024
7c49c01
:sparkles: Add support for client reconnection
AgusSalvidio Jul 10, 2024
d69234f
:recycle: Refactor RabbitMQ code
AgusSalvidio Jul 10, 2024
9ca32f0
:bulb: Added RabbitMQPublisher class comment
AgusSalvidio Jul 10, 2024
747cfdb
:recycle: Refactor RabbitMQWorker and its tests
AgusSalvidio Jul 11, 2024
069d824
:test_tube: Add rabbit disconnection test
AgusSalvidio Jul 16, 2024
57f9bfe
:recycle: RabbitMQ client, worker , publisher refactors
AgusSalvidio Jul 22, 2024
9fcb756
:recycle: AmqpChannel refactors
AgusSalvidio Jul 22, 2024
6d049f2
:white_check_mark: Add RabbitMQWorker reconnection test
AgusSalvidio Jul 22, 2024
de5781d
upgrade dependencies
jvanecek Jul 22, 2024
37809cd
add pharo 12 to the CI actions
jvanecek Jul 22, 2024
e555b7d
:art: Add support to handle nextFrame
AgusSalvidio Jul 25, 2024
c5081d8
:art: Add waitForReply handle
AgusSalvidio Jul 25, 2024
04516d3
:test_tube: Updated test to allow Pharo retrocompatibility
AgusSalvidio Jul 26, 2024
569b1a8
:memo: Added Pharo 12 support
AgusSalvidio Jul 26, 2024
ba135be
Update references of LaunchpadLogRecord to LogRecord
jvanecek Jul 26, 2024
4838473
Refactor the RabbitMQClient hierarchy to make the worker and publishe…
jvanecek Jul 28, 2024
7895862
handle unexpected errors in heartbeat process
jvanecek Jul 29, 2024
5f2a04a
Refactor worker to be used by composition instead of inheritance
jvanecek Jul 29, 2024
b7cfac8
Include Bell-SUnit as dependency to use LoggingAsserter
jvanecek Jul 29, 2024
b2f9fa5
Refactor RabbitMQPublisherTest to use LoggingAsserter
jvanecek Jul 29, 2024
eb45ea8
Refactor RabbitMQTextReverserText to use LoggingAsserter
jvanecek Jul 29, 2024
a7a98a8
Add messages ack callback in RabbitMQPublisher
jvanecek Jul 29, 2024
208df95
Make queue durability configurable in RabbitMQWorker
jvanecek Jul 29, 2024
dfbec25
Refactor AMQPRabbitTest to user RabbitMQPublisher and Worker
jvanecek Jul 29, 2024
5df104c
Make AmqpConnection printOn more robust to disconnections
jvanecek Jul 29, 2024
591df2b
Upgrade actions to use checkout@v4
jvanecek Jul 29, 2024
7b9aba6
Make RabbitMQPublisherTest more portable to previous versions
jvanecek Jul 29, 2024
e90f1dc
Remove redundant error handled
jvanecek Jul 29, 2024
df1ca68
:wastebasket: Deprecate method #confirmMessagesPublicationWith: andTh…
AgusSalvidio Aug 1, 2024
7c9e7e5
:memo: Add migration guide doc
AgusSalvidio Aug 1, 2024
c087d5e
:recycle: Refactor RabbitMQClientTest
AgusSalvidio Aug 1, 2024
2d512c3
:wrench: Add extra client properties to AmqpConnection
AgusSalvidio Aug 5, 2024
49b615a
:white_check_mark: Add RabbitMQ reconnection test
AgusSalvidio Aug 5, 2024
2f859ef
Extract asserts in testPublisherConfirmationWhenMessageProcessed
jvanecek Aug 9, 2024
38d3ee0
Homogeneize AmqpConnection closing logic
jvanecek Aug 10, 2024
504e440
Increment heartbeat priority to highIO
jvanecek Aug 10, 2024
62b8c7d
Use ZdcSocketStream because it's flush handles better the disconnecti…
jvanecek Aug 10, 2024
eca1148
Create an AmqpSocketStream with a #flush implementation consistent fo…
jvanecek Aug 10, 2024
78dd652
Move the fixed #flush implementation to an extension loaded only on P…
jvanecek Aug 12, 2024
3f33063
Correct the deprecated message
jvanecek Aug 12, 2024
75ddd2f
Rename RabbitMQWorker instantiation
jvanecek Aug 12, 2024
6013e94
Update migration guide and docs with new RabbitMQ Clients
jvanecek Aug 12, 2024
d8d1ea4
Update timeframeBetweenAttempts default value
jvanecek Aug 12, 2024
e9eca3c
Add support of debugging info in RabbitMQWorker
jvanecek Aug 12, 2024
205408a
Fix markdown linter errors
jvanecek Aug 13, 2024
30e5053
Include deliveryTag in the worker debugging logs
jvanecek Aug 13, 2024
2d05146
Refactor AmqpChannelHandler instance variables
jvanecek Aug 27, 2024
ff7ce84
Refactor the SocketConnectionStatus hierarchy and improve some handli…
jvanecek Aug 27, 2024
b7f1c50
Improve the reconnection logic when the failure is in the client's so…
jvanecek Aug 27, 2024
a699fd1
Add SocketError as a possible connectivity error
jvanecek Aug 27, 2024
45b9a64
Add PrimitiveFailed as a possible connectivity error for compatibilit…
jvanecek Aug 27, 2024
be39a5d
Improve AmqpConnection description
jvanecek Aug 27, 2024
0ef360d
Homogeneize the exchange and queue declaration using a builder
jvanecek Aug 27, 2024
6239fec
Create a global to wrap the socket error across different pharo versions
jvanecek Aug 27, 2024
350ae76
Use Smalltalk>>#includesKey: to make it portable to Pharo8
jvanecek Aug 28, 2024
c123dc6
Avoid Heartbeat process termination infinite loop in some Pharo versions
jvanecek Aug 28, 2024
073c3b0
Use ifTrue:ifFalse: in postLoadInitialization
jvanecek Aug 28, 2024
8efb901
Release waiting processes on socket when that one is destroyed
jvanecek Aug 29, 2024
e433939
Add another expected error for compatibility with previous version of…
jvanecek Aug 30, 2024
aca79e7
Stop heartbeat after socket was closed
jvanecek Aug 30, 2024
f87db1e
Rename ensure to assert in messages that only checks something and ra…
jvanecek Aug 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/loading-groups.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
strategy:
fail-fast: false
matrix:
smalltalk: [ Pharo64-11, Pharo64-10, Pharo64-9.0, Pharo64-8.0 ]
smalltalk: [ Pharo64-12, Pharo64-11, Pharo64-10, Pharo64-9.0, Pharo64-8.0 ]
load-spec: [ deployment, tests, development, tools ]
name: ${{ matrix.smalltalk }} + ${{ matrix.load-spec }}
services:
Expand All @@ -18,7 +18,7 @@ jobs:
- 5672:5672
options: --health-cmd "rabbitmqctl node_health_check" --health-interval 10s --health-timeout 5s --health-retries 5
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- uses: hpi-swa/setup-smalltalkCI@v1
with:
smalltalk-image: ${{ matrix.smalltalk }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/markdown-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ jobs:
name: runner / markdownlint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4
- name: markdownlint
uses: reviewdog/action-markdownlint@v0
with:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ jobs:
strategy:
fail-fast: false
matrix:
smalltalk: [ Pharo64-11, Pharo64-10, Pharo64-9.0, Pharo64-8.0 ]
smalltalk: [ Pharo64-12, Pharo64-11, Pharo64-10, Pharo64-9.0, Pharo64-8.0 ]
name: ${{ matrix.smalltalk }}
services:
rabbitmq:
Expand All @@ -18,7 +18,7 @@ jobs:
- 5672:5672
options: --health-cmd "rabbitmqctl node_health_check" --health-interval 10s --health-timeout 5s --health-retries 5
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
- uses: hpi-swa/setup-smalltalkCI@v1
with:
smalltalk-image: ${{ matrix.smalltalk }}
Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ Ansible is a AMQP client library for Smalltalk supporting 0-8 and 0-9-1
[![Pharo 8.0](https://img.shields.io/badge/Pharo-8.0-informational)](https://pharo.org)
[![Pharo 9.0](https://img.shields.io/badge/Pharo-9.0-informational)](https://pharo.org)
[![Pharo 10](https://img.shields.io/badge/Pharo-10-informational)](https://pharo.org)
[![Pharo 11](https://img.shields.io/badge/Pharo-10-informational)](https://pharo.org)
[![Pharo 11](https://img.shields.io/badge/Pharo-11-informational)](https://pharo.org)
[![Pharo 12](https://img.shields.io/badge/Pharo-12-informational)](https://pharo.org)

An [Ansible](https://en.wikipedia.org/wiki/Ansible) is a fictional device
capable of near-instantaneous communication. It can send and receive message
Expand Down
70 changes: 70 additions & 0 deletions docs/MigrationGuide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Migration Guide

## Migration from v2 to v3

* Manually load the package `Ansible-Deprecated-V3` that has transformation of
deprecated messages.

* `RabbitMQWorker` has been refactored to be used by composition instead of
inheritance: The classes that subclassified it implemented the subclass
responsibility `#configureConnection:` and `#process: payload` should be
changed to instantiate `RabbitMQWorker` as to pass that process logic to the
`processingPayloadWith:` collaborator.

For instance,

```smalltalk
Class {
#name : 'RabbitMQTextReverser',
#superclass : 'RabbitMQWorker',
#instVars : [
'testCase',
]
}

{ #category : 'initialization' }
RabbitMQTextReverser >> initializeWorkingWith: aTestCase

testCase := aTestCase

{ #category : 'private' }
RabbitMQTextReverser >> #configureConnection: builder

builder hostname: 'localhost'.
builder portNumber: 5672.
builder username: 'guest'.
builder password: 'guest'.

{ #category : 'private' }
RabbitMQTextReverser >> process: payload

testCase storeText: payload utf8Decoded reversed
```

Should be refactored to

```smalltalk
Class {
#name : 'RabbitMQTextReverser',
#superclass : 'Object',
#instVars : [
'worker'
]
}

RabbitMQTextReverser >> initializeWorkingWith: aTestCase

worker := RabbitMQWorker
configuredBy: [ :options |
options
at: #hostname 'localhost';.
at: #port 5672;.
at: #username 'guest';.
at: #password 'guest'
]
processingMessagesWith: [ :payload |
aTestCase storeText: payload utf8Decoded reversed
].

worker start
```
8 changes: 8 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ They are not a rewrite but rather my interpretation.
reading it to get a complete understanding. They also provide this [great tool](http://tryrabbitmq.com)
to help you explore different messaging patterns.

## Use RabbitMQ clients reifications

We provide two objects to simplify the instantiations of a publisher and a
consumer:

1. [RabbitMQPublisher](tutorials/RabbitMQPublisher.md)
2. [RabbitMQWorker](tutorials/RabbitMQWorker.md)

---

To use the project as a dependency of your project, take a look at:
Expand Down
2 changes: 1 addition & 1 deletion docs/tutorials/PublishSubscribe.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ logger name: 'Transcript logger'.
logger resume
```

## Receiveing notifications
## Receiving notifications

Here's the script to spawn a process that will pop up a toast notification on
every log message received
Expand Down
19 changes: 19 additions & 0 deletions docs/tutorials/RabbitMQPublisher.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# RabbitMQPublisher

This object will connect to an AMQP channel and knows how to publish messages
to the specified queue for further processing.

Accepts the following options:

<!-- markdownlint-disable MD013 -->
| Attribute name | Description | Optional/Mandatory | Default value |
| ---------------|-------------|--------------------|---------------|
| #hostname | Hostname of the rabbitmq broker | Optional | localhost |
| #port | Port numbre of the rabbitmq broker | Optional | 5672 |
| #username | Username of the rabbitmq broker | Optional | guest |
| #password | Username of the rabbitmq broker | Optional | guest |
| #maximumConnectionAttemps | Amount of retries when connecting to the broker fails | Optional | 3 |
| #timeSlotBetweenConnectionRetriesInMs | Time duration between retry attempts determined by using the exponential backoff algorithm | Optional | 3000 |
| #enableDebuggingLogs | A boolean indicating whether to log debugging events | Optional | false |
| #extraClientProperties | A dictionary with keys and values to set the [client properties](https://www.rabbitmq.com/docs/connections#capabilities) |Optional | Empty |
| #retry | A block that can configure the internal `Retry` instance | Optional | `[]` |
21 changes: 21 additions & 0 deletions docs/tutorials/RabbitMQWorker.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# RabbitMQWorker

This object will connect to an AMQP channel and knows how to consume messages
from a specified queue for processing.

Accepts the following options:

<!-- markdownlint-disable MD013 -->
| Attribute name | Description | Optional/Mandatory | Default value |
| ---------------|-------------|--------------------|---------------|
| #hostname | Hostname of the rabbitmq broker | Optional | localhost |
| #port | Port numbre of the rabbitmq broker | Optional | 5672 |
| #username | Username of the rabbitmq broker | Optional | guest |
| #password | Username of the rabbitmq broker | Optional | guest |
| #maximumConnectionAttemps | Amount of retries when connecting to the broker fails | Optional | 3 |
| #timeSlotBetweenConnectionRetriesInMs | Time duration between retry attempts determined by using the exponential backoff algorithm | Optional | 3000 |
| #enableDebuggingLogs | A boolean indicating whether to log debugging events | Optional | false |
| #extraClientProperties | A dictionary with keys and values to set the [client properties](https://www.rabbitmq.com/docs/connections#capabilities)| Optional | Empty |
| #retry | A block that can configure the internal `Retry` instance | Optional | `[]` |
| #queueName | Queue name where to consume from | Mandatory | |
| #queueDurable | When false sets the [queue durability](https://www.rabbitmq.com/docs/queues#durability) to transient, otherwise will be durable | Optional | true |
110 changes: 110 additions & 0 deletions source/Ansible-Deprecated-v3/AmqpChannel.extension.st
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
Extension { #name : 'AmqpChannel' }

{ #category : '*Ansible-Deprecated-v3' }
AmqpChannel >> confirmMessagesPublicationWith: anAckBlock andThoseNotProcessedWith: aNackBlock [

self
deprecated: 'Use confirmPublicationWith:otherwise:'
transformWith:
'`@receiver confirmMessagesPublicationWith: `@anAckBlock andThoseNotProcessedWith: `@aNackBlock'
-> '`@receiver onPublicationConfirmationDo: `@anAckBlock onRejectionDo: `@aNackBlock'.

self onPublicationConfirmationDo: anAckBlock onRejectionDo: aNackBlock
]

{ #category : '*Ansible-Deprecated-v3' }
AmqpChannel >> exchangeDeclare: exchangeName type: typeString [

self deprecated: 'Use #declareExchangeNamed:of:applying: directly'.

^ self
exchangeDeclare: exchangeName
type: typeString
durable: false
autoDelete: false
passive: false
arguments: nil
]

{ #category : '*Ansible-Deprecated-v3' }
AmqpChannel >> exchangeDeclare: exchangeName type: typeString durable: durable [

self deprecated: 'Use #declareExchangeNamed:of:applying: directly'.

^ self
exchangeDeclare: exchangeName
type: typeString
durable: durable
autoDelete: false
passive: false
arguments: nil
]

{ #category : '*Ansible-Deprecated-v3' }
AmqpChannel >> exchangeDeclare: exchangeName type: typeString durable: durable autoDelete: autoDelete passive: passive [

self deprecated: 'Use #declareExchangeNamed:of:applying: directly'.

^ self
exchangeDeclare: exchangeName
type: typeString
durable: durable
autoDelete: autoDelete
passive: passive
arguments: nil
]

{ #category : '*Ansible-Deprecated-v3' }
AmqpChannel >> exchangeDeclare: exchangeName type: typeString durable: durable autoDelete: autoDelete passive: passive arguments: aDictionary [

self deprecated: 'Use #declareExchangeNamed:of:applying: directly'.

^ self declareExchangeNamed: exchangeName of: typeString applying: [ :builder |
passive then: [ builder bePassive ].
durable then: [ builder beDurable ].
autoDelete then: [ builder autoDelete ].
builder useAsArguments: aDictionary
]
]

{ #category : '*Ansible-Deprecated-v3' }
AmqpChannel >> queueDeclare: queueName [

self deprecated: 'Use #declareQueueApplying: directly'.
^ self
queueDeclare: queueName
durable: false
exclusive: false
autoDelete: false
passive: false
arguments: nil
]

{ #category : '*Ansible-Deprecated-v3' }
AmqpChannel >> queueDeclare: queueName durable: durable [

self deprecated: 'Use #declareQueueApplying: directly'.
^ self
queueDeclare: queueName
durable: durable
exclusive: false
autoDelete: false
passive: false
arguments: nil
]

{ #category : '*Ansible-Deprecated-v3' }
AmqpChannel >> queueDeclare: queueName durable: durable exclusive: exclusive autoDelete: autoDelete passive: passive arguments: aDictionary [

self deprecated: 'Use #declareQueueApplying: directly'.

^ self declareQueueApplying: [ :builder |
builder
name: queueName;
useAsArguments: aDictionary.
passive then: [ builder bePassive ].
durable then: [ builder beDurable ].
exclusive then: [ builder beExclusive ].
autoDelete then: [ builder autoDelete ]
]
]
1 change: 1 addition & 0 deletions source/Ansible-Deprecated-v3/package.st
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Package { #name : 'Ansible-Deprecated-v3' }
10 changes: 10 additions & 0 deletions source/Ansible-Pharo-Pending-Patches/Semaphore.extension.st
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Extension { #name : 'Semaphore' }

{ #category : '*Ansible-Pharo-Pending-Patches' }
Semaphore >> waitTimeoutMilliseconds: anInteger [
"Wait on this semaphore for up to the given number of milliseconds, then timeout.
Return true if the deadline expired, false otherwise."
| d |
d := DelayWaitTimeout new setDelay: (anInteger max: 0) forSemaphore: self.
^d wait
]
Loading
Loading