Skip to content

Commit a82d047

Browse files
TomAugspurgerd-v-b
andauthored
Make AsyncArray.nchunks_initialized async (#2449)
* Make AsyncArray.nchunks_initialized async This changes the API of AysncArray.nchunks_initialized to change it from a property to an async function. The motivation here comes from 1. general cleanliness (a property access calling async functions doing I/O feels a bit wrong) 2. Work on Array.info, where I hit a strange error, I think from jumping from a - sync Array.info_complete -> - async AsyncArray.info_complete -> - sync AsyncArray.nchunks_initialzed -> - sync collect_aiterator (async list_prefix) With this change, we'll be able to jump from sync to async just once at the boundary. ``` File "/Users/tom/gh/zarr-developers/zarr-python/src/zarr/core/array.py", line 3011, in info_complete return sync(self._async_array.info_complete()) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/tom/gh/zarr-developers/zarr-python/src/zarr/core/sync.py", line 141, in sync raise return_result File "/Users/tom/gh/zarr-developers/zarr-python/src/zarr/core/sync.py", line 100, in _runner return await coro ^^^^^^^^^^ File "/Users/tom/gh/zarr-developers/zarr-python/src/zarr/core/array.py", line 1223, in info_complete "count_chunks_initialized": self.nchunks_initialized, # this should be async? ^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/tom/gh/zarr-developers/zarr-python/src/zarr/core/array.py", line 844, in nchunks_initialized return nchunks_initialized(self) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/tom/gh/zarr-developers/zarr-python/src/zarr/core/array.py", line 3035, in nchunks_initialized return len(chunks_initialized(array)) ^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/tom/gh/zarr-developers/zarr-python/src/zarr/core/array.py", line 3061, in chunks_initialized collect_aiterator(array.store_path.store.list_prefix(prefix=array.store_path.path)) File "/Users/tom/gh/zarr-developers/zarr-python/src/zarr/core/sync.py", line 178, in collect_aiterator return sync(_collect_aiterator(data)) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/Users/tom/gh/zarr-developers/zarr-python/src/zarr/core/sync.py", line 128, in sync raise SyncError("Calling sync() from within a running loop") zarr.core.sync.SyncError: Calling sync() from within a running loop ``` * fixup --------- Co-authored-by: Davis Bennett <[email protected]>
1 parent 0f56ac2 commit a82d047

File tree

2 files changed

+54
-51
lines changed

2 files changed

+54
-51
lines changed

src/zarr/core/array.py

Lines changed: 49 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@
7777
T_ArrayMetadata,
7878
)
7979
from zarr.core.metadata.v3 import parse_node_type_array
80-
from zarr.core.sync import collect_aiterator, sync
80+
from zarr.core.sync import sync
8181
from zarr.errors import MetadataValidationError
8282
from zarr.registry import get_pipeline_class
8383
from zarr.storage import StoreLike, make_store_path
@@ -829,17 +829,31 @@ def nchunks(self) -> int:
829829
"""
830830
return product(self.cdata_shape)
831831

832-
@property
833-
def nchunks_initialized(self) -> int:
832+
async def nchunks_initialized(self) -> int:
834833
"""
835-
The number of chunks that have been persisted in storage.
834+
Calculate the number of chunks that have been initialized, i.e. the number of chunks that have
835+
been persisted to the storage backend.
836836
837837
Returns
838838
-------
839-
int
840-
The number of initialized chunks in the array.
839+
nchunks_initialized : int
840+
The number of chunks that have been initialized.
841+
842+
Notes
843+
-----
844+
On :class:`AsyncArray` this is an asynchronous method, unlike the (synchronous)
845+
property :attr:`Array.nchunks_initialized`.
846+
847+
Examples
848+
--------
849+
>>> arr = await zarr.api.asynchronous.create(shape=(10,), chunks=(2,))
850+
>>> await arr.nchunks_initialized()
851+
0
852+
>>> await arr.setitem(slice(5), 1)
853+
>>> await arr.nchunks_initialized()
854+
3
841855
"""
842-
return nchunks_initialized(self)
856+
return len(await chunks_initialized(self))
843857

844858
def _iter_chunk_coords(
845859
self, *, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None
@@ -1492,9 +1506,29 @@ def nbytes(self) -> int:
14921506
@property
14931507
def nchunks_initialized(self) -> int:
14941508
"""
1495-
The number of chunks that have been initialized in the stored representation of this array.
1509+
Calculate the number of chunks that have been initialized, i.e. the number of chunks that have
1510+
been persisted to the storage backend.
1511+
1512+
Returns
1513+
-------
1514+
nchunks_initialized : int
1515+
The number of chunks that have been initialized.
1516+
1517+
Notes
1518+
-----
1519+
On :class:`Array` this is a (synchronous) property, unlike asynchronous function
1520+
:meth:`AsyncArray.nchunks_initialized`.
1521+
1522+
Examples
1523+
--------
1524+
>>> arr = await zarr.create(shape=(10,), chunks=(2,))
1525+
>>> arr.nchunks_initialized
1526+
0
1527+
>>> arr[:5] = 1
1528+
>>> arr.nchunks_initialized
1529+
3
14961530
"""
1497-
return self._async_array.nchunks_initialized
1531+
return sync(self._async_array.nchunks_initialized())
14981532

14991533
def _iter_chunk_keys(
15001534
self, origin: Sequence[int] | None = None, selection_shape: Sequence[int] | None = None
@@ -2905,39 +2939,15 @@ def info(self) -> None:
29052939
)
29062940

29072941

2908-
def nchunks_initialized(
2909-
array: AsyncArray[ArrayV2Metadata] | AsyncArray[ArrayV3Metadata] | Array,
2910-
) -> int:
2911-
"""
2912-
Calculate the number of chunks that have been initialized, i.e. the number of chunks that have
2913-
been persisted to the storage backend.
2914-
2915-
Parameters
2916-
----------
2917-
array : Array
2918-
The array to inspect.
2919-
2920-
Returns
2921-
-------
2922-
nchunks_initialized : int
2923-
The number of chunks that have been initialized.
2924-
2925-
See Also
2926-
--------
2927-
chunks_initialized
2928-
"""
2929-
return len(chunks_initialized(array))
2930-
2931-
2932-
def chunks_initialized(
2933-
array: Array | AsyncArray[ArrayV2Metadata] | AsyncArray[ArrayV3Metadata],
2942+
async def chunks_initialized(
2943+
array: AsyncArray[ArrayV2Metadata] | AsyncArray[ArrayV3Metadata],
29342944
) -> tuple[str, ...]:
29352945
"""
29362946
Return the keys of the chunks that have been persisted to the storage backend.
29372947
29382948
Parameters
29392949
----------
2940-
array : Array
2950+
array : AsyncArray
29412951
The array to inspect.
29422952
29432953
Returns
@@ -2950,10 +2960,9 @@ def chunks_initialized(
29502960
nchunks_initialized
29512961
29522962
"""
2953-
# TODO: make this compose with the underlying async iterator
2954-
store_contents = list(
2955-
collect_aiterator(array.store_path.store.list_prefix(prefix=array.store_path.path))
2956-
)
2963+
store_contents = [
2964+
x async for x in array.store_path.store.list_prefix(prefix=array.store_path.path)
2965+
]
29572966
return tuple(chunk_key for chunk_key in array._iter_chunk_keys() if chunk_key in store_contents)
29582967

29592968

tests/test_array.py

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ def test_nchunks(test_cls: type[Array] | type[AsyncArray[Any]], nchunks: int) ->
323323

324324

325325
@pytest.mark.parametrize("test_cls", [Array, AsyncArray[Any]])
326-
def test_nchunks_initialized(test_cls: type[Array] | type[AsyncArray[Any]]) -> None:
326+
async def test_nchunks_initialized(test_cls: type[Array] | type[AsyncArray[Any]]) -> None:
327327
"""
328328
Test that nchunks_initialized accurately returns the number of stored chunks.
329329
"""
@@ -337,7 +337,7 @@ def test_nchunks_initialized(test_cls: type[Array] | type[AsyncArray[Any]]) -> N
337337
if test_cls == Array:
338338
observed = arr.nchunks_initialized
339339
else:
340-
observed = arr._async_array.nchunks_initialized
340+
observed = await arr._async_array.nchunks_initialized()
341341
assert observed == expected
342342

343343
# delete chunks
@@ -346,13 +346,12 @@ def test_nchunks_initialized(test_cls: type[Array] | type[AsyncArray[Any]]) -> N
346346
if test_cls == Array:
347347
observed = arr.nchunks_initialized
348348
else:
349-
observed = arr._async_array.nchunks_initialized
349+
observed = await arr._async_array.nchunks_initialized()
350350
expected = arr.nchunks - idx - 1
351351
assert observed == expected
352352

353353

354-
@pytest.mark.parametrize("test_cls", [Array, AsyncArray[Any]])
355-
def test_chunks_initialized(test_cls: type[Array] | type[AsyncArray[Any]]) -> None:
354+
async def test_chunks_initialized() -> None:
356355
"""
357356
Test that chunks_initialized accurately returns the keys of stored chunks.
358357
"""
@@ -364,12 +363,7 @@ def test_chunks_initialized(test_cls: type[Array] | type[AsyncArray[Any]]) -> No
364363
)
365364
for keys, region in zip(chunks_accumulated, arr._iter_chunk_regions(), strict=False):
366365
arr[region] = 1
367-
368-
if test_cls == Array:
369-
observed = sorted(chunks_initialized(arr))
370-
else:
371-
observed = sorted(chunks_initialized(arr._async_array))
372-
366+
observed = sorted(await chunks_initialized(arr._async_array))
373367
expected = sorted(keys)
374368
assert observed == expected
375369

0 commit comments

Comments
 (0)