forked from mgckind/desaccess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmonitor.py
31 lines (25 loc) · 990 Bytes
/
monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from celery import Celery
def my_monitor(app):
state = app.events.State()
def on_event(event):
print("Event for {0} : {1}".format(event['uuid'],event))
def announce_failed_tasks(event):
state.event(event)
# task name is sent only with -received event, and state
# will keep track of this for us.
task = state.tasks.get(event['uuid'])
print('TASK FAILED: %s[%s] %s' % (
task.name, task.uuid, task.info(),))
with app.connection() as connection:
recv = app.events.Receiver(connection, handlers={
'task-failed': announce_failed_tasks,
'task-succeeded' : on_event,
'task-received' : on_event,
'task-revoked' : on_event,
'*': state.event,
})
recv.capture(limit=None, timeout=None, wakeup=True)
if __name__ == '__main__':
app = Celery()
app.config_from_object('config.celeryconfig')
my_monitor(app)