diff --git a/changes/828.feature.md b/changes/828.feature.md new file mode 100644 index 0000000000..cf64b187b1 --- /dev/null +++ b/changes/828.feature.md @@ -0,0 +1 @@ +`ZipStore` now supports deletes (`supports_deletes` is `True`). Because ZIP files cannot delete entries in place, `ZipStore.delete` and `ZipStore.delete_dir` remove keys by rewriting the archive without the deleted members; overwritten (duplicate) members are compacted as a side effect. diff --git a/src/zarr/storage/_zip.py b/src/zarr/storage/_zip.py index 430b0c3e2a..3e695321a8 100644 --- a/src/zarr/storage/_zip.py +++ b/src/zarr/storage/_zip.py @@ -2,6 +2,7 @@ import os import shutil +import tempfile import threading import time import zipfile @@ -18,7 +19,7 @@ from zarr.core.buffer import Buffer, BufferPrototype if TYPE_CHECKING: - from collections.abc import AsyncIterator, Iterable + from collections.abc import AsyncIterator, Callable, Iterable ZipStoreAccessModeLiteral = Literal["r", "w", "a"] @@ -55,7 +56,7 @@ class ZipStore(Store): """ supports_writes: bool = True - supports_deletes: bool = False + supports_deletes: bool = True supports_listing: bool = True path: Path @@ -229,21 +230,65 @@ async def set_if_not_exists(self, key: str, value: Buffer) -> None: if key not in members: self._set(key, value) + def _rewrite_without(self, should_delete: Callable[[str], bool]) -> None: + # Rewrite the archive, dropping every member for which ``should_delete`` + # returns True. ZIP files do not support in-place deletion, so the only + # way to remove an entry is to copy the surviving entries into a fresh + # archive (see issue #828). Duplicate members (created when a chunk is + # overwritten via ``writestr``) are compacted to their most recent value + # as a side effect, since that is what reads already return. + # + # This must be called while holding ``self._lock``. + members: dict[str, zipfile.ZipInfo] = {} + for info in self._zf.infolist(): + members[info.filename] = info # keep the last entry for each name + + to_delete = [name for name in members if should_delete(name)] + if not to_delete: + # nothing matched; leave the archive untouched + return + + fd, tmp_path = tempfile.mkstemp(dir=self.path.parent) + os.close(fd) + try: + with zipfile.ZipFile( + tmp_path, mode="w", compression=self.compression, allowZip64=self.allowZip64 + ) as new_zf: + for name, info in members.items(): + if should_delete(name): + continue + new_zf.writestr(info, self._zf.read(name)) + self._zf.close() + os.replace(tmp_path, self.path) + except BaseException: + if os.path.exists(tmp_path): + os.remove(tmp_path) + raise + + # Reopen in append mode so subsequent writes preserve the archive + # (the original mode may be "w"/"x", which would truncate or fail). + self._zf = zipfile.ZipFile( + self.path, mode="a", compression=self.compression, allowZip64=self.allowZip64 + ) + async def delete_dir(self, prefix: str) -> None: - # only raise NotImplementedError if any keys are found + # docstring inherited self._check_writable() + if not self._is_open: + self._sync_open() if prefix != "" and not prefix.endswith("/"): prefix += "/" - async for _ in self.list_prefix(prefix): - raise NotImplementedError + with self._lock: + self._rewrite_without(lambda name: name.startswith(prefix)) async def delete(self, key: str) -> None: # docstring inherited - # we choose to only raise NotImplementedError here if the key exists - # this allows the array/group APIs to avoid the overhead of existence checks + # deleting a missing key is a no-op, matching the other stores self._check_writable() - if await self.exists(key): - raise NotImplementedError + if not self._is_open: + self._sync_open() + with self._lock: + self._rewrite_without(lambda name: name == key) async def exists(self, key: str) -> bool: # docstring inherited diff --git a/tests/test_codecs/test_sharding.py b/tests/test_codecs/test_sharding.py index e62334f5d3..522ddd8dc8 100644 --- a/tests/test_codecs/test_sharding.py +++ b/tests/test_codecs/test_sharding.py @@ -645,6 +645,9 @@ def test_write_partial_sharded_chunks(store: Store) -> None: assert np.array_equal(a[0:16, 0:16], data) +# ZipStore overwrites shards by appending duplicate archive members (reads return +# the most recent), which zipfile reports via a "Duplicate name" UserWarning. +@pytest.mark.filterwarnings("ignore:Duplicate name:UserWarning") @pytest.mark.parametrize("store", ["local", "memory", "zip"], indirect=["store"]) async def test_delete_empty_shards(store: Store) -> None: if not store.supports_deletes: diff --git a/tests/test_store/test_zip.py b/tests/test_store/test_zip.py index ed69114b51..e015605c4c 100644 --- a/tests/test_store/test_zip.py +++ b/tests/test_store/test_zip.py @@ -84,6 +84,114 @@ def test_store_supports_writes(self, store: ZipStore) -> None: def test_store_supports_listing(self, store: ZipStore) -> None: assert store.supports_listing + def test_store_supports_deletes(self, store: ZipStore) -> None: + assert store.supports_deletes + + async def test_delete_compacts_duplicates(self, store: ZipStore) -> None: + # Overwriting a key leaves a duplicate member in the archive; deleting + # another key rewrites the archive and should compact the duplicates so + # the surviving key has a single, most-recent entry (issue #828). + await store.set("foo", cpu.Buffer.from_bytes(b"v1")) + with pytest.warns(UserWarning, match="Duplicate name: 'foo'"): + await store.set("foo", cpu.Buffer.from_bytes(b"v2")) + await store.set("bar", cpu.Buffer.from_bytes(b"bar")) + + await store.delete("bar") + + assert not await store.exists("bar") + assert store._zf.namelist().count("foo") == 1 + buf = await self.get(store, "foo") + assert buf.to_bytes() == b"v2" + + async def test_delete_then_set(self, store: ZipStore) -> None: + # after a delete (which reopens the archive) writes must still work + await store.set("foo", cpu.Buffer.from_bytes(b"foo")) + await store.delete("foo") + assert not await store.exists("foo") + await store.set("baz", cpu.Buffer.from_bytes(b"baz")) + buf = await self.get(store, "baz") + assert buf.to_bytes() == b"baz" + + async def test_delete_and_delete_dir_auto_open(self, tmp_path: Path) -> None: + # delete() and delete_dir() should auto-open the archive like _get/_set, + # rather than assuming the caller opened it first. + store = ZipStore(tmp_path / "del.zip", mode="w", read_only=False) + assert not store._is_open + await store.delete("missing") # exercises the auto-open branch in delete() + assert store._is_open + + store2 = ZipStore(tmp_path / "deldir.zip", mode="w", read_only=False) + assert not store2._is_open + await store2.delete_dir("missing") # auto-open branch in delete_dir() + assert store2._is_open + + async def test_delete_dir_prefix_already_normalized(self, store: ZipStore) -> None: + # a prefix that already ends in "/" must skip the slash-appending branch + await store.set("foo/zarr.json", cpu.Buffer.from_bytes(b"a")) + await store.set("foo/c/0", cpu.Buffer.from_bytes(b"b")) + await store.set("bar/zarr.json", cpu.Buffer.from_bytes(b"c")) + + await store.delete_dir("foo/") + + assert not await store.exists("foo/zarr.json") + assert not await store.exists("foo/c/0") + assert await store.exists("bar/zarr.json") + + async def test_delete_dir_empty_prefix_removes_all(self, store: ZipStore) -> None: + # an empty prefix also skips normalization and should remove everything + await store.set("a", cpu.Buffer.from_bytes(b"a")) + await store.set("b/c", cpu.Buffer.from_bytes(b"b")) + + await store.delete_dir("") + + assert not await store.exists("a") + assert not await store.exists("b/c") + assert store._zf.namelist() == [] + + async def test_delete_cleans_up_temp_on_failure( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + # if the rewrite fails (e.g. os.replace raises), the temporary archive + # must be removed and the original left untouched. + import zarr.storage._zip as zip_module + + store = ZipStore(tmp_path / "fail.zip", mode="w", read_only=False) + await store.set("foo", cpu.Buffer.from_bytes(b"v")) + + def boom(*args: Any, **kwargs: Any) -> None: + raise OSError("replace failed") + + monkeypatch.setattr(zip_module.os, "replace", boom) + + with pytest.raises(OSError, match="replace failed"): + await store.delete("foo") + + # no leftover temp file: only the original archive remains + assert set(os.listdir(tmp_path)) == {"fail.zip"} + + async def test_delete_failure_when_temp_already_removed( + self, tmp_path: Path, monkeypatch: pytest.MonkeyPatch + ) -> None: + # defensive cleanup branch: if the temp archive is already gone when the + # rewrite fails, cleanup is skipped and the original error still propagates. + import zarr.storage._zip as zip_module + + store = ZipStore(tmp_path / "fail2.zip", mode="w", read_only=False) + await store.set("foo", cpu.Buffer.from_bytes(b"v")) + + real_remove = zip_module.os.remove + + def replace_then_vanish(src: str, dst: str) -> None: + real_remove(src) # temp disappears before the except block runs + raise OSError("replace failed") + + monkeypatch.setattr(zip_module.os, "replace", replace_then_vanish) + + with pytest.raises(OSError, match="replace failed"): + await store.delete("foo") + + assert set(os.listdir(tmp_path)) == {"fail2.zip"} + # TODO: fix this warning @pytest.mark.filterwarnings("ignore:Unclosed client session:ResourceWarning") def test_api_integration(self, store: ZipStore) -> None: @@ -101,17 +209,19 @@ def test_api_integration(self, store: ZipStore) -> None: with pytest.warns(UserWarning, match="Duplicate name: 'foo/c/0/0'"): z[0, 0] = 100 - # TODO: assigning an entire chunk to fill value ends up deleting the chunk which is not supported - # a work around will be needed here. - with pytest.raises(NotImplementedError): - z[0:10, 0:10] = 99 + # assigning an entire chunk to the fill value deletes the chunk; + # ZipStore now supports deletes by rewriting the archive (issue #828) + z[0:10, 0:10] = 99 + expected = data.copy() + expected[0:10, 0:10] = 99 + assert np.array_equal(expected, z[:]) bar = root.create_group("bar", attributes={"hello": "world"}) assert "hello" in dict(bar.attrs) - # keys cannot be deleted - with pytest.raises(NotImplementedError): - del root["bar"] + # keys can now be deleted + del root["bar"] + assert "bar" not in root store.close()