Skip to content

Integrate datafusion-distributed with Python#1611

Draft
gabotechs wants to merge 1 commit into
mainfrom
gabotechs/datafusion-distributed-integration
Draft

Integrate datafusion-distributed with Python#1611
gabotechs wants to merge 1 commit into
mainfrom
gabotechs/datafusion-distributed-integration

Conversation

@gabotechs

@gabotechs gabotechs commented Jun 26, 2026

Copy link
Copy Markdown

Quick Try

Run two workers in separate terminals:

python examples/distributed-localhost-worker.py 50051
python examples/distributed-localhost-worker.py 50052

Download the test dataset:

curl -LO https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet

Then run the query or print the distributed plan:

WORKERS=50051,50052 python examples/distributed-run.py yellow_tripdata_2021-01.parquet

Or print the distributed plan:

WORKERS=50051,50052 python examples/distributed-run.py --plan yellow_tripdata_2021-01.parquet

Which issue does this PR close?

N/A.

Rationale for this change

This lets Python users wire datafusion-distributed into datafusion-python: discover workers from Python, spawn worker servers, and inspect distributed plans.

One integration wrinkle: upstream examples usually build from SessionStateBuilder, while this package exposes SessionConfig/SessionContext as the main public path. This PR installs the distributed planner when a SessionContext is built from a distributed config.

What changes are included in this PR?

  • Adds Python-facing WorkerResolver, Worker, and WorkerQueryContext support.
  • Adds distributed planner wiring and distributed worker server bindings.
  • Adds ExecutionPlan.display_distributed(metrics_format=...) using upstream ASCII plan rendering.
  • Adds localhost worker/run examples, including --plan.
  • Adds focused tests for the new API surface.

Are there any user-facing changes?

Yes. New distributed APIs are exposed from Python, plus new examples. No intended breaking changes.

@gabotechs gabotechs force-pushed the gabotechs/datafusion-distributed-integration branch 2 times, most recently from 1931203 to 0d40367 Compare June 26, 2026 13:20
@gabotechs gabotechs force-pushed the gabotechs/datafusion-distributed-integration branch from 0d40367 to 53a685d Compare June 26, 2026 13:47
Comment on lines +413 to +417
.with_analyzer_rule(Arc::new(crate::analyzer::ResolveLambdaVariables::new()));

if distributed {
builder = builder.with_distributed_planner();
}

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than letting external system inject their own QueryPlanners, this allows just plumbing datafusion-distributed query planner from within the Rust world, which is actually a pretty easy thing to do.

Comment thread crates/core/Cargo.toml
"rt-multi-thread",
"sync",
] }
tonic = { workspace = true }

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the kind of thing that could be easily hidden behind a flag.

@classmethod
def from_session_builder(
cls,
session_builder: Callable[[WorkerQueryContext], SessionContext],

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In datafusion-distributed, this callback configures DataFusion at the SessionStateBuilder level, but this project does not contain an analogous structure, the closest is just a SessionContext, so users are expected to build a full SessionContext here out of another SessionContext, even if in datafusion-distributed the contract is SessionStateBuilder in and SessionState out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant