Skip to content
3 changes: 2 additions & 1 deletion python/zarrs/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ class Basic:
class CodecPipelineImpl:
def __new__(
cls,
metadata: builtins.str,
array_metadata: builtins.str,
store_config: StoreConfig,
*,
validate_checksums: builtins.bool | None = None,
chunk_concurrent_minimum: builtins.int | None = None,
Expand Down
67 changes: 42 additions & 25 deletions python/zarrs/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@

import asyncio
import json
import re
from dataclasses import dataclass
from typing import TYPE_CHECKING, TypedDict
from warnings import warn

import numpy as np
from zarr.abc.codec import Codec, CodecPipeline
from zarr.codecs._v2 import V2Codec
from zarr.core import BatchedCodecPipeline
from zarr.core.config import config
from zarr.core.metadata import ArrayMetadata, ArrayV2Metadata, ArrayV3Metadata

if TYPE_CHECKING:
from collections.abc import Generator, Iterable, Iterator
from typing import Any, Self

from zarr.abc.store import ByteGetter, ByteSetter
from zarr.abc.store import ByteGetter, ByteSetter, Store
from zarr.core.array_spec import ArraySpec
from zarr.core.buffer import Buffer, NDArrayLike, NDBuffer
from zarr.core.chunk_grids import ChunkGrid
Expand All @@ -40,10 +42,14 @@ class UnsupportedMetadataError(Exception):
pass


def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl | None:
def get_codec_pipeline_impl(
metadata: ArrayMetadata, store: Store
) -> CodecPipelineImpl | None:
try:
array_metadata_json = json.dumps(metadata.to_dict())
return CodecPipelineImpl(
codec_metadata_json,
array_metadata_json,
store_config=store,
validate_checksums=config.get("codec_pipeline.validate_checksums", None),
chunk_concurrent_minimum=config.get(
"codec_pipeline.chunk_concurrent_minimum", None
Expand All @@ -54,10 +60,11 @@ def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl | Non
num_threads=config.get("threading.max_workers", None),
)
except TypeError as e:
if re.match(r"codec (delta|zlib) is not supported", str(e)):
return None
else:
raise e
warn(
f"Array is unsupported by ZarrsCodecPipeline: {e}",
category=UserWarning,
)
return None


def codecs_to_dict(codecs: Iterable[Codec]) -> Generator[dict[str, Any], None, None]:
Expand Down Expand Up @@ -88,37 +95,47 @@ class ZarrsCodecPipelineState(TypedDict):
codecs: tuple[Codec, ...]


def array_metadata_to_codecs(metadata: ArrayMetadata) -> list[Codec]:
if isinstance(metadata, ArrayV3Metadata):
return metadata.codecs
elif isinstance(metadata, ArrayV2Metadata):
v2_codec = V2Codec(filters=metadata.filters, compressor=metadata.compressor)
return [v2_codec]


@dataclass
class ZarrsCodecPipeline(CodecPipeline):
codecs: tuple[Codec, ...]
metadata: ArrayMetadata
store: Store
impl: CodecPipelineImpl | None
codec_metadata_json: str
python_impl: BatchedCodecPipeline

def __getstate__(self) -> ZarrsCodecPipelineState:
return {"codec_metadata_json": self.codec_metadata_json, "codecs": self.codecs}
return {"metadata": self.metadata, "store": self.store}

def __setstate__(self, state: ZarrsCodecPipelineState):
self.codecs = state["codecs"]
self.codec_metadata_json = state["codec_metadata_json"]
self.impl = get_codec_pipeline_impl(self.codec_metadata_json)
self.python_impl = BatchedCodecPipeline.from_codecs(self.codecs)
self.metadata = state["metadata"]
self.store = state["store"]
self.impl = get_codec_pipeline_impl(self.metadata, self.store)
codecs = array_metadata_to_codecs(self.metadata)
self.python_impl = BatchedCodecPipeline.from_codecs(codecs)

def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
raise NotImplementedError("evolve_from_array_spec")
return self

@classmethod
def from_codecs(cls, codecs: Iterable[Codec]) -> Self:
codec_metadata = list(codecs_to_dict(codecs))
codec_metadata_json = json.dumps(codec_metadata)
# TODO: upstream zarr-python has not settled on how to deal with configs yet
# Should they be checked when an array is created, or when an operation is performed?
# https://github.com/zarr-developers/zarr-python/issues/2409
# https://github.com/zarr-developers/zarr-python/pull/2429#issuecomment-2566976567
return BatchedCodecPipeline.from_codecs(codecs)

@classmethod
def from_array_metadata_and_store(
cls, array_metadata: ArrayMetadata, store: Store
) -> Self:
codecs = array_metadata_to_codecs(array_metadata)
return cls(
codec_metadata_json=codec_metadata_json,
codecs=tuple(codecs),
impl=get_codec_pipeline_impl(codec_metadata_json),
metadata=array_metadata,
store=store,
impl=get_codec_pipeline_impl(array_metadata, store),
python_impl=BatchedCodecPipeline.from_codecs(codecs),
)

Expand Down
13 changes: 2 additions & 11 deletions src/chunk_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ use zarrs::{
storage::StoreKey,
};

use crate::{store::StoreConfig, utils::PyErrExt};
use crate::utils::PyErrExt;

pub(crate) trait ChunksItem {
fn store_config(&self) -> StoreConfig;
fn key(&self) -> &StoreKey;
fn representation(&self) -> &ChunkRepresentation;
}
Expand All @@ -26,7 +25,6 @@ pub(crate) trait ChunksItem {
#[gen_stub_pyclass]
#[pyclass]
pub(crate) struct Basic {
store: StoreConfig,
key: StoreKey,
representation: ChunkRepresentation,
}
Expand Down Expand Up @@ -62,7 +60,7 @@ fn fill_value_to_bytes(dtype: &str, fill_value: &Bound<'_, PyAny>) -> PyResult<V
impl Basic {
#[new]
fn new(byte_interface: &Bound<'_, PyAny>, chunk_spec: &Bound<'_, PyAny>) -> PyResult<Self> {
let store: StoreConfig = byte_interface.getattr("store")?.extract()?;
// let store: StoreConfig = byte_interface.getattr("store")?.extract()?;
let path: String = byte_interface.getattr("path")?.extract()?;

let chunk_shape = chunk_spec.getattr("shape")?.extract()?;
Expand All @@ -79,7 +77,6 @@ impl Basic {
let fill_value: Bound<'_, PyAny> = chunk_spec.getattr("fill_value")?;
let fill_value_bytes = fill_value_to_bytes(&dtype, &fill_value)?;
Ok(Self {
store,
key: StoreKey::new(path).map_py_err::<PyValueError>()?,
representation: get_chunk_representation(chunk_shape, &dtype, fill_value_bytes)?,
})
Expand Down Expand Up @@ -118,9 +115,6 @@ impl WithSubset {
}

impl ChunksItem for Basic {
fn store_config(&self) -> StoreConfig {
self.store.clone()
}
fn key(&self) -> &StoreKey {
&self.key
}
Expand All @@ -130,9 +124,6 @@ impl ChunksItem for Basic {
}

impl ChunksItem for WithSubset {
fn store_config(&self) -> StoreConfig {
self.item.store.clone()
}
fn key(&self) -> &StoreKey {
&self.item.key
}
Expand Down
52 changes: 34 additions & 18 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ use unsafe_cell_slice::UnsafeCellSlice;
use utils::is_whole_chunk;
use zarrs::array::codec::{
ArrayPartialDecoderTraits, ArrayToBytesCodecTraits, CodecOptions, CodecOptionsBuilder,
StoragePartialDecoder,
};
use zarrs::array::{
copy_fill_value_into, update_array_bytes, ArrayBytes, ArrayBytesFixedDisjointView, ArraySize,
CodecChain, FillValue,
copy_fill_value_into, update_array_bytes, Array, ArrayBytes, ArrayBytesFixedDisjointView,
ArrayMetadata, ArraySize, CodecChain, FillValue,
};
use zarrs::array_subset::ArraySubset;
use zarrs::metadata::v3::MetadataV3;
use zarrs::storage::StoreKey;
use zarrs::storage::store::MemoryStore;
use zarrs::storage::{ReadableWritableListableStorage, StorageHandle, StoreKey};

mod chunk_item;
mod concurrency;
Expand All @@ -41,14 +42,14 @@ mod utils;
use crate::chunk_item::ChunksItem;
use crate::concurrency::ChunkConcurrentLimitAndCodecOptions;
use crate::metadata_v2::codec_metadata_v2_to_v3;
use crate::store::StoreManager;
use crate::store::StoreConfig;
use crate::utils::{PyErrExt as _, PyUntypedArrayExt as _};

// TODO: Use a OnceLock for store with get_or_try_init when stabilised?
#[gen_stub_pyclass]
#[pyclass]
pub struct CodecPipelineImpl {
pub(crate) stores: StoreManager,
pub(crate) store: ReadableWritableListableStorage,
pub(crate) codec_chain: Arc<CodecChain>,
pub(crate) codec_options: CodecOptions,
pub(crate) chunk_concurrent_minimum: usize,
Expand All @@ -63,7 +64,7 @@ impl CodecPipelineImpl {
codec_chain: &CodecChain,
codec_options: &CodecOptions,
) -> PyResult<ArrayBytes<'a>> {
let value_encoded = self.stores.get(item)?;
let value_encoded = self.store.get(item.key()).map_py_err::<PyRuntimeError>()?;
let value_decoded = if let Some(value_encoded) = value_encoded {
let value_encoded: Vec<u8> = value_encoded.into(); // zero-copy in this case
codec_chain
Expand Down Expand Up @@ -94,15 +95,17 @@ impl CodecPipelineImpl {
.map_py_err::<PyValueError>()?;

if value_decoded.is_fill_value(item.representation().fill_value()) {
self.stores.erase(item)
self.store.erase(item.key()).map_py_err::<PyRuntimeError>()
} else {
let value_encoded = codec_chain
.encode(value_decoded, item.representation(), codec_options)
.map(Cow::into_owned)
.map_py_err::<PyRuntimeError>()?;

// Store the encoded chunk
self.stores.set(item, value_encoded.into())
self.store
.set(item.key(), value_encoded.into())
.map_py_err::<PyRuntimeError>()
}
}

Expand Down Expand Up @@ -204,7 +207,8 @@ impl CodecPipelineImpl {
#[pymethods]
impl CodecPipelineImpl {
#[pyo3(signature = (
metadata,
array_metadata,
store_config,
*,
validate_checksums=None,
chunk_concurrent_minimum=None,
Expand All @@ -213,16 +217,21 @@ impl CodecPipelineImpl {
))]
#[new]
fn new(
metadata: &str,
array_metadata: &str,
store_config: StoreConfig,
validate_checksums: Option<bool>,
chunk_concurrent_minimum: Option<usize>,
chunk_concurrent_maximum: Option<usize>,
num_threads: Option<usize>,
) -> PyResult<Self> {
let metadata: Vec<MetadataV3> =
serde_json::from_str(metadata).map_py_err::<PyTypeError>()?;
let codec_chain =
Arc::new(CodecChain::from_metadata(&metadata).map_py_err::<PyTypeError>()?);
let metadata: ArrayMetadata =
serde_json::from_str(array_metadata).map_py_err::<PyTypeError>()?;

// TODO: Add a direct metadata -> codec chain method to zarrs
let store = Arc::new(MemoryStore::new());
let array = Array::new_with_metadata(store, "/", metadata).map_py_err::<PyTypeError>()?;
let codec_chain = Arc::new(array.codecs().clone());

let mut codec_options = CodecOptionsBuilder::new();
if let Some(validate_checksums) = validate_checksums {
codec_options = codec_options.validate_checksums(validate_checksums);
Expand All @@ -235,8 +244,11 @@ impl CodecPipelineImpl {
chunk_concurrent_maximum.unwrap_or(rayon::current_num_threads());
let num_threads = num_threads.unwrap_or(rayon::current_num_threads());

let store: ReadableWritableListableStorage =
(&store_config).try_into().map_py_err::<PyTypeError>()?;

Ok(Self {
stores: StoreManager::default(),
store,
codec_chain,
codec_options,
chunk_concurrent_minimum,
Expand Down Expand Up @@ -276,7 +288,9 @@ impl CodecPipelineImpl {
partial_chunk_descriptions,
map,
|item| {
let input_handle = self.stores.decoder(item)?;
let storage_handle = Arc::new(StorageHandle::new(self.store.clone()));
let input_handle =
StoragePartialDecoder::new(storage_handle, item.key().clone());
let partial_decoder = self
.codec_chain
.clone()
Expand Down Expand Up @@ -326,7 +340,9 @@ impl CodecPipelineImpl {
&& chunk_subset.shape() == item.representation().shape_u64()
{
// See zarrs::array::Array::retrieve_chunk_into
if let Some(chunk_encoded) = self.stores.get(&item)? {
if let Some(chunk_encoded) =
self.store.get(item.key()).map_py_err::<PyRuntimeError>()?
{
// Decode the encoded data into the output buffer
let chunk_encoded: Vec<u8> = chunk_encoded.into();
self.codec_chain.decode_into(
Expand Down
4 changes: 1 addition & 3 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,11 @@ use crate::{runtime::tokio_block_on, utils::PyErrExt};

mod filesystem;
mod http;
mod manager;

pub use self::filesystem::FilesystemStoreConfig;
pub use self::http::HttpStoreConfig;
pub(crate) use self::manager::StoreManager;

#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone)]
#[gen_stub_pyclass_enum]
pub enum StoreConfig {
Filesystem(FilesystemStoreConfig),
Expand Down
2 changes: 1 addition & 1 deletion src/store/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use zarrs::{filesystem::FilesystemStore, storage::ReadableWritableListableStorag

use crate::utils::PyErrExt;

#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone)]
#[gen_stub_pyclass]
#[pyclass]
pub struct FilesystemStoreConfig {
Expand Down
2 changes: 1 addition & 1 deletion src/store/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use zarrs::storage::ReadableWritableListableStorage;

use super::opendal_builder_to_sync_store;

#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Debug, Clone)]
#[gen_stub_pyclass]
#[pyclass]
pub struct HttpStoreConfig {
Expand Down
Loading
Loading