Description
What is your issue?
Hi,
I'm trying to work with creating sharded Zarr V3 datasets with Xarray. Now that Zarr-Python V3 is out, and Xarray supports it, I've tried making a few. I've previously brought this up here in the discussions. There seems to be an issue though. With the latest version of Xarray and Zarr, the below fails to write to Zarr with the error ValueError: unexpected encoding parameters for zarr backend: ['shards']
.
import xarray as xr
import numpy as np
ds = xr.Dataset(
{
"a": (("x", "y"), np.random.rand(16, 16)),
"b": (("x", "y"), np.random.rand(16, 16)),
},
coords={"x": list(range(16)), "y": list(range(16))},
)
ds = ds.chunk({"x": 4, "y": 4})
print(ds)
encoding = {
v: {"shards": (16, 16)}
for v in ds.data_vars}
print(encoding)
ds.to_zarr("test.zarr", mode="w", encoding=encoding, zarr_format=3, compute=True)
ds = xr.open_zarr("test.zarr")
print(ds)
There does seem to be a fairly straightforward fix though, of adding shards
to here:
xarray/xarray/backends/zarr.py
Lines 449 to 457 in 5279bd1
Once that is added, the above code snippet writes the sharded Zarr successfully, and can be read successfully back.
Digging into it a bit more, even with the change in #9948, the below will fail on checksum when writing with larger shard and chunk sizes.
import xarray as xr
import numpy as np
import zarr
shard_size = 1000
ds = xr.Dataset(
{
"a": (("time", "x", "y"), np.random.rand(10, shard_size, shard_size)),
"b": (("time", "x", "y"), np.random.rand(10, shard_size, shard_size)),
},
coords={"time": list(range(10)), "x": list(range(shard_size)), "y": list(range(shard_size))},
)
ds = ds.chunk({"time": 1, "x": 100, "y": 100})
encoding = {
v: {"compressors": zarr.codecs.BloscCodec(cname='zstd', clevel=9, shuffle=zarr.codecs.BloscShuffle.bitshuffle), "shards": (1, shard_size, shard_size)}
for v in ds.data_vars}
print(ds)
ds.to_zarr("test2.zarr", mode="w", encoding=encoding, zarr_format=3, compute=True)
ds = xr.open_zarr("test2.zarr")
print(ds)
Traceback:
Traceback (most recent call last):
File "/Users/jacob/Development/planetary-datasets/min_shard.py", line 48, in <module>
ds.to_zarr("test2.zarr", mode="w", encoding=encoding, zarr_format=3, compute=True)
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/xarray/core/dataset.py", line 2622, in to_zarr
return to_zarr( # type: ignore[call-overload,misc]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/xarray/backends/api.py", line 2217, in to_zarr
writes = writer.sync(
^^^^^^^^^^^^
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/xarray/backends/common.py", line 358, in sync
delayed_store = chunkmanager.store(
^^^^^^^^^^^^^^^^^^^
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/xarray/namedarray/daskmanager.py", line 247, in store
return store(
^^^^^^
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/dask/array/core.py", line 1282, in store
compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/dask/base.py", line 397, in compute_as_if_collection
return schedule(dsk2, keys, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/dask/threaded.py", line 91, in get
results = get_async(
^^^^^^^^^^
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/dask/local.py", line 516, in get_async
raise_exception(exc, tb)
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/dask/local.py", line 324, in reraise
raise exc
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/dask/local.py", line 229, in execute_task
result = task(data)
^^^^^^^^^^
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/dask/_task_spec.py", line 651, in __call__
return self.func(*new_argspec)
^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/dask/array/core.py", line 4575, in store_chunk
return load_store_chunk(x, out, index, lock, return_stored, False)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/dask/array/core.py", line 4557, in load_store_chunk
out[index] = x
~~~^^^^^^^
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/core/array.py", line 2464, in __setitem__
self.set_orthogonal_selection(pure_selection, value, fields=fields)
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/_compat.py", line 43, in inner_f
return f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/core/array.py", line 2920, in set_orthogonal_selection
return sync(
^^^^^
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/core/sync.py", line 142, in sync
raise return_result
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/core/sync.py", line 98, in _runner
return await coro
^^^^^^^^^^
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/core/array.py", line 1354, in _set_selection
await self.codec_pipeline.write(
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/core/codec_pipeline.py", line 468, in write
await concurrent_map(
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/core/common.py", line 68, in concurrent_map
return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/core/common.py", line 66, in run
return await func(*item)
^^^^^^^^^^^^^^^^^
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/core/codec_pipeline.py", line 344, in write_batch
await self.encode_partial_batch(
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/core/codec_pipeline.py", line 229, in encode_partial_batch
await self.array_bytes_codec.encode_partial(batch_info)
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/abc/codec.py", line 235, in encode_partial
await concurrent_map(
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/core/common.py", line 68, in concurrent_map
return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/core/common.py", line 66, in run
return await func(*item)
^^^^^^^^^^^^^^^^^
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/codecs/sharding.py", line 582, in _encode_partial_single
await self._load_full_shard_maybe(
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/codecs/sharding.py", line 730, in _load_full_shard_maybe
await _ShardReader.from_bytes(shard_bytes, self, chunks_per_shard)
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/codecs/sharding.py", line 194, in from_bytes
obj.index = await codec._decode_shard_index(shard_index_bytes, chunks_per_shard)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/codecs/sharding.py", line 632, in _decode_shard_index
await get_pipeline_class()
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/core/codec_pipeline.py", line 435, in decode
output.extend(await self.decode_batch(batch_info))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/core/codec_pipeline.py", line 172, in decode_batch
chunk_bytes_batch = await bb_codec.decode(
^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/abc/codec.py", line 129, in decode
return await _batching_helper(self._decode_single, chunks_and_specs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/abc/codec.py", line 407, in _batching_helper
return await concurrent_map(
^^^^^^^^^^^^^^^^^^^^^
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/core/common.py", line 68, in concurrent_map
return await asyncio.gather(*[asyncio.ensure_future(run(item)) for item in items])
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/core/common.py", line 66, in run
return await func(*item)
^^^^^^^^^^^^^^^^^
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/abc/codec.py", line 420, in wrap
return await func(chunk, chunk_spec)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/jacob/.conda/envs/pc/lib/python3.12/site-packages/zarr/codecs/crc32c_.py", line 46, in _decode_single
raise ValueError(
ValueError: Stored and computed checksum do not match. Stored: b'\x7f\x7f(o'. Computed: b'\xb0\xec\xbe6'.