1212from pyspark .sql .types import LongType , IntegerType , StringType , StructType , StructField , DataType
1313from .spark_singleton import SparkSingleton
1414from .column_generation_spec import ColumnGenerationSpec
15- from .datagen_constants import DEFAULT_RANDOM_SEED , RANDOM_SEED_FIXED , RANDOM_SEED_HASH_FIELD_NAME , MIN_SPARK_VERSION
15+ from .datagen_constants import DEFAULT_RANDOM_SEED , RANDOM_SEED_FIXED , RANDOM_SEED_HASH_FIELD_NAME , \
16+ DEFAULT_SEED_COLUMN , SPARK_RANGE_COLUMN , MIN_SPARK_VERSION
1617from .utils import ensure , topologicalSort , DataGenError , deprecated
1718from . _version import _get_spark_version
1819
1920_OLD_MIN_OPTION = 'min'
2021_OLD_MAX_OPTION = 'max'
2122
23+ _STREAMING_TIMESTAMP_COLUMN = "_source_timestamp"
24+
2225
2326class DataGenerator :
2427 """ Main Class for test data set generation
@@ -35,6 +38,7 @@ class DataGenerator:
3538 :param verbose: = if `True`, generate verbose output
3639 :param batchSize: = UDF batch number of rows to pass via Apache Arrow to Pandas UDFs
3740 :param debug: = if set to True, output debug level of information
41+ :param seedColumnName: = if set, this should be the name of the `seed` or logical `id` column. Defaults to `id`
3842 """
3943
4044 # class vars
@@ -52,14 +56,20 @@ class DataGenerator:
5256
5357 def __init__ (self , sparkSession = None , name = None , randomSeedMethod = None ,
5458 rows = 1000000 , startingId = 0 , randomSeed = None , partitions = None , verbose = False ,
55- batchSize = None , debug = False , ** kwargs ):
59+ batchSize = None , debug = False , seedColumnName = DEFAULT_SEED_COLUMN ,
60+ ** kwargs ):
5661 """ Constructor for data generator object """
5762
5863 # set up logging
5964 self .verbose = verbose
6065 self .debug = debug
6166
6267 self ._setupLogger ()
68+ self ._seedColumnName = seedColumnName
69+ self ._outputStreamingFields = False
70+
71+ if seedColumnName != DEFAULT_SEED_COLUMN :
72+ self .logger .info (f"Using '{ self ._seedColumnName } ' for seed column in place of '{ DEFAULT_SEED_COLUMN } " )
6373
6474 self .name = name if name is not None else self .generateName ()
6575 self ._rowCount = rows
@@ -129,7 +139,9 @@ def __init__(self, sparkSession=None, name=None, randomSeedMethod=None,
129139 self ._buildOrder = []
130140 self ._inferredSchemaFields = []
131141 self .buildPlanComputed = False
132- self .withColumn (ColumnGenerationSpec .SEED_COLUMN , LongType (), nullable = False , implicit = True , omit = True )
142+
143+ # lets add the seed column
144+ self .withColumn (self ._seedColumnName , LongType (), nullable = False , implicit = True , omit = True , noWarn = True )
133145 self ._batchSize = batchSize
134146
135147 # set up spark session
@@ -138,6 +150,11 @@ def __init__(self, sparkSession=None, name=None, randomSeedMethod=None,
138150 # set up use of pandas udfs
139151 self ._setupPandas (batchSize )
140152
153+ @property
154+ def seedColumnName (self ):
155+ """ return the name of data generation seed column"""
156+ return self ._seedColumnName
157+
141158 @classmethod
142159 def _checkSparkVersion (cls , sparkVersion , minSparkVersion ):
143160 """
@@ -291,9 +308,12 @@ def explain(self, suppressOutput=False):
291308 if not self .buildPlanComputed :
292309 self .computeBuildPlan ()
293310
311+ rc = self ._rowCount
312+ tasks = self .partitions
294313 output = ["" , "Data generation plan" , "====================" ,
295- f"spec=DateGenerator(name={ self .name } , rows={ self ._rowCount } , startingId={ self .starting_id } , partitions={ self .partitions } )"
296- , ")" , "" , f"column build order: { self ._buildOrder } " , "" , "build plan:" ]
314+ f"spec=DateGenerator(name={ self .name } , rows={ rc } , startingId={ self .starting_id } , partitions={ tasks } )"
315+ , ")" , "" , f"seed column: { self ._seedColumnName } " , "" ,
316+ f"column build order: { self ._buildOrder } " , "" , "build plan:" ]
297317
298318 for plan_action in self ._buildPlan :
299319 output .append (" ==> " + plan_action )
@@ -345,14 +365,14 @@ def rowCount(self):
345365 return self ._rowCount
346366
347367 def withIdOutput (self ):
348- """ output seed column field (defaults to `id`) as a column in the test data set if specified
368+ """ output seed column field (defaults to `id`) as a column in the generated data set if specified
349369
350- If this is not called, the seed column field is omitted from the final test data set
370+ If this is not called, the seed column field is omitted from the final generated data set
351371
352372 :returns: modified in-place instance of test data generator allowing for chaining of calls
353373 following Builder pattern
354374 """
355- self ._columnSpecsByName [ColumnGenerationSpec . SEED_COLUMN ].omit = False
375+ self ._columnSpecsByName [self . _seedColumnName ].omit = False
356376 self ._markForPlanRegen ()
357377
358378 return self
@@ -584,7 +604,7 @@ def withColumnSpecs(self, patterns=None, fields=None, matchTypes=None, **kwargs)
584604
585605 all_fields = self .getInferredColumnNames ()
586606 effective_fields = [x for x in all_fields if
587- (fields is None or x in fields ) and x != ColumnGenerationSpec . SEED_COLUMN ]
607+ (fields is None or x in fields ) and x != self . _seedColumnName ]
588608
589609 if patterns is not None :
590610 effective_fields = [x for x in effective_fields for y in patterns if re .search (y , x ) is not None ]
@@ -605,7 +625,7 @@ def _checkColumnOrColumnList(self, columns, allowId=False):
605625 :returns: True if test passes
606626 """
607627 inferred_columns = self .getInferredColumnNames ()
608- if allowId and columns == ColumnGenerationSpec . SEED_COLUMN :
628+ if allowId and columns == self . _seedColumnName :
609629 return True
610630
611631 if type (columns ) is list :
@@ -677,7 +697,7 @@ def hasColumnSpec(self, colName):
677697 def withColumn (self , colName , colType = StringType (), minValue = None , maxValue = None , step = 1 ,
678698 dataRange = None , prefix = None , random = False , distribution = None ,
679699 baseColumn = None , nullable = True ,
680- omit = False , implicit = False ,
700+ omit = False , implicit = False , noWarn = False ,
681701 ** kwargs ):
682702 """ add a new column for specification
683703
@@ -693,6 +713,10 @@ def withColumn(self, colName, colType=StringType(), minValue=None, maxValue=None
693713 if baseColumn is not None :
694714 self ._checkColumnOrColumnList (baseColumn , allowId = True )
695715
716+ if not noWarn and colName == self ._seedColumnName :
717+ self .logger .warning (f"Adding a new column named '{ colName } ' overrides seed column '{ self ._seedColumnName } '" )
718+ self .logger .warning (f"Use `seedColumName` option on DataGenerator construction for different seed column" )
719+
696720 # handle migration of old `min` and `max` options
697721 if _OLD_MIN_OPTION in kwargs :
698722 assert minValue is None , \
@@ -774,6 +798,7 @@ def _generateColumnDefinition(self, colName, colType=None, baseColumn=None,
774798 nullable = nullable ,
775799 verbose = self .verbose ,
776800 debug = self .debug ,
801+ seedColumnName = self ._seedColumnName ,
777802 ** new_props )
778803
779804 self ._columnSpecsByName [colName ] = column_spec
@@ -812,8 +837,9 @@ def _getBaseDataFrame(self, startId=0, streaming=False, options=None):
812837 end = end_id ,
813838 numPartitions = id_partitions )
814839
815- if ColumnGenerationSpec .SEED_COLUMN != "id" :
816- df1 = df1 .withColumnRenamed ("id" , ColumnGenerationSpec .SEED_COLUMN )
840+ # spark.range generates a dataframe with the column `id` so rename it if its not our seed column
841+ if SPARK_RANGE_COLUMN != self ._seedColumnName :
842+ df1 = df1 .withColumnRenamed (SPARK_RANGE_COLUMN , self ._seedColumnName )
817843
818844 else :
819845 status = (
@@ -831,12 +857,15 @@ def _getBaseDataFrame(self, startId=0, streaming=False, options=None):
831857
832858 for k , v in options .items ():
833859 df1 = df1 .option (k , v )
834- df1 = df1 .load ().withColumnRenamed ("value" , ColumnGenerationSpec .SEED_COLUMN )
860+ df1 = (df1 .load ()
861+ .withColumnRenamed ("value" , self ._seedColumnName )
862+ )
863+
835864 else :
836865 df1 = (df1 .option ("rowsPerSecond" , 1 )
837866 .option ("numPartitions" , id_partitions )
838867 .load ()
839- .withColumnRenamed ("value" , ColumnGenerationSpec . SEED_COLUMN )
868+ .withColumnRenamed ("value" , self . _seedColumnName )
840869 )
841870
842871 return df1
@@ -854,16 +883,16 @@ def _computeColumnBuildOrder(self):
854883
855884 :returns: the build ordering
856885 """
857- dependency_ordering = [(x .name , set (x .dependencies )) if x .name != ColumnGenerationSpec . SEED_COLUMN else (
858- ColumnGenerationSpec . SEED_COLUMN , set ())
886+ dependency_ordering = [(x .name , set (x .dependencies )) if x .name != self . _seedColumnName else (
887+ self . _seedColumnName , set ())
859888 for x in self ._allColumnSpecs ]
860889
861890 # self.pp_list(dependency_ordering, msg="dependencies")
862891
863892 self .logger .info ("dependency list: %s" , str (dependency_ordering ))
864893
865894 self ._buildOrder = list (
866- topologicalSort (dependency_ordering , flatten = False , initial_columns = [ColumnGenerationSpec . SEED_COLUMN ]))
895+ topologicalSort (dependency_ordering , flatten = False , initial_columns = [self . _seedColumnName ]))
867896
868897 self .logger .info ("columnBuildOrder: %s" , str (self ._buildOrder ))
869898
@@ -876,7 +905,7 @@ def build_order(self):
876905
877906 The build order will be a list of lists - each list specifying columns that can be built at the same time
878907 """
879- return [x for x in self ._buildOrder if x != [ColumnGenerationSpec . SEED_COLUMN ]]
908+ return [x for x in self ._buildOrder if x != [self . _seedColumnName ]]
880909
881910 def _getColumnDataTypes (self , columns ):
882911 """ Get data types for columns
@@ -897,7 +926,7 @@ def computeBuildPlan(self):
897926 self ._buildPlan = []
898927 self .executionHistory = []
899928 self ._processOptions ()
900- self ._buildPlan .append (f"Build Spark data frame with seed column: { ColumnGenerationSpec . SEED_COLUMN } " )
929+ self ._buildPlan .append (f"Build Spark data frame with seed column: ' { self . _seedColumnName } ' " )
901930
902931 # add temporary columns
903932 for cs in self ._allColumnSpecs :
0 commit comments