forked from thepaul/cassandra-dtest
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathglobal_row_key_cache_test.py
145 lines (112 loc) · 6.32 KB
/
global_row_key_cache_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import time
from cassandra.concurrent import execute_concurrent_with_args
from dtest import Tester, debug, create_ks
class TestGlobalRowKeyCache(Tester):
def functional_test(self):
cluster = self.cluster
cluster.populate(3)
node1 = cluster.nodelist()[0]
for keycache_size in (0, 10):
for rowcache_size in (0, 10):
cluster.stop()
debug("Testing with keycache size of %d MB, rowcache size of %d MB " %
(keycache_size, rowcache_size))
keyspace_name = 'ks_%d_%d' % (keycache_size, rowcache_size)
# make the caches save every five seconds
cluster.set_configuration_options(values={
'key_cache_size_in_mb': keycache_size,
'row_cache_size_in_mb': rowcache_size,
'row_cache_save_period': 5,
'key_cache_save_period': 5,
})
cluster.start()
session = self.patient_cql_connection(node1)
create_ks(session, keyspace_name, rf=3)
session.set_keyspace(keyspace_name)
session.execute("CREATE TABLE test (k int PRIMARY KEY, v1 int, v2 int)")
session.execute("CREATE TABLE test_clustering (k int, v1 int, v2 int, PRIMARY KEY (k, v1))")
session.execute("CREATE TABLE test_counter (k int PRIMARY KEY, v1 counter)")
session.execute("CREATE TABLE test_counter_clustering (k int, v1 int, v2 counter, PRIMARY KEY (k, v1))")
# insert 100 rows into each table
for cf in ('test', 'test_clustering'):
execute_concurrent_with_args(
session, session.prepare("INSERT INTO %s (k, v1, v2) VALUES (?, ?, ?)" % (cf,)),
[(i, i, i) for i in range(100)])
execute_concurrent_with_args(
session, session.prepare("UPDATE test_counter SET v1 = v1 + ? WHERE k = ?"),
[(i, i) for i in range(100)],
concurrency=2)
execute_concurrent_with_args(
session, session.prepare("UPDATE test_counter_clustering SET v2 = v2 + ? WHERE k = ? AND v1 = ?"),
[(i, i, i) for i in range(100)],
concurrency=2)
# flush everything to get it into sstables
for node in cluster.nodelist():
node.flush()
# update the first 10 rows in every table
# on non-counter tables, delete the first (remaining) row each round
num_updates = 10
for validation_round in range(3):
session.execute("DELETE FROM test WHERE k = %s", (validation_round,))
execute_concurrent_with_args(
session, session.prepare("UPDATE test SET v1 = ?, v2 = ? WHERE k = ?"),
[(i, validation_round, i) for i in range(validation_round + 1, num_updates)])
session.execute("DELETE FROM test_clustering WHERE k = %s AND v1 = %s", (validation_round, validation_round))
execute_concurrent_with_args(
session, session.prepare("UPDATE test_clustering SET v2 = ? WHERE k = ? AND v1 = ?"),
[(validation_round, i, i) for i in range(validation_round + 1, num_updates)])
execute_concurrent_with_args(
session, session.prepare("UPDATE test_counter SET v1 = v1 + ? WHERE k = ?"),
[(1, i) for i in range(num_updates)],
concurrency=2)
execute_concurrent_with_args(
session, session.prepare("UPDATE test_counter_clustering SET v2 = v2 + ? WHERE k = ? AND v1 = ?"),
[(1, i, i) for i in range(num_updates)],
concurrency=2)
self._validate_values(session, num_updates, validation_round)
session.shutdown()
# let the data be written to the row/key caches.
debug("Letting caches be saved to disk")
time.sleep(10)
debug("Stopping cluster")
cluster.stop()
time.sleep(1)
debug("Starting cluster")
cluster.start()
time.sleep(5) # read the data back from row and key caches
session = self.patient_cql_connection(node1)
session.set_keyspace(keyspace_name)
# check all values again
self._validate_values(session, num_updates, validation_round=2)
def _validate_values(self, session, num_updates, validation_round):
# check values of non-counter tables
for cf in ('test', 'test_clustering'):
rows = list(session.execute("SELECT * FROM %s" % (cf,)))
# one row gets deleted each validation round
self.assertEquals(100 - (validation_round + 1), len(rows))
# adjust enumeration start to account for row deletions
for i, row in enumerate(sorted(rows), start=(validation_round + 1)):
self.assertEquals(i, row.k)
self.assertEquals(i, row.v1)
# updated rows will have different values
expected_value = validation_round if i < num_updates else i
self.assertEquals(expected_value, row.v2)
# check values of counter tables
rows = list(session.execute("SELECT * FROM test_counter"))
self.assertEquals(100, len(rows))
for i, row in enumerate(sorted(rows)):
self.assertEquals(i, row.k)
# updated rows will get incremented once each round
expected_value = i
if i < num_updates:
expected_value += validation_round + 1
self.assertEquals(expected_value, row.v1)
rows = list(session.execute("SELECT * FROM test_counter_clustering"))
self.assertEquals(100, len(rows))
for i, row in enumerate(sorted(rows)):
self.assertEquals(i, row.k)
self.assertEquals(i, row.v1)
expected_value = i
if i < num_updates:
expected_value += validation_round + 1
self.assertEquals(expected_value, row.v2)