Skip to content

Commit 5a287c1

Browse files
Add methods for persisting generated data (#352)
* added use of ABC to mark TextGenerator as abstract * Lint text generators module * Add persistence methods * Add tests and docs; Update PR template * Update hatch installation for push action * Refactor * Update method names and signatures --------- Co-authored-by: ronanstokes-db <ronan.stokes@databricks.com> Co-authored-by: Ronan Stokes <42389040+ronanstokes-db@users.noreply.github.com>
1 parent 3aca7f3 commit 5a287c1

File tree

13 files changed

+404
-79
lines changed

13 files changed

+404
-79
lines changed

.github/workflows/push.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ jobs:
6161
python-version: '3.10'
6262

6363
- name: Install Hatch
64-
run: pip install hatch
64+
# click 8.3+ introduced bug for hatch
65+
run: pip install "hatch==1.13.0" "click<8.3"
6566

6667
- name: Run unit tests
6768
run: make dev test

PULL_REQUEST_TEMPLATE.md

Lines changed: 11 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,15 @@
1-
## Proposed changes
1+
## Changes
2+
<!-- Summarize your changes. Add code examples or screenshots when necessary. -->
23

3-
Describe the big picture of your changes here to communicate to the maintainers.
4-
If it fixes a bug or resolves a feature request, please provide a link to that issue.
4+
### Linked issues
5+
<!-- DOC: Link issue with a keyword: close, closes, closed, fix, fixes, fixed, resolve, resolves, resolved. See https://docs.github.com/en/issues/tracking-your-work-with-issues/linking-a-pull-request-to-an-issue#linking-a-pull-request-to-an-issue-using-a-keyword -->
56

6-
## Types of changes
7+
Resolves #..
78

8-
What types of changes does your code introduce to dbldatagen?
9-
_Put an `x` in the boxes that apply_
9+
### Requirements
10+
<!-- How are your changes documented and tested? Please see the checklist below. -->
1011

11-
- [ ] Bug fix (non-breaking change which fixes an issue)
12-
- [ ] New feature (non-breaking change which adds functionality)
13-
- [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected)
14-
- [ ] Change to tutorials, tests or examples
15-
- [ ] Non code change (readme, images or other non-code assets)
16-
- [ ] Documentation Update (if none of the other choices apply)
17-
18-
## Checklist
19-
20-
_Put an `x` in the boxes that apply. You can also fill these out after creating the PR.
21-
If you're unsure about any of them, don't hesitate to ask. We're here to help!
22-
This is simply a reminder of what we are going to look for before merging your code._
23-
24-
- [ ] Lint and unit tests pass locally with my changes
25-
- [ ] I have added tests that prove my fix is effective or that my feature works
26-
- [ ] I have added necessary documentation (if appropriate)
27-
- [ ] Any dependent changes have been merged and published in downstream modules
28-
- [ ] Submission does not reduce code coverage numbers
29-
- [ ] Submission does not increase alerts or messages from prospector / lint
30-
31-
## Further comments
32-
33-
If this is a relatively large or complex change, kick off the discussion by explaining why you chose the solution you
34-
did and what alternatives you considered, etc...
12+
- [ ] manually tested
13+
- [ ] updated documentation
14+
- [ ] updated demos
15+
- [ ] updated tests

dbldatagen/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,12 @@
4747
from .text_generator_plugins import PyfuncText, PyfuncTextFactory, FakerTextFactory, fakerText
4848
from .html_utils import HtmlUtils
4949
from .datasets_object import Datasets
50+
from .config import OutputDataset
5051

5152
__all__ = ["data_generator", "data_analyzer", "schema_parser", "daterange", "nrange",
5253
"column_generation_spec", "utils", "function_builder",
5354
"spark_singleton", "text_generators", "datarange", "datagen_constants",
54-
"text_generator_plugins", "html_utils", "datasets_object", "constraints"
55+
"text_generator_plugins", "html_utils", "datasets_object", "constraints", "config"
5556
]
5657

5758

dbldatagen/config.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# See the License for the specific language governing permissions and
2+
# limitations under the License.
3+
#
4+
5+
"""
6+
This module implements configuration classes for writing generated data.
7+
"""
8+
from dataclasses import dataclass
9+
10+
11+
@dataclass(frozen=True, slots=True)
12+
class OutputDataset:
13+
"""
14+
This class implements an output sink configuration used to write generated data. An output location must be
15+
provided. The output mode, format, and options can be provided.
16+
17+
:param location: Output location for writing data. This could be an absolute path, a relative path to a Databricks
18+
Volume, or a full table location using Unity catalog's 3-level namespace.
19+
:param output_mode: Output mode for writing data (default is ``"append"``).
20+
:param format: Output data format (default is ``"delta"``).
21+
:param options: Optional dictionary of options for writing data (e.g. ``{"mergeSchema": "true"}``)
22+
"""
23+
location: str
24+
output_mode: str = "append"
25+
format: str = "delta"
26+
options: dict[str, str] | None = None
27+
trigger: dict[str, str] | None = None
28+
29+
def __post_init__(self) -> None:
30+
if not self.trigger:
31+
return
32+
33+
# Only processingTime is currently supported
34+
if "processingTime" not in self.trigger:
35+
valid_trigger_format = '{"processingTime": "10 SECONDS"}'
36+
raise ValueError(f"Attribute 'trigger' must be a dictionary of the form '{valid_trigger_format}'")

dbldatagen/data_generator.py

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,13 @@
1515
from typing import Any
1616

1717
from pyspark.sql import DataFrame, SparkSession
18+
from pyspark.sql.streaming.query import StreamingQuery
1819
from pyspark.sql.types import DataType, IntegerType, LongType, StringType, StructField, StructType
1920

2021
from dbldatagen import datagen_constants
2122
from dbldatagen._version import _get_spark_version
2223
from dbldatagen.column_generation_spec import ColumnGenerationSpec
24+
from dbldatagen.config import OutputDataset
2325
from dbldatagen.constraints import Constraint, SqlExpr
2426
from dbldatagen.datarange import DataRange
2527
from dbldatagen.distributions import DataDistribution
@@ -28,7 +30,14 @@
2830
from dbldatagen.serialization import SerializableToDict
2931
from dbldatagen.spark_singleton import SparkSingleton
3032
from dbldatagen.text_generators import TextGenerator
31-
from dbldatagen.utils import DataGenError, deprecated, ensure, split_list_matching_condition, topologicalSort
33+
from dbldatagen.utils import (
34+
DataGenError,
35+
deprecated,
36+
ensure,
37+
split_list_matching_condition,
38+
topologicalSort,
39+
write_data_to_output,
40+
)
3241

3342

3443
_OLD_MIN_OPTION: str = "min"
@@ -1204,9 +1213,9 @@ def _generateColumnDefinition(
12041213
) -> ColumnGenerationSpec:
12051214
""" generate field definition and column spec
12061215
1207-
.. note:: Any time that a new column definition is added,
1208-
we'll mark that the build plan needs to be regenerated.
1209-
For our purposes, the build plan determines the order of column generation etc.
1216+
.. note::
1217+
Any time that a new column definition is added, we'll mark that the build plan needs to be regenerated.
1218+
For our purposes, the build plan determines the order of column generation etc.
12101219
12111220
:returns: Newly added column_spec
12121221
"""
@@ -1381,7 +1390,6 @@ def _adjustBuildOrderForSqlDependencies(self, buildOrder: list[list[str]], colum
13811390
:param buildOrder: list of lists of ids - each sublist represents phase of build
13821391
:param columnSpecsByName: dictionary to map column names to column specs
13831392
:returns: Spark SQL dataframe of generated test data
1384-
13851393
"""
13861394
new_build_order = []
13871395

@@ -1476,8 +1484,8 @@ def withConstraint(self, constraint: Constraint) -> "DataGenerator":
14761484
:returns: A modified version of the current DataGenerator with the constraint applied
14771485
14781486
.. note::
1479-
Constraints are applied at the end of the data generation. Depending on the type of the constraint, the
1480-
constraint may also affect other aspects of the data generation.
1487+
Constraints are applied at the end of the data generation. Depending on the type of the constraint, the
1488+
constraint may also affect other aspects of the data generation.
14811489
"""
14821490
assert constraint is not None, "Constraint cannot be empty"
14831491
assert isinstance(constraint, Constraint), \
@@ -1494,8 +1502,8 @@ def withConstraints(self, constraints: list[Constraint]) -> "DataGenerator":
14941502
:returns: A modified version of the current `DataGenerator` with the constraints applied
14951503
14961504
.. note::
1497-
Constraints are applied at the end of the data generation. Depending on the type of the constraint, the
1498-
constraint may also affect other aspects of the data generation.
1505+
Constraints are applied at the end of the data generation. Depending on the type of the constraint, the
1506+
constraint may also affect other aspects of the data generation.
14991507
"""
15001508
assert constraints is not None, "Constraints list cannot be empty"
15011509

@@ -1515,9 +1523,9 @@ def withSqlConstraint(self, sqlExpression: str) -> "DataGenerator":
15151523
:returns: A modified version of the current `DataGenerator` with the SQL expression constraint applied
15161524
15171525
.. note::
1518-
Note in the current implementation, this may be equivalent to adding where clauses to the generated dataframe
1519-
but in future releases, this may be optimized to affect the underlying data generation so that constraints
1520-
are satisfied more efficiently.
1526+
Note in the current implementation, this may be equivalent to adding where clauses to the generated dataframe
1527+
but in future releases, this may be optimized to affect the underlying data generation so that constraints
1528+
are satisfied more efficiently.
15211529
"""
15221530
self.withConstraint(SqlExpr(sqlExpression))
15231531
return self
@@ -1909,6 +1917,27 @@ def scriptMerge(
19091917

19101918
return result
19111919

1920+
def saveAsDataset(
1921+
self,
1922+
dataset: OutputDataset,
1923+
with_streaming: bool | None = None,
1924+
generator_options: dict[str, Any] | None = None
1925+
) -> StreamingQuery | None:
1926+
"""
1927+
Builds a `DataFrame` from the `DataGenerator` and writes the data to an output dataset (e.g. a table or files).
1928+
1929+
:param dataset: Output dataset for writing generated data
1930+
:param with_streaming: Whether to generate data using streaming. If None, auto-detects based on trigger
1931+
:param generator_options: Options for building the generator (e.g. `{"rowsPerSecond": 100}`)
1932+
:returns: A Spark `StreamingQuery` if data is written in streaming, otherwise `None`
1933+
"""
1934+
# Auto-detect streaming mode if not explicitly specified
1935+
if with_streaming is None:
1936+
with_streaming = dataset.trigger is not None and len(dataset.trigger) > 0
1937+
1938+
df = self.build(withStreaming=with_streaming, options=generator_options)
1939+
return write_data_to_output(df, output_dataset=dataset)
1940+
19121941
@staticmethod
19131942
def loadFromJson(options: str) -> "DataGenerator":
19141943
"""

dbldatagen/datasets/basic_stock_ticker.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
class BasicStockTickerProvider(DatasetProvider.NoAssociatedDatasetsMixin, DatasetProvider):
1616
"""
1717
Basic Stock Ticker Dataset
18-
========================
18+
==========================
1919
2020
This is a basic stock ticker dataset with time-series `symbol`, `open`, `close`, `high`, `low`,
2121
`adj_close`, and `volume` values.
@@ -31,7 +31,6 @@ class BasicStockTickerProvider(DatasetProvider.NoAssociatedDatasetsMixin, Datase
3131
3232
Note that this dataset does not use any features that would prevent it from being used as a source for a
3333
streaming dataframe, and so the flag `supportsStreaming` is set to True.
34-
3534
"""
3635
DEFAULT_NUM_SYMBOLS = 100
3736
DEFAULT_START_DATE = "2024-10-01"

dbldatagen/datasets_object.py

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@
2020
import re
2121

2222
from dbldatagen.datasets.dataset_provider import DatasetProvider
23-
from .spark_singleton import SparkSingleton
24-
from .utils import strip_margins
23+
from dbldatagen.spark_singleton import SparkSingleton
24+
from dbldatagen.utils import strip_margins
25+
from dbldatagen.data_generator import DataGenerator
2526

2627

2728
class Datasets:
@@ -211,12 +212,6 @@ def get(self, table=None, rows=-1, partitions=-1, **kwargs):
211212
If the dataset supports multiple tables, the table may be specified in the `table` parameter.
212213
If none is specified, the primary table is used.
213214
214-
:param table: name of table to retrieve
215-
:param rows: number of rows to generate. if -1, provider should compute defaults.
216-
:param partitions: number of partitions to use.If -1, the number of partitions is computed automatically
217-
table size and partitioning.If applied to a dataset with only a single table, this is ignored.
218-
:param kwargs: additional keyword arguments to pass to the provider
219-
220215
If `rows` or `partitions` are not specified, default values are supplied by the provider.
221216
222217
For multi-table datasets, the table name must be specified. For single table datasets, the table name may
@@ -225,41 +220,44 @@ def get(self, table=None, rows=-1, partitions=-1, **kwargs):
225220
Additionally, for multi-table datasets, the table name must be one of the tables supported by the provider.
226221
Default number of rows for multi-table datasets may differ - for example a 'customers' table may have a
227222
100,000 rows while a 'sales' table may have 1,000,000 rows.
223+
224+
:param table: name of table to retrieve
225+
:param rows: number of rows to generate. if -1, provider should compute defaults.
226+
:param partitions: number of partitions to use.If -1, the number of partitions is computed automatically
227+
table size and partitioning.If applied to a dataset with only a single table, this is ignored.
228+
:param kwargs: additional keyword arguments to pass to the provider
229+
:returns: table generator
228230
"""
229231

230232
return self._get(providerName=self._name, tableName=table, rows=rows, partitions=partitions,
231233
**kwargs)
232234

233235
def _getSupportingTable(self, *, providerName, tableName, rows=-1, partitions=-1, **kwargs):
234-
providerInstance, providerDefinition = \
235-
self._getProviderInstanceAndMetadata(providerName, supportsStreaming=self._streamingRequired)
236+
providerInstance, providerDefinition = self._getProviderInstanceAndMetadata(
237+
providerName, supportsStreaming=self._streamingRequired
238+
)
236239

237240
assert tableName is not None and len(tableName.strip()) > 0, "Data set name must be provided"
238241

239242
if tableName not in providerDefinition.associatedDatasets:
240243
raise ValueError(f"Dataset `{tableName}` not a recognized dataset option")
241244

242-
dfSupportingTable = providerInstance.getAssociatedDataset(self._sparkSession, tableName=tableName, rows=rows,
243-
partitions=partitions,
244-
**kwargs)
245+
dfSupportingTable = providerInstance.getAssociatedDataset(
246+
self._sparkSession, tableName=tableName, rows=rows, partitions=partitions, **kwargs
247+
)
245248
return dfSupportingTable
246249

247-
def getAssociatedDataset(self, *, table, rows=-1, partitions=-1, **kwargs):
248-
"""Get a table generator from the dataset provider
250+
def getAssociatedDataset(self, *, table, rows=-1, partitions=-1, **kwargs) -> DataGenerator:
251+
"""
252+
Gets a table generator from the dataset provider.
249253
250-
These are DataGenerator instances that can be used to generate the data.
254+
Associated datasets are DataGenerator instances that can be used to generate the data.
251255
The dataset providers also optionally can provide supporting tables which are computed tables based on
252256
parameters. These are retrieved using the `getAssociatedDataset` method
253257
254258
If the dataset supports multiple tables, the table may be specified in the `table` parameter.
255259
If none is specified, the primary table is used.
256260
257-
:param table: name of table to retrieve
258-
:param rows: number of rows to generate. if -1, provider should compute defaults.
259-
:param partitions: number of partitions to use.If -1, the number of partitions is computed automatically
260-
table size and partitioning.If applied to a dataset with only a single table, this is ignored.
261-
:param kwargs: additional keyword arguments to pass to the provider
262-
263261
If `rows` or `partitions` are not specified, default values are supplied by the provider.
264262
265263
For multi-table datasets, the table name must be specified. For single table datasets, the table name may
@@ -269,9 +267,13 @@ def getAssociatedDataset(self, *, table, rows=-1, partitions=-1, **kwargs):
269267
Default number of rows for multi-table datasets may differ - for example a 'customers' table may have a
270268
100,000 rows while a 'sales' table may have 1,000,000 rows.
271269
272-
.. note ::
270+
:param table: Name of table to retrieve
271+
:param rows: Number of rows to generate. if -1, provider should compute defaults
272+
:param partitions: number of partitions to use. If -1, the number of partitions is computed automatically table
273+
size and partitioning. If applied to a dataset with only a single table, this is ignored.
273274
274-
This method may also be invoked via the aliased names - `getSupportingDataset` and `getCombinedDataset`
275+
.. note ::
276+
This method may also be invoked via the aliased names - `getSupportingDataset` and `getCombinedDataset`
275277
"""
276278
return self._getSupportingTable(providerName=self._name, tableName=table, rows=rows, partitions=partitions,
277279
**kwargs)

0 commit comments

Comments
 (0)