From dd9dfaaa990b7adbaf971d3b13d4a96887316ea0 Mon Sep 17 00:00:00 2001 From: ChengHui Chen <27797326+chenghuichen@users.noreply.github.com> Date: Sun, 23 Feb 2025 23:54:39 +0800 Subject: [PATCH 1/4] #46 Improve Readability of TableRead Impletation --- pypaimon/api/table_read.py | 8 +++--- pypaimon/py4j/java_implementation.py | 37 +++++++++++----------------- 2 files changed, 18 insertions(+), 27 deletions(-) diff --git a/pypaimon/api/table_read.py b/pypaimon/api/table_read.py index 60b31e7..54fced6 100644 --- a/pypaimon/api/table_read.py +++ b/pypaimon/api/table_read.py @@ -31,14 +31,14 @@ class TableRead(ABC): """To read data from data splits.""" - @abstractmethod - def to_arrow(self, splits: List[Split]) -> pa.Table: - """Read data from splits and converted to pyarrow.Table format.""" - @abstractmethod def to_arrow_batch_reader(self, splits: List[Split]) -> pa.RecordBatchReader: """Read data from splits and converted to pyarrow.RecordBatchReader format.""" + @abstractmethod + def to_arrow(self, splits: List[Split]) -> pa.Table: + """Read data from splits and converted to pyarrow.Table format.""" + @abstractmethod def to_pandas(self, splits: List[Split]) -> pd.DataFrame: """Read data from splits and converted to pandas.DataFrame format.""" diff --git a/pypaimon/py4j/java_implementation.py b/pypaimon/py4j/java_implementation.py index 9f378b7..94348a2 100644 --- a/pypaimon/py4j/java_implementation.py +++ b/pypaimon/py4j/java_implementation.py @@ -177,23 +177,20 @@ def file_paths(self) -> List[str]: class TableRead(table_read.TableRead): def __init__(self, j_table_read, j_read_type, catalog_options): - self._j_table_read = j_table_read - self._j_read_type = j_read_type - self._catalog_options = catalog_options - self._j_bytes_reader = None self._arrow_schema = java_utils.to_arrow_schema(j_read_type) + self._j_bytes_reader = get_gateway().jvm.InvocationUtil.createParallelBytesReader( + j_table_read, j_read_type, TableRead._get_max_workers(catalog_options)) - def to_arrow(self, splits): - record_batch_reader = self.to_arrow_batch_reader(splits) - return pa.Table.from_batches(record_batch_reader, schema=self._arrow_schema) - - def to_arrow_batch_reader(self, splits): - self._init() + def to_arrow_batch_reader(self, splits) -> pa.RecordBatchReader: j_splits = list(map(lambda s: s.to_j_split(), splits)) self._j_bytes_reader.setSplits(j_splits) batch_iterator = self._batch_generator() return pa.RecordBatchReader.from_batches(self._arrow_schema, batch_iterator) + def to_arrow(self, splits) -> pa.Table: + record_batch_reader = self.to_arrow_batch_reader(splits) + return pa.Table.from_batches(record_batch_reader, schema=self._arrow_schema) + def to_pandas(self, splits: List[Split]) -> pd.DataFrame: return self.to_arrow(splits).to_pandas() @@ -213,19 +210,13 @@ def to_ray(self, splits: List[Split]) -> "ray.data.dataset.Dataset": return ray.data.from_arrow(self.to_arrow(splits)) - def _init(self): - if self._j_bytes_reader is None: - # get thread num - max_workers = self._catalog_options.get(constants.MAX_WORKERS) - if max_workers is None: - # default is sequential - max_workers = 1 - else: - max_workers = int(max_workers) - if max_workers <= 0: - raise ValueError("max_workers must be greater than 0") - self._j_bytes_reader = get_gateway().jvm.InvocationUtil.createParallelBytesReader( - self._j_table_read, self._j_read_type, max_workers) + @staticmethod + def _get_max_workers(catalog_options): + # default is sequential + max_workers = int(catalog_options.get(constants.MAX_WORKERS, 1)) + if max_workers <= 0: + raise ValueError("max_workers must be greater than 0") + return max_workers def _batch_generator(self) -> Iterator[pa.RecordBatch]: while True: From 1d00d0e05d8c7adb79548642555bceec5b52a34c Mon Sep 17 00:00:00 2001 From: ChengHui Chen <27797326+chenghuichen@users.noreply.github.com> Date: Thu, 20 Mar 2025 14:28:00 +0800 Subject: [PATCH 2/4] #46 Improve Readability of TableRead Impletation --- pypaimon/api/table_read.py | 8 ++++---- pypaimon/py4j/java_implementation.py | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/pypaimon/api/table_read.py b/pypaimon/api/table_read.py index 54fced6..60b31e7 100644 --- a/pypaimon/api/table_read.py +++ b/pypaimon/api/table_read.py @@ -31,14 +31,14 @@ class TableRead(ABC): """To read data from data splits.""" - @abstractmethod - def to_arrow_batch_reader(self, splits: List[Split]) -> pa.RecordBatchReader: - """Read data from splits and converted to pyarrow.RecordBatchReader format.""" - @abstractmethod def to_arrow(self, splits: List[Split]) -> pa.Table: """Read data from splits and converted to pyarrow.Table format.""" + @abstractmethod + def to_arrow_batch_reader(self, splits: List[Split]) -> pa.RecordBatchReader: + """Read data from splits and converted to pyarrow.RecordBatchReader format.""" + @abstractmethod def to_pandas(self, splits: List[Split]) -> pd.DataFrame: """Read data from splits and converted to pandas.DataFrame format.""" diff --git a/pypaimon/py4j/java_implementation.py b/pypaimon/py4j/java_implementation.py index 94348a2..56d5080 100644 --- a/pypaimon/py4j/java_implementation.py +++ b/pypaimon/py4j/java_implementation.py @@ -181,16 +181,16 @@ def __init__(self, j_table_read, j_read_type, catalog_options): self._j_bytes_reader = get_gateway().jvm.InvocationUtil.createParallelBytesReader( j_table_read, j_read_type, TableRead._get_max_workers(catalog_options)) + def to_arrow(self, splits) -> pa.Table: + record_batch_reader = self.to_arrow_batch_reader(splits) + return pa.Table.from_batches(record_batch_reader, schema=self._arrow_schema) + def to_arrow_batch_reader(self, splits) -> pa.RecordBatchReader: j_splits = list(map(lambda s: s.to_j_split(), splits)) self._j_bytes_reader.setSplits(j_splits) batch_iterator = self._batch_generator() return pa.RecordBatchReader.from_batches(self._arrow_schema, batch_iterator) - def to_arrow(self, splits) -> pa.Table: - record_batch_reader = self.to_arrow_batch_reader(splits) - return pa.Table.from_batches(record_batch_reader, schema=self._arrow_schema) - def to_pandas(self, splits: List[Split]) -> pd.DataFrame: return self.to_arrow(splits).to_pandas() From 0992aa9adcaf780096bbbdc20987bd124b421b1a Mon Sep 17 00:00:00 2001 From: ChengHui Chen <27797326+chenghuichen@users.noreply.github.com> Date: Thu, 20 Mar 2025 14:28:36 +0800 Subject: [PATCH 3/4] #46 Improve Readability of TableRead Impletation --- pypaimon/py4j/java_implementation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pypaimon/py4j/java_implementation.py b/pypaimon/py4j/java_implementation.py index 56d5080..cfc034b 100644 --- a/pypaimon/py4j/java_implementation.py +++ b/pypaimon/py4j/java_implementation.py @@ -181,7 +181,7 @@ def __init__(self, j_table_read, j_read_type, catalog_options): self._j_bytes_reader = get_gateway().jvm.InvocationUtil.createParallelBytesReader( j_table_read, j_read_type, TableRead._get_max_workers(catalog_options)) - def to_arrow(self, splits) -> pa.Table: + def to_arrow(self, splits): record_batch_reader = self.to_arrow_batch_reader(splits) return pa.Table.from_batches(record_batch_reader, schema=self._arrow_schema) From ab2d26c88085d2df0d9c27615a4542c1ec5db315 Mon Sep 17 00:00:00 2001 From: ChengHui Chen <27797326+chenghuichen@users.noreply.github.com> Date: Thu, 20 Mar 2025 14:29:08 +0800 Subject: [PATCH 4/4] #46 Improve Readability of TableRead Impletation --- pypaimon/py4j/java_implementation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pypaimon/py4j/java_implementation.py b/pypaimon/py4j/java_implementation.py index cfc034b..07721f0 100644 --- a/pypaimon/py4j/java_implementation.py +++ b/pypaimon/py4j/java_implementation.py @@ -185,7 +185,7 @@ def to_arrow(self, splits): record_batch_reader = self.to_arrow_batch_reader(splits) return pa.Table.from_batches(record_batch_reader, schema=self._arrow_schema) - def to_arrow_batch_reader(self, splits) -> pa.RecordBatchReader: + def to_arrow_batch_reader(self, splits): j_splits = list(map(lambda s: s.to_j_split(), splits)) self._j_bytes_reader.setSplits(j_splits) batch_iterator = self._batch_generator()