forked from AtherEnergy/rumqtt
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTODO.txt
199 lines (175 loc) · 7.54 KB
/
TODO.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
*
#[cfg(test)]
mod test {
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use mqtt::packet::*;
use mqtt::QualityOfService as QoS;
use mqtt::topic_name::TopicName;
use time;
use clientoptions::MqttOptions;
use super::MqttClient;
use message::Message;
const BROKER_ADDRESS: &'static str = "localhost:1883";
fn fill_qos1_publish_buffer(client: &mut MqttClient) {
for i in 1001..1101 {
let message = Box::new(Message {
topic: TopicName::new("a/b/c".to_string()).unwrap(),
qos: QoSWithPacketIdentifier::Level1(i),
retain: false,
payload: Arc::new("dummy data".to_string().into_bytes()),
userdata: None,
});
client.outgoing_pub.push_back((time::get_time().sec, message));
}
}
fn fill_qos2_publish_buffer(client: &mut MqttClient) {
for i in 2001..2101 {
let message = Box::new(Message {
topic: TopicName::new("a/b/c".to_string()).unwrap(),
qos: QoSWithPacketIdentifier::Level2(i),
retain: false,
payload: Arc::new("dummy data".to_string().into_bytes()),
userdata: None,
});
client.outgoing_rec.push_back((time::get_time().sec, message));
}
}
#[test]
fn retransmission_after_timeout() {
let client_options = MqttOptions::new()
.set_keep_alive(5)
.set_q_timeout(5)
.set_client_id("test-retransmission-client")
.broker(BROKER_ADDRESS);
let mut mq_client = MqttClient::new(client_options);
fill_qos1_publish_buffer(&mut mq_client);
fill_qos2_publish_buffer(&mut mq_client);
let request = mq_client.start().expect("Coudn't start");
thread::sleep(Duration::new(20, 0));
let final_qos1_length = request.qos1_q_len().expect("Stats Request Error");
let final_qos2_length = request.qos2_q_len().expect("Stats Request Error");
assert_eq!(0, final_qos1_length);
assert_eq!(0, final_qos2_length);
}
#[test]
/// Publish Queues should be immediately retransmitted
/// after reconnection.
/// `publish_q_timeout` > thread sleep time ensures this.
fn force_retransmission_after_reconnect() {
let client_options = MqttOptions::new()
.set_keep_alive(5)
.set_client_id("test-forceretransmission-client")
.broker(BROKER_ADDRESS);
let mut mq_client = MqttClient::new(client_options);
fill_qos1_publish_buffer(&mut mq_client);
fill_qos2_publish_buffer(&mut mq_client);
let request = mq_client.start().expect("Coudn't start");
thread::sleep(Duration::new(1, 0));
let _ = request.disconnect();
thread::sleep(Duration::new(10, 0));
let final_qos1_length = request.qos1_q_len().expect("Stats Request Error");
let final_qos2_length = request.qos2_q_len().expect("Stats Request Error");
assert_eq!(0, final_qos1_length);
//assert_eq!(0, final_qos2_length);
}
#[test]
fn channel_block_and_unblock_after_retransmit_timeout() {
let client_options = MqttOptions::new()
.set_keep_alive(5)
.set_q_timeout(5)
.set_client_id("test-blockunblock-retransmission-client")
.broker(BROKER_ADDRESS);
let mut mq_client = MqttClient::new(client_options);
fill_qos1_publish_buffer(&mut mq_client);
fill_qos2_publish_buffer(&mut mq_client);
// Connects to a broker and returns a `Publisher` and `Subscriber`
let request = mq_client.start().expect("Coudn't start");
for i in 0..100 {
let payload = format!("{}. hello rust", i);
request.publish("test/qos1/blockretransmit", QoS::Level1, payload.into_bytes()).unwrap();
}
for i in 0..100 {
let payload = format!("{}. hello rust", i);
request.publish("test/qos2/blockretransmit", QoS::Level2, payload.into_bytes()).unwrap();
}
thread::sleep(Duration::new(20, 0));
let final_qos1_length = request.qos1_q_len().expect("Stats Request Error");
let final_qos2_length = request.qos2_q_len().expect("Stats Request Error");
println!("qos1_length = {}, qos2_length = {}", final_qos1_length, final_qos2_length);
assert_eq!(0, final_qos1_length);
assert_eq!(0, final_qos2_length);
}
#[test]
fn channel_block_and_unblock_after_reconnection() {
let client_options = MqttOptions::new()
.set_keep_alive(5)
.set_q_timeout(5)
.set_client_id("test-blockunblock-reconnect-client")
.broker(BROKER_ADDRESS);
let mut mq_client = MqttClient::new(client_options);
fill_qos1_publish_buffer(&mut mq_client);
fill_qos2_publish_buffer(&mut mq_client);
let request = mq_client.start().expect("Coudn't start");
for i in 0..100 {
let payload = format!("{}. hello rust", i);
if i == 10 {
let _ = request.disconnect();
}
request.publish("test/qos1/blockretransmit", QoS::Level1, payload.into_bytes()).unwrap();
}
for i in 0..100 {
let payload = format!("{}. hello rust", i);
request.publish("test/qos2/blockretransmit", QoS::Level2, payload.into_bytes()).unwrap();
}
thread::sleep(Duration::new(20, 0));
let final_qos1_length = request.qos1_q_len().expect("Stats Request Error");
let final_qos2_length = request.qos2_q_len().expect("Stats Request Error");
assert_eq!(0, final_qos1_length);
assert_eq!(0, final_qos2_length);
}
#[test]
/// Queue length should never cross than that of
/// set using set_pub_q_len()
fn queue_length_threshold() {
let client_options = MqttOptions::new()
.set_keep_alive(5)
.set_q_timeout(5)
.set_client_id("test-qlen-threshold-client")
.broker(BROKER_ADDRESS);
let q_len = client_options.pub_q_len as usize;
let mq_client = MqttClient::new(client_options);
let request = mq_client.start().expect("Coudn't start");
for i in 0..1000 {
let payload = format!("{}. hello rust", i);
request.publish("test/qos1/qlenthreshold", QoS::Level1, payload.into_bytes()).unwrap();
let qos1_q_len = request.qos1_q_len().expect("Stats Request Error");
// println!("{}. {:?}", i, qos1_q_len);
assert!(qos1_q_len <= q_len);
}
for i in 0..1000 {
let payload = format!("{}. hello rust", i);
request.publish("test/qos2/qlenthreshold", QoS::Level2, payload.into_bytes()).unwrap();
let qos2_q_len = request.qos2_q_len().expect("Stats Request Error");
// println!("{}. {:?}", i, qos1_q_len);
assert!(qos2_q_len <= q_len);
}
}
}
// Why RuMqtt:
// GOALS
// -----
// 1. Synchronous mqtt connects: No need of callback to check if mqtt
// connection is
// successful or not. You'll know of of errors (if any) synchronously
// 2. Synchronous subscribes (TODO): Same as above
// 3. Queued publishes: publishes won't throw errors by default. A queue (with
// user defined
// length) will be buffered when the n/w is down. If n/w is down for some time
// and queue
// becomes full, publishes are blocked
// 4. No locks. Fast and efficient because of Rust and Mio
// 5. Callback only for subscibed incoming message. Callbacks are executed
// using threadpool
//