Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ serde_json = { workspace = true }
uuid = { workspace = true, features = ["v4"] }
mimalloc = { workspace = true, optional = true, features = [
"local_dynamic_tls",
# Pin to mimalloc v2 until apache/datafusion-python#1607 resolves.
"v2",
] }
async-trait = { workspace = true }
futures = { workspace = true }
Expand Down
42 changes: 34 additions & 8 deletions skills/datafusion_python/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ df = ctx.from_pylist([{"a": 1, "b": "x"}, {"a": 2, "b": "y"}])
df = ctx.from_pandas(pandas_df)
df = ctx.from_polars(polars_df)
df = ctx.from_arrow(arrow_table)
df = ctx.read_batch(record_batch) # one pa.RecordBatch, no named table
df = ctx.read_batches([batch1, batch2]) # several pa.RecordBatch

# From SQL
df = ctx.sql("SELECT a, b FROM my_table WHERE a > 1")
Expand Down Expand Up @@ -434,11 +436,19 @@ logical operators.
```python
col("a").is_null()
col("a").is_not_null()
col("a").fill_null(lit(0)) # replace NULL with a value
col("a").fill_null(lit(0)) # replace NULL with a value (single expression)
F.coalesce(col("a"), col("b")) # first non-null value
F.nullif(col("a"), lit(0)) # return NULL if a == 0
```

To fill nulls across the whole DataFrame (optionally limited to a subset of
columns), use the DataFrame-level method:

```python
df.fill_null(0) # every column
df.fill_null(0, subset=["a", "b"]) # only these columns
```

### CASE / WHEN

```python
Expand Down Expand Up @@ -466,6 +476,15 @@ import pyarrow as pa
col("a").cast(pa.float64())
col("a").cast(pa.utf8())
col("a").cast(pa.date32())

col("a").try_cast(pa.int32()) # like cast(), but yields NULL on failure instead of erroring
```

To cast several columns at once at the DataFrame level, pass a mapping to
`df.cast(...)`:

```python
df.cast({"a": pa.float64(), "b": pa.int32()})
```

### Aliasing
Expand All @@ -477,7 +496,7 @@ col("a").cast(pa.date32())
### BETWEEN and IN

```python
col("a").between(lit(1), lit(10)) # 1 <= a <= 10
col("a").between(1, 10) # 1 <= a <= 10 (bounds auto-wrap)
F.in_list(col("a"), [lit(1), lit(2), lit(3)]) # a IN (1, 2, 3)
F.in_list(col("a"), [lit(1), lit(2)], negated=True) # a NOT IN (1, 2)
```
Expand Down Expand Up @@ -534,7 +553,7 @@ F.array_transform(col("a"), F.lambda_(["v"], F.lambda_var("v") * lit(2)))
| `CASE x WHEN 1 THEN 'a' END` | `F.case(col("x")).when(lit(1), lit("a")).end()` |
| `CASE WHEN x > 1 THEN 'a' END` | `F.when(col("x") > 1, lit("a")).end()` |
| `x IN (1, 2, 3)` | `F.in_list(col("x"), [lit(1), lit(2), lit(3)])` |
| `x BETWEEN 1 AND 10` | `col("x").between(lit(1), lit(10))` |
| `x BETWEEN 1 AND 10` | `col("x").between(1, 10)` |
| `CAST(x AS DOUBLE)` | `col("x").cast(pa.float64())` |
| `ROW_NUMBER() OVER (...)` | `F.row_number(partition_by=[...], order_by=[...])` |
| `SUM(x) OVER (...)` | `F.sum(col("x")).over(window)` |
Expand All @@ -556,8 +575,9 @@ F.array_transform(col("a"), F.lambda_(["v"], F.lambda_var("v") * lit(2)))
- arithmetic between two literals with no column involved:
`lit(1) - col("discount")` is fine, but `lit(1) - lit(2)` needs both
- values that must carry a specific Arrow type, via `lit(pa.scalar(...))`
- `.when(...)`, `.otherwise(...)`, `F.nullif(...)`, `.between(...)`,
`F.in_list(...)` and similar method/function arguments
- `.when(...)`, `.otherwise(...)`, `F.nullif(...)`, `F.in_list(...)`
and similar method/function arguments (note: `.between(...)`
auto-wraps its bounds, so `col("a").between(1, 10)` needs no `lit()`)

3. **Column name quoting**: Column names are normalized to lowercase by default
in both `select("...")` and `col("...")`. To reference a column with
Expand All @@ -576,7 +596,8 @@ F.array_transform(col("a"), F.lambda_(["v"], F.lambda_var("v") * lit(2)))
partition frame, set `window_frame=WindowFrame("rows", None, None)`.

6. **Arithmetic on aggregates belongs in a later `select`, not inside
`aggregate`**: Each item in the aggregate list must be a single aggregate
`aggregate`** *(applies to datafusion-python 53 and earlier; fixed in 54)*:
Each item in the aggregate list must be a single aggregate
call (optionally aliased). Combining aggregates with arithmetic inside
`aggregate(...)` fails with `Internal error: Invalid aggregate expression`.
Alias the aggregates, then compute the combination downstream:
Expand Down Expand Up @@ -609,6 +630,12 @@ F.array_transform(col("a"), F.lambda_(["v"], F.lambda_var("v") * lit(2)))
# (note: join_on keeps both key columns in the output, unlike on="key")
li.join_on(failed, col("l_orderkey") == col("o_orderkey"))
```
When the same column name exists on both sides, `DataFrame.col(name)`
(and `DataFrame.column(name)`) returns a column reference qualified to
that DataFrame, which disambiguates the predicate explicitly:
```python
li.join_on(failed, li.col("l_orderkey") == failed.col("o_orderkey"))
```

## Idiomatic Patterns

Expand Down Expand Up @@ -746,7 +773,7 @@ F.left(col("c_phone"), lit(2)) # prefix shortcut

**Array/List**: `array`, `make_array`, `array_agg`, `array_length`,
`array_element`, `array_slice`, `array_append`, `array_prepend`,
`array_concat`, `array_has`, `array_has_all`, `array_has_any`, `array_position`,
`array_concat`, `array_contains`, `array_has`, `array_has_all`, `array_has_any`, `array_position`,
`array_remove`, `array_distinct`, `array_sort`, `array_reverse`, `flatten`,
`array_to_string`, `array_intersect`, `array_union`, `array_except`,
`generate_series`
Expand Down Expand Up @@ -808,7 +835,6 @@ both `functions` and `functions.spark` may behave differently:
| `concat` | NULL inputs treated as empty | NULL inputs propagate to NULL |
| `round` | HALF_EVEN (banker's) | HALF_UP |
| `trunc` | Numeric truncation | Date truncation |
| `substring` | 1-indexed | 1-indexed (parity) |

Pick the namespace whose semantics match your intent — both stay imported
side by side; `enable_spark_functions()` only affects SQL.
Expand Down
Loading