feat: object-store data cache (memory tier) + cache-affinity scheduling#4828
Open
schenksj wants to merge 5 commits into
Open
feat: object-store data cache (memory tier) + cache-affinity scheduling#4828schenksj wants to merge 5 commits into
schenksj wants to merge 5 commits into
Conversation
…uling Implements Phase 1 of the object-store data caching design (gap item #5): a block-aligned local data cache behind the object_store API, plus driver-side cache-affinity scheduling so repeat reads of a remote Parquet file route back to the executor that cached it. Two new storage-API-neutral workspace crates: - native/block-cache: cache core (SIEVE memory tier, single-flight miss dedup, coalesced fetch, ETag/version invalidation, set_memory_budget, metrics). Depends only on tokio/bytes/futures; a cargo-tree test guards it against object_store / opendal / Comet deps. - native/object-store-cache: CachingObjectStore wrapping dyn ObjectStore + the core. Routes get_ranges and plain bounded get_opts through the cache; passes everything else through and invalidates on put/delete/copy. Comet wiring: - Process-global BlockCache in a OnceLock, initialized from JNI configs on the first createPlan; prepare_object_store_with_configs wraps remote (non-file) stores. - spark.comet.scan.dataCache.* configs (default off), force-added across JNI. - CometFileLocalityManager (driver-side sticky file->host assignment, least-loaded balancing, fair-share rebalance on scale-up) wired into CometExecRDD.getPreferredLocations and CometNativeScanExec. Default off; memory tier only. SSD tier, unified-memory integration, opendal / Iceberg coverage, and readahead are deferred to later phases per the design. Tests: block-cache (11 integration + 3 unit + neutrality guard + doctest), object-store-cache (4 adapter tests), data_cache config parsing (3), CometFileLocalityManager (8 Scala unit tests). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The Preflight `check-suites.py` gate requires every *Suite.scala to be listed in both pr_build_linux.yml and pr_build_macos.yml. The new CometFileLocalityManagerSuite was missing, failing Preflight (which gates all downstream jobs). Add it to the `exec` bucket in both workflows. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Apply `cargo fmt` to the cache crates so `cargo fmt --all -- --check` (the CI Lint job) passes, and drop an unused `file` binding in a block-cache test. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The CI rust-test job runs `cargo clippy --all-targets --workspace -- -D warnings` and `cargo machete`: - cache.rs: pass the fetch range via `slice::from_ref` (single_range_in_vec_init). - cache_tests.rs: allow single_range_in_vec_init (intentional one-range test slices) and replace `&[r.clone()]` with `slice::from_ref(&r)`. - Drop the unused `log` dependency from block-cache and object-store-cache (the phase-1 memory tier has no logging; the SSD-tier logging arrives in the phase-2 PR). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
`.groupBy(identity).view.mapValues(_.size).toMap` is a Scala 2.13-only idiom, so the
suite failed to compile under Scala 2.12 (Spark 3.4/3.5 lint-java jobs). Replace both
occurrences with `.groupBy(identity).map { case (h, v) => (h, v.size) }`, which compiles
on 2.12 and 2.13. Verified with a `-Pspark-3.4` (Scala 2.12) test-compile.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Part of #4695 (Scan I/O acceleration proposal). This is Phase 1 of that work and does not close the issue — the SSD tier and read-ahead land in follow-ups.
Rationale for this change
Today every Comet scan re-fetches data bytes from S3/HDFS/ABFS — only parsed Parquet metadata is cached, and only per-plan (Comet builds a fresh
RuntimeEnvper plan, so nothing survives across tasks, let alone across queries). A hot table scanned by ten queries pays ten full rounds of GETs. This is one of the two big I/O wins called out in #4695 (the other being read-ahead).This PR adds a process-wide, block-aligned memory cache of the raw bytes behind the
object_storeAPI, plus the driver-side cache-affinity scheduling that makes it effective beyond a single executor: with K executors and Spark's locality-blind scheduling of S3-backed scans, each host's private cache would otherwise see only ~1/Kof the re-reads. Default off.Design document:
docs/design/OBJECT_STORE_CACHE_DESIGN.md(on a reference branch; the code comments cite it by section, e.g. §2.7 / §2.10).Why this is split into phases
The full design is landing as two stacked PRs, split at the memory/SSD boundary purely to keep each one reviewable:
object_storewrapper, Comet wiring, and cache-affinity scheduling.UnifiedMemoryManager, and a real-S3 benchmark. It stacks on this PR; the memory-tier core already ships theset_memory_budgetcontract on day one so the phase-2 unified-memory work is purely additive. For reviewer reference, the Phase 2 change is staged on the author's fork at schenksj/datafusion-comet#16 (based on this branch, so its diff shows only the Phase 2 delta); it will be opened here once this PR lands.Keeping the on-disk region format and the
UnifiedMemoryManagerintegration out of this PR means Phase 1 stays focused on the core + wiring that everything else builds on.What changes are included in this PR?
Two new storage-API-neutral workspace crates
native/block-cache— the cache core: SIEVE memory tier, single-flight miss dedup, coalesced miss fetch, ETag/version invalidation (immutable-file workloads),set_memory_budget(phase-2 contract), metrics. Depends only ontokio/bytes/futures; acargo treetest guards it againstobject_store/opendal/Comet deps.native/object-store-cache—CachingObjectStorewrappingdyn ObjectStore+ the core. Routesget_rangesand plain boundedget_optsthrough the cache; passes everything else through and invalidates onput/delete/copy. The only crate that touches theobject_storetrait.Comet wiring
BlockCachein aOnceLock, initialized from JNI configs on the firstcreatePlan;prepare_object_store_with_configswraps remote (non-file) stores.spark.comet.scan.dataCache.*configs (default off), force-added across JNI so defaults reach native code.CometFileLocalityManager— driver-side stickyfile -> hostassignment, least-loaded balancing, fair-share rebalance on scale-up — wired intoCometExecRDD.getPreferredLocationsandCometNativeScanExec.Deferred to later phases per the design: SSD tier, unified storage-memory integration, opendal/Iceberg coverage, readahead.
Scope — Comet-core scans only (Parquet). The cache wraps the
object_storestack used by Comet's native scan, so this PR — and the Phase 2 follow-up — accelerate Comet-core data sources such as Parquet. Iceberg and Delta native scans are not yet routed through the cache (they read data on their own paths, e.g. via opendal); wiring them in will require a dedicated future follow-up. Until then those scans read uncached, exactly as today.How are these changes tested?
block-cache: 11 integration + 3 unit tests + a storage-neutrality guard (cargo treefails the build if a forbidden dep creeps in) + doctest.object-store-cache: 4 adapter tests (zero-refetch, byte-exactness vs. the unwrapped store, conditional passthrough, put invalidation).data_cacheconfig parsing: 3 tests.CometFileLocalityManager: 8 Scala unit tests (sticky reassignment, least-loaded, lost host, fair-share rebalance, prune).Native cache crates:
cargo test -p datafusion-comet-block-cache -p datafusion-comet-object-store-cache. Scala built with JDK17 +-Pspark-4.1.Benchmark
Cold-vs-warm cache measured with
CometDataCacheBenchmarkagainst real S3. Each scenario clears the cache for a genuine cold read, then times warm runs (median of 4).ColdFetch/WarmFetchare bytes read from S3 per run (native counter viagetDataCacheStats);WarmFetch ≈ 0confirms reads are served from the cache, not the network. These runs fit within the memory budget, so it is this PR's memory tier serving the warm reads. (The benchmark harness itself ships with Phase 2, alongside the SSD tier it also exercises.)Setup: MacBook Air,
local[4], S3 over the public internet on commercial broadband. WAN-to-S3 inflates cold times vs. a co-located in-region executor, so the absolute numbers/speedups are illustrative rather than datacenter-representative.Incompressible data (random doubles, ~0.5 GB on disk):
Compressible data (id-derived; Parquet dictionary/RLE crushes the file to ~8 MB):
Takeaways:
🤖 Generated with Claude Code