Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: contribution of new py-amqp/asyncio based sensor, bump to v1.2.0 #22

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Change Log
# 1.1.2
ip-sf marked this conversation as resolved.
Show resolved Hide resolved
- Added new py-amqp based sensor using asyncio (3.8) for concurrency. Details on why: https://github.com/StackStorm/st2/discussions/5743
- Added pip dependency for `amqp==5.0.6` (py-amqp)

# 1.1.1
- Updated pip dependency to pika `1.3.x` to support python >= 3.7
Expand Down
49 changes: 46 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
Pack which allows integration with [RabbitMQ](http://www.rabbitmq.com/).

## Configuration

Configuration is required to use the RabbitMQ sensor. Copy the example configuration
### rabbitmq.RabbitMQQueueSensor Sensor
Configuration is required to use the `pika` based RabbitMQ sensor. Copy the example configuration
in [rabbitmq.yaml.example](./rabbitmq.yaml.example) to `/opt/stackstorm/configs/rabbitmq.yaml`
and edit as required.

Expand All @@ -23,6 +23,7 @@ You can also use dynamic values from the datastore. See the
You can specify multiple queues using this syntax:

```yaml
---
sensor_config:
rabbitmq_queue_sensor:
queues:
Expand All @@ -33,6 +34,40 @@ sensor_config:
- queue2
```

### rabbitmq.QueueWatcherAMQP Sensor
You must configure this sensor under the top level configuration key `amqp_watcher_sensor_config`. This is to ensure backwards compatibility with other sensors.

The below config will declare a simple 'Classic' queue, with a 'direct' exchange with routing key to route messages to that queue (for publishers).

```yaml
---
amqp_watcher_sensor_config:
host: "rabbitmq.domain.com"
port: 5672
username: "guest"
password: "guest"
queues:
- queue: "temp_queue"
type: "classic"
exchanges:
- exchange: "temp_exchange"
type: "direct"
bindings:
- routing_key: "temp.messages"
queue: "temp_queue"
```
The sensor then monitors any declared queues for new messages, and dispatches the trigger `rabbitmq.amqp_msg_rx` with the data:
```json
{"queue": "queue_name", "body": "message body"}
```

If the message body is a serialized string of JSON, it will be deserialized and loaded before being dispatched.

#### Queues / Exchanges Config Parameters
The sensor uses passthrough via `**kwargs` for declares and binds on `queues` and `exchanges` list items, so follow the [documentation of `py-yaml`](https://docs.celeryq.dev/projects/amqp/en/latest/reference/amqp.channel.html) for the `exchange_declare()`, `queue_declare()`, and `queue_bind()` methods.

**Note:** The `exchange` param is passed into `queue_bind()` explicitly via the sensor inferring it from config structure, and is not required.

## Actions

* ``list_exchanges`` - List available exchanges.
Expand All @@ -50,11 +85,19 @@ The following action will publish a message to a remote RabbitMQ server with a s
$ st2 run rabbitmq.publish_message host=localhost port=5673 virtual_host=sensu exchange=metrics exchange_type=topic username=sensu password=password message="foo.bar.baz 1 1436802746"
```


## Sensors

* ``new_message`` - Sensor that triggers a rabbitmq.new_message with a payload containing the queue and the body

Configured in the pack config under the key `sensor_config`

This sensor uses Python's `threading` for concurrency via `pika`'s included methods.

This sensor should only be used with ``fanout`` and ``topic`` exchanges, this way it doesn't affect the behavior of the app since messages will still be delivered to other consumers / subscribers.
If it's used with ``direct`` or ``headers`` exchanges, those messages won't be delivered to other consumers so it will affect app behavior and potentially break it.

* `amqp_msg_rx` - Sensor that triggers rabbitmq.amqp_msg_rx with a payload containing the queue and the body

Configured in the pack config under the key `amqp_watcher_sensor_config`

This sensor uses `asyncio` for concurrency and `py-amqp` and to handle the declaration of configured queues and exchanges on the remote RabbitMQ instance, and the subsequent consumption of messages.
116 changes: 115 additions & 1 deletion config.schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
sensor_config:
description: "RabbitMQ Sensor settings"
type: "object"
required: true
required: false
additionalProperties: true
properties:
host:
Expand Down Expand Up @@ -41,3 +41,117 @@ sensor_config:
- "json"
- "pickle"
required: false
amqp_watcher_sensor_config:
description: "The config for a py-amqp sensor that creates and monitors queues in RabbitMQ for messages"
required: false
additionalProperties: false
type: "object"
properties:
host:
description: "The RabbitMQ host to connect to"
type: "string"
required: true
port:
description: "Connection port for RabbitMQ (Default: 5672)"
type: "number"
username:
description: "Username for authenticating to RabbitMQ"
type: "string"
required: true
password:
description: "Password for authenticating to RabbitMQ"
type: "string"
required: true
secret: true
queues:
description: "A list of queues to be declared (created if missing)"
type: "array"
required: false
additionalProperties: false
items:
type: "object"
additionalProperties: false
required: true
properties:
queue:
description: "The name of the queue"
type: "string"
required: true
type:
description: "The type of the queue"
type: "string"
required: false
enum:
- "classic"
- "quorum"
- "stream"
passive:
description: "Configure this queue as passive?"
type: "boolean"
required: false
durable:
description: "Configure this queue as durable?"
type: "boolean"
required: false
exclusive:
description: "Configure this queue as exclusive?"
type: "boolean"
required: false
auto_delete:
description: "Configure this queue to auto delete?"
type: "boolean"
required: false
arguments:
description: "Additional arguments to be pased during Queue declare"
type: "object"
required: false
additionalProperties: true
exchanges:
description: "A list of exchanges to be declared (created if missing)"
type: "array"
required: false
additionalProperties: false
items:
type: "object"
additionalProperties: false
required: true
properties:
exchange:
description: "The name of the exchange"
type: "string"
required: true
type:
description: "The type of the exchange"
type: "string"
required: false
enum:
- "direct"
- "fanout"
- "headers"
- "topic"
passive:
description: "Configure this exchange as passive?"
type: "boolean"
required: false
durable:
description: "Configure this exchange as durable?"
type: "boolean"
required: false
auto_delete:
description: "Configure this exchange to auto delete?"
type: "boolean"
required: false
arguments:
description: "Additional arguments to be pased during Exchange declare"
type: "object"
required: false
additionalProperties: true
bindings:
description: "A list of bindings to be declared for this exchange"
type: "array"
required: false
additionalProperties: false
items:
type: "object"
required: true
additionalProperties: true
2 changes: 1 addition & 1 deletion pack.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ keywords:
- aqmp
- stomp
- message broker
version: 1.1.1
version: 1.1.2
python_versions:
- "3"
author: StackStorm, Inc.
Expand Down
14 changes: 14 additions & 0 deletions rabbitmq.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,17 @@ sensor_config:
queues:
- "queue1"
deserialization_method: "json"
amqp_watcher_sensor_config:
host: "rabbitmq.domain.com"
port: 5672
username: "guest"
password: "guest"
queues:
- queue: "temp_queue"
type: "classic"
exchanges:
- exchange: "temp_exchange"
type: "direct"
bindings:
- routing_key: "temp.messages"
queue: "temp_queue"
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pika~=1.3.1
amqp==5.0.6
Loading