-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdemo.py
executable file
·79 lines (62 loc) · 1.87 KB
/
demo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#!/usr/bin/env python2.5
import sys
import time
from random import random
import asyncore
import pika
N = 15
Q = "forw_test_1"
def usage():
print 'Usage: %s --step1|--step2' % sys.argv[0]
if len(sys.argv) != 2:
usage()
sys.exit(1)
creds = pika.PlainCredentials('guest', 'guest')
jerry_conn = pika.AsyncoreConnection(pika.ConnectionParameters(
'localhost', port=65001, credentials=creds))
tom_conn = pika.AsyncoreConnection(pika.ConnectionParameters(
'localhost', port=65002, credentials=creds))
# will publish to jerry, expect to consume from tom
publisher_ch = jerry_conn.channel()
consumer_ch = tom_conn.channel()
# create queues on both brokers first
if sys.argv[-1] == '--step1':
for ch in (publisher_ch, consumer_ch):
print ch.queue_declare(queue=Q, durable=True,
exclusive=False, auto_delete=False)
print 'ok. step 1 done. run remsh code now. then re-run this with --step2'
sys.exit(0)
if sys.argv[-1] != '--step2':
usage()
sys.exit(1)
sent = { }
def handle_delivery(ch, meth, header, body):
global sent
print time.asctime(), 'received %s' % body
print sent
try: del(sent[body])
except KeyError: pass
ch.basic_ack(delivery_tag=meth.delivery_tag)
for i in range(N):
body = str(random())
publisher_ch.basic_publish(exchange='', routing_key=Q, body=body)
sent[body] = 1
attempts = N
while attempts > 0:
attempts -= 1
print attempts
asyncore.poll2(timeout=1.0)
if len(sent) == N and len(consumer_ch.callbacks) == 0:
print 'Starting consumer'
consumer_ch.basic_consume(handle_delivery, queue=Q)
elif len(sent) == 0:
print 'All consumed!'
break
if attempts == 0:
print 'failed. did you run step1? did you run remsh stuff?'
else:
print 'ok'
# exiting
jerry_conn.close()
tom_conn.close()
asyncore.loop()