Skip to content

Commit

Permalink
Fix percona migration (#78)
Browse files Browse the repository at this point in the history
* Add unittest for "IF (NOT) EXISTS"

* Support Percona-style migration to add a column

* Review comments
  • Loading branch information
nielsreijers authored Jan 7, 2025
1 parent a4b1576 commit fb8d88c
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 22 deletions.
35 changes: 21 additions & 14 deletions mysql_ch_replicator/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,24 @@ def __basic_validate_query(self, mysql_query):
if mysql_query.find(';') != -1:
raise Exception('multi-query statement not supported')
return mysql_query

def get_db_and_table_name(self, token, db_name):
if '.' in token:
db_name, table_name = token.split('.')
else:
table_name = token
db_name = strip_sql_name(db_name)
table_name = strip_sql_name(table_name)
if self.db_replicator:
if db_name == self.db_replicator.database:
db_name = self.db_replicator.target_database
matches_config = (
self.db_replicator.config.is_database_matches(db_name)
and self.db_replicator.config.is_table_matches(table_name))
else:
matches_config = True

return db_name, table_name, matches_config

def convert_alter_query(self, mysql_query, db_name):
mysql_query = self.__basic_validate_query(mysql_query)
Expand All @@ -365,21 +383,10 @@ def convert_alter_query(self, mysql_query, db_name):
if tokens[1].lower() != 'table':
raise Exception('wrong query')

table_name = tokens[2]
if table_name.find('.') != -1:
db_name, table_name = table_name.split('.')
db_name, table_name, matches_config = self.get_db_and_table_name(tokens[2], db_name)

if self.db_replicator:
if not self.db_replicator.config.is_database_matches(db_name):
return
if not self.db_replicator.config.is_table_matches(table_name):
return

db_name = strip_sql_name(db_name)
if self.db_replicator and db_name == self.db_replicator.database:
db_name = self.db_replicator.target_database

table_name = strip_sql_name(table_name)
if not matches_config:
return

subqueries = ' '.join(tokens[3:])
subqueries = split_high_level(subqueries, ',')
Expand Down
44 changes: 36 additions & 8 deletions mysql_ch_replicator/db_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -477,14 +477,18 @@ def handle_query_event(self, event: LogEvent):
logger.debug(f'processing query event: {event.transaction_id}, query: {event.records}')
query = strip_sql_comments(event.records)
if query.lower().startswith('alter'):
self.upload_records()
self.handle_alter_query(query, event.db_name)
if query.lower().startswith('create table'):
self.handle_create_table_query(query, event.db_name)
if query.lower().startswith('drop table'):
self.upload_records()
self.handle_drop_table_query(query, event.db_name)
if query.lower().startswith('rename table'):
self.upload_records()
self.handle_rename_table_query(query, event.db_name)

def handle_alter_query(self, query, db_name):
self.upload_records()
self.converter.convert_alter_query(query, db_name)

def handle_create_table_query(self, query, db_name):
Expand All @@ -509,17 +513,41 @@ def handle_drop_table_query(self, query, db_name):
if len(tokens) != 3:
raise Exception('wrong token count', query)

table_name = tokens[2]
if '.' in table_name:
db_name, table_name = table_name.split('.')
if db_name == self.database:
db_name = self.target_database
table_name = strip_sql_name(table_name)
db_name = strip_sql_name(db_name)
db_name, table_name, matches_config = self.converter.get_db_and_table_name(tokens[2], db_name)
if not matches_config:
return

if table_name in self.state.tables_structure:
self.state.tables_structure.pop(table_name)
self.clickhouse_api.execute_command(f'DROP TABLE {"IF EXISTS" if if_exists else ""} {db_name}.{table_name}')

def handle_rename_table_query(self, query, db_name):
tokens = query.split()
if tokens[0].lower() != 'rename' or tokens[1].lower() != 'table':
raise Exception('wrong rename table query', query)

ch_clauses = []
for rename_clause in ' '.join(tokens[2:]).split(','):
tokens = rename_clause.split()

if len(tokens) != 3:
raise Exception('wrong token count', query)
if tokens[1].lower() != 'to':
raise Exception('"to" keyword expected', query)

src_db_name, src_table_name, matches_config = self.converter.get_db_and_table_name(tokens[0], db_name)
dest_db_name, dest_table_name, _ = self.converter.get_db_and_table_name(tokens[2], db_name)
if not matches_config:
return

if src_db_name != self.target_database or dest_db_name != self.target_database:
raise Exception('cross databases table renames not implemented', tokens)
if src_table_name in self.state.tables_structure:
self.state.tables_structure[dest_table_name] = self.state.tables_structure.pop(src_table_name)

ch_clauses.append(f"{src_db_name}.{src_table_name} TO {dest_db_name}.{dest_table_name}")
self.clickhouse_api.execute_command(f'RENAME TABLE {", ".join(ch_clauses)}')

def log_stats_if_required(self):
curr_time = time.time()
if curr_time - self.last_dump_stats_time < DbReplicator.STATS_DUMP_INTERVAL:
Expand Down
117 changes: 117 additions & 0 deletions test_mysql_ch_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,122 @@ def test_string_primary_key(monkeypatch):
binlog_replicator_runner.stop()


def test_if_exists_if_not_exists(monkeypatch):
monkeypatch.setattr(DbReplicator, 'INITIAL_REPLICATION_BATCH_SIZE', 1)

cfg = config.Settings()
cfg.load(CONFIG_FILE)

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)

