Skip to content

feat: object-store data cache (memory tier) + cache-affinity scheduling#4828

Open
schenksj wants to merge 5 commits into
apache:mainfrom
schenksj:feat/object-store-cache
Open

feat: object-store data cache (memory tier) + cache-affinity scheduling#4828
schenksj wants to merge 5 commits into
apache:mainfrom
schenksj:feat/object-store-cache

Conversation

@schenksj

@schenksj schenksj commented Jul 5, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Part of #4695 (Scan I/O acceleration proposal). This is Phase 1 of that work and does not close the issue — the SSD tier and read-ahead land in follow-ups.

Rationale for this change

Today every Comet scan re-fetches data bytes from S3/HDFS/ABFS — only parsed Parquet metadata is cached, and only per-plan (Comet builds a fresh RuntimeEnv per plan, so nothing survives across tasks, let alone across queries). A hot table scanned by ten queries pays ten full rounds of GETs. This is one of the two big I/O wins called out in #4695 (the other being read-ahead).

This PR adds a process-wide, block-aligned memory cache of the raw bytes behind the object_store API, plus the driver-side cache-affinity scheduling that makes it effective beyond a single executor: with K executors and Spark's locality-blind scheduling of S3-backed scans, each host's private cache would otherwise see only ~1/K of the re-reads. Default off.

Design document: docs/design/OBJECT_STORE_CACHE_DESIGN.md (on a reference branch; the code comments cite it by section, e.g. §2.7 / §2.10).

Why this is split into phases

The full design is landing as two stacked PRs, split at the memory/SSD boundary purely to keep each one reviewable:

  • Phase 1 — this PR: the storage-neutral cache core (memory tier), the object_store wrapper, Comet wiring, and cache-affinity scheduling.
  • Phase 2 — follow-up (not yet opened here): the on-disk SSD tier, optional unified off-heap storage-memory accounting via Spark's UnifiedMemoryManager, and a real-S3 benchmark. It stacks on this PR; the memory-tier core already ships the set_memory_budget contract on day one so the phase-2 unified-memory work is purely additive. For reviewer reference, the Phase 2 change is staged on the author's fork at schenksj/datafusion-comet#16 (based on this branch, so its diff shows only the Phase 2 delta); it will be opened here once this PR lands.

Keeping the on-disk region format and the UnifiedMemoryManager integration out of this PR means Phase 1 stays focused on the core + wiring that everything else builds on.

What changes are included in this PR?

Two new storage-API-neutral workspace crates

  • native/block-cache — the cache core: SIEVE memory tier, single-flight miss dedup, coalesced miss fetch, ETag/version invalidation (immutable-file workloads), set_memory_budget (phase-2 contract), metrics. Depends only on tokio/bytes/futures; a cargo tree test guards it against object_store/opendal/Comet deps.
  • native/object-store-cacheCachingObjectStore wrapping dyn ObjectStore + the core. Routes get_ranges and plain bounded get_opts through the cache; passes everything else through and invalidates on put/delete/copy. The only crate that touches the object_store trait.

Comet wiring

  • Process-global BlockCache in a OnceLock, initialized from JNI configs on the first createPlan; prepare_object_store_with_configs wraps remote (non-file) stores.
  • spark.comet.scan.dataCache.* configs (default off), force-added across JNI so defaults reach native code.
  • CometFileLocalityManager — driver-side sticky file -> host assignment, least-loaded balancing, fair-share rebalance on scale-up — wired into CometExecRDD.getPreferredLocations and CometNativeScanExec.

Deferred to later phases per the design: SSD tier, unified storage-memory integration, opendal/Iceberg coverage, readahead.

Scope — Comet-core scans only (Parquet). The cache wraps the object_store stack used by Comet's native scan, so this PR — and the Phase 2 follow-up — accelerate Comet-core data sources such as Parquet. Iceberg and Delta native scans are not yet routed through the cache (they read data on their own paths, e.g. via opendal); wiring them in will require a dedicated future follow-up. Until then those scans read uncached, exactly as today.

How are these changes tested?

  • block-cache: 11 integration + 3 unit tests + a storage-neutrality guard (cargo tree fails the build if a forbidden dep creeps in) + doctest.
  • object-store-cache: 4 adapter tests (zero-refetch, byte-exactness vs. the unwrapped store, conditional passthrough, put invalidation).
  • native data_cache config parsing: 3 tests.
  • CometFileLocalityManager: 8 Scala unit tests (sticky reassignment, least-loaded, lost host, fair-share rebalance, prune).

Native cache crates: cargo test -p datafusion-comet-block-cache -p datafusion-comet-object-store-cache. Scala built with JDK17 + -Pspark-4.1.

Benchmark

Cold-vs-warm cache measured with CometDataCacheBenchmark against real S3. Each scenario clears the cache for a genuine cold read, then times warm runs (median of 4). ColdFetch/WarmFetch are bytes read from S3 per run (native counter via getDataCacheStats); WarmFetch ≈ 0 confirms reads are served from the cache, not the network. These runs fit within the memory budget, so it is this PR's memory tier serving the warm reads. (The benchmark harness itself ships with Phase 2, alongside the SSD tier it also exercises.)

Setup: MacBook Air, local[4], S3 over the public internet on commercial broadband. WAN-to-S3 inflates cold times vs. a co-located in-region executor, so the absolute numbers/speedups are illustrative rather than datacenter-representative.

Incompressible data (random doubles, ~0.5 GB on disk):

