Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
d9121f8
feat: add sharding support for GeoZarr conversion and CLI
emmanuelmathot Sep 26, 2025
8b83e77
update launch configurations for GeoZarr conversion with new data sou…
emmanuelmathot Sep 26, 2025
2693532
Merge branch 'launch' into sharding
emmanuelmathot Sep 26, 2025
367b146
feat: enable sharding in GeoZarr conversion launch configuration
emmanuelmathot Sep 26, 2025
30d6fb0
fix: update sharding codec handling in _create_sharded_encoding function
emmanuelmathot Sep 26, 2025
6f3db70
Merge branch 'sharding' of https://github.com/EOPF-Explorer/data-mode…
emmanuelmathot Sep 26, 2025
4f38d68
refactor: streamline sharding configuration in _create_geozarr_encodi…
emmanuelmathot Sep 26, 2025
37ad2c0
feat: enhance sharding logic in _create_geozarr_encoding and add _cal…
emmanuelmathot Sep 26, 2025
67faa58
feat: improve sharding configuration and validation in _create_geozar…
emmanuelmathot Sep 26, 2025
cccd8fd
fix: refine shard dimension calculation and improve divisor check in …
emmanuelmathot Sep 26, 2025
064917a
Add dataset tree structure and test script for sharding fix
emmanuelmathot Sep 26, 2025
82305a6
feat: enable sharding in Dask cluster setup and enhance chunking logi…
emmanuelmathot Sep 26, 2025
4486f81
Merge branch 'sharding' of https://github.com/EOPF-Explorer/data-mode…
emmanuelmathot Sep 26, 2025
3232a8e
Add Sentinel-2 Optimization Module with CLI Integration and Data Proc…
emmanuelmathot Sep 26, 2025
4fc14a3
feat: enhance S2 data consolidator with comprehensive extraction meth…
emmanuelmathot Sep 26, 2025
3d1ea51
Add comprehensive tests for S2MultiscalePyramid class
emmanuelmathot Sep 26, 2025
1ae2c19
feat: simplify chunk alignment and sharding logic in S2MultiscalePyramid
emmanuelmathot Sep 26, 2025
fca840e
feat: integrate S2 optimization commands into CLI and enhance convert…
emmanuelmathot Sep 26, 2025
72546bc
feat: add S2L2A optimized conversion command to CLI and update launch…
emmanuelmathot Sep 26, 2025
4d07907
feat: enhance S2 converter and multiscale pyramid with optimized enco…
emmanuelmathot Sep 27, 2025
89bc6cf
feat: enhance sharding logic to ensure compatibility with chunk dimen…
emmanuelmathot Sep 27, 2025
3aff8d3
feat: add downsampling for 10m data and adjust dataset creation for l…
emmanuelmathot Sep 27, 2025
1cd4281
feat: add support for Dask cluster in S2 optimization commands and en…
emmanuelmathot Sep 28, 2025
dfca7e1
feat: add compression level option for GeoZarr conversion
emmanuelmathot Sep 28, 2025
68d6715
Merge branch 'new_s2' of https://github.com/EOPF-Explorer/data-model …
emmanuelmathot Sep 28, 2025
d4c7487
feat: implement Dask parallelization for multiscale pyramid creation …
emmanuelmathot Sep 28, 2025
a1539fb
feat: enhance multiscale pyramid creation with streaming Dask paralle…
emmanuelmathot Sep 28, 2025
ded0f61
feat: configure Dask client to use 3 workers with 8GB memory each for…
emmanuelmathot Sep 28, 2025
0a2cc41
fix: update import path for geozarr functions in S2OptimizedConverter
emmanuelmathot Sep 28, 2025
19cabb8
feat: refactor multiscales metadata handling and root consolidation i…
emmanuelmathot Sep 28, 2025
75be5f7
feat: add comprehensive unit tests for S2OptimizedConverter and relat…
emmanuelmathot Sep 28, 2025
c409af1
feat: implement geographic metadata writing in S2MultiscalePyramid an…
emmanuelmathot Sep 28, 2025
7df85bd
feat: skip duplicate variables during downsampling in S2MultiscalePyr…
emmanuelmathot Sep 28, 2025
e7896c4
feat: enhance CRS handling by adding grid mapping variable to dataset…
emmanuelmathot Sep 28, 2025
eec6b27
feat: add grid mapping variable writing for datasets in S2MultiscaleP…
emmanuelmathot Sep 28, 2025
d4a5a95
feat: skip already present variables during downsampling in S2Multisc…
emmanuelmathot Sep 28, 2025
df005ef
feat: reduce memory limit for Dask client to 4GB and add geographic m…
emmanuelmathot Sep 28, 2025
728168d
Merge branch 'main' into new_s2
emmanuelmathot Sep 29, 2025
0a265aa
Refactor test cases and improve code formatting in S2 resampling test…
emmanuelmathot Sep 29, 2025
4849e3f
feat: update memory limit for Dask client to 8GB and adjust spatial c…
emmanuelmathot Sep 29, 2025
e388b26
Merge branch 'new_s2' of https://github.com/EOPF-Explorer/data-model …
emmanuelmathot Sep 29, 2025
3d02eab
feat: add new CLI command for converting to GeoZarr S2L2A optimized f…
emmanuelmathot Sep 29, 2025
48f5dd8
feat: implement batched parallel downsampling for S2 datasets and imp…
emmanuelmathot Sep 29, 2025
94f62f9
Merge branch 'new_s2' of https://github.com/EOPF-Explorer/data-model …
emmanuelmathot Sep 29, 2025
e20c411
fix: update measurement group keys and enhance dataset loading with d…
emmanuelmathot Sep 30, 2025
e33f035
feat: add streaming support for multiscale pyramid creation in S2 con…
emmanuelmathot Sep 30, 2025
0307d0d
feat: add --enable-streaming option for experimental streaming mode i…
emmanuelmathot Sep 30, 2025
1d6a922
fix: avoid passing coordinates in lazy dataset creation to prevent al…
emmanuelmathot Sep 30, 2025
c32aaef
feat: implement Zarr v3 compatible encoding for optimized datasets
emmanuelmathot Sep 30, 2025
2ccb110
fix: enhance measurements group writing by consolidating metadata and…
emmanuelmathot Sep 30, 2025
a4952e7
feat: enhance streaming write with advanced chunking and sharding sup…
emmanuelmathot Sep 30, 2025
9d97eee
feat: enhance encoding for streaming writes with advanced chunking an…
emmanuelmathot Sep 30, 2025
76cde29
fix: improve root-level metadata consolidation with proper Zarr group…
emmanuelmathot Sep 30, 2025
52e516d
feat: add streaming support to S2 optimized converter and update meas…
emmanuelmathot Sep 30, 2025
73bfea2
Merge branch 'new_s2' of https://github.com/EOPF-Explorer/data-model …
emmanuelmathot Sep 30, 2025
5e58251
fix: change root Zarr group creation mode from 'w' to 'a' for appendi…
emmanuelmathot Sep 30, 2025
8250bc7
refactor: streamline Zarr group handling and metadata consolidation i…
emmanuelmathot Sep 30, 2025
deff685
fix: streamline root Zarr group creation by removing existence check …
emmanuelmathot Sep 30, 2025
56596e2
Merge branch 'new_s2' of https://github.com/EOPF-Explorer/data-model …
emmanuelmathot Sep 30, 2025
cb4ada1
fix: correct multiscales attribute assignment and update group prefix…
emmanuelmathot Sep 30, 2025
16c245b
feat: add downsampled coordinates creation for multiscale pyramid levels
emmanuelmathot Sep 30, 2025
42a72fb
fix: update launch configuration for S2A MSIL2A dataset and adjust gr…
emmanuelmathot Oct 1, 2025
e16d9f7
Refactor downsample factor calculation in S2StreamingMultiscalePyramid
emmanuelmathot Oct 1, 2025
94f55bc
Merge branch 'new_s2' of https://github.com/EOPF-Explorer/data-model …
emmanuelmathot Oct 1, 2025
e7d6bf0
Merge branch 'main' into new_s2
emmanuelmathot Oct 25, 2025
fb2b60d
streaming as default
emmanuelmathot Oct 25, 2025
bc692c7
refactor: update S2 optimization process to preserve original data st…
emmanuelmathot Oct 25, 2025
5559a91
refactor: enhance multiscale creation by preserving all original grou…
emmanuelmathot Oct 25, 2025
9bdd615
refactor: enhance group writing by preserving original chunking and e…
emmanuelmathot Oct 25, 2025
1c276a9
refactor: preserve original chunking during dataset writing by rechun…
emmanuelmathot Oct 26, 2025
66c7937
refactor: enhance downsampling process by organizing resolution group…
emmanuelmathot Oct 26, 2025
b499995
refactor: improve error handling and verbosity in downsampling proces…
emmanuelmathot Oct 26, 2025
18de7ab
refactor: update band mapping for Sentinel-2 by adding 'b10' to nativ…
emmanuelmathot Oct 26, 2025
e9cf962
refactor: update tile dimensions calculation and enhance multiscales …
emmanuelmathot Nov 3, 2025
cc8eaa3
refactor: simplify multiscales metadata addition by removing unnecess…
emmanuelmathot Nov 3, 2025
8d1c89d
refactor: simplify variable naming in multiscale pyramid creation and…
emmanuelmathot Nov 3, 2025
70fe228
Merge branch 'new_s2' of https://github.com/EOPF-Explorer/data-model …
emmanuelmathot Nov 3, 2025
935b043
refactor: streamline zarr group creation and multiscales metadata han…
emmanuelmathot Nov 3, 2025
b9bcc31
refactor: change Zarr write mode from 'a' to 'r+' in S2 converter and…
emmanuelmathot Nov 3, 2025
c54b253
refactor: change Zarr write mode from 'r+' to 'a' and simplify DataTr…
emmanuelmathot Nov 3, 2025
efd8fa2
fix: correct parameter name from 'modea' to 'mode' in DataTree zarr w…
emmanuelmathot Nov 3, 2025
f8271a3
feat: add missing parent groups creation in root-level metadata conso…
emmanuelmathot Nov 3, 2025
117795b
feat: enhance root-level group creation by identifying and creating m…
emmanuelmathot Nov 3, 2025
59ccbc0
fix: correct parameter name from 'zqarr_format' to 'zarr_format' in S…
emmanuelmathot Nov 3, 2025
5d9057d
fix: store result of multiscales metadata addition in processed_groups
emmanuelmathot Nov 3, 2025
b3cede6
fix: update NATIVE_BANDS to include 'b10' and adjust pyramid levels c…
emmanuelmathot Nov 3, 2025
870e28f
fix: update coordinate creation in downsampling for consistency and i…
emmanuelmathot Nov 3, 2025
d751282
refactor: remove unused fixture and update tests for pyramid levels a…
emmanuelmathot Nov 3, 2025
5c583f1
Merge branch 'main' into new_s2
emmanuelmathot Nov 3, 2025
d9a3d2d
Remove Sentinel-2 Zarr Conversion Optimization Plan and associated te…
emmanuelmathot Nov 3, 2025
7d6fa51
Implement feature X to enhance user experience and fix bug Y in module Z
emmanuelmathot Nov 3, 2025
2290b8e
delete: remove dataset_tree_simplified.txt as it is no longer needed
emmanuelmathot Nov 3, 2025
a0feee9
Merge branch 'main' into new_s2
emmanuelmathot Nov 24, 2025
81d7f62
Refactor code structure for improved readability and maintainability
emmanuelmathot Nov 24, 2025
e347fcd
Refactor S2 optimization commands and enhance CRS handling
emmanuelmathot Nov 24, 2025
a630631
Add geo metadata writing for /measurements/ groups in S2 multiscale p…
emmanuelmathot Nov 24, 2025
4487090
Add support for additional S3 storage options and update launch confi…
emmanuelmathot Nov 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
669 changes: 357 additions & 312 deletions .vscode/launch.json

