Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1034,4 +1034,58 @@ mod tests {
// msg2 missing "name" field, should get default empty string from ensure_size
assert_eq!(name_col.value(1), "");
}

/// Pin the omitted-vs-null distinction for a Boolean field that this PR
/// introduced by switching the shared `ensure_output_array_builders_size`
/// boolean default from `append_null()` to `append_value(false)`.
///
/// After the change, the PB path correctly emits `false` for a proto3 field
/// absent from a message, but the shared default also reaches this JSON
/// path: an *omitted* boolean now yields a non-null `false`, while an
/// *explicit* JSON `null` still yields a null (the JSON handler appends
/// null for explicit nulls). This test locks that behavior so a future
/// change to either side is a conscious decision, not an accident.
#[test]
fn test_parse_json_boolean_omitted_vs_explicit_null() {
let schema = Arc::new(Schema::new(vec![
Field::new("serialized_kafka_records_partition", DataType::Int32, false),
Field::new("serialized_kafka_records_offset", DataType::Int64, false),
Field::new("serialized_kafka_records_timestamp", DataType::Int64, false),
Field::new("active", DataType::Boolean, true),
]));

let nested_mapping = HashMap::new();
let mut deserializer = JsonDeserializer::new(schema.clone(), &nested_mapping)
.expect("Failed to create JsonDeserializer");

// row0: explicit true; row1: explicit null; row2: field omitted entirely.
let msg0 = br#"{"active": true}"#;
let msg1 = br#"{"active": null}"#;
let msg2 = br#"{}"#;

let messages = create_binary_array(vec![msg0.as_ref(), msg1.as_ref(), msg2.as_ref()]);
let partitions = create_partition_array(vec![0, 0, 0]);
let offsets = create_offset_array(vec![100, 101, 102]);
let timestamps = create_timestamp_array(vec![1000, 1001, 1002]);

let batch = deserializer
.parse_messages_with_kafka_meta(&messages, &partitions, &offsets, &timestamps)
.expect("Failed to parse messages");

assert_eq!(batch.num_rows(), 3);
let active_col = batch
.column(3)
.as_any()
.downcast_ref::<BooleanArray>()
.expect("active column");

// row0: explicit true.
assert!(active_col.value(0));
assert!(!active_col.is_null(0));
// row1: explicit null → null (JSON handler's own behavior).
assert!(active_col.is_null(1));
// row2: omitted → non-null false (shared ensure_size default).
assert!(!active_col.is_null(2));
assert!(!active_col.value(2));
}
}
Loading
Loading