forked from compose/transporter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathadaptor_test.go
94 lines (83 loc) · 2.12 KB
/
adaptor_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
package rabbitmq
import (
"fmt"
"os"
"testing"
"time"
"github.com/compose/transporter/log"
"github.com/streadway/amqp"
)
const (
testExchange = "transporter-tests"
)
var (
defaultTestClient = &Client{
uri: DefaultURI,
}
defaultSession *Session
queuesToTest = []TestData{
readerTestData, readerBadDataTest,
writerTestData, writerTestData2, writerTestData3,
}
)
type TestData struct {
Queue string
RoutingKey string
InsertCount int
}
func setup() {
log.Infoln("setting up tests")
s, err := defaultTestClient.Connect()
if err != nil {
log.Errorf("unable to initialize connection to rabbitmq, %s", err)
os.Exit(1)
}
defaultSession = s.(*Session)
if err := defaultSession.channel.ExchangeDeclare(
testExchange,
"direct",
true,
false,
false,
false,
nil); err != nil {
log.Errorf("unable to declare exhange, %s", err)
os.Exit(1)
}
for _, testData := range queuesToTest {
setupData(testData, defaultSession.channel)
}
}
func setupData(data TestData, ch *amqp.Channel) {
if _, err := ch.QueueDeclare(data.Queue, false, false, false, false, nil); err != nil {
log.Errorf("failed to declare queue (%s), may affect tests!, %s", data.Queue, err)
}
if _, err := ch.QueuePurge(data.Queue, true); err != nil {
log.Errorf("failed to purge queue (%s), may affect tests!, %s", data.Queue, err)
}
if err := ch.QueueBind(data.Queue, data.RoutingKey, testExchange, false, nil); err != nil {
log.Errorf("failed to bind queue (%s), may affect tests!, %s", data.Queue, err)
}
for i := 0; i < data.InsertCount; i++ {
msg := amqp.Publishing{
DeliveryMode: amqp.Transient,
Timestamp: time.Now(),
ContentType: "application/json",
Body: []byte(fmt.Sprintf(`{"id": %d, "message": "hello"}`, i)),
}
if err := ch.Publish(testExchange, data.RoutingKey, false, false, msg); err != nil {
log.Errorf("failed to publish to queue (%s), may affect tests!, %s", data.Queue, err)
}
}
}
func TestMain(m *testing.M) {
setup()
code := m.Run()
shutdown()
os.Exit(code)
}
func shutdown() {
log.Infoln("shutting down tests")
defaultTestClient.Close()
log.Infoln("tests shutdown complete")
}