diff --git a/src/paimon/testing/utils/binary_row_generator.h b/src/paimon/testing/utils/binary_row_generator.h new file mode 100644 index 0000000..e24c2c5 --- /dev/null +++ b/src/paimon/testing/utils/binary_row_generator.h @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/common/data/binary_array_writer.h" +#include "paimon/common/data/binary_row_writer.h" +#include "paimon/core/stats/simple_stats.h" + +namespace paimon::test { + +struct TimestampType { + TimestampType(const Timestamp& _timestamp, int32_t _precision) + : timestamp(_timestamp), precision(_precision) {} + + Timestamp timestamp = Timestamp(0, 0); + int32_t precision = Timestamp::DEFAULT_PRECISION; +}; + +class BinaryRowGenerator { + public: + BinaryRowGenerator() = delete; + ~BinaryRowGenerator() = delete; + + // monostate indicates null + using ValueType = std::vector< + std::variant, TimestampType, Decimal, NullType>>; + + static std::unique_ptr GenerateRowPtr(const RowKind* kind, const ValueType& values, + MemoryPool* pool) { + auto row = std::make_unique(values.size()); + BinaryRowWriter writer(row.get(), 0, pool); + for (size_t i = 0; i < values.size(); i++) { + std::visit( + [&](auto&& arg) { + using T = std::decay_t; + if constexpr (std::is_same_v) { + // @note for timestamp and decimal, if precision is non-compact, the + // hash of returned row maybe diffenent with java + writer.SetNullAt(i); + } else if constexpr (std::is_same_v) { + writer.WriteByte(i, arg); + } else if constexpr (std::is_same_v) { + writer.WriteShort(i, arg); + } else if constexpr (std::is_same_v) { + writer.WriteInt(i, arg); + } else if constexpr (std::is_same_v) { + writer.WriteLong(i, arg); + } else if constexpr (std::is_same_v) { + writer.WriteFloat(i, arg); + } else if constexpr (std::is_same_v) { + writer.WriteDouble(i, arg); + } else if constexpr (std::is_same_v) { + writer.WriteBoolean(i, arg); + } else if constexpr (std::is_same_v) { + writer.WriteString(i, BinaryString::FromString(arg, pool)); + } else if constexpr (std::is_same_v>) { + writer.WriteBinary(i, *arg); + } else if constexpr (std::is_same_v) { + writer.WriteTimestamp(i, arg.timestamp, arg.precision); + } else if constexpr (std::is_same_v) { + writer.WriteDecimal(i, arg, arg.Precision()); + } else { + EXPECT_FALSE(true); + } + }, + values[i]); + } + writer.Complete(); + row->SetRowKind(kind); + return row; + } + + static std::unique_ptr GenerateRowPtr(const ValueType& values, MemoryPool* pool) { + return GenerateRowPtr(RowKind::Insert(), values, pool); + } + + static BinaryRow GenerateRow(const RowKind* kind, const ValueType& values, MemoryPool* pool) { + auto row = GenerateRowPtr(kind, values, pool); + auto binary_row = dynamic_cast(row.get()); + assert(binary_row); + return *binary_row; + } + + static BinaryRow GenerateRow(const ValueType& values, MemoryPool* pool) { + return GenerateRow(RowKind::Insert(), values, pool); + } + + static SimpleStats GenerateStats(const ValueType& min, const ValueType& max, + const std::vector& null, MemoryPool* pool) { + auto min_row = GenerateRowPtr(min, pool); + auto binary_min_row = dynamic_cast(min_row.get()); + assert(binary_min_row); + auto max_row = GenerateRowPtr(max, pool); + auto binary_max_row = dynamic_cast(max_row.get()); + assert(binary_max_row); + return SimpleStats(*binary_min_row, *binary_max_row, FromLongArrayWithNull(null, pool)); + } + + static BinaryArray FromLongArrayWithNull(const std::vector& arr, MemoryPool* pool) { + BinaryArray array; + BinaryArrayWriter writer = BinaryArrayWriter(&array, arr.size(), sizeof(int64_t), pool); + for (size_t i = 0; i < arr.size(); i++) { + int64_t v = arr[i]; + if (v == -1) { + writer.SetNullValue(i); + } else { + writer.WriteLong(i, v); + } + } + writer.Complete(); + return array; + } +}; +} // namespace paimon::test diff --git a/src/paimon/testing/utils/data_generator.cpp b/src/paimon/testing/utils/data_generator.cpp new file mode 100644 index 0000000..2ef5c9d --- /dev/null +++ b/src/paimon/testing/utils/data_generator.cpp @@ -0,0 +1,314 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/testing/utils/data_generator.h" + +#include +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/array/builder_binary.h" +#include "arrow/array/builder_nested.h" +#include "arrow/array/builder_primitive.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "fmt/format.h" +#include "paimon/common/data/binary_row_writer.h" +#include "paimon/common/data/binary_string.h" +#include "paimon/common/types/row_kind.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/utils/file_store_path_factory.h" + +namespace arrow { +class Array; +class ArrayBuilder; +} // namespace arrow +namespace paimon { +class MemoryPool; +} // namespace paimon + +namespace paimon::test { + +DataGenerator::DataGenerator(const std::shared_ptr& table_schema, + const std::shared_ptr& memory_pool) + : table_schema_(table_schema), memory_pool_(memory_pool) { + assert(table_schema->Id() == 0); +} + +Status DataGenerator::WriteBinaryRow(const BinaryRow& src_row, int32_t src_field_id, + const std::shared_ptr& src_type, + int32_t target_field_id, BinaryRowWriter* target_row_writer) { + arrow::Type::type type_id = src_type->id(); + switch (type_id) { + case arrow::Type::type::BOOL: { + target_row_writer->WriteBoolean(target_field_id, src_row.GetBoolean(src_field_id)); + break; + } + case arrow::Type::type::INT8: { + target_row_writer->WriteByte(target_field_id, src_row.GetByte(src_field_id)); + break; + } + case arrow::Type::type::INT16: { + target_row_writer->WriteShort(target_field_id, src_row.GetShort(src_field_id)); + break; + } + case arrow::Type::type::INT32: { + target_row_writer->WriteInt(target_field_id, src_row.GetInt(src_field_id)); + break; + } + case arrow::Type::type::INT64: { + target_row_writer->WriteLong(target_field_id, src_row.GetLong(src_field_id)); + break; + } + case arrow::Type::type::FLOAT: { + target_row_writer->WriteFloat(target_field_id, src_row.GetFloat(src_field_id)); + break; + } + case arrow::Type::type::DOUBLE: { + target_row_writer->WriteDouble(target_field_id, src_row.GetDouble(src_field_id)); + break; + } + case arrow::Type::type::STRING: { + target_row_writer->WriteString(target_field_id, src_row.GetString(src_field_id)); + break; + } + default: + return Status::Invalid( + fmt::format("type {} not support in write partial row", src_type->ToString())); + } + return Status::OK(); +} + +Result DataGenerator::ExtractPartialRow(const BinaryRow& binary_row, + const std::vector& partition_fields) { + BinaryRow partial_row(static_cast(partition_fields.size())); + BinaryRowWriter writer(&partial_row, /*initial_size=*/0, memory_pool_.get()); + for (size_t field_idx = 0; field_idx < partition_fields.size(); field_idx++) { + int32_t id = partition_fields[field_idx].Id(); + auto type = partition_fields[field_idx].Type(); + PAIMON_RETURN_NOT_OK(WriteBinaryRow(binary_row, id, type, field_idx, &writer)); + } + writer.Complete(); + return partial_row; +} + +Status DataGenerator::AppendValue(const BinaryRow& row, int32_t field_id, + const std::shared_ptr& type, + arrow::StructBuilder* struct_builder) { + arrow::Type::type type_id = type->id(); + switch (type_id) { + case arrow::Type::type::BOOL: { + auto builder = + static_cast(struct_builder->field_builder(field_id)); + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(row.GetBoolean(field_id))); + break; + } + case arrow::Type::type::INT8: { + auto builder = + static_cast(struct_builder->field_builder(field_id)); + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(row.GetByte(field_id))); + break; + } + case arrow::Type::type::INT16: { + auto builder = + static_cast(struct_builder->field_builder(field_id)); + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(row.GetShort(field_id))); + break; + } + case arrow::Type::type::INT32: { + auto builder = + static_cast(struct_builder->field_builder(field_id)); + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(row.GetInt(field_id))); + break; + } + case arrow::Type::type::INT64: { + auto builder = + static_cast(struct_builder->field_builder(field_id)); + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(row.GetLong(field_id))); + break; + } + case arrow::Type::type::FLOAT: { + auto builder = + static_cast(struct_builder->field_builder(field_id)); + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(row.GetFloat(field_id))); + break; + } + case arrow::Type::type::DOUBLE: { + auto builder = + static_cast(struct_builder->field_builder(field_id)); + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(row.GetDouble(field_id))); + break; + } + case arrow::Type::type::STRING: { + auto builder = + static_cast(struct_builder->field_builder(field_id)); + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Append(row.GetString(field_id).ToString())); + break; + } + default: + return Status::Invalid( + fmt::format("type {} not support in append value", type->ToString())); + } + return Status::OK(); +} + +Result> DataGenerator::MakeStructBuilder( + const std::vector& fields) { + auto data_type = DataField::ConvertDataFieldsToArrowStructType(fields); + std::vector> field_builders; + for (const auto& field : fields) { + auto type_id = field.Type()->id(); + switch (type_id) { + case arrow::Type::type::BOOL: { + field_builders.push_back(std::make_shared()); + break; + } + case arrow::Type::type::INT8: { + field_builders.push_back(std::make_shared()); + break; + } + case arrow::Type::type::INT16: { + field_builders.push_back(std::make_shared()); + break; + } + case arrow::Type::type::INT32: { + field_builders.push_back(std::make_shared()); + break; + } + case arrow::Type::type::INT64: { + field_builders.push_back(std::make_shared()); + break; + } + case arrow::Type::type::FLOAT: { + field_builders.push_back(std::make_shared()); + break; + } + case arrow::Type::type::DOUBLE: { + field_builders.push_back(std::make_shared()); + break; + } + case arrow::Type::type::STRING: { + field_builders.push_back(std::make_shared()); + break; + } + default: + return Status::Invalid(fmt::format("type {} not support in make struct builder", + field.Type()->ToString())); + } + } + return std::make_shared(data_type, arrow::default_memory_pool(), + field_builders); +} + +Result>> DataGenerator::SplitArrayByPartitionAndBucket( + const std::vector& binary_rows) { + auto fields = table_schema_->Fields(); + auto partition_keys = table_schema_->PartitionKeys(); + PAIMON_ASSIGN_OR_RAISE(auto partition_fields, table_schema_->GetFields(partition_keys)); + PAIMON_ASSIGN_OR_RAISE(auto bucket_fields, + table_schema_->GetFields(table_schema_->BucketKeys())); + int32_t num_buckets = table_schema_->NumBuckets(); + // map: {partition_map, bucket_id} -> arrow::StructBuilder + std::map, int32_t>, + std::shared_ptr> + struct_builder_holder; + std::map, int32_t>, + std::vector> + row_kinds_holder; + auto schema = DataField::ConvertDataFieldsToArrowSchema(fields); + PAIMON_ASSIGN_OR_RAISE( + auto path_factory, + FileStorePathFactory::Create( + /*root=*/"/tmp", schema, partition_keys, + /*default_part_value=*/"__DEFAULT_PARTITION__", + /*identifier=*/"orc", /*data_file_prefix=*/"data-", + /*legacy_partition_name_enabled=*/true, /*external_paths=*/std::vector(), + /*global_index_external_path=*/std::nullopt, + /*index_file_in_data_file_dir=*/false, memory_pool_)); + + for (const auto& binary_row : binary_rows) { + PAIMON_ASSIGN_OR_RAISE(BinaryRow partition_row, + ExtractPartialRow(binary_row, partition_fields)); + + std::vector> part_values; + PAIMON_ASSIGN_OR_RAISE(part_values, path_factory->GeneratePartitionVector(partition_row)); + std::map partition_map; + for (const auto& part_value : part_values) { + partition_map[part_value.first] = part_value.second; + } + PAIMON_ASSIGN_OR_RAISE(BinaryRow bucket_row, ExtractPartialRow(binary_row, bucket_fields)); + int32_t bucket_id = std::abs(bucket_row.HashCode() % num_buckets); + auto struct_builder_iter = struct_builder_holder.find({partition_map, bucket_id}); + if (struct_builder_iter == struct_builder_holder.end()) { + PAIMON_ASSIGN_OR_RAISE(auto struct_builder, MakeStructBuilder(fields)); + auto insert_result = struct_builder_holder.emplace( + std::make_pair(partition_map, bucket_id), struct_builder); + if (!insert_result.second) { + return Status::Invalid("insert element to struct_builder_holder failed"); + } + struct_builder_iter = insert_result.first; + } + auto struct_builder = struct_builder_iter->second; + PAIMON_RETURN_NOT_OK_FROM_ARROW(struct_builder->Append()); + for (size_t i = 0; i < fields.size(); i++) { + if (binary_row.IsNullAt(i)) { + PAIMON_RETURN_NOT_OK_FROM_ARROW(struct_builder->field_builder(i)->AppendNull()); + } else { + PAIMON_RETURN_NOT_OK( + AppendValue(binary_row, i, fields[i].Type(), struct_builder.get())); + } + } + + auto row_kinds_iter = row_kinds_holder.find({partition_map, bucket_id}); + if (row_kinds_iter == row_kinds_holder.end()) { + auto insert_result = row_kinds_holder.emplace(std::make_pair(partition_map, bucket_id), + std::vector()); + if (!insert_result.second) { + return Status::Invalid("insert element to row_kinds_holder failed"); + } + row_kinds_iter = insert_result.first; + } + auto& row_kinds = row_kinds_iter->second; + PAIMON_ASSIGN_OR_RAISE(auto row_kind, binary_row.GetRowKind()); + row_kinds.push_back(static_cast(row_kind->ToByteValue())); + } + + std::vector> batches; + for (const auto& [identifier, struct_builder] : struct_builder_holder) { + std::shared_ptr array; + PAIMON_RETURN_NOT_OK_FROM_ARROW(struct_builder->Finish(&array)); + auto& row_kinds = row_kinds_holder[identifier]; + + ArrowArray arrow_array; + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, &arrow_array)); + RecordBatchBuilder batch_builder(&arrow_array); + PAIMON_ASSIGN_OR_RAISE(auto record_batch, batch_builder.SetPartition(identifier.first) + .SetBucket(identifier.second) + .SetRowKinds(row_kinds) + .Finish()); + batches.push_back(std::move(record_batch)); + } + return batches; +} + +} // namespace paimon::test diff --git a/src/paimon/testing/utils/data_generator.h b/src/paimon/testing/utils/data_generator.h new file mode 100644 index 0000000..cccb2ff --- /dev/null +++ b/src/paimon/testing/utils/data_generator.h @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/array/builder_nested.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/data/binary_row_writer.h" +#include "paimon/common/types/data_field.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/record_batch.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace arrow { +class DataType; +} // namespace arrow +namespace paimon { +class BinaryRowWriter; +class MemoryPool; +class TableSchema; +} // namespace paimon + +namespace paimon::test { + +class DataGenerator { + public: + DataGenerator(const std::shared_ptr& table_schema, + const std::shared_ptr& memory_pool); + + Result>> SplitArrayByPartitionAndBucket( + const std::vector& binary_rows); + + private: + Result ExtractPartialRow(const BinaryRow& binary_row, + const std::vector& partition_fields); + + static Status WriteBinaryRow(const BinaryRow& src_row, int32_t src_field_id, + const std::shared_ptr& src_type, + int32_t target_field_id, BinaryRowWriter* target_row_writer); + + static Result> MakeStructBuilder( + const std::vector& fields); + + static Status AppendValue(const BinaryRow& row, int32_t field_id, + const std::shared_ptr& type, + arrow::StructBuilder* struct_builder); + + private: + std::shared_ptr table_schema_; + std::shared_ptr memory_pool_; +}; + +} // namespace paimon::test diff --git a/src/paimon/testing/utils/data_generator_test.cpp b/src/paimon/testing/utils/data_generator_test.cpp new file mode 100644 index 0000000..1f56fb6 --- /dev/null +++ b/src/paimon/testing/utils/data_generator_test.cpp @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/testing/utils/data_generator.h" + +#include +#include + +#include "arrow/api.h" +#include "gtest/gtest.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/types/row_kind.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/defs.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/binary_row_generator.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +class DataGeneratorTest : public testing::Test { + public: + void SetUp() override { + pool_ = GetDefaultPool(); + } + + BinaryRow Make(const RowKind* kind, const std::string& f0, const std::string& f1, + const std::string& f2) { + return BinaryRowGenerator::GenerateRow(kind, {f0, f1, f2}, pool_.get()); + } + + private: + std::shared_ptr pool_; +}; + +TEST_F(DataGeneratorTest, TestSimple) { + arrow::FieldVector fields = {arrow::field("f0", arrow::utf8()), + arrow::field("f1", arrow::utf8()), + arrow::field("f2", arrow::utf8())}; + auto schema = arrow::schema(fields); + + std::vector rows; + rows.push_back(Make(RowKind::Insert(), "Alex", "20250326", "18")); + rows.push_back(Make(RowKind::Insert(), "Bob", "20250326", "19")); + rows.push_back(Make(RowKind::Insert(), "Cathy", "20250325", "20")); + rows.push_back(Make(RowKind::Insert(), "David", "20250325", "21")); + rows.push_back(Make(RowKind::Insert(), "Evan", "20250326", "22")); + rows.push_back(Make(RowKind::Delete(), "Alex", "20250326", "18")); + rows.push_back(Make(RowKind::Delete(), "Bob", "20250326", "19")); + + { + std::vector primary_keys = {"f0"}; + std::vector partition_keys = {"f1"}; + std::map options; + options[Options::BUCKET_KEY] = "f0"; + options[Options::BUCKET] = "2"; + ASSERT_OK_AND_ASSIGN( + std::shared_ptr table_schema, + TableSchema::Create(/*schema_id=*/0, schema, partition_keys, primary_keys, options)); + DataGenerator gen(table_schema, GetDefaultPool()); + + ASSERT_OK_AND_ASSIGN(auto batches, gen.SplitArrayByPartitionAndBucket(rows)); + ASSERT_EQ(3, batches.size()); + } + + { + std::vector primary_keys; + std::vector partition_keys; + std::map options; + ASSERT_OK_AND_ASSIGN( + std::shared_ptr table_schema, + TableSchema::Create(/*schema_id=*/0, schema, partition_keys, primary_keys, options)); + DataGenerator gen(table_schema, GetDefaultPool()); + + ASSERT_OK_AND_ASSIGN(auto batches, gen.SplitArrayByPartitionAndBucket(rows)); + ASSERT_EQ(1, batches.size()); + } +} + +} // namespace paimon::test diff --git a/src/paimon/testing/utils/dict_array_converter.h b/src/paimon/testing/utils/dict_array_converter.h new file mode 100644 index 0000000..e24b70e --- /dev/null +++ b/src/paimon/testing/utils/dict_array_converter.h @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "arrow/api.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/result.h" + +namespace paimon::test { +class DictArrayConverter { + public: + DictArrayConverter() = delete; + ~DictArrayConverter() = delete; + + // deep copy dictionary array to string array/binary array + static Result> ConvertDictArray( + const std::shared_ptr& array, arrow::MemoryPool* pool) { + arrow::Type::type kind = array->type_id(); + switch (kind) { + case arrow::Type::type::STRUCT: { + // convert array + auto struct_array = + arrow::internal::checked_pointer_cast(array); + arrow::ArrayVector new_children; + std::size_t size = struct_array->fields().size(); + for (size_t i = 0; i < size; i++) { + std::shared_ptr child = struct_array->field(static_cast(i)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr new_child, + ConvertDictArray(child, pool)); + new_children.push_back(new_child); + } + + // convert type + arrow::FieldVector fields; + fields.reserve(new_children.size()); + for (size_t i = 0; i < new_children.size(); i++) { + // Note: For test consistency, intentionally left nullable unspecified, as ORC + // discard nullable information, making it impossible to align. + // Moreover, this detail is currently not important for users. + fields.push_back(arrow::field(struct_array->type()->field(i)->name(), + new_children[i]->type())); + } + + return std::make_shared(arrow::struct_(fields), + struct_array->length(), new_children, + struct_array->null_bitmap()); + } + case arrow::Type::type::LIST: { + auto list_array = arrow::internal::checked_pointer_cast(array); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr value_array, + ConvertDictArray(list_array->values(), pool)); + return std::make_shared( + arrow::list(value_array->type()), list_array->length(), + list_array->value_offsets(), value_array, list_array->null_bitmap(), + list_array->null_count(), list_array->offset()); + } + case arrow::Type::type::MAP: { + auto map_array = arrow::internal::checked_pointer_cast(array); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr key_array, + ConvertDictArray(map_array->keys(), pool)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr item_array, + ConvertDictArray(map_array->items(), pool)); + auto map_type = + arrow::internal::checked_pointer_cast(map_array->type()); + auto new_map_type = std::make_shared( + key_array->type(), item_array->type(), map_type->keys_sorted()); + return std::make_shared( + new_map_type, map_array->length(), map_array->value_offsets(), key_array, + item_array, map_array->null_bitmap(), map_array->null_count(), + map_array->offset()); + } + case arrow::Type::type::DICTIONARY: { + auto dict_array = + arrow::internal::checked_pointer_cast(array); + auto dict_type = arrow::internal::checked_pointer_cast( + dict_array->type()); + auto value_type_id = dict_type->value_type()->id(); + auto index_type_id = dict_type->index_type()->id(); + if (value_type_id == arrow::Type::type::STRING && + index_type_id == arrow::Type::type::INT32) { + return ConvertDictionaryArrayToBinaryArray< + arrow::StringArray, arrow::Int32Array, arrow::StringBuilder>(dict_array, + pool); + } else if (value_type_id == arrow::Type::type::LARGE_STRING && + index_type_id == arrow::Type::type::INT64) { + return ConvertDictionaryArrayToBinaryArray< + arrow::LargeStringArray, arrow::Int64Array, arrow::StringBuilder>( + dict_array, pool); + } else { + return Status::Invalid( + "only support [STRING, INT32] or [LARGE_STRING, INT64] for " + "DictionaryArray"); + } + } + default: { + return array; + } + } + } + + private: + template + static Result> ConvertDictionaryArrayToBinaryArray( + const std::shared_ptr& dict_array, arrow::MemoryPool* pool) { + auto dictionary = std::dynamic_pointer_cast(dict_array->dictionary()); + auto indices = std::dynamic_pointer_cast(dict_array->indices()); + auto string_builder = std::make_shared(pool); + for (int64_t i = 0; i < dict_array->length(); ++i) { + if (dict_array->IsNull(i)) { + PAIMON_RETURN_NOT_OK_FROM_ARROW(string_builder->AppendNull()); + } else { + int64_t dict_index = indices->Value(i); + PAIMON_RETURN_NOT_OK_FROM_ARROW( + string_builder->Append(dictionary->GetString(dict_index))); + } + } + std::shared_ptr string_array; + PAIMON_RETURN_NOT_OK_FROM_ARROW(string_builder->Finish(&string_array)); + return string_array; + } +}; +} // namespace paimon::test diff --git a/src/paimon/testing/utils/io_exception_helper.h b/src/paimon/testing/utils/io_exception_helper.h new file mode 100644 index 0000000..bcb1150 --- /dev/null +++ b/src/paimon/testing/utils/io_exception_helper.h @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include + +#include "fmt/format.h" + +namespace paimon::test { + +// use for io exception cases: +// bool run_complete = false; +// auto io_hook = IOHook::GetInstance(); +// for (size_t i = 0; i < turn_count; i++) { +// ScopeGuard guard([&io_hook]() { io_hook->Clear(); }); +// io_hook->Reset(i, IOHook::Mode::RETURN_ERROR); +// // execute some funcs with io exception triggered +// // check the error msg to make sure error status caused by io exception +// CHECK_HOOK_STATUS(Func1(), i); +// CHECK_HOOK_STATUS(Func2(), i); +// CHECK_HOOK_STATUS(Func3(), i); +// run_complete = true; +// // when all func finished, check the result +// io_hook->Clear(); +// Check(); +// break; +// } +// // make sure all funcs run complete +// ASSERT_TRUE(run_complete); + +#define CHECK_HOOK_STATUS(status, io_count) \ + { \ + auto __s = (status); \ + if (!__s.ok()) { \ + if (__s.ToString().find(fmt::format("io hook triggered io error at position {}", \ + io_count)) != std::string::npos) { \ + continue; \ + } else { \ + FAIL() << __s.ToString(); \ + } \ + } \ + } + +#define CHECK_HOOK_STATUS_WITHOUT_MESSAGE_CHECK(status) \ + { \ + auto __s = (status); \ + if (!__s.ok()) { \ + continue; \ + } \ + } +} // namespace paimon::test diff --git a/src/paimon/testing/utils/key_value_checker.h b/src/paimon/testing/utils/key_value_checker.h new file mode 100644 index 0000000..8cb7b1a --- /dev/null +++ b/src/paimon/testing/utils/key_value_checker.h @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/common/types/data_field.h" +#include "paimon/common/utils/fields_comparator.h" +#include "paimon/core/key_value.h" +#include "paimon/testing/utils/binary_row_generator.h" +#include "paimon/testing/utils/testharness.h" +namespace paimon::test { +class KeyValueChecker : public testing::Test { + public: + // only support INT field + static void CheckResult(const std::vector& expected, + const std::vector& result, int32_t key_arity, + int32_t value_arity) { + ASSERT_EQ(expected.size(), result.size()); + for (size_t i = 0; i < expected.size(); i++) { + CheckResult(expected[i], result[i], key_arity, value_arity); + } + } + + static void CheckResult(const KeyValue& expected, const KeyValue& result, int32_t key_arity, + int32_t value_arity) { + ASSERT_EQ(*(expected.value_kind), *(result.value_kind)); + ASSERT_EQ(expected.sequence_number, result.sequence_number); + ASSERT_EQ(expected.level, result.level); + ASSERT_EQ(key_arity, result.key->GetFieldCount()); + ASSERT_EQ(value_arity, result.value->GetFieldCount()); + for (int32_t k = 0; k < key_arity; k++) { + ASSERT_EQ(expected.key->GetInt(k), result.key->GetInt(k)); + } + for (int32_t v = 0; v < value_arity; v++) { + if (expected.value->IsNullAt(v)) { + ASSERT_TRUE(result.value->IsNullAt(v)); + } else { + ASSERT_EQ(expected.value->GetInt(v), result.value->GetInt(v)); + } + } + } + + // support non-nested field + static void CheckResult(const std::vector& expected_vec, + const std::vector& result_vec, + const std::vector& key_fields, + const std::vector& value_fields) { + ASSERT_EQ(expected_vec.size(), result_vec.size()); + ASSERT_OK_AND_ASSIGN(auto key_comparator, + FieldsComparator::Create(key_fields, /*is_ascending_order=*/true)); + ASSERT_OK_AND_ASSIGN(auto value_comparator, + FieldsComparator::Create(value_fields, /*is_ascending_order=*/true)); + for (size_t i = 0; i < expected_vec.size(); i++) { + const auto& expected = expected_vec[i]; + const auto& result = result_vec[i]; + ASSERT_EQ(*(expected.value_kind), *(result.value_kind)); + ASSERT_EQ(expected.sequence_number, result.sequence_number); + ASSERT_EQ(expected.level, result.level); + ASSERT_EQ(key_fields.size(), result.key->GetFieldCount()); + ASSERT_EQ(value_fields.size(), result.value->GetFieldCount()); + ASSERT_EQ(0, key_comparator->CompareTo(*expected.key, *result.key)); + ASSERT_EQ(0, value_comparator->CompareTo(*expected.value, *result.value)); + } + } + + static std::vector GenerateKeyValues( + const std::vector& seq_vec, + const std::vector& key_vec, + const std::vector& value_vec, + const std::shared_ptr& pool) { + // default row kind is insert, level is 0 + std::vector row_kind_vec(key_vec.size(), const_cast(RowKind::Insert())); + std::vector level_vec(key_vec.size(), 0); + return GenerateKeyValues(row_kind_vec, seq_vec, level_vec, key_vec, value_vec, pool); + } + + static std::vector GenerateKeyValues( + const std::vector& row_kind_vec, const std::vector& seq_vec, + const std::vector& level_vec, + const std::vector& key_vec, + const std::vector& value_vec, + const std::shared_ptr& pool) { + EXPECT_EQ(row_kind_vec.size(), key_vec.size()); + EXPECT_EQ(seq_vec.size(), key_vec.size()); + EXPECT_EQ(level_vec.size(), key_vec.size()); + EXPECT_EQ(value_vec.size(), key_vec.size()); + std::vector results; + for (size_t i = 0; i < seq_vec.size(); i++) { + results.emplace_back( + row_kind_vec[i], /*sequence_number=*/seq_vec[i], /*level=*/level_vec[i], + /*key=*/BinaryRowGenerator::GenerateRowPtr(key_vec[i], pool.get()), + /*value=*/BinaryRowGenerator::GenerateRowPtr(value_vec[i], pool.get())); + } + return results; + } +}; +} // namespace paimon::test diff --git a/src/paimon/testing/utils/read_result_collector.h b/src/paimon/testing/utils/read_result_collector.h new file mode 100644 index 0000000..bbfc21a --- /dev/null +++ b/src/paimon/testing/utils/read_result_collector.h @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/c/bridge.h" +#include "arrow/compute/api.h" +#include "paimon/common/reader/reader_utils.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/date_time_utils.h" +#include "paimon/core/io/key_value_data_file_record_reader.h" +#include "paimon/core/key_value.h" +#include "paimon/reader/batch_reader.h" +#include "paimon/testing/utils/dict_array_converter.h" +namespace paimon::test { +class ReadResultCollector { + public: + ReadResultCollector() = delete; + ~ReadResultCollector() = delete; + + template + static Result> CollectKeyValueResult(ReaderType* reader) { + std::vector results; + while (true) { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr iterator, reader->NextBatch()); + if (iterator == nullptr) { + break; + } + while (true) { + if constexpr (std::is_same_v) { + PAIMON_ASSIGN_OR_RAISE(bool has_next, iterator->HasNext()); + if (!has_next) { + break; + } + PAIMON_ASSIGN_OR_RAISE(KeyValue kv, iterator->Next()); + results.emplace_back(std::move(kv)); + } else { + PAIMON_ASSIGN_OR_RAISE(bool has_next, iterator->HasNext()); + if (!has_next) { + break; + } + KeyValue kv = iterator->Next(); + results.emplace_back(std::move(kv)); + } + } + } + return results; + } + + static Result> CollectResult(BatchReader* batch_reader) { + return CollectResult(batch_reader, /*max simulated data processing time*/ 0); + } + + // will convert dictionary array to string array for comparing results + static Result> CollectResult( + BatchReader* batch_reader, int64_t max_data_processing_time_in_us) { + arrow::ArrayVector result_array_vector; + int64_t seed = DateTimeUtils::GetCurrentUTCTimeUs(); + std::srand(seed); + while (true) { + // Prioritize calling NextBatch. If it fails (paimon inner reader e.g., + // PrefetchBatchReader, ApplyBitmapIndexBatchReader...), call NextBatchWithBitmap. + auto batch_result = batch_reader->NextBatch(); + BatchReader::ReadBatch batch; + if (!batch_result.ok()) { + if (batch_result.status().ToString().find("should use NextBatchWithBitmap") != + std::string::npos) { + PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatchWithBitmap batch_with_bitmap, + batch_reader->NextBatchWithBitmap()); + if (BatchReader::IsEofBatch(batch_with_bitmap)) { + break; + } + assert(!batch_with_bitmap.second.IsEmpty()); + PAIMON_ASSIGN_OR_RAISE( + batch, ReaderUtils::ApplyBitmapToReadBatch(std::move(batch_with_bitmap), + arrow::default_memory_pool())); + } else { + return batch_result.status(); + } + } else { + batch = std::move(batch_result).value(); + if (BatchReader::IsEofBatch(batch)) { + break; + } + } + auto& [c_array, c_schema] = batch; + assert(c_array->length > 0); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(auto result_array, + arrow::ImportArray(c_array.get(), c_schema.get())); + result_array_vector.push_back(result_array); + if (max_data_processing_time_in_us > 0) { + usleep(std::rand() % max_data_processing_time_in_us); + } + } + if (result_array_vector.empty()) { + return std::shared_ptr(); + } + // accumulate all the batch array and convert dictionary to string array together to avoid + // the problem (multiple batches in multiple stripes overlap dictionary data) being + // difficult to expose + arrow::ArrayVector converted_array_vector; + for (const auto& array : result_array_vector) { + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr converted_array, + DictArrayConverter::ConvertDictArray(array, arrow::default_memory_pool())); + converted_array_vector.push_back(converted_array); + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(auto chunk_array, + arrow::ChunkedArray::Make(converted_array_vector)); + return chunk_array; + } + + static Result> GetArray(BatchReader::ReadBatch&& batch) { + if (BatchReader::IsEofBatch(batch)) { + return std::shared_ptr(); + } + auto& [c_array, c_schema] = batch; + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(auto array, + arrow::ImportArray(c_array.get(), c_schema.get())); + return DictArrayConverter::ConvertDictArray(array, arrow::default_memory_pool()); + } + + static Result GetReadBatch(const std::shared_ptr& array) { + auto c_array = std::make_unique(); + auto c_schema = std::make_unique(); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, c_array.get(), c_schema.get())); + return std::make_pair(std::move(c_array), std::move(c_schema)); + } + + // Noted that, sort chunked array by multiple key for timestamp type may cause + // coredump in arrow, refer to https://github.com/apache/arrow/issues/47252 + static Result> SortArray( + const std::shared_ptr& array, + const std::shared_ptr& schema) { + std::vector sort_keys; + for (const auto& name : schema->field_names()) { + sort_keys.emplace_back(name, arrow::compute::SortOrder::Ascending); + } + auto sort_options = arrow::compute::SortOptions(sort_keys); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + auto sorted_indices, arrow::compute::SortIndices(arrow::Datum(array), sort_options)); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + auto sorted_batch, + arrow::compute::Take(arrow::Datum(array), arrow::Datum(sorted_indices))); + return sorted_batch.chunked_array(); + } +}; +} // namespace paimon::test diff --git a/src/paimon/testing/utils/test_helper.h b/src/paimon/testing/utils/test_helper.h new file mode 100644 index 0000000..c6a0509 --- /dev/null +++ b/src/paimon/testing/utils/test_helper.h @@ -0,0 +1,443 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "arrow/c/bridge.h" +#include "arrow/ipc/api.h" +#include "paimon/api.h" +#include "paimon/catalog/catalog.h" +#include "paimon/commit_context.h" +#include "paimon/common/data/blob_descriptor.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/core/operation/append_only_file_store_write.h" +#include "paimon/core/operation/file_store_commit_impl.h" +#include "paimon/core/schema/schema_manager.h" +#include "paimon/core/table/sink/commit_message_impl.h" +#include "paimon/core/utils/file_store_path_factory.h" +#include "paimon/core/utils/snapshot_manager.h" +#include "paimon/data/blob.h" +#include "paimon/file_store_commit.h" +#include "paimon/file_store_write.h" +#include "paimon/fs/file_system_factory.h" +#include "paimon/record_batch.h" +#include "paimon/result.h" +#include "paimon/table/source/startup_mode.h" +#include "paimon/table/source/table_scan.h" +#include "paimon/testing/utils/read_result_collector.h" +#include "paimon/testing/utils/testharness.h" +#include "paimon/write_context.h" +namespace paimon::test { + +class TestHelper { + public: + static Result> Create( + const std::string& root_path, const std::shared_ptr& schema, + const std::vector& partition_keys, + const std::vector& primary_keys, + const std::map& options, bool is_streaming_mode, + bool ignore_if_exists = false) { + // only for test && only check the key + auto new_options = options; + new_options["enable-object-store-catalog-in-inte-test"] = ""; + PAIMON_ASSIGN_OR_RAISE(auto catalog, Catalog::Create(root_path, new_options)); + PAIMON_RETURN_NOT_OK(catalog->CreateDatabase("foo", new_options, ignore_if_exists)); + ::ArrowSchema c_schema; + ScopeGuard guard([schema = &c_schema]() { ArrowSchemaRelease(schema); }); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema, &c_schema)); + PAIMON_RETURN_NOT_OK(catalog->CreateTable(Identifier("foo", "bar"), &c_schema, + partition_keys, primary_keys, new_options, + ignore_if_exists)); + std::string table_path = PathUtil::JoinPath(root_path, "foo.db/bar"); + return Create(table_path, new_options, is_streaming_mode); + } + + static Result> Create( + const std::string& table_path, const std::map& options, + bool is_streaming_mode) { + std::string file_system_identifier = "local"; + auto fs_iter = options.find(Options::FILE_SYSTEM); + if (fs_iter != options.end()) { + file_system_identifier = StringUtils::ToLowerCase(fs_iter->second); + } + PAIMON_ASSIGN_OR_RAISE(auto file_system, + FileSystemFactory::Get(file_system_identifier, table_path, options)); + std::string commit_user = "commit_user"; + WriteContextBuilder context_builder(table_path, commit_user); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr write_context, + context_builder.SetOptions(options) + .WithStreamingMode(is_streaming_mode) + .WithWriteId(12345) + .Finish()); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr write, + FileStoreWrite::Create(std::move(write_context))); + std::map new_options = options; + // only for test && only check the key + new_options["enable-pk-commit-in-inte-test"] = ""; + new_options["enable-object-store-commit-in-inte-test"] = ""; + CommitContextBuilder commit_context_builder(table_path, commit_user); + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr commit_context, + commit_context_builder.SetOptions(new_options).IgnoreEmptyCommit(false).Finish()); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr commit, + FileStoreCommit::Create(std::move(commit_context))); + return std::unique_ptr(new TestHelper(std::move(file_system), std::move(write), + std::move(commit), commit_user, + table_path, options)); + } + + static Result> MakeRecordBatch( + const std::shared_ptr& data_type, const std::string& data_str, + const std::map& partition_map, int32_t bucket, + const std::vector& row_kinds) { + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + auto array, arrow::ipc::internal::json::ArrayFromJSON(data_type, data_str)); + ::ArrowArray arrow_array; + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, &arrow_array)); + RecordBatchBuilder batch_builder(&arrow_array); + return batch_builder.SetPartition(partition_map) + .SetBucket(bucket) + .SetRowKinds(row_kinds) + .Finish(); + } + + Result>> WriteAndCommit( + std::unique_ptr&& record_batch, int64_t commit_identifier, + const std::optional>>& + expected_commit_messages) { + std::vector> batches; + batches.emplace_back(std::move(record_batch)); + return WriteAndCommit(std::move(batches), commit_identifier, expected_commit_messages); + } + + Result>> WriteAndCommit( + std::vector>&& record_batches, int64_t commit_identifier, + const std::optional>>& + expected_commit_messages) { + for (auto& record_batch : record_batches) { + PAIMON_RETURN_NOT_OK(write_->Write(std::move(record_batch))); + } + PAIMON_ASSIGN_OR_RAISE(std::vector> commit_messages, + write_->PrepareCommit(/*wait_compaction=*/false, commit_identifier)); + if (expected_commit_messages) { + CheckCommitMessages(expected_commit_messages.value(), commit_messages); + CheckExternalPath(commit_messages); + } + PAIMON_RETURN_NOT_OK(commit_->Commit(commit_messages, commit_identifier)); + return commit_messages; + } + + Result>> NewScan(StartupMode startup_mode, + std::optional snapshot_id, + bool is_streaming = true) { + ScanContextBuilder scan_context_builder(table_path_); + scan_context_builder.WithStreamingMode(is_streaming) + .SetOptions(options_) + .AddOption(Options::SCAN_MODE, startup_mode.ToString()); + if (snapshot_id) { + scan_context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, + std::to_string(snapshot_id.value())); + } + PAIMON_ASSIGN_OR_RAISE(auto scan_context, scan_context_builder.Finish()); + PAIMON_ASSIGN_OR_RAISE(scan_, TableScan::Create(std::move(scan_context))); + return Scan(); + } + + Result>> Scan() { + if (scan_ == nullptr) { + return Status::Invalid("need call NewScan first"); + } + PAIMON_ASSIGN_OR_RAISE(auto result_plan, scan_->CreatePlan()); + return result_plan->Splits(); + } + + static Result CheckBlobsEqual(const std::vector>& result_blobs, + const std::vector>& expected_blobs, + const std::shared_ptr& fs) { + if (result_blobs.size() != expected_blobs.size()) { + std::cout << "[result_blobs.size]: " << result_blobs.size() << std::endl; + std::cout << "[expected_blobs.size]: " << expected_blobs.size() << std::endl; + return false; + } + for (uint32_t i = 0; i < result_blobs.size(); ++i) { + PAIMON_ASSIGN_OR_RAISE(auto result_stream, result_blobs[i]->NewInputStream(fs)); + PAIMON_ASSIGN_OR_RAISE(auto expected_stream, expected_blobs[i]->NewInputStream(fs)); + + PAIMON_ASSIGN_OR_RAISE(uint64_t result_length, result_stream->Length()); + PAIMON_ASSIGN_OR_RAISE(uint64_t expected_length, expected_stream->Length()); + if (result_length != expected_length) { + auto result_descriptor_bytes = result_blobs[i]->ToDescriptor(GetDefaultPool()); + auto expected_descriptor_bytes = expected_blobs[i]->ToDescriptor(GetDefaultPool()); + PAIMON_ASSIGN_OR_RAISE( + auto result_descriptor, + BlobDescriptor::Deserialize(result_descriptor_bytes->data(), + result_descriptor_bytes->size())); + PAIMON_ASSIGN_OR_RAISE( + auto expected_descriptor, + BlobDescriptor::Deserialize(expected_descriptor_bytes->data(), + expected_descriptor_bytes->size())); + std::cout << "blobs[" << i << "]: " << std::endl; + std::cout << "[result_length(" << result_length << ") != expected_length(" + << expected_length << ")]" << std::endl; + std::cout << "[result_descriptor]: " << result_descriptor->ToString() << std::endl; + std::cout << "[expected_descriptor]: " << expected_descriptor->ToString() + << std::endl; + return false; + } + + std::vector result_bytes(result_length, 0); + std::vector expected_bytes(expected_length, 0); + PAIMON_RETURN_NOT_OK(result_stream->Read(result_bytes.data(), result_length)); + PAIMON_RETURN_NOT_OK(expected_stream->Read(expected_bytes.data(), expected_length)); + if (result_bytes != expected_bytes) { + std::cout << "blobs[" << i << "]: " << std::endl; + std::cout << "[result_bytes != expected_bytes]" << std::endl; + return false; + } + } + return true; + } + + static Result>> ToBlobs( + const std::shared_ptr& blob_struct_array) { + std::vector> result_blobs; + auto child_array = blob_struct_array->field(0); + assert(blob_struct_array->num_fields() == 1); + assert(blob_struct_array->null_count() == 0); + assert(child_array->null_count() == 0); + assert(child_array->type_id() == arrow::Type::type::LARGE_BINARY); + + const auto& blob_array = + arrow::internal::checked_cast(*child_array); + for (int64_t i = 0; i < blob_array.length(); ++i) { + std::string_view descriptor = blob_array.GetView(i); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr blob, + Blob::FromDescriptor(descriptor.data(), descriptor.size())); + result_blobs.emplace_back(blob); + } + return result_blobs; + } + + // need to reconstruct the blob array, because the array in read result do not have blob meta + Result> ReconstructBlobArray( + const std::shared_ptr& array, const std::shared_ptr& schema) { + ::ArrowArray c_array; + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, &c_array)); + ::ArrowSchema new_c_schema; + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema, &new_c_schema)); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(auto new_array, + arrow::ImportArray(&c_array, &new_c_schema)); + return new_array; + } + + Result ReadAndCheckResultForBlobTable( + const std::shared_ptr& all_columns_schema, + const std::vector>& splits, const std::string& main_expected_json, + const std::vector>& expected_blob_descriptors) { + ReadContextBuilder read_context_builder(table_path_); + read_context_builder.SetOptions(options_); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr read_context, + read_context_builder.Finish()); + PAIMON_ASSIGN_OR_RAISE(auto table_read, TableRead::Create(std::move(read_context))); + PAIMON_ASSIGN_OR_RAISE(auto batch_reader, table_read->CreateReader(splits)); + PAIMON_ASSIGN_OR_RAISE(auto read_result, + ReadResultCollector::CollectResult(batch_reader.get())); + + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(auto concat_array, + arrow::Concatenate(read_result->chunks())); + PAIMON_ASSIGN_OR_RAISE(auto reconstruct_array, + ReconstructBlobArray(concat_array, all_columns_schema)); + PAIMON_ASSIGN_OR_RAISE( + auto separated_array, + BlobUtils::SeparateBlobArray( + std::dynamic_pointer_cast(reconstruct_array))); + + arrow::EqualOptions equal_options = arrow::EqualOptions::Defaults(); + + // check main columns + auto separated_schema = BlobUtils::SeparateBlobSchema(all_columns_schema); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + auto main_expected_array, + arrow::ipc::internal::json::ArrayFromJSON( + arrow::struct_(separated_schema.main_schema->fields()), main_expected_json)); + auto main_expected_chunk_array = std::make_shared(main_expected_array); + bool main_equal = main_expected_chunk_array->Equals( + arrow::ChunkedArray(separated_array.main_array), equal_options.diff_sink(&std::cout)); + if (!main_equal) { + std::cout << "[expected_data_type]" << main_expected_chunk_array->type()->ToString() + << std::endl; + std::cout << "[actual_data_type]" << separated_array.main_array->type()->ToString() + << std::endl; + std::cout << "[expected]:" << main_expected_chunk_array->ToString() << std::endl; + std::cout << "[actual]: " << separated_array.main_array->ToString() << std::endl; + } + + // check blob column + std::vector> expected_blobs; + for (const auto& descriptor : expected_blob_descriptors) { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr blob, + Blob::FromDescriptor(descriptor->data(), descriptor->size())); + expected_blobs.emplace_back(blob); + } + PAIMON_ASSIGN_OR_RAISE(auto result_blobs, ToBlobs(separated_array.blob_array)); + PAIMON_ASSIGN_OR_RAISE(bool blob_equal, CheckBlobsEqual(result_blobs, expected_blobs, fs_)); + + table_read.reset(); + return main_equal && blob_equal; + } + + Result ReadAndCheckResult(const std::shared_ptr& data_type, + const std::vector>& splits, + const std::string& expected_result) { + ReadContextBuilder read_context_builder(table_path_); + read_context_builder.SetOptions(options_); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr read_context, + read_context_builder.Finish()); + PAIMON_ASSIGN_OR_RAISE(auto table_read, TableRead::Create(std::move(read_context))); + PAIMON_ASSIGN_OR_RAISE(auto batch_reader, table_read->CreateReader(splits)); + PAIMON_ASSIGN_OR_RAISE(auto read_result, + ReadResultCollector::CollectResult(batch_reader.get())); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + auto expected_array, + arrow::ipc::internal::json::ArrayFromJSON(data_type, expected_result)); + auto expected_chunk_array = std::make_shared(expected_array); + + arrow::EqualOptions equal_options = arrow::EqualOptions::Defaults(); + bool is_equal = + expected_chunk_array->Equals(read_result, equal_options.diff_sink(&std::cout)); + + if (!is_equal) { + std::cout << "[expected_data_type]" << expected_chunk_array->type()->ToString() + << std::endl; + std::cout << "[actual_data_type]" << read_result->type()->ToString() << std::endl; + std::cout << "[expected]:" << expected_chunk_array->ToString() << std::endl; + std::cout << "[actual]: " << read_result->ToString() << std::endl; + } + table_read.reset(); + return is_equal; + } + + Result> LatestSnapshot() const { + auto commit_impl = dynamic_cast(commit_.get()); + return commit_impl->snapshot_manager_->LatestSnapshotOfUser(commit_user_); + } + + Result>> LatestSchema() const { + auto commit_impl = dynamic_cast(commit_.get()); + return commit_impl->schema_manager_->Latest(); + } + + Result PartitionStr(const BinaryRow& partition) const { + auto abstract_write = dynamic_cast(write_.get()); + return abstract_write->file_store_path_factory_->GetPartitionString(partition); + } + + static void CheckCommitMessages(std::vector> expected, + std::vector> actual) { + ASSERT_EQ(expected.size(), actual.size()); + auto commit_message_sort_function = [](const std::shared_ptr& lhs, + const std::shared_ptr& rhs) -> bool { + auto lhs_impl = std::dynamic_pointer_cast(lhs); + auto rhs_impl = std::dynamic_pointer_cast(rhs); + if (lhs_impl->Partition() == rhs_impl->Partition()) { + return lhs_impl->Bucket() < rhs_impl->Bucket(); + } + return lhs_impl->Partition().HashCode() < rhs_impl->Partition().HashCode(); + }; + std::stable_sort(expected.begin(), expected.end(), commit_message_sort_function); + std::stable_sort(actual.begin(), actual.end(), commit_message_sort_function); + for (size_t i = 0; i < actual.size(); i++) { + auto result_impl = std::dynamic_pointer_cast(actual[i]); + auto expect_impl = std::dynamic_pointer_cast(expected[i]); + bool is_equal = result_impl->TEST_Equal(*expect_impl); + if (!is_equal) { + std::cout << "actual_msg:" << result_impl->ToString() << std::endl; + std::cout << "expect_msg:" << expect_impl->ToString() << std::endl; + } + ASSERT_TRUE(is_equal); + } + } + + static int64_t CountChannelFiles(const std::shared_ptr& file_system, + const std::string& tmp_path) { + std::vector> dir_statuses; + EXPECT_OK(file_system->ListDir(tmp_path, &dir_statuses)); + + int64_t channel_file_count = 0; + for (const auto& dir_status : dir_statuses) { + const std::string dir_path = dir_status->GetPath(); + if (dir_status->IsDir() && dir_path.find("paimon-io-") != std::string::npos) { + std::vector> file_statuses; + EXPECT_OK(file_system->ListDir(dir_path, &file_statuses)); + + for (const auto& file_status : file_statuses) { + if (StringUtils::EndsWith(file_status->GetPath(), ".channel")) { + ++channel_file_count; + } + } + } + } + return channel_file_count; + } + + private: + void CheckExternalPath(const std::vector>& actuals) { + for (const auto& actual : actuals) { + auto msg = std::dynamic_pointer_cast(actual); + auto files = msg->GetNewFilesIncrement().NewFiles(); + for (auto& file : files) { + if (file->external_path) { + ASSERT_OK_AND_ASSIGN(bool file_exist, fs_->Exists(file->external_path.value())); + ASSERT_TRUE(file_exist); + } + } + } + } + + TestHelper(std::unique_ptr fs, std::unique_ptr write, + std::unique_ptr commit, const std::string& commit_user, + const std::string& table_path, const std::map& options) + : fs_(std::move(fs)), + write_(std::move(write)), + commit_(std::move(commit)), + commit_user_(commit_user), + table_path_(table_path), + options_(options) {} + + const std::string& CommitUser() const { + return commit_user_; + } + + std::shared_ptr fs_; + std::unique_ptr write_; + std::unique_ptr commit_; + std::unique_ptr scan_; + std::string commit_user_; + std::string table_path_; + std::map options_; +}; + +} // namespace paimon::test diff --git a/src/paimon/testing/utils/timezone_guard.h b/src/paimon/testing/utils/timezone_guard.h index 2b88532..408580b 100644 --- a/src/paimon/testing/utils/timezone_guard.h +++ b/src/paimon/testing/utils/timezone_guard.h @@ -7,14 +7,13 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ #pragma once