Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 60 additions & 16 deletions mysql_ch_replicator/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,23 +278,67 @@ def strip_sql_name(name):
return name


def split_high_level(data, token):
results = []
level = 0
curr_data = ''
for c in data:
if c == token and level == 0:
results.append(curr_data.strip())
curr_data = ''
def split_high_level(data, delimiter):
"""
Split a string by a delimiter, ignoring delimiters inside parentheses or quotes.

This function performs a context-aware split, respecting nested structures:
- Delimiters inside parentheses () are ignored
- Delimiters inside single quotes '' are ignored
- Handles nested parentheses at any depth

Args:
data (str): The string to split
delimiter (str): The character to split on (typically ',' or ';')

Returns:
list[str]: List of split segments with whitespace stripped

Examples:
>>> split_high_level("a,b(c,d),e", ",")
['a', 'b(c,d)', 'e']

>>> split_high_level("name varchar(100) DEFAULT 'a,b',id int", ",")
["name varchar(100) DEFAULT 'a,b'", 'id int']
"""
if not data:
return []

segments = []
current_segment = []
paren_depth = 0
in_quotes = False

for i, char in enumerate(data):
# Handle quote toggling (ignore escaped quotes)
if char == "'" and (i == 0 or data[i - 1] != '\\'):
in_quotes = not in_quotes
current_segment.append(char)
continue

# Track parentheses depth only outside quotes
if not in_quotes:
if char == '(':
paren_depth += 1
elif char == ')':
paren_depth -= 1

# Split only at top level (outside parentheses and quotes)
if char == delimiter and paren_depth == 0 and not in_quotes:
segment_text = ''.join(current_segment).strip()
if segment_text: # Only add non-empty segments
segments.append(segment_text)
current_segment = []
continue
if c == '(':
level += 1
if c == ')':
level -= 1
curr_data += c
if curr_data:
results.append(curr_data.strip())
return results

current_segment.append(char)

# Add final segment if it exists
final_segment = ''.join(current_segment).strip()
if final_segment:
segments.append(final_segment)

return segments


def strip_sql_comments(sql_statement):
Expand Down