Large diffs are not rendered by default.

10 changes: 0 additions & 10 deletions src/eopf_geozarr/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1182,14 +1182,6 @@ def add_s2_optimization_commands(subparsers: Any) -> None:
choices=range(1, 10),
help="Compression level 1-9 (default: 3)",
)
s2_parser.add_argument(
"--skip-geometry", action="store_true", help="Skip creating geometry group"
)
s2_parser.add_argument(
"--skip-meteorology",
action="store_true",
help="Skip creating meteorology group",
)
s2_parser.add_argument(
"--skip-validation", action="store_true", help="Skip output validation"
)
Expand Down Expand Up @@ -1229,8 +1221,6 @@ def convert_s2_optimized_command(args: Any) -> int:
enable_sharding=args.enable_sharding,
spatial_chunk=args.spatial_chunk,
compression_level=args.compression_level,
create_geometry_group=not args.skip_geometry,
create_meteorology_group=not args.skip_meteorology,
validate_output=not args.skip_validation,
verbose=args.verbose,
)
Expand Down
15 changes: 15 additions & 0 deletions src/eopf_geozarr/conversion/fs_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ def get_s3_storage_options(s3_path: str, **s3_kwargs: Any) -> S3FsOptions:
"client_kwargs": {
"region_name": os.environ.get("AWS_DEFAULT_REGION", "us-east-1")
},
"s3_additional_kwargs": {
"StorageClass": "EXPRESS_ONEZONE",
},
}