Category Scenario                  Cold(ms)  Warm(ms)  Speedup  ColdFetch  WarmFetch
------------------------------------------------------------------------------------
scan     full scan (16 cols)          30519       503    60.7x    514.8MB      0.0MB
scan     projection (1 col)            3594       147    24.4x     66.8MB      0.0MB
scan     projection (4 cols)           9647       115    83.9x    146.8MB      0.0MB
scan     filter (v2 < 0.25)            5410       119    45.5x     82.8MB      0.0MB
agg      group-by aggregation          5472       158    34.6x     98.8MB      0.0MB
agg      count distinct key            2885        77    37.5x     50.8MB      0.0MB
join     join fact-dim                 5150       149    34.6x     82.8MB      0.0MB
join     join + group-by               6319       176    35.9x     82.8MB      0.0MB
------------------------------------------------------------------------------------
mean warm speedup: 44.6x across 8 scenarios

Compressible data (id-derived; Parquet dictionary/RLE crushes the file to ~8 MB):

Category Scenario                  Cold(ms)  Warm(ms)  Speedup  ColdFetch  WarmFetch
------------------------------------------------------------------------------------
scan     full scan (16 cols)           1991        81    24.6x      7.8MB      0.0MB
scan     projection (1 col)             677        38    17.8x      7.8MB      0.0MB
scan     projection (4 cols)           1264        48    26.3x      7.8MB      0.0MB
scan     filter (v2 < 0.25)             730        38    19.2x      7.8MB      0.0MB
agg      group-by aggregation           852       124     6.9x      7.8MB      0.0MB
agg      count distinct key             599        68     8.8x      7.8MB      0.0MB
join     join fact-dim                  736        66    11.2x      7.8MB      0.0MB
join     join + group-by                771       101     7.6x      7.8MB      0.0MB
------------------------------------------------------------------------------------
mean warm speedup: 15.3x across 8 scenarios

Takeaways:

  • Serves real data volume: the full 16-column scan fetches ~515 MB cold and ~0 warm.
  • Block-granularity selective reads work: a 1-column projection fetches ~67 MB vs. ~515 MB for the full scan (visible only on the large incompressible file).
  • The win is largely S3-round-trip-latency-driven, so it holds even for the ~8 MB compressed file (15.3x mean) — i.e. it helps for the compressed data real tables actually have.
  • Aggregation/join gains are lower than pure scans because shuffle/hash compute is a fixed cost the cache doesn't remove — a useful signal about where scan-side caching pays off most.

🤖 Generated with Claude Code

schenksj and others added 5 commits July 4, 2026 12:43
…uling

Implements Phase 1 of the object-store data caching design (gap item #5):
a block-aligned local data cache behind the object_store API, plus driver-side
cache-affinity scheduling so repeat reads of a remote Parquet file route back to
the executor that cached it.

Two new storage-API-neutral workspace crates:
- native/block-cache: cache core (SIEVE memory tier, single-flight miss dedup,
  coalesced fetch, ETag/version invalidation, set_memory_budget, metrics). Depends
  only on tokio/bytes/futures; a cargo-tree test guards it against object_store /
  opendal / Comet deps.
- native/object-store-cache: CachingObjectStore wrapping dyn ObjectStore + the
  core. Routes get_ranges and plain bounded get_opts through the cache; passes
  everything else through and invalidates on put/delete/copy.

Comet wiring:
- Process-global BlockCache in a OnceLock, initialized from JNI configs on the
  first createPlan; prepare_object_store_with_configs wraps remote (non-file)
  stores.
- spark.comet.scan.dataCache.* configs (default off), force-added across JNI.
- CometFileLocalityManager (driver-side sticky file->host assignment, least-loaded
  balancing, fair-share rebalance on scale-up) wired into
  CometExecRDD.getPreferredLocations and CometNativeScanExec.

Default off; memory tier only. SSD tier, unified-memory integration, opendal /
Iceberg coverage, and readahead are deferred to later phases per the design.

Tests: block-cache (11 integration + 3 unit + neutrality guard + doctest),
object-store-cache (4 adapter tests), data_cache config parsing (3),
CometFileLocalityManager (8 Scala unit tests).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The Preflight `check-suites.py` gate requires every *Suite.scala to be listed in both
pr_build_linux.yml and pr_build_macos.yml. The new CometFileLocalityManagerSuite was
missing, failing Preflight (which gates all downstream jobs). Add it to the `exec` bucket
in both workflows.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Apply `cargo fmt` to the cache crates so `cargo fmt --all -- --check` (the CI Lint job)
passes, and drop an unused `file` binding in a block-cache test.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
The CI rust-test job runs `cargo clippy --all-targets --workspace -- -D warnings` and
`cargo machete`:
- cache.rs: pass the fetch range via `slice::from_ref` (single_range_in_vec_init).
- cache_tests.rs: allow single_range_in_vec_init (intentional one-range test slices) and
  replace `&[r.clone()]` with `slice::from_ref(&r)`.
- Drop the unused `log` dependency from block-cache and object-store-cache (the phase-1
  memory tier has no logging; the SSD-tier logging arrives in the phase-2 PR).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
`.groupBy(identity).view.mapValues(_.size).toMap` is a Scala 2.13-only idiom, so the
suite failed to compile under Scala 2.12 (Spark 3.4/3.5 lint-java jobs). Replace both
occurrences with `.groupBy(identity).map { case (h, v) => (h, v.size) }`, which compiles
on 2.12 and 2.13. Verified with a `-Pspark-3.4` (Scala 2.12) test-compile.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant