Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ classifiers = [
dynamic = ["version"]
dependencies = [
"numpy>=1.24",
"zarr>=3.0.3,<3.1",
"zarr @ git+https://github.com/zarr-developers/zarr-python",
]

[dependency-groups]
Expand Down
3 changes: 2 additions & 1 deletion python/zarrs/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ class Basic:
class CodecPipelineImpl:
def __new__(
cls,
metadata: builtins.str,
array_metadata: builtins.str,
store_config: StoreConfig,
*,
validate_checksums: builtins.bool | None = None,
store_empty_chunks: builtins.bool | None = None,
Expand Down
72 changes: 45 additions & 27 deletions python/zarrs/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,28 @@

import asyncio
import json
import re
from dataclasses import dataclass
from typing import TYPE_CHECKING, TypedDict
from warnings import warn

import numpy as np
from zarr.abc.codec import Codec, CodecPipeline
from zarr.codecs._v2 import V2Codec
from zarr.core import BatchedCodecPipeline
from zarr.core.config import config
from zarr.core.metadata import ArrayMetadata, ArrayV2Metadata, ArrayV3Metadata

if TYPE_CHECKING:
from collections.abc import Generator, Iterable, Iterator
from typing import Any, Self

from zarr.abc.store import ByteGetter, ByteSetter
from zarr.abc.store import ByteGetter, ByteSetter, Store
from zarr.core.array_spec import ArraySpec
from zarr.core.buffer import Buffer, NDArrayLike, NDBuffer
from zarr.core.chunk_grids import ChunkGrid
from zarr.core.common import ChunkCoords
from zarr.core.indexing import SelectorTuple
from zarr.dtype import ZDType

from ._internal import CodecPipelineImpl, codec_metadata_v2_to_v3
from .utils import (
Expand All @@ -39,10 +42,14 @@ class UnsupportedMetadataError(Exception):
pass


def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl | None:
def get_codec_pipeline_impl(
metadata: ArrayMetadata, store: Store
) -> CodecPipelineImpl | None:
try:
array_metadata_json = json.dumps(metadata.to_dict())
return CodecPipelineImpl(
codec_metadata_json,
array_metadata_json,
store_config=store,
validate_checksums=config.get("codec_pipeline.validate_checksums", None),
store_empty_chunks=config.get("array.write_empty_chunks", None),
chunk_concurrent_minimum=config.get(
Expand All @@ -54,10 +61,11 @@ def get_codec_pipeline_impl(codec_metadata_json: str) -> CodecPipelineImpl | Non
num_threads=config.get("threading.max_workers", None),
)
except TypeError as e:
if re.match(r"codec (delta|zlib) is not supported", str(e)):
return None
else:
raise e
warn(
f"Array is unsupported by ZarrsCodecPipeline: {e}",
category=UserWarning,
)
return None


def codecs_to_dict(codecs: Iterable[Codec]) -> Generator[dict[str, Any], None, None]:
Expand Down Expand Up @@ -88,37 +96,47 @@ class ZarrsCodecPipelineState(TypedDict):
codecs: tuple[Codec, ...]


def array_metadata_to_codecs(metadata: ArrayMetadata) -> list[Codec]:
if isinstance(metadata, ArrayV3Metadata):
return metadata.codecs
elif isinstance(metadata, ArrayV2Metadata):
v2_codec = V2Codec(filters=metadata.filters, compressor=metadata.compressor)
return [v2_codec]


@dataclass
class ZarrsCodecPipeline(CodecPipeline):
codecs: tuple[Codec, ...]
metadata: ArrayMetadata
store: Store
impl: CodecPipelineImpl | None
codec_metadata_json: str
python_impl: BatchedCodecPipeline

def __getstate__(self) -> ZarrsCodecPipelineState:
return {"codec_metadata_json": self.codec_metadata_json, "codecs": self.codecs}
return {"metadata": self.metadata, "store": self.store}

def __setstate__(self, state: ZarrsCodecPipelineState):
self.codecs = state["codecs"]
self.codec_metadata_json = state["codec_metadata_json"]
self.impl = get_codec_pipeline_impl(self.codec_metadata_json)
self.python_impl = BatchedCodecPipeline.from_codecs(self.codecs)
self.metadata = state["metadata"]
self.store = state["store"]
self.impl = get_codec_pipeline_impl(self.metadata, self.store)
codecs = array_metadata_to_codecs(self.metadata)
self.python_impl = BatchedCodecPipeline.from_codecs(codecs)

def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
raise NotImplementedError("evolve_from_array_spec")
return self

@classmethod
def from_codecs(cls, codecs: Iterable[Codec]) -> Self:
codec_metadata = list(codecs_to_dict(codecs))
codec_metadata_json = json.dumps(codec_metadata)
# TODO: upstream zarr-python has not settled on how to deal with configs yet
# Should they be checked when an array is created, or when an operation is performed?
# https://github.com/zarr-developers/zarr-python/issues/2409
# https://github.com/zarr-developers/zarr-python/pull/2429#issuecomment-2566976567
return BatchedCodecPipeline.from_codecs(codecs)

@classmethod
def from_array_metadata_and_store(
cls, array_metadata: ArrayMetadata, store: Store
) -> Self:
codecs = array_metadata_to_codecs(array_metadata)
return cls(
codec_metadata_json=codec_metadata_json,
codecs=tuple(codecs),
impl=get_codec_pipeline_impl(codec_metadata_json),
metadata=array_metadata,
store=store,
impl=get_codec_pipeline_impl(array_metadata, store),
python_impl=BatchedCodecPipeline.from_codecs(codecs),
)

Expand All @@ -134,7 +152,7 @@ def __iter__(self) -> Iterator[Codec]:
yield from self.codecs

def validate(
self, *, shape: ChunkCoords, dtype: np.dtype[Any], chunk_grid: ChunkGrid
self, *, shape: ChunkCoords, dtype: ZDType, chunk_grid: ChunkGrid
) -> None:
raise NotImplementedError("validate")

Expand Down Expand Up @@ -236,7 +254,7 @@ def _raise_error_on_unsupported_batch_dtype(
# https://github.com/LDeakin/zarrs/blob/0532fe983b7b42b59dbf84e50a2fe5e6f7bad4ce/zarrs_metadata/src/v2_to_v3.rs#L289-L293 for VSUMm
# Further, our pipeline does not support variable-length objects due to limitations on decode_into, so object/np.dtypes.StringDType is also out
if any(
info.dtype.kind in {"V", "S", "U", "M", "m", "O", "T"}
info.dtype.to_native_dtype().kind in {"V", "S", "U", "M", "m", "O", "T"}
for (_, info, _, _, _) in batch_info
):
raise UnsupportedDataTypeError()
6 changes: 3 additions & 3 deletions python/zarrs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import numpy as np
from zarr.core.array_spec import ArraySpec
from zarr.core.indexing import SelectorTuple, is_integer
from zarr.core.metadata.v2 import _default_fill_value

from zarrs._internal import Basic, WithSubset

Expand All @@ -17,6 +16,7 @@
from types import EllipsisType

from zarr.abc.store import ByteGetter, ByteSetter
from zarr.dtype import ZDType


# adapted from https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ThreadPoolExecutor
Expand Down Expand Up @@ -139,9 +139,9 @@ def get_shape_for_selector(
return resulting_shape_from_index(shape, selector_tuple, drop_axes, pad=pad)


def get_implicit_fill_value(dtype: np.dtype, fill_value: Any) -> Any:
def get_implicit_fill_value(dtype: ZDType, fill_value: Any) -> Any:
if fill_value is None:
fill_value = _default_fill_value(dtype)
fill_value = dtype.default_scalar()
return fill_value


Expand Down
14 changes: 3 additions & 11 deletions src/chunk_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ use zarrs::{
storage::StoreKey,
};

use crate::{store::StoreConfig, utils::PyErrExt};
use crate::utils::PyErrExt;

pub(crate) trait ChunksItem {
fn store_config(&self) -> StoreConfig;
fn key(&self) -> &StoreKey;
fn representation(&self) -> &ChunkRepresentation;
}
Expand All @@ -26,7 +25,6 @@ pub(crate) trait ChunksItem {
#[gen_stub_pyclass]
#[pyclass]
pub(crate) struct Basic {
store: StoreConfig,
key: StoreKey,
representation: ChunkRepresentation,
}
Expand Down Expand Up @@ -62,12 +60,13 @@ fn fill_value_to_bytes(dtype: &str, fill_value: &Bound<'_, PyAny>) -> PyResult<V
impl Basic {
#[new]
fn new(byte_interface: &Bound<'_, PyAny>, chunk_spec: &Bound<'_, PyAny>) -> PyResult<Self> {
let store: StoreConfig = byte_interface.getattr("store")?.extract()?;
// let store: StoreConfig = byte_interface.getattr("store")?.extract()?;
let path: String = byte_interface.getattr("path")?.extract()?;

let chunk_shape = chunk_spec.getattr("shape")?.extract()?;
let mut dtype: String = chunk_spec
.getattr("dtype")?
.call_method0("to_native_dtype")?
.call_method0("__str__")?
.extract()?;
if dtype == "object" {
Expand All @@ -78,7 +77,6 @@ impl Basic {
let fill_value: Bound<'_, PyAny> = chunk_spec.getattr("fill_value")?;
let fill_value_bytes = fill_value_to_bytes(&dtype, &fill_value)?;
Ok(Self {
store,
key: StoreKey::new(path).map_py_err::<PyValueError>()?,
representation: get_chunk_representation(chunk_shape, &dtype, fill_value_bytes)?,
})
Expand Down Expand Up @@ -117,9 +115,6 @@ impl WithSubset {
}

impl ChunksItem for Basic {
fn store_config(&self) -> StoreConfig {
self.store.clone()
}
fn key(&self) -> &StoreKey {
&self.key
}
Expand All @@ -129,9 +124,6 @@ impl ChunksItem for Basic {
}

impl ChunksItem for WithSubset {
fn store_config(&self) -> StoreConfig {
self.item.store.clone()
}
fn key(&self) -> &StoreKey {
&self.item.key
}
Expand Down
52 changes: 34 additions & 18 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ use unsafe_cell_slice::UnsafeCellSlice;
use utils::is_whole_chunk;
use zarrs::array::codec::{
ArrayPartialDecoderTraits, ArrayToBytesCodecTraits, CodecOptions, CodecOptionsBuilder,
StoragePartialDecoder,
};
use zarrs::array::{
copy_fill_value_into, update_array_bytes, ArrayBytes, ArrayBytesFixedDisjointView, ArraySize,
CodecChain, FillValue,
copy_fill_value_into, update_array_bytes, Array, ArrayBytes, ArrayBytesFixedDisjointView,
ArrayMetadata, ArraySize, CodecChain, FillValue,
};
use zarrs::array_subset::ArraySubset;
use zarrs::metadata::v3::MetadataV3;
use zarrs::storage::StoreKey;
use zarrs::storage::store::MemoryStore;
use zarrs::storage::{ReadableWritableListableStorage, StorageHandle, StoreKey};

mod chunk_item;
mod concurrency;
Expand All @@ -41,14 +42,14 @@ mod utils;
use crate::chunk_item::ChunksItem;
use crate::concurrency::ChunkConcurrentLimitAndCodecOptions;
use crate::metadata_v2::codec_metadata_v2_to_v3;
use crate::store::StoreManager;
use crate::store::StoreConfig;
use crate::utils::{PyErrExt as _, PyUntypedArrayExt as _};

// TODO: Use a OnceLock for store with get_or_try_init when stabilised?
#[gen_stub_pyclass]
#[pyclass]
pub struct CodecPipelineImpl {
pub(crate) stores: StoreManager,
pub(crate) store: ReadableWritableListableStorage,
pub(crate) codec_chain: Arc<CodecChain>,
pub(crate) codec_options: CodecOptions,
pub(crate) chunk_concurrent_minimum: usize,
Expand All @@ -63,7 +64,7 @@ impl CodecPipelineImpl {
codec_chain: &CodecChain,
codec_options: &CodecOptions,
) -> PyResult<ArrayBytes<'a>> {
let value_encoded = self.stores.get(item)?;
let value_encoded = self.store.get(item.key()).map_py_err::<PyRuntimeError>()?;
let value_decoded = if let Some(value_encoded) = value_encoded {
let value_encoded: Vec<u8> = value_encoded.into(); // zero-copy in this case
codec_chain
Expand Down Expand Up @@ -94,15 +95,17 @@ impl CodecPipelineImpl {
.map_py_err::<PyValueError>()?;

if value_decoded.is_fill_value(item.representation().fill_value()) {
self.stores.erase(item)
self.store.erase(item.key()).map_py_err::<PyRuntimeError>()
} else {
let value_encoded = codec_chain
.encode(value_decoded, item.representation(), codec_options)
.map(Cow::into_owned)
.map_py_err::<PyRuntimeError>()?;

// Store the encoded chunk
self.stores.set(item, value_encoded.into())
self.store
.set(item.key(), value_encoded.into())
.map_py_err::<PyRuntimeError>()
}
}

Expand Down Expand Up @@ -204,7 +207,8 @@ impl CodecPipelineImpl {
#[pymethods]
impl CodecPipelineImpl {
#[pyo3(signature = (
metadata,
array_metadata,
store_config,
*,
validate_checksums=None,
store_empty_chunks=None,
Expand All @@ -214,17 +218,22 @@ impl CodecPipelineImpl {
))]
#[new]
fn new(
metadata: &str,
array_metadata: &str,
store_config: StoreConfig,
validate_checksums: Option<bool>,
store_empty_chunks: Option<bool>,
chunk_concurrent_minimum: Option<usize>,
chunk_concurrent_maximum: Option<usize>,
num_threads: Option<usize>,
) -> PyResult<Self> {
let metadata: Vec<MetadataV3> =
serde_json::from_str(metadata).map_py_err::<PyTypeError>()?;
let codec_chain =
Arc::new(CodecChain::from_metadata(&metadata).map_py_err::<PyTypeError>()?);
let metadata: ArrayMetadata =
serde_json::from_str(array_metadata).map_py_err::<PyTypeError>()?;

// TODO: Add a direct metadata -> codec chain method to zarrs
let store = Arc::new(MemoryStore::new());
let array = Array::new_with_metadata(store, "/", metadata).map_py_err::<PyTypeError>()?;
let codec_chain = Arc::new(array.codecs().clone());

let mut codec_options = CodecOptionsBuilder::new();
if let Some(validate_checksums) = validate_checksums {
codec_options = codec_options.validate_checksums(validate_checksums);
Expand All @@ -240,8 +249,11 @@ impl CodecPipelineImpl {
chunk_concurrent_maximum.unwrap_or(rayon::current_num_threads());
let num_threads = num_threads.unwrap_or(rayon::current_num_threads());

let store: ReadableWritableListableStorage =
(&store_config).try_into().map_py_err::<PyTypeError>()?;

Ok(Self {
stores: StoreManager::default(),
store,
codec_chain,
codec_options,
chunk_concurrent_minimum,
Expand Down Expand Up @@ -281,7 +293,9 @@ impl CodecPipelineImpl {
partial_chunk_descriptions,
map,
|item| {
let input_handle = self.stores.decoder(item)?;
let storage_handle = Arc::new(StorageHandle::new(self.store.clone()));
let input_handle =
StoragePartialDecoder::new(storage_handle, item.key().clone());
let partial_decoder = self
.codec_chain
.clone()
Expand Down Expand Up @@ -331,7 +345,9 @@ impl CodecPipelineImpl {
&& chunk_subset.shape() == item.representation().shape_u64()
{
// See zarrs::array::Array::retrieve_chunk_into
if let Some(chunk_encoded) = self.stores.get(&item)? {
if let Some(chunk_encoded) =
self.store.get(item.key()).map_py_err::<PyRuntimeError>()?
{
// Decode the encoded data into the output buffer
let chunk_encoded: Vec<u8> = chunk_encoded.into();
self.codec_chain.decode_into(
Expand Down
Loading
Loading