# Add custom endpoint support (e.g., for OVH Cloud)
Expand Down Expand Up @@ -209,6 +212,9 @@ def write_s3_json_metadata(
"client_kwargs": {
"region_name": os.environ.get("AWS_DEFAULT_REGION", "us-east-1")
},
"s3_additional_kwargs": {
"StorageClass": "EXPRESS_ONEZONE",
},
}

# Add custom endpoint support (e.g., for OVH Cloud)
Expand Down Expand Up @@ -251,6 +257,9 @@ def read_s3_json_metadata(s3_path: str, **s3_kwargs: Any) -> dict[str, Any]:
"client_kwargs": {
"region_name": os.environ.get("AWS_DEFAULT_REGION", "us-east-1")
},
"s3_additional_kwargs": {
"StorageClass": "EXPRESS_ONEZONE",
},
}

# Add custom endpoint support (e.g., for OVH Cloud)
Expand Down Expand Up @@ -293,6 +302,9 @@ def s3_path_exists(s3_path: str, **s3_kwargs: Any) -> bool:
"client_kwargs": {
"region_name": os.environ.get("AWS_DEFAULT_REGION", "us-east-1")
},
"s3_additional_kwargs": {
"StorageClass": "EXPRESS_ONEZONE",
},
}

# Add custom endpoint support (e.g., for OVH Cloud)
Expand Down Expand Up @@ -380,6 +392,9 @@ def validate_s3_access(s3_path: str, **s3_kwargs: Any) -> tuple[bool, str | None
"client_kwargs": {
"region_name": os.environ.get("AWS_DEFAULT_REGION", "us-east-1")
},
"s3_additional_kwargs": {
"StorageClass": "EXPRESS_ONEZONE",
},
}

