Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions mysql_ch_replicator/db_replicator_realtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,11 +266,16 @@ def handle_rename_table_query(self, query, db_name):
def handle_truncate_query(self, query, db_name):
"""Handle TRUNCATE TABLE operations by clearing data in ClickHouse"""
tokens = query.strip().split()
if len(tokens) < 3 or tokens[0].lower() != 'truncate' or tokens[1].lower() != 'table':
if len(tokens) < 2 or tokens[0].lower() != 'truncate':
raise Exception('Invalid TRUNCATE query format', query)

# Get table name from the third token (after TRUNCATE TABLE)
table_token = tokens[2]
# MySQL supports both "TRUNCATE TABLE table_name" and "TRUNCATE table_name"
if tokens[1].lower() == 'table':
if len(tokens) < 3:
raise Exception('Invalid TRUNCATE query format', query)
table_token = tokens[2]
else:
table_token = tokens[1]

# Parse database and table name from the token
db_name, table_name, matches_config = self.replicator.converter.get_db_and_table_name(table_token, db_name)
Expand Down
155 changes: 155 additions & 0 deletions tests/test_truncate_bug.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
from common import *
from mysql_ch_replicator import config
from mysql_ch_replicator import mysql_api
from mysql_ch_replicator import clickhouse_api


def test_truncate_without_table_keyword_non_replicated_table():
"""Test TRUNCATE statement without TABLE keyword on non-replicated table.

Reproduces bug #216 where TRUNCATE `table_name` (without TABLE keyword)
breaks replication even when the table is not being replicated.
"""
cfg = config.Settings()
cfg.load('tests/tests_config_truncate_bug.yaml')

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)

ch = clickhouse_api.ClickhouseApi(
database=TEST_DB_NAME,
clickhouse_settings=cfg.clickhouse,
)

prepare_env(cfg, mysql, ch)

mysql.execute(f'''
CREATE TABLE `{TEST_TABLE_NAME}` (
`id` int NOT NULL,
`name` varchar(255),
PRIMARY KEY (`id`)
);
''')

mysql.execute(f'''
CREATE TABLE `telescope_entries` (
`id` int NOT NULL,
`data` text,
PRIMARY KEY (`id`)
);
''')

mysql.execute(
f"INSERT INTO `{TEST_TABLE_NAME}` (id, name) VALUES (1, 'test')",
commit=True,
)

mysql.execute(
f"INSERT INTO `telescope_entries` (id, data) VALUES (1, 'entry1')",
commit=True,
)

binlog_replicator_runner = BinlogReplicatorRunner(cfg_file='tests/tests_config_truncate_bug.yaml')
binlog_replicator_runner.run()
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file='tests/tests_config_truncate_bug.yaml')
db_replicator_runner.run()

assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
ch.execute_command(f'USE `{TEST_DB_NAME}`')
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)

mysql.execute(f"TRUNCATE `telescope_entries`", commit=True)

binlog_pid = get_binlog_replicator_pid(cfg)
db_pid = get_db_replicator_pid(cfg, TEST_DB_NAME)

assert binlog_pid is not None, "Binlog replicator process died"
assert db_pid is not None, "DB replicator process died after TRUNCATE"

mysql.execute(
f"INSERT INTO `{TEST_TABLE_NAME}` (id, name) VALUES (2, 'test2')",
commit=True,
)

assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2)

db_replicator_runner.stop()
binlog_replicator_runner.stop()


def test_truncate_with_table_keyword_non_replicated_table():
"""Test TRUNCATE TABLE statement on non-replicated table.

This should work correctly as the TABLE keyword is properly handled.
"""
cfg = config.Settings()
cfg.load('tests/tests_config_truncate_bug.yaml')

mysql = mysql_api.MySQLApi(
database=None,
mysql_settings=cfg.mysql,
)

ch = clickhouse_api.ClickhouseApi(
database=TEST_DB_NAME,
clickhouse_settings=cfg.clickhouse,
)

prepare_env(cfg, mysql, ch)

mysql.execute(f'''
CREATE TABLE `{TEST_TABLE_NAME}` (
`id` int NOT NULL,
`name` varchar(255),
PRIMARY KEY (`id`)
);
''')

mysql.execute(f'''
CREATE TABLE `telescope_entries` (
`id` int NOT NULL,
`data` text,
PRIMARY KEY (`id`)
);
''')

mysql.execute(
f"INSERT INTO `{TEST_TABLE_NAME}` (id, name) VALUES (1, 'test')",
commit=True,
)

mysql.execute(
f"INSERT INTO `telescope_entries` (id, data) VALUES (1, 'entry1')",
commit=True,
)

binlog_replicator_runner = BinlogReplicatorRunner(cfg_file='tests/tests_config_truncate_bug.yaml')
binlog_replicator_runner.run()
db_replicator_runner = DbReplicatorRunner(TEST_DB_NAME, cfg_file='tests/tests_config_truncate_bug.yaml')
db_replicator_runner.run()

assert_wait(lambda: TEST_DB_NAME in ch.get_databases())
ch.execute_command(f'USE `{TEST_DB_NAME}`')
assert_wait(lambda: TEST_TABLE_NAME in ch.get_tables())
assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 1)

mysql.execute(f"TRUNCATE TABLE `telescope_entries`", commit=True)

binlog_pid = get_binlog_replicator_pid(cfg)
db_pid = get_db_replicator_pid(cfg, TEST_DB_NAME)

assert binlog_pid is not None, "Binlog replicator process died"
assert db_pid is not None, "DB replicator process died after TRUNCATE TABLE"

mysql.execute(
f"INSERT INTO `{TEST_TABLE_NAME}` (id, name) VALUES (2, 'test2')",
commit=True,
)

assert_wait(lambda: len(ch.select(TEST_TABLE_NAME)) == 2)

db_replicator_runner.stop()
binlog_replicator_runner.stop()
21 changes: 21 additions & 0 deletions tests/tests_config_truncate_bug.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
mysql:
host: 'localhost'
port: 9306
user: 'root'
password: 'admin'

clickhouse:
host: 'localhost'
port: 9123
user: 'default'
password: 'admin'

binlog_replicator:
data_dir: '/app/binlog/'
records_per_file: 100000

databases: '*test*'
tables: ['test_table*']

log_level: 'info'