-
Notifications
You must be signed in to change notification settings - Fork 134
/
Copy pathhintedhandoff_test.py
392 lines (313 loc) · 15.3 KB
/
hintedhandoff_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
import os
import time
import pytest
import logging
import re
from cassandra import ConsistencyLevel
from dtest import Tester, create_ks
from tools.data import create_c1c2_table, insert_c1c2, query_c1c2
from tools.assertions import assert_stderr_clean
from tools.jmxutils import (JolokiaAgent, make_mbean)
since = pytest.mark.since
ported_to_in_jvm = pytest.mark.ported_to_in_jvm
logger = logging.getLogger(__name__)
@since('3.0')
class TestHintedHandoffConfig(Tester):
"""
Tests the hinted handoff configuration options introduced in
CASSANDRA-9035.
@jira_ticket CASSANDRA-9035
"""
def _start_two_node_cluster(self, config_options=None):
"""
Start a cluster with two nodes and return them
"""
cluster = self.cluster
if config_options:
cluster.set_configuration_options(values=config_options)
cluster.populate([2]).start()
return cluster.nodelist()
def _launch_nodetool_cmd(self, node, cmd):
"""
Launch a nodetool command and check there is no error, return the result
"""
out, err, _ = node.nodetool(cmd)
assert_stderr_clean(err)
return out
def _do_hinted_handoff(self, node1, node2, enabled, keyspace='ks'):
"""
Test that if we stop one node the other one
will store hints only when hinted handoff is enabled
"""
session = self.patient_exclusive_cql_connection(node1)
create_ks(session, keyspace, 2)
create_c1c2_table(self, session)
node2.stop(wait_other_notice=True)
insert_c1c2(session, n=100, consistency=ConsistencyLevel.ONE)
log_mark = node1.mark_log()
node2.start()
if enabled:
node1.watch_log_for(["Finished hinted"], from_mark=log_mark, timeout=120)
node1.stop(wait_other_notice=True)
# Check node2 for all the keys that should have been delivered via HH if enabled or not if not enabled
session = self.patient_exclusive_cql_connection(node2, keyspace=keyspace)
for n in range(0, 100):
if enabled:
query_c1c2(session, n, ConsistencyLevel.ONE)
else:
query_c1c2(session, n, ConsistencyLevel.ONE, tolerate_missing=True, must_be_missing=True)
@ported_to_in_jvm('4.0')
def test_nodetool(self):
"""
Test various nodetool commands
"""
node1, node2 = self._start_two_node_cluster({'hinted_handoff_enabled': True})
for node in node1, node2:
res = self._launch_nodetool_cmd(node, 'statushandoff')
assert 'Hinted handoff is running' == res.rstrip()
self._launch_nodetool_cmd(node, 'disablehandoff')
res = self._launch_nodetool_cmd(node, 'statushandoff')
assert 'Hinted handoff is not running' == res.rstrip()
self._launch_nodetool_cmd(node, 'enablehandoff')
res = self._launch_nodetool_cmd(node, 'statushandoff')
assert 'Hinted handoff is running' == res.rstrip()
self._launch_nodetool_cmd(node, 'disablehintsfordc dc1')
res = self._launch_nodetool_cmd(node, 'statushandoff')
assert 'Hinted handoff is running{}Data center dc1 is disabled'.format(os.linesep) == res.rstrip()
self._launch_nodetool_cmd(node, 'enablehintsfordc dc1')
res = self._launch_nodetool_cmd(node, 'statushandoff')
assert 'Hinted handoff is running' == res.rstrip()
def test_hintedhandoff_disabled(self):
"""
Test gloabl hinted handoff disabled
"""
node1, node2 = self._start_two_node_cluster({'hinted_handoff_enabled': False})
for node in node1, node2:
res = self._launch_nodetool_cmd(node, 'statushandoff')
assert 'Hinted handoff is not running' == res.rstrip()
self._do_hinted_handoff(node1, node2, False)
def test_hintedhandoff_enabled(self):
"""
Test global hinted handoff enabled
"""
node1, node2 = self._start_two_node_cluster({'hinted_handoff_enabled': True})
for node in node1, node2:
res = self._launch_nodetool_cmd(node, 'statushandoff')
assert 'Hinted handoff is running' == res.rstrip()
self._do_hinted_handoff(node1, node2, True)
@since('4.0')
def test_hintedhandoff_setmaxwindow(self):
"""
Test global hinted handoff against max_hint_window_in_ms update via nodetool
"""
node1, node2 = self._start_two_node_cluster({'hinted_handoff_enabled': True, "max_hint_window_in_ms": 300000})
for node in node1, node2:
res = self._launch_nodetool_cmd(node, 'statushandoff')
assert 'Hinted handoff is running' == res.rstrip()
res = self._launch_nodetool_cmd(node, 'getmaxhintwindow')
assert 'Current max hint window: 300000 ms' == res.rstrip()
self._do_hinted_handoff(node1, node2, True)
node1.start()
for node in node1, node2:
# Make sure HH is effective on both nodes despite node startup races CASSANDRA-15865
self._launch_nodetool_cmd(node, 'setmaxhintwindow 1')
res = self._launch_nodetool_cmd(node, 'getmaxhintwindow')
assert 'Current max hint window: 1 ms' == res.rstrip()
self._do_hinted_handoff(node1, node2, False, keyspace='ks2')
def test_hintedhandoff_dc_disabled(self):
"""
Test global hinted handoff enabled with the dc disabled
"""
node1, node2 = self._start_two_node_cluster({'hinted_handoff_enabled': True,
'hinted_handoff_disabled_datacenters': ['dc1']})
for node in node1, node2:
res = self._launch_nodetool_cmd(node, 'statushandoff')
assert 'Hinted handoff is running{}Data center dc1 is disabled'.format(os.linesep) == res.rstrip()
self._do_hinted_handoff(node1, node2, False)
def test_hintedhandoff_dc_reenabled(self):
"""
Test global hinted handoff enabled with the dc disabled first and then re-enabled
"""
node1, node2 = self._start_two_node_cluster({'hinted_handoff_enabled': True,
'hinted_handoff_disabled_datacenters': ['dc1']})
for node in node1, node2:
res = self._launch_nodetool_cmd(node, 'statushandoff')
assert 'Hinted handoff is running{}Data center dc1 is disabled'.format(os.linesep) == res.rstrip()
for node in node1, node2:
self._launch_nodetool_cmd(node, 'enablehintsfordc dc1')
res = self._launch_nodetool_cmd(node, 'statushandoff')
assert 'Hinted handoff is running' == res.rstrip()
self._do_hinted_handoff(node1, node2, True)
class TestHintedHandoff(Tester):
@ported_to_in_jvm('4.0')
@pytest.mark.no_vnodes
def test_hintedhandoff_decom(self):
self.fixture_dtest_setup.ignore_log_patterns = [
'Could not update repaired ranges.*Giving up'
]
self.cluster.populate(4).start()
[node1, node2, node3, node4] = self.cluster.nodelist()
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 2)
create_c1c2_table(self, session)
node4.stop(wait_other_notice=True)
insert_c1c2(session, n=100, consistency=ConsistencyLevel.ONE)
node1.decommission()
node4.start(wait_for_binary_proto=True)
force = True if self.cluster.version() >= '3.12' else False
node2.decommission(force=force)
node3.decommission(force=force)
time.sleep(5)
for x in range(0, 100):
query_c1c2(session, x, ConsistencyLevel.ONE)
"""
Test that hints will be transferred during a schema disagreement
@jira_ticket CASSANDRA-20188
"""
def test_hintedhandoff_disagreement(self):
def hint_size(node):
total_size = 0
datadir = node.data_directories()[0]
hints = datadir + '/../hints'
assert os.path.isdir(hints)
for dirpath, dirnames, filenames in os.walk(hints):
for f in filenames:
fp = os.path.join(dirpath, f)
if os.path.isfile(fp):
total_size += os.path.getsize(fp)
return total_size
def wait_no_hints(node, secs=30):
i=0
while(hint_size(node) > 0):
i += 1
if i >= secs:
raise Exception("hints directory not empty in {} seconds".format(secs))
time.sleep(1)
# we will force a flush later, but need defense in depth against circle weirdness
self.cluster.set_configuration_options({'hints_flush_period_in_ms': 100})
self.cluster.populate(2).start()
[node1, node2] = self.cluster.nodelist()
session = self.patient_exclusive_cql_connection(node1)
create_ks(session, 'ks', 2)
create_c1c2_table(self, session)
node2.stop(wait_other_notice=True)
# sometimes the driver's control connection dies here, this one is sacrificial
session2 = self.patient_exclusive_cql_connection(node1)
session2.execute("USE ks")
try:
session2.execute("ALTER TABLE ks.cf with comment = 'disagree now'")
except Exception as e:
logger.info("Received {} while trying to alter table, continuing".format(e))
insert_c1c2(session, n=1000, consistency=ConsistencyLevel.ONE)
node1.flush()
assert hint_size(node1) > 0
# make the disagreement unsolvable
node2.start(jvm_args=["-Dcassandra.IGNORED_SCHEMA_CHECK_ENDPOINTS={}".format("127.0.0.1")])
response = node2.nodetool('describecluster').stdout
schemas = response.split('Schema versions:')[1].strip()
num_schemas = len(re.findall(r'\[.*?\]', schemas))
assert num_schemas > 1
wait_no_hints(node1)
@since('4.1')
def test_hintedhandoff_window(self):
"""
Test that we only store at a maximum the hint window worth of hints.
Prior to CASSANDRA-14309 we would store another window worth of hints
if the down node was brought up and then taken back down immediately.
We would also store another window of hints on a live node if the live
node was restarted.
@jira_ticket CASSANDRA-14309
"""
def wait_for_downtime(node_to_query, node, downtime):
def endpoint_downtime(node_to_query, node):
mbean = make_mbean('net', type='Gossiper')
with JolokiaAgent(node_to_query) as jmx:
return jmx.execute_method(mbean, 'getEndpointDowntime(java.lang.String)', [node])
run = True
while run:
try:
if downtime == 0:
while endpoint_downtime(node_to_query, node) != downtime:
time.sleep(1)
else:
while endpoint_downtime(node_to_query, node) <= downtime:
time.sleep(1)
run = False
except Exception:
pass
# hint_window_persistent_enabled is set to true by default
self.cluster.set_configuration_options({'max_hint_window_in_ms': 10000,
'hinted_handoff_enabled': True,
'max_hints_delivery_threads': 1,
'hints_flush_period_in_ms': 100, })
self.cluster.populate(2).start()
node1, node2 = self.cluster.nodelist()
session = self.patient_cql_connection(node1)
create_ks(session, 'ks', 2)
create_c1c2_table(self, session)
# Stop handoff until very end and take node2 down for first round of hints
node1.nodetool('pausehandoff')
node2.nodetool('disablebinary')
node2.watch_log_for(["Stop listening for CQL clients"], timeout=120)
node2.nodetool('disablegossip')
node2.watch_log_for(["Announcing shutdown", "state jump to shutdown"], timeout=120)
node1.watch_log_for(["state jump to shutdown"], timeout=120)
log_mark_node_1 = node1.mark_log()
log_mark_node_2 = node2.mark_log()
wait_for_downtime(node1, "127.0.0.2", 1000)
# First round of hints. We expect these to be replayed and the only
# hints within the window
insert_c1c2(session, n=(0, 100), consistency=ConsistencyLevel.ONE)
node1.nodetool('flush')
# let the hint window pass
wait_for_downtime(node1, "127.0.0.2", 10000)
# Re-enable and disable the node. Prior to CASSANDRA-14215 this should make the hint window on node1 reset.
node2.nodetool('enablegossip')
node2.watch_log_for(["state jump to NORMAL"], timeout=120, from_mark=log_mark_node_2)
node1.watch_log_for(["state jump to NORMAL"], timeout=120, from_mark=log_mark_node_1)
# no downtime for node for which we enabled gossip
wait_for_downtime(node1, "127.0.0.2", 0)
log_mark_node_1 = node1.mark_log()
log_mark_node_2 = node2.mark_log()
# disable gossip again
node2.nodetool('disablegossip')
wait_for_downtime(node1, "127.0.0.2", 1000)
node2.watch_log_for(["Announcing shutdown", "state jump to shutdown"], timeout=120, from_mark=log_mark_node_2)
node1.watch_log_for(["state jump to shutdown"], timeout=120, from_mark=log_mark_node_1)
log_mark_node_1 = node1.mark_log()
log_mark_node_2 = node2.mark_log()
wait_for_downtime(node1, "127.0.0.2", 5000)
# Second round of inserts. We do not expect hints to be stored.
insert_c1c2(session, n=(100, 200), consistency=ConsistencyLevel.ONE)
# Restart node1. Prior to CASSANDRA-14215 this would reset node1's hint window.
node1.stop()
node1.start(wait_for_binary_proto=True, wait_other_notice=False)
session = self.patient_exclusive_cql_connection(node1)
session.execute('USE ks')
# Third round of inserts. We do not expect hints to be stored.
insert_c1c2(session, n=(200, 300), consistency=ConsistencyLevel.ONE)
# Enable node2 and wait for hints to be replayed
node2.nodetool('enablegossip')
node2.watch_log_for(["state jump to NORMAL"], timeout=120, from_mark=log_mark_node_2)
node2.nodetool('enablebinary')
node2.watch_log_for(["Starting listening for CQL clients"], timeout=120, from_mark=log_mark_node_2)
node1.nodetool('resumehandoff')
node1.watch_log_for('Finished hinted handoff')
# Stop node1 so that we only query node2
node1.stop()
session = self.patient_exclusive_cql_connection(node2)
session.execute('USE ks')
# Ensure first dataset is present (through replayed hints)
for x in range(0, 100):
query_c1c2(session, x, ConsistencyLevel.ONE)
exception = None
# Ensure second and third datasets are not present
for x in range(100, 300):
try:
query_c1c2(session, x, ConsistencyLevel.ONE, tolerate_missing=True, must_be_missing=True)
except AssertionError as ex:
logger.info("failed for %d" % x)
exception = ex
pass
if exception is not None:
raise exception