# Add custom endpoint support (e.g., for OVH Cloud)
Expand Down
48 changes: 35 additions & 13 deletions src/eopf_geozarr/s2_optimization/s2_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import time
from typing import Any, Dict

from pyproj import CRS
import structlog
import xarray as xr

Expand Down Expand Up @@ -37,13 +38,12 @@ def __init__(
# Initialize components - streaming is always enabled
self.pyramid_creator = S2MultiscalePyramid(enable_sharding, spatial_chunk)
self.validator = S2OptimizationValidator()
self.crs: CRS | None = None

def convert_s2_optimized(
self,
dt_input: xr.DataTree,
output_path: str,
create_geometry_group: bool = True,
create_meteorology_group: bool = True,
validate_output: bool = True,
verbose: bool = False,
) -> xr.DataTree:
Expand Down Expand Up @@ -75,28 +75,30 @@ def convert_s2_optimized(
raise ValueError("Input dataset is not a Sentinel-2 product")

# Step 1: Process data while preserving original structure
log.info("Step 1: Processing data with original structure preserved")
log.info("Step 1: Preparing data (getting CRS, etc.)...")
self._init_crs_for_groups(dt_input)

# Step 2: Create multiscale pyramids for each group in the original structure
log.info("Step 2: Creating multiscale pyramids (preserving original hierarchy)")
log.info(
"Step 2: Creating multiscale pyramids (preserving original hierarchy)..."
)
datasets = self.pyramid_creator.create_multiscale_from_datatree(
dt_input, output_path, verbose
dt_input, output_path, verbose, self.crs
)

log.info("Created multiscale pyramids", num_groups=len(datasets))
log.info(f" Created multiscale pyramids for {len(datasets)} groups")

# Step 3: Root-level consolidation
log.info("Step 3: Final root-level metadata consolidation")
log.info("Step 3: Final root-level metadata consolidation...")
self._simple_root_consolidation(output_path, datasets)

# Step 4: Validation
if validate_output:
log.info("Step 4: Validating optimized dataset")
log.info("Step 4: Validating optimized dataset...")
validation_results = self.validator.validate_optimized_dataset(output_path)
if not validation_results["is_valid"]:
log.warning(
"Validation issues found", issues=validation_results["issues"]
)
log.info(" Warning: Validation issues found:")
for issue in validation_results["issues"]:
log.info(f" - {issue}")

# Create result DataTree
result_dt = self._create_result_datatree(output_path)
Expand All @@ -108,6 +110,27 @@ def convert_s2_optimized(
self._print_optimization_summary(dt_input, result_dt, output_path)

return result_dt

def _init_crs_for_groups(self, dt: xr.DataTree) -> None:
epsg: int | None = None

# For CPM >= 2.6.0, the EPSG code is stored in attributes
epsg_CPM_260 = dt.attrs.get("other_metadata", {}).get("horizontal_CRS_code", None)
if epsg_CPM_260 is not None:
epsg = int(epsg_CPM_260.split(":")[-1])
# For older CPM versions, look for proj:epsg attribute in data variables
else:
for group in dt.groups.values():
for var in group.to_dataset().data_vars.values():
if "proj:epsg" in var.attrs:
epsg = int(var.attrs["proj:epsg"])
break
if epsg is not None:
break

self.crs = CRS.from_epsg(epsg) if epsg is not None else None
self.pyramid_creator.crs = self.crs


def _is_sentinel2_dataset(self, dt: xr.DataTree) -> bool:
"""Check if dataset is Sentinel-2."""
Expand Down Expand Up @@ -147,7 +170,6 @@ def _simple_root_consolidation(
parent_path = "/" + "/".join(parts[:i])
if parent_path not in datasets:
missing_groups.add(parent_path)

for group_path in missing_groups:
dt_parent = xr.DataTree()
dt_parent.to_zarr(
Expand Down
58 changes: 23 additions & 35 deletions src/eopf_geozarr/s2_optimization/s2_multiscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def __init__(self, enable_sharding: bool = True, spatial_chunk: int = 256):
self.enable_sharding = enable_sharding
self.spatial_chunk = spatial_chunk
self.resampler = S2ResamplingEngine()
self.crs: CRS | None = None

# Define pyramid levels: resolution in meters
self.pyramid_levels = {
Expand All @@ -58,7 +59,7 @@ def __init__(self, enable_sharding: bool = True, spatial_chunk: int = 256):
}

def create_multiscale_from_datatree(
self, dt_input: xr.DataTree, output_path: str, verbose: bool = False
self, dt_input: xr.DataTree, output_path: str, verbose: bool = False, crs: CRS | None = None
) -> dict[str, dict]:
"""
Create multiscale versions preserving original structure.
Expand All @@ -68,12 +69,10 @@ def create_multiscale_from_datatree(
dt_input: Input DataTree with original structure
output_path: Base output path
verbose: Enable verbose logging

Returns:
Dictionary of processed groups
"""
processed_groups = {}

# Step 1: Copy all original groups as-is
for group_path in dt_input.groups:
if group_path == ".":
Expand All @@ -90,11 +89,11 @@ def create_multiscale_from_datatree(
# Skip empty groups
if not dataset.data_vars:
if verbose:
log.info(" Skipping empty group: {}", group_path=group_path)
log.info(" Skipping empty group", group_path=group_path)
continue

if verbose:
log.info(" Copying original group: {}", group_path=group_path)
log.info(" Copying original group", group_path=group_path)

output_group_path = f"{output_path}{group_path}"

Expand Down Expand Up @@ -124,15 +123,13 @@ def create_multiscale_from_datatree(
# Only process groups under /measurements/reflectance
if not group_path.startswith(base_path):
continue

group_name = group_path.split("/")[-1]
if group_name in ["r10m", "r20m", "r60m"]:
resolution_groups[group_name] = processed_groups[group_path]

# Find the coarsest resolution (r60m > r20m > r10m)
source_dataset = None
source_resolution = None

for res in ["r60m", "r20m", "r10m"]:
if res in resolution_groups:
source_dataset = resolution_groups[res]
Expand Down Expand Up @@ -160,7 +157,7 @@ def create_multiscale_from_datatree(
r120m_path = f"{base_path}/r120m"
factor = 120 // source_resolution
if verbose:
log.info(" Creating r120m with factor {}", factor=factor)
log.info(" Creating r120m with factor", factor=factor)

r120m_dataset = self._create_downsampled_resolution_group(
source_dataset, factor=factor, verbose=verbose
Expand All @@ -169,7 +166,7 @@ def create_multiscale_from_datatree(
if r120m_dataset and len(r120m_dataset.data_vars) > 0:
output_path_120 = f"{output_path}{r120m_path}"
if verbose:
log.info(" Writing r120m to {}", output_path_120=output_path_120)
log.info(" Writing r120m", output_path_120=output_path_120)
encoding_120 = self._create_measurements_encoding(r120m_dataset)
ds_120 = self._stream_write_dataset(
r120m_dataset, output_path_120, encoding_120
Expand All @@ -191,7 +188,7 @@ def create_multiscale_from_datatree(
output_path_360 = f"{output_path}{r360m_path}"
if verbose:
log.info(
" Writing r360m to {}",
" Writing r360m",
output_path_360=output_path_360,
)
encoding_360 = self._create_measurements_encoding(r360m_dataset)
Expand All @@ -215,7 +212,7 @@ def create_multiscale_from_datatree(
output_path_720 = f"{output_path}{r720m_path}"
if verbose:
log.info(
" Writing r720m to {}",
" Writing r720m",
output_path_720=output_path_720,
)
encoding_720 = self._create_measurements_encoding(
Expand All @@ -240,7 +237,7 @@ def create_multiscale_from_datatree(
log.info(" r360m dataset is empty, skipping")
except Exception as e:
log.warning(
"Could not create r360m for {}: {}", base_path=base_path, e=e
"Could not create r360m", base_path=base_path, e=e
)
# Track r120m for multiscales if created
if verbose:
Expand All @@ -249,7 +246,7 @@ def create_multiscale_from_datatree(
if verbose:
log.info(" r120m dataset is empty, skipping")
except Exception as e:
log.warning("Could not create r120m for {}: {}", base_path=base_path, e=e)
log.warning("Could not create r120m", base_path=base_path, e=e)

# Step 3: Add multiscales metadata to parent groups
if verbose:
Expand All @@ -262,7 +259,7 @@ def create_multiscale_from_datatree(
processed_groups[base_path] = dt_multiscale
except Exception as e:
log.warning(
"Could not add multiscales metadata to {}: {}", base_path=base_path, e=e
"Could not add multiscales metadata to parent groups", base_path=base_path, e=e
)

return processed_groups
Expand Down Expand Up @@ -302,7 +299,6 @@ def _create_downsampled_resolution_group(
"""Create a downsampled version of a dataset by given factor."""
if not source_dataset or len(source_dataset.data_vars) == 0:
return xr.Dataset()

# Get reference dimensions
ref_var = next(iter(source_dataset.data_vars.values()))
if ref_var.ndim < 2:
Expand All @@ -319,13 +315,11 @@ def _create_downsampled_resolution_group(
downsampled_coords = self._create_downsampled_coordinates(
source_dataset, target_height, target_width, factor
)

# Downsample all variables using existing lazy operations
lazy_vars = {}
for var_name, var_data in source_dataset.data_vars.items():
if var_data.ndim < 2:
continue

lazy_downsampled = self._create_lazy_downsample_operation_from_existing(
var_data, target_height, target_width
)
Expand Down Expand Up @@ -403,15 +397,16 @@ def _stream_write_dataset(
)
return existing_ds

log.info(" Streaming computation and write to {}", dataset_path=dataset_path)
log.info("Variables", variables=list(dataset.data_vars.keys()))
log.info(" Streaming computation and write", dataset_path=dataset_path)
log.info(" Variables", variables=list(dataset.data_vars.keys()))

# Rechunk dataset to align with encoding when sharding is enabled
if self.enable_sharding:
dataset = self._rechunk_dataset_for_encoding(dataset, encoding)

# Add the geo metadata before writing
self._write_geo_metadata(dataset)
# Add the geo metadata before writing for /measurements/ groups
if "/measurements/" in dataset_path:
self._write_geo_metadata(dataset)

# Write with streaming computation and progress tracking
# The to_zarr operation will trigger all lazy computations
Expand All @@ -430,14 +425,14 @@ def _stream_write_dataset(
try:
distributed.progress(write_job, notebook=False)
except Exception as e:
log.warning("Could not display progress bar: {}", e=e)
log.warning("Could not display progress bar", e=e)
write_job.compute()
else:
log.info(" Writing zarr file...")
write_job.compute()

log.info(
" Streaming write complete for dataset {}", dataset_path=dataset_path
" Streaming write complete for dataset", dataset_path=dataset_path
)
return dataset

Expand Down Expand Up @@ -740,7 +735,6 @@ def _add_multiscales_metadata_to_parent(
"resampling_method": "average",
"tile_matrix_limits": tile_matrix_limits,
}

# Create parent group path
parent_group_path = f"{output_path}{base_path}"
dt_multiscale = xr.DataTree()
Expand All @@ -766,19 +760,13 @@ def _write_geo_metadata(
) -> None:
"""Write geographic metadata to the dataset."""
# Implementation same as original
crs = None
for var in dataset.data_vars.values():
if hasattr(var, "rio") and var.rio.crs:
crs = var.rio.crs
break
elif "proj:epsg" in var.attrs:
epsg = var.attrs["proj:epsg"]
crs = CRS.from_epsg(epsg)
break
if self.crs is None:
log.warning("CRS is not set, skipping geo metadata writing")
return

if crs is not None:
if self.crs is not None:
dataset.rio.write_crs(
crs, grid_mapping_name=grid_mapping_var_name, inplace=True
self.crs, grid_mapping_name=grid_mapping_var_name, inplace=True
)
dataset.rio.write_grid_mapping(grid_mapping_var_name, inplace=True)
dataset.attrs["grid_mapping"] = grid_mapping_var_name
Expand Down
1 change: 0 additions & 1 deletion src/eopf_geozarr/s2_optimization/s2_resampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ def _downsample_classification(
# Take the center pixel of each block as representative
center_h = block_h // 2
center_w = block_w // 2

if data.ndim == 3:
# Sample every block_h and block_w pixels, starting from center
downsampled = data.values[:, center_h::block_h, center_w::block_w]
Expand Down
Loading
Loading