Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 172 additions & 18 deletions dbldatagen/column_generation_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@

import copy
import logging
import random
from datetime import date, datetime, timedelta

import numpy as np
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.functions import lit, concat, rand, round as sql_round, array, expr, when, udf, \
format_string
Expand All @@ -27,7 +30,7 @@
from .nrange import NRange
from .serialization import SerializableToDict
from .text_generators import TemplateGenerator
from .utils import ensure, coalesce_values
from .utils import ensure, coalesce_values, parse_time_interval
from .schema_parser import SchemaParser

HASH_COMPUTE_METHOD = "hash"
Expand Down Expand Up @@ -518,6 +521,130 @@ def _setupTemporaryColumns(self):
'description': desc}))
self._weightedBaseColumn = temp_name

def _list_random_unique_numeric_values(
self,
unique_count: int,
min_val: int | float | date | datetime | str,
max_val: int | float | date | datetime | str,
step_val: int | float | timedelta | str
) -> None:
"""
Builds a list of random unique numeric values when ``uniqueValues`` is specified and ``random=True``.

This creates an internal omitted column with a list of randomly selected unique values from the specified range,
then sets up the main column to select from this list using a random index.

:param unique_count: Number of unique values to generate
:param min_val: Minimum value of the range
:param max_val: Maximum value of the range
:param step_val: Step value for the range
"""
if self._randomSeed is not None and self._randomSeed != -1:
self._set_random_seed()

selected_values = set()
while len(selected_values) < unique_count:
if self.distribution and isinstance(self.distribution, DataDistribution):
raw_value = np.clip(self.distribution.generateNormalizedDistributionSample(), 0, 1)
else:
raw_value = random.random()

range_size = (max_val - min_val) / step_val
if not isinstance(min_val, float) and not isinstance(max_val, float) and not isinstance(step_val, float):
range_size = range_size + 1

scaled_index = int(raw_value * range_size)
value = np.clip(min_val + scaled_index * step_val, min_val, max_val)
selected_values.add(value)

selected_values = list(selected_values)
if len(selected_values) < unique_count:
self.logger.warning(
f"Could not generate {unique_count} unique values for column {self.name}; "
f"Generated {len(selected_values)} unique values"
)

self.values = selected_values
self.logger.info(
f"Set up random unique values for column {self.name}: {len(selected_values)} values using "
f"{'distribution' if self.distribution else 'uniform'} sampling"
)

def _list_random_unique_datetime_values(
self,
unique_count: int,
begin_val: date | datetime | str,
end_val: date | datetime | str,
interval_val: timedelta | str,
col_type: DataType | str
) -> None:
"""
Builds a list of random unique date/timestamp values when ``uniqueValues`` is specified and ``random=True``.

:param unique_count: Number of unique values to generate
:param begin_val: Beginning date/timestamp
:param end_val: End date/timestamp
:param interval_val: Date/time interval
:param col_type: Type of column to generate (e.g. ``DateType`` or ``TimestampType``)
"""
if isinstance(interval_val, str):
interval_val = parse_time_interval(interval_val)
if isinstance(begin_val, str):
if isinstance(col_type, TimestampType):
begin_val = datetime.strptime(begin_val, DateRange.DEFAULT_UTC_TS_FORMAT)
else:
begin_val = datetime.strptime(begin_val, DateRange.DEFAULT_DATE_FORMAT)
if isinstance(end_val, str):
if isinstance(col_type, TimestampType):
end_val = datetime.strptime(end_val, DateRange.DEFAULT_UTC_TS_FORMAT)
else:
end_val = datetime.strptime(end_val, DateRange.DEFAULT_DATE_FORMAT)
if isinstance(col_type, DateType):
begin_val = begin_val.date()
end_val = end_val.date()

total_span = end_val - begin_val
if isinstance(total_span, timedelta):
total_seconds = total_span.total_seconds()
interval_seconds = interval_val.total_seconds()
num_possible_values = int(total_seconds / interval_seconds) + 1
else:
total_days = total_span.days
interval_days = interval_val.days
num_possible_values = int(total_days / interval_days) + 1

unique_count = min(unique_count, num_possible_values)

if self._randomSeed is not None and self._randomSeed != -1:
self._set_random_seed()

selected_values = set()
while len(selected_values) < unique_count:
if self.distribution and isinstance(self.distribution, DataDistribution):
raw_value = np.clip(self.distribution.generateNormalizedDistributionSample(), 0, 1)
else:
raw_value = random.random()

scaled_index = int(raw_value * (num_possible_values - 1))
value = begin_val + interval_val * scaled_index

if value > end_val:
value = end_val
selected_values.add(value)

selected_values = list(selected_values)
if len(selected_values) < unique_count:
self.logger.warning(
f"Could not generate {unique_count} unique values for column {self.name}; "
f"Generated {len(selected_values)} unique values"
)

