From 4daa7691de66d1cc8f111408841c9876ee149e95 Mon Sep 17 00:00:00 2001 From: lxy264173 Date: Thu, 25 Jun 2026 15:10:42 +0800 Subject: [PATCH] feat: Migrate global index scan and file meta utilities --- include/paimon/migrate/file_meta_utils.h | 73 +++ .../global_index/global_index_evaluator.h | 46 ++ .../global_index_evaluator_impl.cpp | 146 +++++ .../global_index_evaluator_impl.h | 67 +++ .../global_index/global_index_file_manager.h | 79 +++ .../core/global_index/global_index_scan.cpp | 121 ++++ .../global_index/global_index_scan_impl.cpp | 212 +++++++ .../global_index/global_index_scan_impl.h | 89 +++ .../global_index/global_index_write_task.cpp | 231 ++++++++ .../core/global_index/indexed_split_impl.h | 118 ++++ .../core/global_index/indexed_split_test.cpp | 199 +++++++ src/paimon/core/migrate/file_meta_utils.cpp | 187 ++++++ .../core/migrate/file_meta_utils_test.cpp | 542 ++++++++++++++++++ 13 files changed, 2110 insertions(+) create mode 100644 include/paimon/migrate/file_meta_utils.h create mode 100644 src/paimon/core/global_index/global_index_evaluator.h create mode 100644 src/paimon/core/global_index/global_index_evaluator_impl.cpp create mode 100644 src/paimon/core/global_index/global_index_evaluator_impl.h create mode 100644 src/paimon/core/global_index/global_index_file_manager.h create mode 100644 src/paimon/core/global_index/global_index_scan.cpp create mode 100644 src/paimon/core/global_index/global_index_scan_impl.cpp create mode 100644 src/paimon/core/global_index/global_index_scan_impl.h create mode 100644 src/paimon/core/global_index/global_index_write_task.cpp create mode 100644 src/paimon/core/global_index/indexed_split_impl.h create mode 100644 src/paimon/core/global_index/indexed_split_test.cpp create mode 100644 src/paimon/core/migrate/file_meta_utils.cpp create mode 100644 src/paimon/core/migrate/file_meta_utils_test.cpp diff --git a/include/paimon/migrate/file_meta_utils.h b/include/paimon/migrate/file_meta_utils.h new file mode 100644 index 0000000..a61e2a4 --- /dev/null +++ b/include/paimon/migrate/file_meta_utils.h @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "paimon/commit_message.h" +#include "paimon/result.h" +#include "paimon/visibility.h" + +namespace paimon { + +/// Utility class for handling file metadata operations during data migration. +/// +/// This class provides static utility functions for migrating external data files into Paimon +/// tables. It handles the generation of commit messages from source data files, enabling seamless +/// integration of existing data into Paimon's file store architecture. +/// +/// @note This utility currently does not support: +/// - object store file systems +/// - primary-key tables +/// +/// @warning This utility will move/rename source data files to destination paths during migration. +/// It is recommended to back up your data in advance before using this utility to avoid data loss. +class PAIMON_EXPORT FileMetaUtils { + public: + FileMetaUtils() = delete; + ~FileMetaUtils() = delete; + + /// Generate a commit message for migrating external data files into a Paimon table. + /// + /// This method analyzes the provided source data files and generates a commit message that can + /// be used to incorporate these files into the target Paimon table. + /// + /// @param src_data_files Vector of paths to source data files to be migrated. + /// **These files must have the same schema as the target Paimon table**. + /// @param dst_table_path Path to the destination Paimon table directory. + /// @param partition_values Map of partition column names to their values for partitioned + /// tables. Use empty map for non-partitioned tables. + /// @param options Set a configuration options map to set some option entries which are not + /// defined in the table schema or whose values you want to overwrite. + /// @param file_system Specifies the file system for file operations. + /// If `nullptr`, use default file system. + /// @return Result containing a unique pointer to the generated `CommitMessage`, + /// or an error status if the migration cannot be performed. + static Result> GenerateCommitMessage( + const std::vector& src_data_files, const std::string& dst_table_path, + const std::map& partition_values, + const std::map& options, + const std::shared_ptr& file_system = nullptr); +}; + +} // namespace paimon diff --git a/src/paimon/core/global_index/global_index_evaluator.h b/src/paimon/core/global_index/global_index_evaluator.h new file mode 100644 index 0000000..b1d2d07 --- /dev/null +++ b/src/paimon/core/global_index/global_index_evaluator.h @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/global_index/global_index_result.h" +#include "paimon/predicate/predicate.h" +#include "paimon/visibility.h" + +namespace paimon { +/// Abstract base class for evaluating predicates against a global index. +class PAIMON_EXPORT GlobalIndexEvaluator { + public: + virtual ~GlobalIndexEvaluator() = default; + /// Evaluates a predicate against the global index. + /// + /// @param predicate The filter predicate to evaluate. + /// @return A `Result` containing: + /// - `nullptr` if the predicate cannot be evaluated by this index (e.g., field has + /// no index), + /// - A `std::shared_ptr` if evaluation succeeds. + /// The `GlobalIndexResult` indicates the matching rows (e.g., via row ID bitmaps). + virtual Result> Evaluate( + const std::shared_ptr& predicate) = 0; +}; + +} // namespace paimon diff --git a/src/paimon/core/global_index/global_index_evaluator_impl.cpp b/src/paimon/core/global_index/global_index_evaluator_impl.cpp new file mode 100644 index 0000000..5c19381 --- /dev/null +++ b/src/paimon/core/global_index/global_index_evaluator_impl.cpp @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/global_index/global_index_evaluator_impl.h" + +#include "fmt/format.h" +#include "paimon/global_index/bitmap_global_index_result.h" +#include "paimon/predicate/leaf_predicate.h" +#include "paimon/predicate/predicate_utils.h" + +namespace paimon { +Result> GlobalIndexEvaluatorImpl::Evaluate( + const std::shared_ptr& predicate) { + std::shared_ptr compound_result; + if (predicate) { + PAIMON_ASSIGN_OR_RAISE(compound_result, EvaluatePredicate(predicate)); + } + return compound_result; +} + +Result>> GlobalIndexEvaluatorImpl::GetIndexReaders( + const std::string& field_name) { + PAIMON_ASSIGN_OR_RAISE(DataField data_field, table_schema_->GetField(field_name)); + int32_t field_id = data_field.Id(); + // get or create global index readers for current field + std::vector> readers; + auto iter = index_readers_cache_.find(field_id); + if (iter != index_readers_cache_.end()) { + readers = iter->second; + } else { + PAIMON_ASSIGN_OR_RAISE(readers, create_index_readers_(field_id)); + index_readers_cache_.insert({field_id, readers}); + } + return readers; +} + +Result> GlobalIndexEvaluatorImpl::EvaluatePredicate( + const std::shared_ptr& predicate) { + if (predicate == nullptr) { + return std::shared_ptr(nullptr); + } + + if (auto compound_predicate = std::dynamic_pointer_cast(predicate)) { + return EvaluateCompoundPredicate(compound_predicate); + } else if (auto leaf_predicate = std::dynamic_pointer_cast(predicate)) { + const std::string& field_name = leaf_predicate->FieldName(); + PAIMON_ASSIGN_OR_RAISE(std::vector> readers, + GetIndexReaders(field_name)); + if (readers.empty()) { + // No usable index for this field within the requested range. Treat as "no + // pushdown available" so the upstream falls back to a full scan instead of + // wrongly producing an empty result. + return std::shared_ptr(nullptr); + } + // calculate compound result as field may has multiple indexes + std::shared_ptr compound_result; + for (const auto& index_reader : readers) { + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr sub_result, + PredicateUtils::VisitPredicate>(leaf_predicate, + index_reader)); + if (sub_result) { + if (!compound_result) { + compound_result = sub_result; + } else { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr and_result, + compound_result->And(sub_result)); + compound_result = and_result; + } + } + if (compound_result) { + PAIMON_ASSIGN_OR_RAISE(bool is_empty, compound_result->IsEmpty()); + if (is_empty) { + return compound_result; + } + } + } + return compound_result; + } + return Status::Invalid(fmt::format( + "cannot cast predicate {} to CompoundPredicate or LeafPredicate", predicate->ToString())); +} + +Result> GlobalIndexEvaluatorImpl::EvaluateCompoundPredicate( + const std::shared_ptr& compound_predicate) { + if (compound_predicate->GetFunction().GetType() == Function::Type::OR) { + std::shared_ptr compound_result; + for (const auto& child : compound_predicate->Children()) { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr sub_result, + EvaluatePredicate(child)); + if (!sub_result) { + return std::shared_ptr(nullptr); + } + if (!compound_result) { + compound_result = sub_result; + } else { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr or_result, + compound_result->Or(sub_result)); + compound_result = or_result; + } + } + return compound_result; + } else if (compound_predicate->GetFunction().GetType() == Function::Type::AND) { + std::shared_ptr compound_result; + for (const auto& child : compound_predicate->Children()) { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr sub_result, + EvaluatePredicate(child)); + if (sub_result) { + if (!compound_result) { + compound_result = sub_result; + } else { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr and_result, + compound_result->And(sub_result)); + compound_result = and_result; + } + } + + if (compound_result) { + PAIMON_ASSIGN_OR_RAISE(bool is_empty, compound_result->IsEmpty()); + if (is_empty) { + return compound_result; + } + } + } + return compound_result; + } + return Status::Invalid("CompoundPredicate only support And/Or function"); +} + +} // namespace paimon diff --git a/src/paimon/core/global_index/global_index_evaluator_impl.h b/src/paimon/core/global_index/global_index_evaluator_impl.h new file mode 100644 index 0000000..7555a71 --- /dev/null +++ b/src/paimon/core/global_index/global_index_evaluator_impl.h @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once +#include +#include +#include +#include +#include +#include +#include + +#include "paimon/core/global_index/global_index_evaluator.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/global_index/global_index_reader.h" +#include "paimon/predicate/compound_predicate.h" + +namespace paimon { +class GlobalIndexEvaluatorImpl : public GlobalIndexEvaluator { + public: + /// Creates the underlying readers for the given field. Returns an empty vector when the field + /// has no usable index. + using IndexReadersCreator = + std::function>>(int32_t)>; + + GlobalIndexEvaluatorImpl(const std::shared_ptr& table_schema, + IndexReadersCreator create_index_readers) + : table_schema_(table_schema), create_index_readers_(std::move(create_index_readers)) {} + + Result> Evaluate( + const std::shared_ptr& predicate) override; + + private: + Result> EvaluatePredicate( + const std::shared_ptr& predicate); + + Result> EvaluateCompoundPredicate( + const std::shared_ptr& compound_predicate); + + Result>> GetIndexReaders( + const std::string& field_name); + + private: + std::shared_ptr table_schema_; + // create_index_readers_(field_id) + IndexReadersCreator create_index_readers_; + // [field_id, vector] + std::map>> index_readers_cache_; +}; + +} // namespace paimon diff --git a/src/paimon/core/global_index/global_index_file_manager.h b/src/paimon/core/global_index/global_index_file_manager.h new file mode 100644 index 0000000..584be05 --- /dev/null +++ b/src/paimon/core/global_index/global_index_file_manager.h @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/common/utils/uuid.h" +#include "paimon/core/index/index_path_factory.h" +#include "paimon/fs/file_system.h" +#include "paimon/global_index/io/global_index_file_reader.h" +#include "paimon/global_index/io/global_index_file_writer.h" + +namespace paimon { +/// Helper class for managing global index files. +class GlobalIndexFileManager : public GlobalIndexFileReader, public GlobalIndexFileWriter { + public: + GlobalIndexFileManager(const std::shared_ptr& fs, + const std::shared_ptr& path_factory) + : fs_(fs), path_factory_(path_factory) {} + + Result> GetInputStream( + const std::string& file_path) const override { + return fs_->Open(file_path); + } + + Result NewFileName(const std::string& prefix) const override { + std::string uuid; + if (PAIMON_UNLIKELY(!UUID::Generate(&uuid))) { + return Status::Invalid("fail to generate uuid for global index file manager"); + } + return prefix + "-" + "global-index-" + uuid + ".index"; + } + + std::string ToPath(const std::string& file_name) const override { + return path_factory_->ToPath(file_name); + } + + std::string ToPath(const std::shared_ptr& file) const { + return path_factory_->ToPath(file); + } + + Result> NewOutputStream( + const std::string& file_name) const override { + return fs_->Create(ToPath(file_name), /*overwrite=*/false); + } + + Result GetFileSize(const std::string& file_name) const override { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr file_status, + fs_->GetFileStatus(ToPath(file_name))); + return file_status->GetLen(); + } + + bool IsExternalPath() const { + return path_factory_->IsExternalPath(); + } + + private: + std::shared_ptr fs_; + std::shared_ptr path_factory_; +}; +} // namespace paimon diff --git a/src/paimon/core/global_index/global_index_scan.cpp b/src/paimon/core/global_index/global_index_scan.cpp new file mode 100644 index 0000000..457c3a1 --- /dev/null +++ b/src/paimon/core/global_index/global_index_scan.cpp @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/global_index/global_index_scan.h" + +#include "paimon/core/core_options.h" +#include "paimon/core/global_index/global_index_scan_impl.h" +#include "paimon/core/operation/file_store_scan.h" +#include "paimon/core/schema/schema_manager.h" +#include "paimon/core/utils/file_store_path_factory.h" +#include "paimon/core/utils/snapshot_manager.h" + +namespace paimon { +namespace { +Result> LoadSchema(const std::string& root_path, + const std::map& options, + const std::shared_ptr& file_system) { + PAIMON_ASSIGN_OR_RAISE(CoreOptions tmp_options, CoreOptions::FromMap(options, file_system)); + SchemaManager schema_manager(tmp_options.GetFileSystem(), root_path); + PAIMON_ASSIGN_OR_RAISE(std::optional> latest_table_schema, + schema_manager.Latest()); + if (latest_table_schema == std::nullopt) { + return Status::Invalid("not found latest schema"); + } + return latest_table_schema.value(); +} + +Result MergeOptions(const std::shared_ptr& table_schema, + const std::map& options, + const std::shared_ptr& file_system) { + auto final_options = table_schema->Options(); + for (const auto& [key, value] : options) { + final_options[key] = value; + } + return CoreOptions::FromMap(final_options, file_system); +} + +Result LoadSnapshot(const std::string& root_path, + const std::optional& snapshot_id, + const CoreOptions& core_options) { + std::optional snapshot; + SnapshotManager snapshot_manager(core_options.GetFileSystem(), root_path); + if (snapshot_id) { + PAIMON_ASSIGN_OR_RAISE(snapshot, snapshot_manager.LoadSnapshot(snapshot_id.value())); + } else { + PAIMON_ASSIGN_OR_RAISE(snapshot, snapshot_manager.LatestSnapshot()); + } + if (!snapshot) { + return Status::Invalid("not found latest snapshot"); + } + return snapshot.value(); +} +} // namespace + +Result> GlobalIndexScan::Create( + const std::string& root_path, const std::optional& snapshot_id, + const std::optional>>& partitions, + const std::map& options, + const std::shared_ptr& file_system, const std::shared_ptr& executor, + const std::shared_ptr& memory_pool) { + if (partitions && partitions.value().empty()) { + return Status::Invalid( + "invalid input partition, supposed to be null or at least one partition"); + } + std::shared_ptr pool = memory_pool ? memory_pool : GetDefaultPool(); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr table_schema, + LoadSchema(root_path, options, file_system)); + PAIMON_ASSIGN_OR_RAISE(CoreOptions core_options, + MergeOptions(table_schema, options, file_system)); + std::shared_ptr partition_filters; + if (partitions) { + auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(table_schema->Fields()); + PAIMON_ASSIGN_OR_RAISE(partition_filters, FileStoreScan::CreatePartitionPredicate( + table_schema->PartitionKeys(), + core_options.GetPartitionDefaultName(), + arrow_schema, partitions.value())); + } + PAIMON_ASSIGN_OR_RAISE(Snapshot snapshot, LoadSnapshot(root_path, snapshot_id, core_options)); + return GlobalIndexScanImpl::Create(root_path, table_schema, snapshot, partition_filters, + core_options, executor, pool); +} + +Result> GlobalIndexScan::Create( + const std::string& root_path, const std::optional& snapshot_id, + const std::shared_ptr& partitions, const std::map& options, + const std::shared_ptr& file_system, const std::shared_ptr& executor, + const std::shared_ptr& memory_pool) { + std::shared_ptr partition_filters; + if (partitions) { + partition_filters = std::dynamic_pointer_cast(partitions); + if (!partition_filters) { + return Status::Invalid("partition filters cannot cast to PredicateFilter"); + } + } + std::shared_ptr pool = memory_pool ? memory_pool : GetDefaultPool(); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr table_schema, + LoadSchema(root_path, options, file_system)); + PAIMON_ASSIGN_OR_RAISE(CoreOptions core_options, + MergeOptions(table_schema, options, file_system)); + PAIMON_ASSIGN_OR_RAISE(Snapshot snapshot, LoadSnapshot(root_path, snapshot_id, core_options)); + return GlobalIndexScanImpl::Create(root_path, table_schema, snapshot, partition_filters, + core_options, executor, pool); +} + +} // namespace paimon diff --git a/src/paimon/core/global_index/global_index_scan_impl.cpp b/src/paimon/core/global_index/global_index_scan_impl.cpp new file mode 100644 index 0000000..40820a2 --- /dev/null +++ b/src/paimon/core/global_index/global_index_scan_impl.cpp @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "paimon/core/global_index/global_index_scan_impl.h" + +#include +#include +#include + +#include "arrow/c/bridge.h" +#include "paimon/common/global_index/offset_global_index_reader.h" +#include "paimon/common/global_index/union_global_index_reader.h" +#include "paimon/common/utils/scope_guard.h" +#include "paimon/core/global_index/global_index_evaluator_impl.h" +#include "paimon/core/index/index_file_handler.h" +#include "paimon/global_index/bitmap_global_index_result.h" +#include "paimon/global_index/global_indexer.h" +#include "paimon/global_index/global_indexer_factory.h" + +namespace paimon { +GlobalIndexScanImpl::GlobalIndexScanImpl(const std::shared_ptr& table_schema, + const CoreOptions& options, + const std::shared_ptr& path_factory, + IndexMetaMap&& index_metas, + const std::shared_ptr& executor, + const std::shared_ptr& pool) + : pool_(pool), + table_schema_(table_schema), + options_(options), + index_file_manager_( + std::make_shared(options.GetFileSystem(), path_factory)), + index_metas_(std::move(index_metas)), + executor_(executor) {} + +Result> GlobalIndexScanImpl::Create( + const std::string& root_path, const std::shared_ptr& table_schema, + const Snapshot& snapshot, const std::shared_ptr& partitions, + const CoreOptions& options, const std::shared_ptr& executor, + const std::shared_ptr& pool) { + auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(table_schema->Fields()); + PAIMON_ASSIGN_OR_RAISE(std::vector external_paths, options.CreateExternalPaths()); + PAIMON_ASSIGN_OR_RAISE(std::optional global_index_external_path, + options.CreateGlobalIndexExternalPath()); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr file_store_path_factory, + FileStorePathFactory::Create( + root_path, arrow_schema, table_schema->PartitionKeys(), + options.GetPartitionDefaultName(), options.GetFileFormat()->Identifier(), + options.DataFilePrefix(), options.LegacyPartitionNameEnabled(), external_paths, + global_index_external_path, options.IndexFileInDataFileDir(), pool)); + std::shared_ptr path_factory = + file_store_path_factory->CreateGlobalIndexFileFactory(); + + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr index_manifest_file, + IndexManifestFile::Create(options.GetFileSystem(), options.GetManifestFormat(), + options.GetManifestCompression(), file_store_path_factory, + options.GetBucket(), pool, options)); + auto index_file_handler = std::make_unique( + options.GetFileSystem(), std::move(index_manifest_file), + std::make_shared(file_store_path_factory), + options.DeletionVectorsBitmap64(), pool); + + PAIMON_ASSIGN_OR_RAISE(std::vector partition_fields, + table_schema->GetFields(table_schema->PartitionKeys())); + auto partition_schema = DataField::ConvertDataFieldsToArrowSchema(partition_fields); + std::function(const IndexManifestEntry&)> filter = + [&](const IndexManifestEntry& entry) -> Result { + if (partitions) { + PAIMON_ASSIGN_OR_RAISE(bool saved, partitions->Test(partition_schema, entry.partition)); + if (!saved) { + return false; + } + } + if (!entry.index_file->GetGlobalIndexMeta()) { + return false; + } + return true; + }; + PAIMON_ASSIGN_OR_RAISE(std::vector entries, + index_file_handler->Scan(snapshot, filter)); + IndexMetaMap index_metas; + for (const auto& entry : entries) { + auto index_file_meta = entry.index_file; + const auto& index_meta = index_file_meta->GetGlobalIndexMeta(); + assert(index_meta); + Range range(index_meta->row_range_start, index_meta->row_range_end); + index_metas[index_meta->index_field_id][index_file_meta->IndexType()][range].push_back( + index_file_meta); + } + auto final_executor = executor; + if (!final_executor) { + std::optional thread_num = options.GetGlobalIndexThreadNum(); + if (!thread_num) { + uint32_t cpu_count = std::thread::hardware_concurrency(); + thread_num = cpu_count > 0 ? static_cast(cpu_count) : 1; + } + final_executor = CreateDefaultExecutor(static_cast(thread_num.value())); + } + return std::unique_ptr(new GlobalIndexScanImpl( + table_schema, options, path_factory, std::move(index_metas), final_executor, pool)); +} + +Result> GlobalIndexScanImpl::GetOrCreateIndexEvaluator() { + if (evaluator_) { + return evaluator_; + } + GlobalIndexEvaluatorImpl::IndexReadersCreator create_index_readers = + [this](int32_t field_id) -> Result>> { + return CreateReaders(field_id, /*row_range_index=*/std::nullopt); + }; + evaluator_ = std::make_shared(table_schema_, create_index_readers); + return evaluator_; +} + +Result>> GlobalIndexScanImpl::CreateReaders( + int32_t field_id, const std::optional& row_range_index) const { + PAIMON_ASSIGN_OR_RAISE(DataField field, table_schema_->GetField(field_id)); + return CreateReaders(field, row_range_index); +} + +Result>> GlobalIndexScanImpl::CreateReaders( + const std::string& field_name, const std::optional& row_range_index) const { + PAIMON_ASSIGN_OR_RAISE(DataField field, table_schema_->GetField(field_name)); + return CreateReaders(field, row_range_index); +} + +Result>> GlobalIndexScanImpl::CreateReaders( + const DataField& field, const std::optional& row_range_index) const { + auto field_iter = index_metas_.find(field.Id()); + if (field_iter == index_metas_.end()) { + return std::vector>(); + } + const auto& index_type_to_metas = field_iter->second; + std::vector> readers; + readers.reserve(index_type_to_metas.size()); + for (const auto& [index_type, range_to_metas] : index_type_to_metas) { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr indexer, + GlobalIndexerFactory::Get(index_type, options_.ToMap())); + if (!indexer) { + continue; + } + std::vector> union_readers; + union_readers.reserve(range_to_metas.size()); + for (const auto& [range, metas] : range_to_metas) { + if (row_range_index && !row_range_index->Intersects(range.from, range.to)) { + continue; + } + // TODO(xinyu.lxy): c_arrow_schema may contains additional associated fields. + auto arrow_field = DataField::ConvertDataFieldToArrowField(field); + auto arrow_schema = arrow::schema({arrow_field}); + + ArrowSchema c_arrow_schema; + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*arrow_schema, &c_arrow_schema)); + auto index_io_metas = ToGlobalIndexIOMetas(metas); + ScopeGuard guard([&]() { ArrowSchemaRelease(&c_arrow_schema); }); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr index_reader, + indexer->CreateReader(&c_arrow_schema, index_file_manager_, index_io_metas, pool_)); + union_readers.push_back( + std::make_shared(std::move(index_reader), range.from)); + } + if (union_readers.empty()) { + continue; + } + readers.push_back( + std::make_shared(std::move(union_readers), executor_)); + } + return readers; +} + +std::vector GlobalIndexScanImpl::ToGlobalIndexIOMetas( + const std::vector>& metas) const { + std::vector index_io_metas; + index_io_metas.reserve(metas.size()); + for (const auto& meta : metas) { + index_io_metas.push_back(ToGlobalIndexIOMeta(meta)); + } + return index_io_metas; +} + +GlobalIndexIOMeta GlobalIndexScanImpl::ToGlobalIndexIOMeta( + const std::shared_ptr& index_meta) const { + assert(index_meta->GetGlobalIndexMeta()); + const auto& global_index_meta = index_meta->GetGlobalIndexMeta().value(); + return {index_file_manager_->ToPath(index_meta), index_meta->FileSize(), + global_index_meta.index_meta}; +} + +Result> GlobalIndexScanImpl::Scan( + const std::shared_ptr& predicate) { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr evaluator, + GetOrCreateIndexEvaluator()); + return evaluator->Evaluate(predicate); +} + +} // namespace paimon diff --git a/src/paimon/core/global_index/global_index_scan_impl.h b/src/paimon/core/global_index/global_index_scan_impl.h new file mode 100644 index 0000000..d5f10d0 --- /dev/null +++ b/src/paimon/core/global_index/global_index_scan_impl.h @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "paimon/common/predicate/predicate_filter.h" +#include "paimon/core/core_options.h" +#include "paimon/core/global_index/global_index_evaluator.h" +#include "paimon/core/global_index/global_index_file_manager.h" +#include "paimon/core/index/index_file_meta.h" +#include "paimon/core/index/index_path_factory.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/snapshot.h" +#include "paimon/global_index/global_index_io_meta.h" +#include "paimon/global_index/global_index_scan.h" + +namespace paimon { +class GlobalIndexScanImpl : public GlobalIndexScan { + public: + static Result> Create( + const std::string& root_path, const std::shared_ptr& table_schema, + const Snapshot& snapshot, const std::shared_ptr& partitions, + const CoreOptions& options, const std::shared_ptr& executor, + const std::shared_ptr& pool); + + Result> Scan(const std::shared_ptr& predicate); + + Result>> CreateReaders( + const std::string& field_name, + const std::optional& row_range_index) const override; + + Result>> CreateReaders( + int32_t field_id, const std::optional& row_range_index) const override; + + private: + /// (id->index_type->row_range) -> index meta list + using IndexMetaMap = + std::map>>>>; + + GlobalIndexScanImpl(const std::shared_ptr& table_schema, + const CoreOptions& options, + const std::shared_ptr& path_factory, + IndexMetaMap&& index_metas, const std::shared_ptr& executor, + const std::shared_ptr& pool); + + Result> GetOrCreateIndexEvaluator(); + + Result>> CreateReaders( + const DataField& field, const std::optional& row_range_index) const; + + std::vector ToGlobalIndexIOMetas( + const std::vector>& metas) const; + + GlobalIndexIOMeta ToGlobalIndexIOMeta(const std::shared_ptr& index_meta) const; + + private: + std::shared_ptr pool_; + std::string root_path_; + std::shared_ptr table_schema_; + CoreOptions options_; + std::shared_ptr index_file_manager_; + IndexMetaMap index_metas_; + std::shared_ptr executor_; + std::shared_ptr evaluator_; +}; + +} // namespace paimon diff --git a/src/paimon/core/global_index/global_index_write_task.cpp b/src/paimon/core/global_index/global_index_write_task.cpp new file mode 100644 index 0000000..1d2932f --- /dev/null +++ b/src/paimon/core/global_index/global_index_write_task.cpp @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/global_index/global_index_write_task.h" + +#include "arrow/c/bridge.h" +#include "paimon/common/table/special_fields.h" +#include "paimon/common/types/data_field.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/core/core_options.h" +#include "paimon/core/global_index/global_index_file_manager.h" +#include "paimon/core/io/data_increment.h" +#include "paimon/core/schema/schema_manager.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/table/sink/commit_message_impl.h" +#include "paimon/core/table/source/data_split_impl.h" +#include "paimon/core/utils/file_store_path_factory.h" +#include "paimon/global_index/global_indexer.h" +#include "paimon/global_index/global_indexer_factory.h" +#include "paimon/read_context.h" +#include "paimon/table/source/table_read.h" +namespace paimon { +namespace { +Result> CreateGlobalIndexFileManager( + const std::string& table_path, const std::shared_ptr& table_schema, + const CoreOptions& core_options, const std::shared_ptr& pool) { + auto all_arrow_schema = DataField::ConvertDataFieldsToArrowSchema(table_schema->Fields()); + PAIMON_ASSIGN_OR_RAISE(std::vector external_paths, + core_options.CreateExternalPaths()); + PAIMON_ASSIGN_OR_RAISE(std::optional global_index_external_path, + core_options.CreateGlobalIndexExternalPath()); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr path_factory, + FileStorePathFactory::Create( + table_path, all_arrow_schema, table_schema->PartitionKeys(), + core_options.GetPartitionDefaultName(), core_options.GetFileFormat()->Identifier(), + core_options.DataFilePrefix(), core_options.LegacyPartitionNameEnabled(), + external_paths, global_index_external_path, core_options.IndexFileInDataFileDir(), + pool)); + std::shared_ptr index_path_factory = + path_factory->CreateGlobalIndexFileFactory(); + return std::make_shared(core_options.GetFileSystem(), + index_path_factory); +} + +Result> CreateGlobalIndexWriter( + const std::string& index_type, const DataField& field, + const std::shared_ptr& index_file_manager, + const CoreOptions& core_options, const std::shared_ptr& pool) { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr indexer, + GlobalIndexerFactory::Get(index_type, core_options.ToMap())); + if (!indexer) { + return Status::Invalid( + fmt::format("Unknown index type {}, may not registered", index_type)); + } + // TODO(xinyu.lxy): may add additional fields to read for index write + auto arrow_field = DataField::ConvertDataFieldToArrowField(field); + auto arrow_schema = arrow::schema({arrow_field}); + ArrowSchema c_arrow_schema; + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*arrow_schema, &c_arrow_schema)); + return indexer->CreateWriter(field.Name(), &c_arrow_schema, index_file_manager, pool); +} + +Result> CreateBatchReader( + const std::string& table_path, const std::string& field_name, + const std::shared_ptr& indexed_split, const CoreOptions& core_options, + const std::shared_ptr& pool) { + ReadContextBuilder read_context_builder(table_path); + read_context_builder.SetOptions(core_options.ToMap()) + .WithFileSystem(core_options.GetFileSystem()) + .EnablePrefetch(true) + .WithMemoryPool(pool) + .SetReadSchema({field_name, SpecialFields::RowId().Name()}); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr read_context, + read_context_builder.Finish()); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr table_read, + TableRead::Create(std::move(read_context))); + return table_read->CreateReader(indexed_split); +} + +Result> BuildIndex(const std::string& field_name, const Range& range, + BatchReader* batch_reader, + GlobalIndexWriter* global_index_writer) { + while (true) { + PAIMON_ASSIGN_OR_RAISE(BatchReader::ReadBatch read_batch, batch_reader->NextBatch()); + if (BatchReader::IsEofBatch(read_batch)) { + break; + } + auto& [c_array, c_schema] = read_batch; + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr array, + arrow::ImportArray(c_array.get(), c_schema.get())); + auto struct_array = std::dynamic_pointer_cast(array); + if (!struct_array) { + return Status::Invalid( + "array read from batch reader is not a struct array in GlobalIndexWriteTask"); + } + auto indexed_array = struct_array->GetFieldByName(field_name); + if (!indexed_array) { + return Status::Invalid(fmt::format( + "read array does not contain {} field in GlobalIndexWriteTask", field_name)); + } + auto row_id_array = struct_array->GetFieldByName(SpecialFields::RowId().Name()); + auto typed_row_id_array = std::dynamic_pointer_cast(row_id_array); + if (!typed_row_id_array) { + return Status::Invalid( + fmt::format("read array does not contain {} field, or it cannot be casted to " + "Int64Array in GlobalIndexWriteTask", + SpecialFields::RowId().Name())); + } + std::vector relative_row_ids; + relative_row_ids.reserve(typed_row_id_array->length()); + for (int64_t i = 0; i < typed_row_id_array->length(); i++) { + int64_t row_id = typed_row_id_array->Value(i); + if (row_id < range.from || row_id > range.to) { + return Status::Invalid("invalid row id {}, out of range [{}, {}]", row_id, + range.from, range.to); + } + relative_row_ids.push_back(row_id - range.from); + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr new_array, + arrow::StructArray::Make({indexed_array}, {field_name})); + ::ArrowArray c_new_array; + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*new_array, &c_new_array)); + PAIMON_RETURN_NOT_OK( + global_index_writer->AddBatch(&c_new_array, std::move(relative_row_ids))); + } + return global_index_writer->Finish(); +} + +Result> ToCommitMessage( + const std::string& index_type, int32_t field_id, const Range& range, + const std::vector& global_index_io_metas, const BinaryRow& partition, + int32_t bucket, const std::shared_ptr& file_manager) { + std::vector> index_file_metas; + index_file_metas.reserve(global_index_io_metas.size()); + bool is_external_path = file_manager->IsExternalPath(); + for (const auto& io_meta : global_index_io_metas) { + std::optional external_path; + if (is_external_path) { + PAIMON_ASSIGN_OR_RAISE(Path path, PathUtil::ToPath(io_meta.file_path)); + external_path = path.ToString(); + } + index_file_metas.push_back(std::make_shared( + index_type, PathUtil::GetName(io_meta.file_path), io_meta.file_size, range.Count(), + /*dv_ranges=*/std::nullopt, external_path, + GlobalIndexMeta(range.from, range.to, field_id, + /*extra_field_ids=*/std::nullopt, io_meta.metadata))); + } + DataIncrement data_increment(std::move(index_file_metas)); + return std::make_shared(partition, bucket, + /*total_buckets=*/std::nullopt, data_increment, + CompactIncrement({}, {}, {})); +} +} // namespace +Result> GlobalIndexWriteTask::WriteIndex( + const std::string& table_path, const std::string& field_name, const std::string& index_type, + const std::shared_ptr& indexed_split, + const std::map& options, + const std::shared_ptr& memory_pool, + const std::shared_ptr& file_system) { + auto data_split = std::dynamic_pointer_cast(indexed_split->GetDataSplit()); + if (!data_split) { + return Status::Invalid("split cannot be casted to data split"); + } + const auto& ranges = indexed_split->RowRanges(); + if (ranges.size() != 1) { + return Status::Invalid("GlobalIndexWriteTask only supports a single contiguous range."); + } + const auto& range = ranges[0]; + std::shared_ptr pool = memory_pool ? memory_pool : GetDefaultPool(); + + // load schema + PAIMON_ASSIGN_OR_RAISE(CoreOptions tmp_options, CoreOptions::FromMap(options, file_system)); + SchemaManager schema_manager(tmp_options.GetFileSystem(), table_path); + PAIMON_ASSIGN_OR_RAISE(std::optional> latest_table_schema, + schema_manager.Latest()); + if (latest_table_schema == std::nullopt) { + return Status::Invalid("not found latest schema"); + } + // merge options + const auto& table_schema = latest_table_schema.value(); + auto final_options = table_schema->Options(); + for (const auto& [key, value] : options) { + final_options[key] = value; + } + PAIMON_ASSIGN_OR_RAISE(CoreOptions core_options, + CoreOptions::FromMap(final_options, file_system)); + + // create index file manager + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr index_file_manager, + CreateGlobalIndexFileManager(table_path, table_schema, core_options, pool)); + + // create global index writer + PAIMON_ASSIGN_OR_RAISE(DataField field, table_schema->GetField(field_name)); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr global_index_writer, + CreateGlobalIndexWriter(index_type, field, index_file_manager, core_options, pool)); + + // create batch reader + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr batch_reader, + CreateBatchReader(table_path, field_name, indexed_split, core_options, pool)); + + // read from data split and write to index writer + PAIMON_ASSIGN_OR_RAISE( + std::vector global_index_io_metas, + BuildIndex(field_name, range, batch_reader.get(), global_index_writer.get())); + + // generate commit message + return ToCommitMessage(index_type, field.Id(), range, global_index_io_metas, + data_split->Partition(), data_split->Bucket(), index_file_manager); +} + +} // namespace paimon diff --git a/src/paimon/core/global_index/indexed_split_impl.h b/src/paimon/core/global_index/indexed_split_impl.h new file mode 100644 index 0000000..e4844a7 --- /dev/null +++ b/src/paimon/core/global_index/indexed_split_impl.h @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "fmt/format.h" +#include "fmt/ranges.h" +#include "paimon/core/table/source/data_split_impl.h" +#include "paimon/global_index/indexed_split.h" + +namespace paimon { +class IndexedSplitImpl : public IndexedSplit { + public: + static constexpr int64_t MAGIC = -938472394838495695L; + static constexpr int32_t VERSION = 1; + + IndexedSplitImpl(const std::shared_ptr& data_split, + const std::vector& row_ranges, const std::vector& scores) + : data_split_(data_split), row_ranges_(row_ranges), scores_(scores) {} + IndexedSplitImpl(const std::shared_ptr& data_split, + const std::vector& row_ranges) + : IndexedSplitImpl(data_split, row_ranges, {}) {} + + std::shared_ptr GetDataSplit() const override { + return data_split_; + } + const std::vector& RowRanges() const override { + return row_ranges_; + } + const std::vector& Scores() const override { + return scores_; + } + + bool operator==(const IndexedSplitImpl& other) const { + if (this == &other) { + return true; + } + bool score_equal = + (scores_.size() == other.scores_.size()) && + std::equal(scores_.begin(), scores_.end(), other.scores_.begin(), + [](float left, float right) { return std::abs(left - right) <= kEpsilon; }); + return score_equal && *data_split_ == *(other.data_split_) && + row_ranges_ == other.row_ranges_; + } + + bool TEST_Equal(const IndexedSplitImpl& other) const { + if (this == &other) { + return true; + } + bool score_equal = + (scores_.size() == other.scores_.size()) && + std::equal(scores_.begin(), scores_.end(), other.scores_.begin(), + [](float left, float right) { return std::abs(left - right) <= kEpsilon; }); + + return score_equal && data_split_->TEST_Equal(*other.data_split_) && + row_ranges_ == other.row_ranges_; + } + + std::string ToString() const { + std::vector row_ranges_str_vec; + row_ranges_str_vec.reserve(row_ranges_.size()); + for (const auto& range : row_ranges_) { + row_ranges_str_vec.push_back(range.ToString()); + } + std::string row_ranges_str = fmt::format("[{}]", fmt::join(row_ranges_str_vec, ",")); + std::string scores_str = fmt::format("[{}]", fmt::join(scores_, ",")); + return fmt::format("IndexedSplit{{split={}, rowRanges={}, scores={}}}", + data_split_->ToString(), row_ranges_str, scores_str); + } + + Status Validate() const { + if ((row_ranges_.empty() && !data_split_->DataFiles().empty()) || + (!row_ranges_.empty() && data_split_->DataFiles().empty())) { + return Status::Invalid("Invalid IndexedSplit: row ranges mismatch data files."); + } + if (!scores_.empty()) { + size_t row_count = 0; + for (const auto& range : row_ranges_) { + row_count += range.Count(); + } + if (row_count != scores_.size()) { + return Status::Invalid("Scores length does not match row ranges in indexed split."); + } + } + return Status::OK(); + } + + private: + static constexpr float kEpsilon = 1e-5f; + + std::shared_ptr data_split_; + std::vector row_ranges_; + std::vector scores_; +}; +} // namespace paimon diff --git a/src/paimon/core/global_index/indexed_split_test.cpp b/src/paimon/core/global_index/indexed_split_test.cpp new file mode 100644 index 0000000..857f50b --- /dev/null +++ b/src/paimon/core/global_index/indexed_split_test.cpp @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/data/data_define.h" +#include "paimon/core/global_index/indexed_split_impl.h" +#include "paimon/core/table/source/data_split_impl.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +#include "paimon/testing/utils/binary_row_generator.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +TEST(IndexedSplitTest, TestSimple) { + std::string file_name = paimon::test::GetDataDir() + "/global_index/indexed_split-01"; + auto file_system = std::make_unique(); + + ASSERT_OK_AND_ASSIGN(auto input_stream, file_system->Open(file_name)); + std::vector split_bytes(input_stream->Length().value_or(0), 0); + + ASSERT_OK(input_stream->Read(split_bytes.data(), split_bytes.size())); + ASSERT_OK(input_stream->Close()); + + auto pool = GetDefaultPool(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr result, + Split::Deserialize(reinterpret_cast(split_bytes.data()), split_bytes.size(), pool)); + + auto result_indexed_split = std::dynamic_pointer_cast(result); + + auto meta1 = std::make_shared( + "file1.orc", 100l, 200l, BinaryRow::EmptyRow(), BinaryRow::EmptyRow(), + SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), 50l, 249l, 0, 0, + std::vector>(), Timestamp(1765535214349l, 0), 0, nullptr, + FileSource::Append(), std::nullopt, std::nullopt, 50l, std::nullopt); + auto meta2 = std::make_shared( + "file2.orc", 101l, 100l, BinaryRow::EmptyRow(), BinaryRow::EmptyRow(), + SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), 250l, 349l, 0, 0, + std::vector>(), Timestamp(1765535214349l, 0), 0, nullptr, + FileSource::Append(), std::nullopt, std::nullopt, 250l, std::nullopt); + auto meta3 = std::make_shared( + "file3.orc", 102l, 200l, BinaryRow::EmptyRow(), BinaryRow::EmptyRow(), + SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), 1000l, 1199l, 0, 0, + std::vector>(), Timestamp(1765535214349l, 0), 0, nullptr, + FileSource::Append(), std::nullopt, std::nullopt, 1000l, std::nullopt); + + DataSplitImpl::Builder builder( + /*partition=*/BinaryRow::EmptyRow(), + /*bucket=*/0, /*bucket_path=*/ + "data/test_table/bucket-0", + std::vector>({meta1, meta2, meta3})); + + auto expected_data_split = std::dynamic_pointer_cast( + builder.WithSnapshot(1).IsStreaming(false).RawConvertible(true).Build().value()); + + std::vector ranges = {Range(55, 56), Range(270, 270), Range(1001, 1002)}; + auto expected_indexed_split = std::make_shared(expected_data_split, ranges); + + ASSERT_EQ(*result_indexed_split, *expected_indexed_split) << result_indexed_split->ToString(); + ASSERT_OK_AND_ASSIGN(std::string serialize_bytes, Split::Serialize(result_indexed_split, pool)); + ASSERT_EQ(serialize_bytes, + std::string(reinterpret_cast(split_bytes.data()), split_bytes.size())); +} + +TEST(IndexedSplitTest, TestIndexedSplitWithScore) { + std::string file_name = paimon::test::GetDataDir() + "/global_index/indexed_split-02"; + auto file_system = std::make_unique(); + + ASSERT_OK_AND_ASSIGN(auto input_stream, file_system->Open(file_name)); + std::vector split_bytes(input_stream->Length().value_or(0), 0); + + ASSERT_OK(input_stream->Read(split_bytes.data(), split_bytes.size())); + ASSERT_OK(input_stream->Close()); + + auto pool = GetDefaultPool(); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr result, + Split::Deserialize(reinterpret_cast(split_bytes.data()), split_bytes.size(), pool)); + + auto result_indexed_split = std::dynamic_pointer_cast(result); + + auto meta1 = std::make_shared( + "file1.orc", 100l, 200l, BinaryRow::EmptyRow(), BinaryRow::EmptyRow(), + SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), 50l, 249l, 0, 0, + std::vector>(), Timestamp(1765549435648l, 0), 0, nullptr, + FileSource::Append(), std::nullopt, std::nullopt, 50l, std::nullopt); + auto meta2 = std::make_shared( + "file2.orc", 101l, 100l, BinaryRow::EmptyRow(), BinaryRow::EmptyRow(), + SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), 250l, 349l, 0, 0, + std::vector>(), Timestamp(1765549435649l, 0), 0, nullptr, + FileSource::Append(), std::nullopt, std::nullopt, 250l, std::nullopt); + auto meta3 = std::make_shared( + "file3.orc", 102l, 200l, BinaryRow::EmptyRow(), BinaryRow::EmptyRow(), + SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), 1000l, 1199l, 0, 0, + std::vector>(), Timestamp(1765549435649l, 0), 0, nullptr, + FileSource::Append(), std::nullopt, std::nullopt, 1000l, std::nullopt); + + DataSplitImpl::Builder builder( + /*partition=*/BinaryRow::EmptyRow(), + /*bucket=*/0, /*bucket_path=*/ + "data/test_table/bucket-0", + std::vector>({meta1, meta2, meta3})); + + auto expected_data_split = std::dynamic_pointer_cast( + builder.WithSnapshot(1).IsStreaming(false).RawConvertible(true).Build().value()); + + std::vector ranges = {Range(55, 56), Range(270, 270), Range(1001, 1002)}; + std::vector scores = {1.01f, 2.10f, -1.32f, 4.23f, 50.74f}; + auto expected_indexed_split = + std::make_shared(expected_data_split, ranges, scores); + + ASSERT_EQ(*result_indexed_split, *expected_indexed_split) << result_indexed_split->ToString(); + ASSERT_TRUE( + result_indexed_split->ToString().find( + "rowRanges=[[55, 56],[270, 270],[1001, 1002]], scores=[1.01,2.1,-1.32,4.23,50.74]") != + std::string::npos); + ASSERT_OK_AND_ASSIGN(std::string serialize_bytes, Split::Serialize(result_indexed_split, pool)); + ASSERT_EQ(serialize_bytes, + std::string(reinterpret_cast(split_bytes.data()), split_bytes.size())); +} + +TEST(IndexedSplitTest, TestValidate) { + auto meta = std::make_shared( + "file.orc", 1l, 200l, BinaryRow::EmptyRow(), BinaryRow::EmptyRow(), + SimpleStats::EmptyStats(), SimpleStats::EmptyStats(), 1000l, 1199l, 0, 0, + std::vector>(), Timestamp(0l, 0), 0, nullptr, + FileSource::Append(), std::nullopt, std::nullopt, 1000l, std::nullopt); + + DataSplitImpl::Builder builder( + /*partition=*/BinaryRow::EmptyRow(), + /*bucket=*/0, /*bucket_path=*/ + "data/test_table/bucket-0", std::vector>({meta})); + + auto data_split = std::dynamic_pointer_cast( + builder.WithSnapshot(1).IsStreaming(false).RawConvertible(true).Build().value()); + + { + std::vector row_ranges = {Range(10, 20), Range(30, 40)}; + IndexedSplitImpl split(data_split, row_ranges); + ASSERT_OK(split.Validate()); + } + { + std::vector row_ranges = {Range(10, 12), Range(30, 31)}; + std::vector scores = {10.01f, 10.11f, 10.21f, -30.01f, -30.11f}; + IndexedSplitImpl split(data_split, row_ranges, scores); + ASSERT_OK(split.Validate()); + } + { + std::vector row_ranges = {Range(10, 12), Range(30, 31)}; + std::vector scores = {10.01f, 10.11f, 10.21f, -30.01f}; + IndexedSplitImpl split(data_split, row_ranges, scores); + ASSERT_NOK_WITH_MSG(split.Validate(), + "Scores length does not match row ranges in indexed split."); + } + { + std::vector row_ranges = {}; + IndexedSplitImpl split(data_split, row_ranges); + ASSERT_NOK_WITH_MSG(split.Validate(), + "Invalid IndexedSplit: row ranges mismatch data files."); + } + { + std::vector row_ranges = {Range(10, 12)}; + DataSplitImpl::Builder empty_builder( + /*partition=*/BinaryRow::EmptyRow(), + /*bucket=*/0, /*bucket_path=*/ + "data/test_table/bucket-0", std::vector>({})); + auto empty_data_split = std::dynamic_pointer_cast( + empty_builder.WithSnapshot(1).IsStreaming(false).RawConvertible(true).Build().value()); + + IndexedSplitImpl split(empty_data_split, row_ranges); + ASSERT_NOK_WITH_MSG(split.Validate(), + "Invalid IndexedSplit: row ranges mismatch data files."); + } +} +} // namespace paimon::test diff --git a/src/paimon/core/migrate/file_meta_utils.cpp b/src/paimon/core/migrate/file_meta_utils.cpp new file mode 100644 index 0000000..af634a0 --- /dev/null +++ b/src/paimon/core/migrate/file_meta_utils.cpp @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/migrate/file_meta_utils.h" + +#include +#include +#include + +#include "fmt/format.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/types/data_field.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/binary_row_partition_computer.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/core/core_options.h" +#include "paimon/core/io/compact_increment.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/io/data_increment.h" +#include "paimon/core/manifest/file_source.h" +#include "paimon/core/schema/schema_manager.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/stats/simple_stats.h" +#include "paimon/core/stats/simple_stats_converter.h" +#include "paimon/core/table/sink/commit_message_impl.h" +#include "paimon/core/utils/file_store_path_factory.h" +#include "paimon/format/file_format.h" +#include "paimon/format/format_stats_extractor.h" +#include "paimon/fs/file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/status.h" + +namespace paimon { +namespace { +Result> LoadTableSchema(const std::shared_ptr& fs, + const std::string& table_path) { + SchemaManager schema_manager(fs, table_path); + PAIMON_ASSIGN_OR_RAISE(std::optional> table_schema, + schema_manager.Latest()); + if (table_schema == std::nullopt) { + return Status::Invalid(fmt::format("load schema failed, no schema in {}", table_path)); + } + if (table_schema.value()->Id() != TableSchema::FIRST_SCHEMA_ID) { + return Status::NotImplemented("do not support schema evolution in migrate process"); + } + return table_schema.value(); +} + +Result> ConstructFileMeta( + const std::string& src_file_path, const std::string& format_identifier, + const std::string& bucket_path, int64_t schema_id, + const std::shared_ptr& stats_extractor, + const std::shared_ptr& fs, const std::shared_ptr& memory_pool) { + std::string file_name = PathUtil::GetName(src_file_path); + // rename + std::string new_file_name = StringUtils::EndsWith(file_name, "." + format_identifier) + ? file_name + : (file_name + "." + format_identifier); + std::string dst_file_path = PathUtil::JoinPath(bucket_path, new_file_name); + PAIMON_ASSIGN_OR_RAISE(bool dst_exist, fs->Exists(dst_file_path)); + if (!dst_exist) { + PAIMON_RETURN_NOT_OK(fs->Rename(/*src=*/src_file_path, /*dst=*/dst_file_path)); + } + // extract stats + PAIMON_ASSIGN_OR_RAISE(auto stats, + stats_extractor->ExtractWithFileInfo(fs, dst_file_path, memory_pool)); + PAIMON_ASSIGN_OR_RAISE(SimpleStats simple_stats, + SimpleStatsConverter::ToBinary(stats.first, memory_pool.get())); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr file_status, + fs->GetFileStatus(dst_file_path)); + assert(file_status); + return DataFileMeta::ForAppend( + new_file_name, file_status->GetLen(), stats.second.GetRowCount(), simple_stats, + /*min_sequence_number=*/0, /*max_sequence_number=*/0, schema_id, /*extra_files=*/{}, + /*embedded_index=*/nullptr, FileSource::Append(), /*value_stats_cols=*/std::nullopt, + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); +} + +Status ValidateNonObjectPath(const std::vector& files) { + for (const auto& file : files) { + PAIMON_ASSIGN_OR_RAISE(bool is_object_store, FileSystem::IsObjectStore(file)); + if (is_object_store) { + return Status::NotImplemented( + "FileMetaUtils does not support object store file system for now"); + } + } + return Status::OK(); +} +} // namespace + +Result> FileMetaUtils::GenerateCommitMessage( + const std::vector& src_data_files, const std::string& dst_table_path, + const std::map& partition_values, + const std::map& options, + const std::shared_ptr& file_system) { + auto memory_pool = GetDefaultPool(); + // load table schema + PAIMON_ASSIGN_OR_RAISE(CoreOptions tmp_options, CoreOptions::FromMap(options, file_system)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr table_schema, + LoadTableSchema(tmp_options.GetFileSystem(), dst_table_path)); + if (!table_schema->PrimaryKeys().empty() || table_schema->NumBuckets() != -1) { + return Status::Invalid("migrate only support append table with unaware-bucket"); + } + // merge options + auto table_options = table_schema->Options(); + for (const auto& [key, value] : options) { + table_options[key] = value; + } + PAIMON_ASSIGN_OR_RAISE(CoreOptions core_options, + CoreOptions::FromMap(table_options, file_system)); + + std::shared_ptr fs = core_options.GetFileSystem(); + std::shared_ptr format = core_options.GetFileFormat(); + assert(fs); + assert(format); + PAIMON_RETURN_NOT_OK(ValidateNonObjectPath(src_data_files)); + PAIMON_RETURN_NOT_OK(ValidateNonObjectPath({dst_table_path})); + PAIMON_ASSIGN_OR_RAISE(std::vector external_paths, + core_options.CreateExternalPaths()); + if (!external_paths.empty() || core_options.IndexFileInDataFileDir()) { + return Status::Invalid( + "migrate only support schema without external paths and index not in data file dir"); + } + + // generate partition + auto schema = DataField::ConvertDataFieldsToArrowSchema(table_schema->Fields()); + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr partition_computer, + BinaryRowPartitionComputer::Create(table_schema->PartitionKeys(), schema, + core_options.GetPartitionDefaultName(), + core_options.LegacyPartitionNameEnabled(), memory_pool)); + PAIMON_ASSIGN_OR_RAISE(BinaryRow partition_row, + partition_computer->ToBinaryRow(partition_values)); + + // generate bucket path + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr file_store_path_factory, + FileStorePathFactory::Create(dst_table_path, schema, table_schema->PartitionKeys(), + core_options.GetPartitionDefaultName(), format->Identifier(), + core_options.DataFilePrefix(), + core_options.LegacyPartitionNameEnabled(), + /*external_paths=*/std::vector(), + /*global_index_external_path=*/std::nullopt, + /*index_file_in_data_file_dir=*/false, memory_pool)); + PAIMON_ASSIGN_OR_RAISE(std::string bucket_path, + file_store_path_factory->BucketPath(partition_row, /*bucket=*/0)); + + // prepare stats extractor + ::ArrowSchema arrow_schema; + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema, &arrow_schema)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr stats_extractor, + format->CreateStatsExtractor(&arrow_schema)); + + // prepare data file meta + std::vector> data_file_metas; + data_file_metas.reserve(src_data_files.size()); + for (const auto& file : src_data_files) { + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr meta, + ConstructFileMeta(file, format->Identifier(), bucket_path, table_schema->Id(), + stats_extractor, fs, memory_pool)); + data_file_metas.push_back(meta); + } + + return std::make_unique( + partition_row, /*bucket=*/0, /*total_buckets=*/core_options.GetBucket(), + DataIncrement(std::move(data_file_metas), /*deleted_files=*/{}, /*changelog_files=*/{}), + CompactIncrement(/*compact_before=*/{}, /*compact_after=*/{}, /*changelog_files=*/{})); +} +} // namespace paimon diff --git a/src/paimon/core/migrate/file_meta_utils_test.cpp b/src/paimon/core/migrate/file_meta_utils_test.cpp new file mode 100644 index 0000000..ee30680 --- /dev/null +++ b/src/paimon/core/migrate/file_meta_utils_test.cpp @@ -0,0 +1,542 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/migrate/file_meta_utils.h" + +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/data/data_define.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/core/io/compact_increment.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/io/data_increment.h" +#include "paimon/core/manifest/file_source.h" +#include "paimon/core/stats/simple_stats.h" +#include "paimon/core/table/sink/commit_message_impl.h" +#include "paimon/data/decimal.h" +#include "paimon/data/timestamp.h" +#include "paimon/defs.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/status.h" +#include "paimon/testing/utils/binary_row_generator.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +class FileMetaUtilsTest : public ::testing::Test { + public: + void SetUp() override { + pool_ = GetDefaultPool(); + fs_ = std::make_shared(); + tmp_dir_ = UniqueTestDirectory::Create(); + dst_table_dir_ = UniqueTestDirectory::Create(); + } + void TearDown() override { + tmp_dir_.reset(); + dst_table_dir_.reset(); + fs_.reset(); + pool_.reset(); + } + void CopyFile(const std::string& src_file, const std::string& dst_file) const { + std::string content; + ASSERT_OK(fs_->ReadFile(src_file, &content)); + ASSERT_OK(fs_->WriteFile(dst_file, content, /*overwrite=*/false)); + } + void CreateTable(const std::string& table_name, const std::string& schema_path) const { + std::string table_path = dst_table_dir_->Str() + "/" + table_name; + ASSERT_OK(fs_->Mkdirs(table_path)); + std::string schema_dir = table_path + "/schema"; + ASSERT_OK(fs_->Mkdirs(schema_dir)); + CopyFile(schema_path, schema_dir + "/schema-0"); + } + + std::vector CopyDataFilesToTempDir( + const std::vector& src_data_files) const { + std::vector tmp_data_files; + for (const auto& name : src_data_files) { + std::string tmp_data_file = tmp_dir_->Str() + "/" + PathUtil::GetName(name); + CopyFile(name, tmp_data_file); + tmp_data_files.push_back(tmp_data_file); + } + return tmp_data_files; + } + + private: + std::shared_ptr pool_; + std::shared_ptr fs_; + std::unique_ptr tmp_dir_; + std::unique_ptr dst_table_dir_; +}; + +TEST_F(FileMetaUtilsTest, TestSimple) { + // copy a db from test/data to tmp path + std::string src_schema_path = + paimon::test::GetDataDir() + + "orc/append_10_external_path.db/append_10_external_path/schema/schema-0"; + std::string dst_table_name = "test_table"; + CreateTable(dst_table_name, src_schema_path); + std::vector src_data_files = { + paimon::test::GetDataDir() + + "/orc/append_09.db/append_09/f1=20/bucket-0/" + "data-b913a160-a4d1-4084-af2a-18333c35668e-0.orc", + paimon::test::GetDataDir() + + "/orc/append_09.db/append_09/f1=20/bucket-0/" + "data-db2b44c0-0d73-449d-82a0-4075bd2cb6e3-0.orc"}; + + std::vector tmp_data_files = CopyDataFilesToTempDir(src_data_files); + + // generate commit message and check + std::string dst_table_path = dst_table_dir_->Str() + "/" + dst_table_name; + ASSERT_OK_AND_ASSIGN( + std::unique_ptr msg, + FileMetaUtils::GenerateCommitMessage(tmp_data_files, dst_table_path, {}, + /*options=*/{{Options::FILE_FORMAT, "orc"}})); + auto msg_impl = dynamic_cast(msg.get()); + ASSERT_TRUE(msg_impl); + ASSERT_EQ(2, msg_impl->GetNewFilesIncrement().NewFiles().size()); + + // check commitMsg + std::vector expected_msgs; + auto file_meta1 = std::make_shared( + "data-b913a160-a4d1-4084-af2a-18333c35668e-0.orc", /*file_size=*/506, + /*row_count=*/1, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Paul"), 20, 1, NullType()}, + {std::string("Paul"), 20, 1, NullType()}, {0, 0, 0, 1}, + pool_.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/msg_impl->GetNewFilesIncrement().NewFiles()[0]->creation_time, + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + + auto file_meta2 = std::make_shared( + "data-db2b44c0-0d73-449d-82a0-4075bd2cb6e3-0.orc", /*file_size=*/541, + /*row_count=*/1, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Lucy"), 20, 1, 14.1}, + {std::string("Lucy"), 20, 1, 14.1}, {0, 0, 0, 0}, + pool_.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/msg_impl->GetNewFilesIncrement().NewFiles()[1]->creation_time, + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + CommitMessageImpl expected(BinaryRow::EmptyRow(), /*bucket=*/0, /*total_buckets=*/-1, + DataIncrement({file_meta1, file_meta2}, {}, {}), + CompactIncrement({}, {}, {})); + ASSERT_EQ(expected, *msg_impl) << expected.ToString() << std::endl << msg_impl->ToString(); + + // check data files move to dst table + ASSERT_OK_AND_ASSIGN( + bool exist, + fs_->Exists(dst_table_path + "/bucket-0/data-b913a160-a4d1-4084-af2a-18333c35668e-0.orc")); + ASSERT_TRUE(exist); + ASSERT_OK_AND_ASSIGN( + exist, + fs_->Exists(dst_table_path + "/bucket-0/data-db2b44c0-0d73-449d-82a0-4075bd2cb6e3-0.orc")); + ASSERT_TRUE(exist); +} + +TEST_F(FileMetaUtilsTest, TestFailover) { + // total 2 files, rename the first file to target, and then fail over + // only need to rename the second file next time + std::string src_schema_path = + paimon::test::GetDataDir() + + "/orc/append_10_external_path.db/append_10_external_path/schema/schema-0"; + std::string dst_table_name = "test_table"; + CreateTable(dst_table_name, src_schema_path); + std::vector src_data_files = { + paimon::test::GetDataDir() + + "/orc/append_09.db/append_09/f1=20/bucket-0/" + "data-b913a160-a4d1-4084-af2a-18333c35668e-0.orc", + paimon::test::GetDataDir() + + "/orc/append_09.db/append_09/f1=20/bucket-0/" + "data-db2b44c0-0d73-449d-82a0-4075bd2cb6e3-0.orc"}; + + std::string dst_table_path = dst_table_dir_->Str() + "/" + dst_table_name; + std::vector tmp_data_files = CopyDataFilesToTempDir(src_data_files); + // simulate renaming the first file to target, and then fail ove + CopyFile(src_data_files[0], + dst_table_path + "/bucket-0/data-b913a160-a4d1-4084-af2a-18333c35668e-0.orc"); + // generate commit message and check + ASSERT_OK_AND_ASSIGN( + std::unique_ptr msg, + FileMetaUtils::GenerateCommitMessage(tmp_data_files, dst_table_path, {}, + /*options=*/{{Options::FILE_FORMAT, "orc"}})); + auto msg_impl = dynamic_cast(msg.get()); + ASSERT_TRUE(msg_impl); + ASSERT_EQ(2, msg_impl->GetNewFilesIncrement().NewFiles().size()); + + // check commitMsg + std::vector expected_msgs; + auto file_meta1 = std::make_shared( + "data-b913a160-a4d1-4084-af2a-18333c35668e-0.orc", /*file_size=*/506, + /*row_count=*/1, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Paul"), 20, 1, NullType()}, + {std::string("Paul"), 20, 1, NullType()}, {0, 0, 0, 1}, + pool_.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/msg_impl->GetNewFilesIncrement().NewFiles()[0]->creation_time, + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + + auto file_meta2 = std::make_shared( + "data-db2b44c0-0d73-449d-82a0-4075bd2cb6e3-0.orc", /*file_size=*/541, + /*row_count=*/1, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Lucy"), 20, 1, 14.1}, + {std::string("Lucy"), 20, 1, 14.1}, {0, 0, 0, 0}, + pool_.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/msg_impl->GetNewFilesIncrement().NewFiles()[1]->creation_time, + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + CommitMessageImpl expected(BinaryRow::EmptyRow(), /*bucket=*/0, /*total_buckets=*/-1, + DataIncrement({file_meta1, file_meta2}, {}, {}), + CompactIncrement({}, {}, {})); + ASSERT_EQ(expected, *msg_impl) << expected.ToString() << std::endl << msg_impl->ToString(); + + // check data files move to dst table + ASSERT_OK_AND_ASSIGN( + bool exist, + fs_->Exists(dst_table_path + "/bucket-0/data-b913a160-a4d1-4084-af2a-18333c35668e-0.orc")); + ASSERT_TRUE(exist); + ASSERT_OK_AND_ASSIGN( + exist, + fs_->Exists(dst_table_path + "/bucket-0/data-db2b44c0-0d73-449d-82a0-4075bd2cb6e3-0.orc")); + ASSERT_TRUE(exist); +} + +TEST_F(FileMetaUtilsTest, TestWithPartition) { + // copy a db from test/data to tmp path + std::string src_schema_path = + paimon::test::GetDataDir() + + "/orc/multi_partition_append_table.db/multi_partition_append_table/schema/" + "schema-0"; + std::string dst_table_name = "test_table"; + CreateTable(dst_table_name, src_schema_path); + std::vector src_data_files = { + paimon::test::GetDataDir() + + "/orc/multi_partition_append_table.db/multi_partition_append_table/f1=10/f2=0/" + "bucket-0/" + "data-01b6a930-6564-409b-b8f4-ed1307790d72-0.orc", + paimon::test::GetDataDir() + + "/orc/multi_partition_append_table.db/multi_partition_append_table/f1=10/f2=0/" + "bucket-0/" + "data-1c547e5f-48b2-4917-a996-71d306377661-0.orc"}; + + std::vector tmp_data_files = CopyDataFilesToTempDir(src_data_files); + + // generate commit message and check + std::string dst_table_path = dst_table_dir_->Str() + "/" + dst_table_name; + std::map partition = {{"f1", "10"}, {"f2", "0"}}; + ASSERT_OK_AND_ASSIGN( + std::unique_ptr msg, + FileMetaUtils::GenerateCommitMessage(tmp_data_files, dst_table_path, partition, + /*options=*/{{Options::FILE_FORMAT, "orc"}})); + auto msg_impl = dynamic_cast(msg.get()); + ASSERT_TRUE(msg_impl); + ASSERT_EQ(2, msg_impl->GetNewFilesIncrement().NewFiles().size()); + + // check commitMsg + std::vector expected_msgs; + auto file_meta1 = std::make_shared( + "data-01b6a930-6564-409b-b8f4-ed1307790d72-0.orc", /*file_size=*/575, + /*row_count=*/3, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Bob"), 10, 0, 12.1}, + {std::string("Tony"), 10, 0, 14.1}, {0, 0, 0, 0}, + pool_.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/msg_impl->GetNewFilesIncrement().NewFiles()[0]->creation_time, + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + + auto file_meta2 = std::make_shared( + "data-1c547e5f-48b2-4917-a996-71d306377661-0.orc", /*file_size=*/589, + /*row_count=*/3, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Alex"), 10, 0, 12.1}, + {std::string("Emily"), 10, 0, 16.1}, {0, 0, 0, 0}, + pool_.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/msg_impl->GetNewFilesIncrement().NewFiles()[1]->creation_time, + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + CommitMessageImpl expected(BinaryRowGenerator::GenerateRow({10, 0}, pool_.get()), /*bucket=*/0, + /*total_buckets=*/-1, + DataIncrement({file_meta1, file_meta2}, {}, {}), + CompactIncrement({}, {}, {})); + ASSERT_EQ(expected, *msg_impl) << expected.ToString() << std::endl << msg_impl->ToString(); + + // check data files move to dst table + ASSERT_OK_AND_ASSIGN( + bool exist, + fs_->Exists(dst_table_path + + "/f1=10/f2=0/bucket-0/data-1c547e5f-48b2-4917-a996-71d306377661-0.orc")); + ASSERT_TRUE(exist); + ASSERT_OK_AND_ASSIGN( + exist, fs_->Exists(dst_table_path + + "/f1=10/f2=0/bucket-0/data-01b6a930-6564-409b-b8f4-ed1307790d72-0.orc")); + ASSERT_TRUE(exist); +} + +TEST_F(FileMetaUtilsTest, TestWithNestedType) { + // copy a db from test/data to tmp path + std::string src_schema_path = + paimon::test::GetDataDir() + + "/orc/append_complex_build_in_fieldid.db/append_complex_build_in_fieldid/schema/" + "schema-0"; + std::string dst_table_name = "test_table"; + CreateTable(dst_table_name, src_schema_path); + std::vector src_data_files = { + paimon::test::GetDataDir() + + "/orc/append_complex_build_in_fieldid.db/append_complex_build_in_fieldid/" + "bucket-0/data-6dac9052-36d8-4950-8f74-b2bbc082e489-0.orc"}; + + std::vector tmp_data_files = CopyDataFilesToTempDir(src_data_files); + + // generate commit message and check + std::string dst_table_path = dst_table_dir_->Str() + "/" + dst_table_name; + ASSERT_OK_AND_ASSIGN(std::unique_ptr msg, + FileMetaUtils::GenerateCommitMessage( + tmp_data_files, dst_table_path, /*partition_values=*/{}, + /*options=*/{{Options::FILE_FORMAT, "orc"}})); + auto msg_impl = dynamic_cast(msg.get()); + ASSERT_TRUE(msg_impl); + ASSERT_EQ(1, msg_impl->GetNewFilesIncrement().NewFiles().size()); + + // check commitMsg + std::vector expected_msgs; + auto file_meta1 = std::make_shared( + "data-6dac9052-36d8-4950-8f74-b2bbc082e489-0.orc", /*file_size=*/1222, + /*row_count=*/3, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats( + {NullType(), NullType(), NullType(), TimestampType(Timestamp(0, 0), 9), 24, + Decimal(2, 2, 12)}, + {NullType(), NullType(), NullType(), TimestampType(Timestamp(123123, 123000), 9), 2456, + Decimal(2, 2, 22)}, + {0, 0, 0, 0, 0, 1}, pool_.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/msg_impl->GetNewFilesIncrement().NewFiles()[0]->creation_time, + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + + CommitMessageImpl expected(BinaryRow::EmptyRow(), /*bucket=*/0, /*total_buckets=*/-1, + DataIncrement({file_meta1}, {}, {}), CompactIncrement({}, {}, {})); + ASSERT_EQ(expected, *msg_impl) << expected.ToString() << std::endl << msg_impl->ToString(); + + // check data files move to dst table + ASSERT_OK_AND_ASSIGN( + bool exist, + fs_->Exists(dst_table_path + "/bucket-0/data-6dac9052-36d8-4950-8f74-b2bbc082e489-0.orc")); + ASSERT_TRUE(exist); +} + +TEST_F(FileMetaUtilsTest, TestNonExistTablePath) { + // test non-exist table path + ASSERT_NOK_WITH_MSG( + FileMetaUtils::GenerateCommitMessage(/*src_data_files=*/{}, "invalid_table_path", + /*partition_values=*/{}, + /*options=*/{{Options::FILE_FORMAT, "orc"}}), + "load schema failed, no schema in invalid_table_path"); +} + +TEST_F(FileMetaUtilsTest, TestInvalidBucketMode) { + // test invalid bucket count + std::string src_schema_path = + paimon::test::GetDataDir() + "/orc/append_09.db/append_09/schema/schema-0"; + std::string dst_table_name = "test_table"; + CreateTable(dst_table_name, src_schema_path); + std::string dst_table_path = dst_table_dir_->Str() + "/" + dst_table_name; + ASSERT_NOK_WITH_MSG( + FileMetaUtils::GenerateCommitMessage(/*src_data_files=*/{}, dst_table_path, + /*partition_values=*/{{"f1", "10"}}, + /*options=*/{{Options::FILE_FORMAT, "orc"}}), + "migrate only support append table with unaware-bucket"); +} + +TEST_F(FileMetaUtilsTest, TestInvalidPKTable) { + // test unsupported pk table + std::string src_schema_path = + paimon::test::GetDataDir() + "/orc/pk_09.db/pk_09/schema/schema-0"; + std::string dst_table_name = "test_table"; + CreateTable(dst_table_name, src_schema_path); + std::string dst_table_path = dst_table_dir_->Str() + "/" + dst_table_name; + ASSERT_NOK_WITH_MSG( + FileMetaUtils::GenerateCommitMessage(/*src_data_files=*/{}, dst_table_path, + /*partition_values=*/{{"f1", "10"}}, + /*options=*/{{Options::FILE_FORMAT, "orc"}}), + "migrate only support append table with unaware-bucket"); +} + +TEST_F(FileMetaUtilsTest, TestInvalidExternalPath) { + // test unsupported external path + std::string table_schema_str = R"({ + "version" : 3, + "id" : 0, + "fields" : [ { + "id" : 0, + "name" : "f0", + "type" : "STRING" + }, { + "id" : 1, + "name" : "f1", + "type" : "INT" + }, { + "id" : 2, + "name" : "f2", + "type" : "INT" + }, { + "id" : 3, + "name" : "f3", + "type" : "DOUBLE" + } ], + "highestFieldId" : 3, + "partitionKeys" : [], + "primaryKeys" : [], + "options" : { + "bucket" : "-1", + "manifest.format" : "orc", + "file.format" : "orc", + "data-file.external-paths" : "FILE:///tmp/external", + "data-file.external-paths.strategy" : "round-robin" + }, + "timeMillis" : 1721614341162 + })"; + + auto schema_tmp_dir = UniqueTestDirectory::Create(); + auto schema_tmp_path = schema_tmp_dir->Str() + "/schema-0"; + ASSERT_OK(fs_->AtomicStore(schema_tmp_path, table_schema_str)); + + std::string dst_table_name = "test_table"; + CreateTable(dst_table_name, schema_tmp_path); + std::string dst_table_path = dst_table_dir_->Str() + "/" + dst_table_name; + ASSERT_NOK_WITH_MSG( + FileMetaUtils::GenerateCommitMessage(/*src_data_files=*/{}, dst_table_path, + /*partition_values=*/{}, + /*options=*/{{Options::FILE_FORMAT, "orc"}}), + "migrate only support schema without external paths and index not in data file dir"); +} + +TEST_F(FileMetaUtilsTest, TestWithInvalidPartition) { + std::string src_schema_path = + paimon::test::GetDataDir() + + "/orc/multi_partition_append_table.db/multi_partition_append_table/schema/" + "schema-0"; + std::string dst_table_name = "test_table"; + CreateTable(dst_table_name, src_schema_path); + + std::string dst_table_path = dst_table_dir_->Str() + "/" + dst_table_name; + // invalid partition + std::map partition = {{"f2", "0"}}; + ASSERT_NOK_WITH_MSG( + FileMetaUtils::GenerateCommitMessage(/*src data files*/ {}, dst_table_path, partition, + /*options=*/{{Options::FILE_FORMAT, "orc"}}), + "can not find partition key 'f1' in input partition"); +} + +TEST_F(FileMetaUtilsTest, TestEmptyInput) { + // copy a db from test/data to tmp path + std::string src_schema_path = + paimon::test::GetDataDir() + + "/orc/multi_partition_append_table.db/multi_partition_append_table/schema/" + "schema-0"; + std::string dst_table_name = "test_table"; + CreateTable(dst_table_name, src_schema_path); + + // generate commit message and check + std::string dst_table_path = dst_table_dir_->Str() + "/" + dst_table_name; + std::map partition = {{"f1", "10"}, {"f2", "0"}}; + ASSERT_OK_AND_ASSIGN( + std::unique_ptr msg, + FileMetaUtils::GenerateCommitMessage(/*src_data_files=*/{}, dst_table_path, partition, + /*options=*/{{Options::FILE_FORMAT, "orc"}})); + auto msg_impl = dynamic_cast(msg.get()); + ASSERT_TRUE(msg_impl); + ASSERT_EQ(0, msg_impl->GetNewFilesIncrement().NewFiles().size()); + + CommitMessageImpl expected(BinaryRowGenerator::GenerateRow({10, 0}, pool_.get()), /*bucket=*/0, + /*total_buckets=*/-1, DataIncrement({}, {}, {}), + CompactIncrement({}, {}, {})); + ASSERT_EQ(expected, *msg_impl) << expected.ToString() << std::endl << msg_impl->ToString(); + + // check data files move to dst table + ASSERT_OK_AND_ASSIGN(bool exist, fs_->Exists(dst_table_path + "/f1=10/f2=0/bucket-0/")); + ASSERT_FALSE(exist); +} + +TEST_F(FileMetaUtilsTest, TestInvalidFile) { + // copy a db from test/data to tmp path + std::string src_schema_path = + paimon::test::GetDataDir() + + "/orc/append_10_external_path.db/append_10_external_path/schema/schema-0"; + std::string dst_table_name = "test_table"; + CreateTable(dst_table_name, src_schema_path); + std::vector src_data_files = { + paimon::test::GetDataDir() + + "/orc/append_09.db/append_09/f1=20/bucket-0/" + "data-b913a160-a4d1-4084-af2a-18333c35668e-0.orc", + paimon::test::GetDataDir() + "/orc/append_09.db/append_09/snapshot/snapshot-1"}; + + std::vector tmp_data_files = CopyDataFilesToTempDir(src_data_files); + + // generate commit message and check + std::string dst_table_path = dst_table_dir_->Str() + "/" + dst_table_name; + ASSERT_NOK_WITH_MSG( + FileMetaUtils::GenerateCommitMessage(tmp_data_files, dst_table_path, {}, + /*options=*/{{Options::FILE_FORMAT, "orc"}}), + "extract file info failed"); +} + +} // namespace paimon::test