-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add RabbitMQBoundQueueSensor #21
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work, you're going to want to update this section as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd be great if you could create some tests to go along with it.
Thanks for the feedback @mamercad I'll work on updating that section, and look into tests. |
Appreciate the contribution! |
In case you have not read this already, here is the Pack testing documentation. |
My main thinking about this PR: It's no problem to bump a major version of the pack with breaking changes while trying to unify things. |
@armab The motivation for this was because we were having consumers declare their own queues and bindings. On the other hand, if we want to allow the consumers to declare queues and bindings themselves, then I can work on a unified sensor which likely has breaking changes. I feel like it can be done without making the existing sensor too complex. |
Yeah, I'm definitely 👍 for the unified approach trying to keep some middle ground between the two. How a simple config example stackstorm-rabbitmq/rabbitmq.yaml.example Lines 3 to 9 in 492311e
might look like with the potential |
Hi, First off: Thank you for contributing this sensor! I have been fighting with the existing sensor without much luck, detailed in this discussion at @armab 's suggestion i gave this sensor a try using I started troubleshooting by adding some I've tried with and without the specified exchange/queue existing prior to the sensor starting (it looks like the sensor would create resources if not present). What is the current environment that you have this working in? (Docker, standalone, etc?) Any insight or ideas you can offer would be very appreciated. DetailsDocker Sensor Log2023-01-11 17:46:57 2023-01-12 01:46:57,778 INFO [-] Using Python: 3.8.10 (/opt/stackstorm/st2/bin/python)
2023-01-11 17:46:57 2023-01-12 01:46:57,778 INFO [-] Using fs encoding: utf-8, default encoding: utf-8, locale: en_US.UTF-8, LANG env variable: en_US.UTF-8, PYTHONIOENCODING env variable: notset
2023-01-11 17:46:57 2023-01-12 01:46:57,778 INFO [-] Using config files: /etc/st2/st2.conf,/etc/st2/st2.docker.conf,/etc/st2/st2.user.conf
2023-01-11 17:46:57 2023-01-12 01:46:57,778 INFO [-] Using logging config: /etc/st2/logging.sensorcontainer.conf
2023-01-11 17:46:57 2023-01-12 01:46:57,779 INFO [-] Using coordination driver: redis
2023-01-11 17:46:57 2023-01-12 01:46:57,779 INFO [-] Using metrics driver: noop
2023-01-11 17:46:57 2023-01-12 01:46:57,787 INFO [-] Connecting to database "st2" @ "mongo:27017" as user "None".
2023-01-11 17:46:57 2023-01-12 01:46:57,796 INFO [-] Successfully connected to database "st2" @ "mongo:27017" as user "None".
2023-01-11 17:46:57 2023-01-12 01:46:57,797 DEBUG [-] Ensuring database indexes...
2023-01-11 17:46:57 2023-01-12 01:46:57,888 DEBUG [-] Skipping index cleanup for blacklisted model "PermissionGrantDB"...
2023-01-11 17:46:57 2023-01-12 01:46:57,933 DEBUG [-] Indexes are ensured for models: ActionAliasDB, ActionAliasDB, ActionDB, ActionExecutionDB, ActionExecutionDB, ActionExecutionOutputDB, ActionExecutionSchedulingQueueItemDB, ActionExecutionStateDB, ActionExecutionStateDB, ApiKeyDB, ConfigDB, ConfigSchemaDB, GroupToRoleMappingDB, KeyValuePairDB, LiveActionDB, LiveActionDB, PackDB, PermissionGrantDB, PolicyDB, PolicyTypeDB, RoleDB, RuleDB, RuleEnforcementDB, RunnerTypeDB, RunnerTypeDB, SensorTypeDB, TaskExecutionDB, TokenDB, TraceDB, TriggerDB, TriggerInstanceDB, TriggerTypeDB, UserDB, UserRoleAssignmentDB, WorkflowExecutionDB
2023-01-11 17:46:57 2023-01-12 01:46:57,934 DEBUG [-] Registering exchanges...
2023-01-11 17:46:57 2023-01-12 01:46:57,934 DEBUG [-] Using SSL context for RabbitMQ connection: {}
2023-01-11 17:46:57 2023-01-12 01:46:57,943 DEBUG [-] Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': 'rabbit@a83a92b37497', 'copyright': 'Copyright (c) 2007-2022 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 25.1.2', 'product': 'RabbitMQ', 'version': '3.10.12'}, mechanisms: [b'AMQPLAIN', b'PLAIN'], locales: ['en_US']
2023-01-11 17:46:57 2023-01-12 01:46:57,944 DEBUG [-] using channel_id: 1
2023-01-11 17:46:57 2023-01-12 01:46:57,945 DEBUG [-] Channel open
2023-01-11 17:46:57 2023-01-12 01:46:57,945 DEBUG [-] Registered exchange st2.actionexecutionstate ({'exchange': 'st2.actionexecutionstate', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2023-01-11 17:46:57 2023-01-12 01:46:57,946 DEBUG [-] Registered exchange st2.announcement ({'exchange': 'st2.announcement', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2023-01-11 17:46:57 2023-01-12 01:46:57,947 DEBUG [-] Registered exchange st2.execution ({'exchange': 'st2.execution', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2023-01-11 17:46:57 2023-01-12 01:46:57,947 DEBUG [-] Registered exchange st2.liveaction ({'exchange': 'st2.liveaction', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2023-01-11 17:46:57 2023-01-12 01:46:57,947 DEBUG [-] Registered exchange st2.liveaction.status ({'exchange': 'st2.liveaction.status', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2023-01-11 17:46:57 2023-01-12 01:46:57,948 DEBUG [-] Registered exchange st2.trigger ({'exchange': 'st2.trigger', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2023-01-11 17:46:57 2023-01-12 01:46:57,948 DEBUG [-] Registered exchange st2.trigger_instances_dispatch ({'exchange': 'st2.trigger_instances_dispatch', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2023-01-11 17:46:57 2023-01-12 01:46:57,949 DEBUG [-] Registered exchange st2.sensor ({'exchange': 'st2.sensor', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2023-01-11 17:46:57 2023-01-12 01:46:57,949 DEBUG [-] Registered exchange st2.workflow ({'exchange': 'st2.workflow', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2023-01-11 17:46:57 2023-01-12 01:46:57,950 DEBUG [-] Registered exchange st2.workflow.status ({'exchange': 'st2.workflow.status', 'type': 'topic', 'durable': True, 'auto_delete': False, 'arguments': None, 'nowait': False, 'passive': False}).
2023-01-11 17:46:57 2023-01-12 01:46:57,950 DEBUG [-] Closed channel #1
2023-01-11 17:46:57 2023-01-12 01:46:57,950 DEBUG [-] using channel_id: 1
2023-01-11 17:46:57 2023-01-12 01:46:57,951 DEBUG [-] Channel open
2023-01-11 17:46:57 2023-01-12 01:46:57,951 DEBUG [-] Predeclaring queue for exchange "st2.liveaction.status"
2023-01-11 17:46:57 2023-01-12 01:46:57,952 DEBUG [-] Predeclared queue for exchange "st2.liveaction.status"
2023-01-11 17:46:57 2023-01-12 01:46:57,952 DEBUG [-] Predeclaring queue for exchange "st2.liveaction.status"
2023-01-11 17:46:57 2023-01-12 01:46:57,954 DEBUG [-] Predeclared queue for exchange "st2.liveaction.status"
2023-01-11 17:46:57 2023-01-12 01:46:57,954 DEBUG [-] Predeclaring queue for exchange "st2.liveaction.status"
2023-01-11 17:46:57 2023-01-12 01:46:57,955 DEBUG [-] Predeclared queue for exchange "st2.liveaction.status"
2023-01-11 17:46:57 2023-01-12 01:46:57,955 DEBUG [-] Predeclaring queue for exchange "st2.execution"
2023-01-11 17:46:57 2023-01-12 01:46:57,956 DEBUG [-] Predeclared queue for exchange "st2.execution"
2023-01-11 17:46:57 2023-01-12 01:46:57,956 DEBUG [-] Predeclaring queue for exchange "st2.actionexecutionstate"
2023-01-11 17:46:57 2023-01-12 01:46:57,958 DEBUG [-] Predeclared queue for exchange "st2.actionexecutionstate"
2023-01-11 17:46:57 2023-01-12 01:46:57,958 DEBUG [-] Predeclaring queue for exchange "st2.trigger_instances_dispatch"
2023-01-11 17:46:57 2023-01-12 01:46:57,959 DEBUG [-] Predeclared queue for exchange "st2.trigger_instances_dispatch"
2023-01-11 17:46:57 2023-01-12 01:46:57,959 DEBUG [-] Predeclaring queue for exchange "st2.announcement"
2023-01-11 17:46:57 2023-01-12 01:46:57,962 DEBUG [-] Predeclared queue for exchange "st2.announcement"
2023-01-11 17:46:57 2023-01-12 01:46:57,962 DEBUG [-] Predeclaring queue for exchange "st2.execution"
2023-01-11 17:46:57 2023-01-12 01:46:57,965 DEBUG [-] Predeclared queue for exchange "st2.execution"
2023-01-11 17:46:57 2023-01-12 01:46:57,965 DEBUG [-] Predeclaring queue for exchange "st2.liveaction"
2023-01-11 17:46:57 2023-01-12 01:46:57,968 DEBUG [-] Predeclared queue for exchange "st2.liveaction"
2023-01-11 17:46:57 2023-01-12 01:46:57,968 DEBUG [-] Predeclaring queue for exchange "st2.execution.output"
2023-01-11 17:46:57 2023-01-12 01:46:57,970 DEBUG [-] Predeclared queue for exchange "st2.execution.output"
2023-01-11 17:46:57 2023-01-12 01:46:57,971 DEBUG [-] Predeclaring queue for exchange "st2.workflow.status"
2023-01-11 17:46:57 2023-01-12 01:46:57,972 DEBUG [-] Predeclared queue for exchange "st2.workflow.status"
2023-01-11 17:46:57 2023-01-12 01:46:57,972 DEBUG [-] Predeclaring queue for exchange "st2.workflow.status"
2023-01-11 17:46:57 2023-01-12 01:46:57,973 DEBUG [-] Predeclared queue for exchange "st2.workflow.status"
2023-01-11 17:46:57 2023-01-12 01:46:57,973 DEBUG [-] Predeclaring queue for exchange "st2.trigger"
2023-01-11 17:46:57 2023-01-12 01:46:57,974 DEBUG [-] Predeclared queue for exchange "st2.trigger"
2023-01-11 17:46:57 2023-01-12 01:46:57,974 DEBUG [-] Predeclaring queue for exchange "st2.sensor"
2023-01-11 17:46:57 2023-01-12 01:46:57,975 DEBUG [-] Predeclared queue for exchange "st2.sensor"
2023-01-11 17:46:57 2023-01-12 01:46:57,976 DEBUG [-] Closed channel #1
2023-01-11 17:46:57 2023-01-12 01:46:57,980 DEBUG [-] Inserting system roles (['system_admin', 'admin', 'observer'])
2023-01-11 17:46:57 2023-01-12 01:46:57,992 DEBUG [-] found extension EntryPoint.parse('echo = st2common.metrics.drivers.echo_driver:EchoDriver')
2023-01-11 17:46:57 2023-01-12 01:46:57,992 DEBUG [-] found extension EntryPoint.parse('noop = st2common.metrics.drivers.noop_driver:NoopDriver')
2023-01-11 17:46:57 2023-01-12 01:46:57,993 DEBUG [-] found extension EntryPoint.parse('statsd = st2common.metrics.drivers.statsd_driver:StatsdDriver')
2023-01-11 17:46:57 2023-01-12 01:46:57,993 INFO [-] Running in single sensor mode, using a single sensor partitioner...
2023-01-11 17:46:57 2023-01-12 01:46:57,995 INFO [-] Setting up container to run 1 sensors.
2023-01-11 17:46:57 2023-01-12 01:46:57,995 INFO [-] Sensors list - ['rabbitmq.RabbitMQBoundQueueSensor'].
2023-01-11 17:46:57 2023-01-12 01:46:57,995 INFO [-] (PID:1) SensorContainer started.
2023-01-11 17:46:57 2023-01-12 01:46:57,995 DEBUG [-] Using SSL context for RabbitMQ connection: {}
2023-01-11 17:46:57 2023-01-12 01:46:57,995 DEBUG [-] Starting sensor CUD watcher...
2023-01-11 17:46:57 2023-01-12 01:46:57,996 DEBUG [-] Using SSL context for RabbitMQ connection: {}
2023-01-11 17:46:57 2023-01-12 01:46:57,996 INFO [-] Running sensor rabbitmq.RabbitMQBoundQueueSensor
2023-01-11 17:46:57 2023-01-12 01:46:57,996 DEBUG [-] Creating temporary auth token for sensor RabbitMQBoundQueueSensor
2023-01-11 17:46:58 2023-01-12 01:46:58,003 AUDIT [-] Registered new user "sensors_container". (username='sensors_container',user={'id': '63bf66922e1c2d747da9cecf', 'is_service': False, 'name': 'sensors_container', 'nicknames': {}})
2023-01-11 17:46:58 2023-01-12 01:46:58,004 DEBUG [-] Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': 'rabbit@a83a92b37497', 'copyright': 'Copyright (c) 2007-2022 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 25.1.2', 'product': 'RabbitMQ', 'version': '3.10.12'}, mechanisms: [b'AMQPLAIN', b'PLAIN'], locales: ['en_US']
2023-01-11 17:46:58 2023-01-12 01:46:58,005 AUDIT [-] Access granted to "sensors_container" with the token set to expire at "2023-01-13T01:46:58.003441Z". (username='sensors_container',token_expiration='2023-01-13T01:46:58.003441Z')
2023-01-11 17:46:58 2023-01-12 01:46:58,005 DEBUG [-] Running sensor subprocess (cmd="/opt/stackstorm/virtualenvs/rabbitmq/bin/python /opt/stackstorm/st2/lib/python3.8/site-packages/st2reactor/container/sensor_wrapper.py --pack=rabbitmq --file-path=/opt/stackstorm/packs/rabbitmq/sensors/bound_queues_sensor.py --class-name=RabbitMQBoundQueueSensor --trigger-type-refs=rabbitmq.routed_message --parent-args=["--config-file=/etc/st2/st2.conf", "--config-file=/etc/st2/st2.docker.conf", "--config-file=/etc/st2/st2.user.conf", "--single-sensor-mode", "--sensor-ref=rabbitmq.RabbitMQBoundQueueSensor", "--debug"]")
2023-01-11 17:46:58 2023-01-12 01:46:58,011 DEBUG [-] Dispatching trigger (trigger=core.st2.sensor.process_spawn,payload={'trigger': 'core.st2.sensor.process_spawn', 'payload': {'id': 'RabbitMQBoundQueueSensor', 'timestamp': 1673488018, 'pid': 8, 'cmd': '/opt/stackstorm/virtualenvs/rabbitmq/bin/python /opt/stackstorm/st2/lib/python3.8/site-packages/st2reactor/container/sensor_wrapper.py --pack=rabbitmq --file-path=/opt/stackstorm/packs/rabbitmq/sensors/bound_queues_sensor.py --class-name=RabbitMQBoundQueueSensor --trigger-type-refs=rabbitmq.routed_message --parent-args=["--config-file=/etc/st2/st2.conf", "--config-file=/etc/st2/st2.docker.conf", "--config-file=/etc/st2/st2.user.conf", "--single-sensor-mode", "--sensor-ref=rabbitmq.RabbitMQBoundQueueSensor", "--debug"]'}, 'trace_context': None})
2023-01-11 17:46:58 2023-01-12 01:46:58,016 INFO [-] Connected to amqp://guest:**@rabbitmq:5672//
2023-01-11 17:46:58 2023-01-12 01:46:58,016 DEBUG [-] using channel_id: 1
2023-01-11 17:46:58 2023-01-12 01:46:58,017 DEBUG [-] Start from server, version: 0.9, properties: {'capabilities': {'publisher_confirms': True, 'exchange_exchange_bindings': True, 'basic.nack': True, 'consumer_cancel_notify': True, 'connection.blocked': True, 'consumer_priorities': True, 'authentication_failure_close': True, 'per_consumer_qos': True, 'direct_reply_to': True}, 'cluster_name': 'rabbit@a83a92b37497', 'copyright': 'Copyright (c) 2007-2022 VMware, Inc. or its affiliates.', 'information': 'Licensed under the MPL 2.0. Website: https://rabbitmq.com', 'platform': 'Erlang/OTP 25.1.2', 'product': 'RabbitMQ', 'version': '3.10.12'}, mechanisms: [b'AMQPLAIN', b'PLAIN'], locales: ['en_US']
2023-01-11 17:46:58 2023-01-12 01:46:58,018 DEBUG [-] Channel open
2023-01-11 17:46:58 2023-01-12 01:46:58,019 DEBUG [-] using channel_id: 1
2023-01-11 17:46:58 2023-01-12 01:46:58,019 DEBUG [-] Channel open
2023-01-11 17:46:58 2023-01-12 01:46:58,020 DEBUG [-] Closed channel #1
2023-01-11 17:46:58 2023-01-12 01:46:58,020 INFO [-] Sensor rabbitmq.RabbitMQBoundQueueSensor started
2023-01-11 17:46:58 2023-01-12 01:46:58,020 DEBUG [-] 1 active sensor(s)
2023-01-11 17:46:58 2023-01-12 01:46:58,687 DEBUG [-] Using SSL context for RabbitMQ connection: {}
2023-01-11 17:46:58 2023-01-12 01:46:58,694 INFO [-] Found config for sensor "RabbitMQBoundQueueSensor"
2023-01-11 17:46:58 2023-01-12 01:46:58,694 INFO [-] USERDEBUG ReconnectingConsumer.init complete
2023-01-11 17:46:58 2023-01-12 01:46:58,695 INFO [-] USERDEBUG main init complete
2023-01-11 17:46:58 2023-01-12 01:46:58,695 DEBUG [-] Using SSL context for RabbitMQ connection: {}
2023-01-11 17:46:58 2023-01-12 01:46:58,695 INFO [-] Watcher started
2023-01-11 17:46:58 2023-01-12 01:46:58,695 INFO [-] Running sensor initialization code
2023-01-11 17:46:58 2023-01-12 01:46:58,695 INFO [-] USERDEBUG ReconnectingConsumer.connect
2023-01-11 17:46:58 2023-01-12 01:46:58,695 INFO [-] USERDEBUG ReconnectingConsumer._open_connection
2023-01-11 17:46:58 2023-01-12 01:46:58,695 DEBUG [-] Using PollPoller
2023-01-11 17:46:58 2023-01-12 01:46:58,701 DEBUG [-] New Connection state: CLOSED (prev=CLOSED)
2023-01-11 17:46:58 2023-01-12 01:46:58,701 DEBUG [-] Added: {'callback': <bound method Connection._on_connection_start of <SelectConnection CLOSED transport=None params=<ConnectionParameters host=rabbitmq port=5672 virtual_host=/ ssl=False>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
2023-01-11 17:46:58 2023-01-12 01:46:58,701 DEBUG [-] Added: {'callback': <bound method Connection._on_connection_close_from_broker of <SelectConnection CLOSED transport=None params=<ConnectionParameters host=rabbitmq port=5672 virtual_host=/ ssl=False>>>, 'one_shot': True, 'only': None, 'arguments': None, 'calls': 1}
2023-01-11 17:46:58 2023-01-12 01:46:58,701 DEBUG [-] Added: {'callback': <bound method ReconnectingConsumer._on_connection_open_error of <bound_queues_sensor.ReconnectingConsumer object at 0x7f02b9710130>>, 'one_shot': False, 'only': None, 'arguments': None}
2023-01-11 17:46:58 2023-01-12 01:46:58,701 DEBUG [-] Added: {'callback': <bound method ReconnectingConsumer._on_connection_open of <bound_queues_sensor.ReconnectingConsumer object at 0x7f02b9710130>>, 'one_shot': False, 'only': None, 'arguments': None}
2023-01-11 17:46:58 2023-01-12 01:46:58,702 DEBUG [-] Added: {'callback': <bound method ReconnectingConsumer._on_connection_close of <bound_queues_sensor.ReconnectingConsumer object at 0x7f02b9710130>>, 'one_shot': False, 'only': None, 'arguments': None}
2023-01-11 17:46:58 2023-01-12 01:46:58,702 DEBUG [-] New Connection state: INIT (prev=CLOSED)
2023-01-11 17:46:58 2023-01-12 01:46:58,702 DEBUG [-] Starting AMQP Connection workflow asynchronously.
2023-01-11 17:46:58 2023-01-12 01:46:58,702 DEBUG [-] call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7f02b96ae5b0> with deadline=44723.170437885 and callback=functools.partial(<bound method AMQPConnectionWorkflow._start_new_cycle_async of <pika.adapters.utils.connection_workflow.AMQPConnectionWorkflow object at 0x7f02b96ae610>>, first=True); now=44723.170437885; delay=0
2023-01-11 17:46:58 2023-01-12 01:46:58,702 INFO [-] Running sensor in passive mode
2023-01-11 17:46:58 2023-01-12 01:46:58,702 INFO [-] USERDEBUG main run
2023-01-11 17:46:58 2023-01-12 01:46:58,702 INFO [-] USERDEBUG ReconnectingConsumer.run begin
2023-01-11 17:46:58 2023-01-12 01:46:58,702 DEBUG [-] Entering IOLoop
2023-01-11 17:46:58 2023-01-12 01:46:58,702 DEBUG [-] Beginning a new AMQP connection workflow cycle; attempts remaining after this: 0
2023-01-11 17:46:58 2023-01-12 01:46:58,702 DEBUG [-] call_later: added timeout <pika.adapters.select_connection._Timeout object at 0x7f02b97108e0> with deadline=44723.171033903 and callback=<bound method AMQPConnectionWorkflow._try_next_config_async of <pika.adapters.utils.connection_workflow.AMQPConnectionWorkflow object at 0x7f02b96ae610>>; now=44723.171033903; delay=0
2023-01-11 17:46:58 2023-01-12 01:46:58,702 DEBUG [-] _try_next_config_async: 'rabbitmq':5672
2023-01-11 17:46:58 2023-01-12 01:46:58,705 DEBUG [-] Found existing trigger: TriggerDB(description=None, id=63bf66713d7e33060876d4d4, metadata_file=None, name="routed_message", pack="rabbitmq", parameters={}, ref="rabbitmq.routed_message", ref_count=0, type="rabbitmq.routed_message", uid="trigger:rabbitmq:routed_message:99914b932bd37a50b983c5e7c90ae93b") in db.
2023-01-11 17:46:58 2023-01-12 01:46:58,705 DEBUG [-] Calling sensor "add_trigger" method (trigger.type=rabbitmq.routed_message)
2023-01-11 17:46:59 2023-01-12 01:46:59,022 DEBUG [-] 1 active sensor(s)
2023-01-11 17:47:00 2023-01-12 01:47:00,023 DEBUG [-] 1 active sensor(s)
2023-01-11 17:47:01 2023-01-12 01:47:01,024 DEBUG [-] 1 active sensor(s)
[... repeating]
2023-01-11 17:54:42 2023-01-12 01:54:42,430 DEBUG [-] 1 active sensor(s)
2023-01-11 17:54:43 2023-01-12 01:54:43,431 DEBUG [-] 1 active sensor(s)
2023-01-11 17:54:44 2023-01-12 01:54:44,432 DEBUG [-] 1 active sensor(s) RabbitMQ Pack Config---
sensor_config:
host: "rabbitmq"
username: "guest"
password: "guest"
rabbitmq_queue_sensor:
queues:
- "fakequeue"
deserialization_method: "json"
sensor_binding_config:
host: "rabbitmq"
username: "guest"
password: "guest"
exchanges:
- name: "platform"
exchange_type: "direct"
queues:
- name: "platform_jobs"
bindings:
- routing_key: "platform.orchestration.jobs" Docker Compose config for single SensorI also commented out the stock sensor container, so this is literally the only sensor running and enabled. st2sensorcontainer-rabbitmqbound:
image: ${ST2_IMAGE_REPO:-stackstorm/}st2sensorcontainer:${ST2_VERSION:-latest}
restart: on-failure
depends_on:
- st2api
networks:
- private
command:
- "/opt/stackstorm/st2/bin/st2sensorcontainer"
- "--config-file=/etc/st2/st2.conf"
- "--config-file=/etc/st2/st2.docker.conf"
- "--config-file=/etc/st2/st2.user.conf"
- "--single-sensor-mode"
- "--sensor-ref=rabbitmq.RabbitMQBoundQueueSensor"
- "--debug"
volumes:
- ./files/st2.docker.conf:/etc/st2/st2.docker.conf:ro
- ./files/st2.user.conf:/etc/st2/st2.user.conf:ro
- stackstorm-virtualenvs:/opt/stackstorm/virtualenvs:ro
- stackstorm-packs:/opt/stackstorm/packs:ro
- stackstorm-packs-configs:/opt/stackstorm/configs:ro
- ${ST2_PACKS_DEV:-./packs.dev}:/opt/stackstorm/packs.dev:ro |
To follow up, per the linked discussion the issue is with pika and threading/concurrency and not specifically with your sensor's code. I used your sensor as a test bed and it worked amazingly outside of exactly the case of " |
This PR adds a new sensor
RabbitMQBoundQueueSensor
.The purpose for this is to support multiple exchanges, exchange types, and queue routing features. The existing
RabbitMQQueueSensor
lacks the ability to declare these options. Additionally, the existing sensor does not reconnect when connection to RabbitMQ is lost.Implementing a separate sensor allows this to not cause breaking changes.
The new config schema is more complex, I tried to strike a balance between complexity and ease of use inside the sensor. The updated example config shows off many of the new features.