self.values = selected_values
self.logger.info(
f"Set up random unique values for column {self.name}: {len(selected_values)} values using "
f"{'distribution' if self.distribution else 'uniform'} sampling"
)

def _setup_logger(self):
"""Set up logging

Expand Down Expand Up @@ -553,12 +680,12 @@ def _computeAdjustedNumericRangeForColumn(self, colType, c_min, c_max, c_step, *
- if a datarange is specified , use that range
- if begin and end are specified or minValue and maxValue are specified, use that
- if unique values is specified, compute minValue and maxValue depending on type

- if unique values and random=True are both specified, generate random unique values from full range
"""
if c_unique is not None:
assert type(c_unique) is int, "unique_values must be integer"
assert c_unique >= 1, "if supplied, unique values must be > 0"
# TODO: set maxValue to unique_values + minValue & add unit test

effective_min, effective_max, effective_step = None, None, None
if c_range is not None and type(c_range) is NRange:
effective_min = c_range.minValue
Expand All @@ -568,19 +695,27 @@ def _computeAdjustedNumericRangeForColumn(self, colType, c_min, c_max, c_step, *
effective_step = coalesce_values(effective_step, c_step, 1)
effective_max = coalesce_values(effective_max, c_max)

# due to floating point errors in some Python floating point calculations, we need to apply rounding
# if any of the components are float
if type(effective_min) is float or type(effective_step) is float:
unique_max = round(c_unique * effective_step + effective_min - effective_step, 9)
# Check if both uniqueValues and random=True are specified
if self.random and effective_max is not None:
# Generate random unique values from the full range and store them
self._list_random_unique_numeric_values(c_unique, effective_min, effective_max, effective_step)
# Create a range that maps to indices of the unique values (0 to unique_count-1)
result = NRange(0, c_unique - 1, 1)
else:
unique_max = c_unique * effective_step + effective_min - effective_step
result = NRange(effective_min, unique_max, effective_step)

if result.maxValue is not None and effective_max is not None and result.maxValue > effective_max:
self.logger.warning("Computed maxValue for column [%s] of %s is greater than specified maxValue %s",
self.name,
result.maxValue,
effective_max)
# Original behavior: create sequential range
# due to floating point errors in some Python floating point calculations, we need to apply rounding
# if any of the components are float
if type(effective_min) is float or type(effective_step) is float:
unique_max = round(c_unique * effective_step + effective_min - effective_step, 9)
else:
unique_max = c_unique * effective_step + effective_min - effective_step
result = NRange(effective_min, unique_max, effective_step)

if result.maxValue is not None and effective_max is not None and result.maxValue > effective_max:
self.logger.warning("Computed maxValue for column [%s] of %s is greater than specified maxValue %s",
self.name,
result.maxValue,
effective_max)
elif c_range is not None:
result = c_range
elif c_range is None:
Expand All @@ -607,10 +742,21 @@ def _computeAdjustedDateTimeRangeForColumn(self, colType, c_begin, c_end, c_inte
effective_end = coalesce_values(effective_end, c_end)
effective_begin = coalesce_values(effective_begin, c_begin)

if type(colType) is DateType:
result = DateRange.computeDateRange(effective_begin, effective_end, effective_interval, c_unique)
# Check if both uniqueValues and random=True are specified for date/timestamp
if c_unique is not None and self.random and effective_end is not None:
# Generate random unique date/timestamp values from the full range
self._list_random_unique_datetime_values(c_unique, effective_begin, effective_end, effective_interval, colType)
# Return a minimal range - the actual values will come from discrete values
if type(colType) is DateType:
result = DateRange.computeDateRange(effective_begin, effective_begin, effective_interval, 1)
else:
result = DateRange.computeTimestampRange(effective_begin, effective_begin, effective_interval, 1)
else:
result = DateRange.computeTimestampRange(effective_begin, effective_end, effective_interval, c_unique)
# Original behavior
if type(colType) is DateType:
result = DateRange.computeDateRange(effective_begin, effective_end, effective_interval, c_unique)
else:
result = DateRange.computeTimestampRange(effective_begin, effective_end, effective_interval, c_unique)

self.logger.debug("Computing adjusted range for column: %s - %s", self.name, result)
return result
Expand Down Expand Up @@ -1322,3 +1468,11 @@ def makeGenerationExpressions(self):
retval = F.slice(retval, F.lit(1), F.expr(expr_str))

return retval

def _set_random_seed(self) -> None:
"""
Sets the random seed value for computing random values from a range.
"""
seed_value = abs(self._randomSeed) % (2**32) # Numpy accepts values in the range from 0 - 2^32-1.
random.seed(seed_value)
np.random.seed(seed_value)
Loading
Loading