ch = clickhouse_api.ClickhouseApi(
database=TEST_DB_NAME,
clickhouse_settings=cfg.clickhouse,
)

prepare_env(cfg, mysql, ch)

binlog_replicator_runner = BinlogReplicatorRunner()
binlog_replicator_runner.run()
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME)
db_replicator_runner.run()

assert_wait(lambda: TEST_DB_NAME in ch.get_databases())

mysql.execute(f"CREATE TABLE IF NOT EXISTS {TEST_DB_NAME}.{TEST_TABLE_NAME} (id int NOT NULL, PRIMARY KEY(id));")
mysql.execute(f"CREATE TABLE IF NOT EXISTS {TEST_TABLE_NAME} (id int NOT NULL, PRIMARY KEY(id));")
mysql.execute(f"CREATE TABLE IF NOT EXISTS {TEST_DB_NAME}.{TEST_TABLE_NAME_2} (id int NOT NULL, PRIMARY KEY(id));")
mysql.execute(f"CREATE TABLE IF NOT EXISTS {TEST_TABLE_NAME_2} (id int NOT NULL, PRIMARY KEY(id));")
mysql.execute(f"DROP TABLE IF EXISTS {TEST_DB_NAME}.{TEST_TABLE_NAME};")
mysql.execute(f"DROP TABLE IF EXISTS {TEST_TABLE_NAME};")

ch.execute_command(f'USE {TEST_DB_NAME}')

assert_wait(lambda: TEST_TABLE_NAME_2 in ch.get_tables())
assert_wait(lambda: TEST_TABLE_NAME not in ch.get_tables())

db_replicator_runner.stop()
binlog_replicator_runner.stop()


def test_percona_migration(monkeypatch):
monkeypatch.setattr(DbReplicator, 'INITIAL_REPLICATION_BATCH_SIZE', 1)

cfg = config.Settings()
cfg.load(CONFIG_FILE)

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)

ch = clickhouse_api.ClickhouseApi(
database=TEST_DB_NAME,
clickhouse_settings=cfg.clickhouse,
)

prepare_env(cfg, mysql, ch)

mysql.execute(f'''
CREATE TABLE {TEST_TABLE_NAME} (
`id` int NOT NULL,
PRIMARY KEY (`id`));
''')

mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (id) VALUES (42)",
commit=True,
)

binlog_replicator_runner = BinlogReplicatorRunner()
binlog_replicator_runner.run()
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME)
db_replicator_runner.run()

assert_wait(lambda: TEST_DB_NAME in ch.get_databases())

ch.execute_command(f'USE {TEST_DB_NAME}')

assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)

# Perform 'pt-online-schema-change' style migration to add a column
# This is a subset of what happens when the following command is run:
# pt-online-schema-change --alter "ADD COLUMN c1 INT" D=$TEST_DB_NAME,t=$TEST_TABLE_NAME,h=0.0.0.0,P=3306,u=root,p=admin --execute
mysql.execute(f'''
CREATE TABLE `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_new` (
`id` int NOT NULL,
PRIMARY KEY (`id`)
)''')

mysql.execute(
f"ALTER TABLE `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_new` ADD COLUMN c1 INT;")

mysql.execute(
f"INSERT LOW_PRIORITY IGNORE INTO `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_new` (`id`) SELECT `id` FROM `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` LOCK IN SHARE MODE;",
commit=True,
)

mysql.execute(
f"RENAME TABLE `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}` TO `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_old`, `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_new` TO `{TEST_DB_NAME}`.`{TEST_TABLE_NAME}`;")

mysql.execute(
f"DROP TABLE IF EXISTS `{TEST_DB_NAME}`.`_{TEST_TABLE_NAME}_old`;")

mysql.execute(
f"INSERT INTO {TEST_TABLE_NAME} (id, c1) VALUES (43, 1)",
commit=True,
)

assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2)

db_replicator_runner.stop()
binlog_replicator_runner.stop()


def test_parse_mysql_table_structure():
query = "CREATE TABLE IF NOT EXISTS user_preferences_portal (\n\t\t\tid char(36) NOT NULL,\n\t\t\tcategory varchar(50) DEFAULT NULL,\n\t\t\tdeleted tinyint(1) DEFAULT 0,\n\t\t\tdate_entered datetime DEFAULT NULL,\n\t\t\tdate_modified datetime DEFAULT NULL,\n\t\t\tassigned_user_id char(36) DEFAULT NULL,\n\t\t\tcontents longtext DEFAULT NULL\n\t\t ) ENGINE=InnoDB DEFAULT CHARSET=utf8"

Expand All @@ -1065,3 +1181,4 @@ def test_parse_mysql_table_structure():
structure = converter.parse_mysql_table_structure(query)

assert structure.table_name == 'user_preferences_portal'

0 comments on commit fb8d88c

Please sign in to comment.