From d12b431d9f3fb62ad979ce7d1475a4e50377b0d8 Mon Sep 17 00:00:00 2001 From: "yonghao.fyh" Date: Thu, 25 Jun 2026 11:06:45 +0800 Subject: [PATCH] feat(append): add append-only writer, compact task and coordinator --- .../append/append_compact_coordinator.h | 65 ++ .../append/append_compact_coordinator.cpp | 360 +++++++++ src/paimon/append/append_compact_task.cpp | 97 +++ src/paimon/append/append_compact_task.h | 70 ++ src/paimon/append/append_only_writer.cpp | 286 +++++++ src/paimon/append/append_only_writer.h | 137 ++++ src/paimon/append/append_only_writer_test.cpp | 710 ++++++++++++++++++ 7 files changed, 1725 insertions(+) create mode 100644 include/paimon/append/append_compact_coordinator.h create mode 100644 src/paimon/append/append_compact_coordinator.cpp create mode 100644 src/paimon/append/append_compact_task.cpp create mode 100644 src/paimon/append/append_compact_task.h create mode 100644 src/paimon/append/append_only_writer.cpp create mode 100644 src/paimon/append/append_only_writer.h create mode 100644 src/paimon/append/append_only_writer_test.cpp diff --git a/include/paimon/append/append_compact_coordinator.h b/include/paimon/append/append_compact_coordinator.h new file mode 100644 index 0000000..838a413 --- /dev/null +++ b/include/paimon/append/append_compact_coordinator.h @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "paimon/result.h" + +namespace paimon { + +class CommitMessage; +class FileSystem; +class MemoryPool; + +/// Compact coordinator for append-only unaware-bucket tables. +/// +/// This coordinator scans the latest snapshot for small files, groups them by partition, +/// and generates compaction tasks using a bin-packing algorithm. It then synchronously +/// executes all tasks and returns the resulting commit messages. +/// +/// @note This implementation does not support deletion vectors or streaming mode. +/// It only scans the current latest snapshot (batch mode). +class PAIMON_EXPORT AppendCompactCoordinator { + public: + AppendCompactCoordinator() = delete; + ~AppendCompactCoordinator() = delete; + /// Run the compaction coordinator. + /// + /// Scans the latest snapshot for small files across the specified partitions, + /// generates compact tasks via bin-packing, executes them synchronously, + /// and returns the resulting commit messages. + /// + /// @param table_path The root path of the table. + /// @param options User-defined options (will be merged with schema options). + /// @param partitions Partition filters; each element is a partition spec as key-value pairs. + /// Empty vector means all partitions. + /// @param file_system The file system to use. If nullptr, will be created from options. + /// @param pool The memory pool to use. If nullptr, will use default pool. + /// @return Result containing a vector of commit messages from compaction tasks. + static Result>> Run( + const std::string& table_path, const std::map& options, + const std::vector>& partitions, + const std::shared_ptr& file_system, const std::shared_ptr& pool); +}; + +} // namespace paimon diff --git a/src/paimon/append/append_compact_coordinator.cpp b/src/paimon/append/append_compact_coordinator.cpp new file mode 100644 index 0000000..9ad641f --- /dev/null +++ b/src/paimon/append/append_compact_coordinator.cpp @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/append/append_compact_coordinator.h" + +#include +#include +#include +#include +#include +#include + +#include "paimon/common/data/binary_row.h" +#include "paimon/common/types/data_field.h" +#include "paimon/common/utils/linked_hash_map.h" +#include "paimon/core/append/append_compact_task.h" +#include "paimon/core/core_options.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/manifest/file_kind.h" +#include "paimon/core/manifest/manifest_entry.h" +#include "paimon/core/manifest/manifest_file.h" +#include "paimon/core/manifest/manifest_list.h" +#include "paimon/core/operation/append_only_file_store_scan.h" +#include "paimon/core/operation/append_only_file_store_write.h" +#include "paimon/core/operation/file_store_scan.h" +#include "paimon/core/schema/schema_manager.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/snapshot.h" +#include "paimon/core/utils/field_mapping.h" +#include "paimon/core/utils/file_store_path_factory.h" +#include "paimon/core/utils/snapshot_manager.h" +#include "paimon/executor.h" +#include "paimon/memory/memory_pool.h" +namespace paimon { + +namespace { + +/// A bin for packing small files into compaction groups. +class FileBin { + public: + FileBin(int64_t target_file_size, int64_t open_file_cost, int32_t min_file_num) + : target_file_size_(target_file_size), + open_file_cost_(open_file_cost), + min_file_num_(min_file_num) {} + + void AddFile(const std::shared_ptr& file) { + total_file_size_ += file->file_size + open_file_cost_; + bin_.push_back(file); + } + + bool EnoughContent() const { + return bin_.size() > 1 && total_file_size_ >= target_file_size_ * 2; + } + + bool EnoughInputFiles() const { + return static_cast(bin_.size()) >= min_file_num_; + } + + std::vector> Drain() { + std::vector> result = std::move(bin_); + bin_.clear(); + total_file_size_ = 0; + return result; + } + + bool IsEmpty() const { + return bin_.empty(); + } + + private: + int64_t target_file_size_; + int64_t open_file_cost_; + int32_t min_file_num_; + std::vector> bin_; + int64_t total_file_size_ = 0; +}; + +/// Pack small files into compaction groups using a bin-packing algorithm. +/// Files are sorted by size ascending, then greedily packed into bins. +/// A bin is flushed when its total size >= targetFileSize * 2 (and has > 1 file), +/// or when it has >= minFileNum files. +std::vector>> PackFiles( + const std::vector>& files, int64_t target_file_size, + int64_t open_file_cost, int32_t min_file_num) { + // Sort by file size ascending for better packing + std::vector> sorted_files(files.begin(), files.end()); + std::sort( + sorted_files.begin(), sorted_files.end(), + [](const std::shared_ptr& left, const std::shared_ptr& right) { + return left->file_size < right->file_size; + }); + + std::vector>> result; + FileBin file_bin(target_file_size, open_file_cost, min_file_num); + + for (const auto& file_meta : sorted_files) { + file_bin.AddFile(file_meta); + if (file_bin.EnoughContent()) { + result.push_back(file_bin.Drain()); + } + } + + if (file_bin.EnoughInputFiles()) { + result.push_back(file_bin.Drain()); + } + // else: skip small files that are too few to compact + + return result; +} + +/// Create a FileStoreScan for scanning manifest entries. +/// Mirrors the logic in `AppendOnlyFileStoreWrite::CreateFileStoreScan`. +Result> CreateFileStoreScan( + const std::shared_ptr& snapshot_manager, + const std::shared_ptr& schema_manager, + const std::shared_ptr& table_schema, + const std::shared_ptr& arrow_schema, + const std::shared_ptr& partition_schema, const CoreOptions& core_options, + const std::shared_ptr& path_factory, + const std::shared_ptr& scan_filter, const std::shared_ptr& executor, + const std::shared_ptr& pool) { + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr manifest_list, + ManifestList::Create(core_options.GetFileSystem(), core_options.GetManifestFormat(), + core_options.GetManifestCompression(), path_factory, pool)); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr manifest_file, + ManifestFile::Create(core_options.GetFileSystem(), core_options.GetManifestFormat(), + core_options.GetManifestCompression(), path_factory, + core_options.GetManifestTargetFileSize(), pool, core_options, + partition_schema)); + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr scan, + AppendOnlyFileStoreScan::Create(snapshot_manager, schema_manager, manifest_list, + manifest_file, table_schema, arrow_schema, scan_filter, + core_options, executor, pool)); + return std::unique_ptr(std::move(scan)); +} + +/// Create an AppendOnlyFileStoreWrite for executing compaction rewrites. +std::unique_ptr CreateFileStoreWrite( + const std::shared_ptr& path_factory, + const std::shared_ptr& snapshot_manager, + const std::shared_ptr& schema_manager, const std::string& table_path, + const std::shared_ptr& table_schema, + const std::shared_ptr& arrow_schema, + const std::shared_ptr& partition_schema, const CoreOptions& core_options, + const std::shared_ptr& executor, const std::shared_ptr& pool) { + return std::make_unique( + path_factory, snapshot_manager, schema_manager, + /*commit_user=*/"compact-coordinator", + /*root_path=*/table_path, table_schema, arrow_schema, + /*write_schema=*/arrow_schema, partition_schema, + /*dv_maintainer_factory=*/nullptr, + /*io_manager=*/nullptr, core_options, + /*ignore_previous_files=*/true, + /*is_streaming_mode=*/false, + /*ignore_num_bucket_check=*/false, executor, pool); +} + +/// Load schema from table path and merge user options with schema options. +Result, CoreOptions>> LoadSchemaAndOptions( + const std::string& table_path, const std::map& options, + const std::shared_ptr& file_system) { + PAIMON_ASSIGN_OR_RAISE(CoreOptions tmp_options, CoreOptions::FromMap(options, file_system)); + SchemaManager schema_manager(tmp_options.GetFileSystem(), table_path); + PAIMON_ASSIGN_OR_RAISE(std::optional> latest_table_schema, + schema_manager.Latest()); + if (latest_table_schema == std::nullopt) { + return Status::Invalid("not found latest schema"); + } + const auto& table_schema = latest_table_schema.value(); + + auto final_options = table_schema->Options(); + for (const auto& [key, value] : options) { + final_options[key] = value; + } + PAIMON_ASSIGN_OR_RAISE(CoreOptions core_options, + CoreOptions::FromMap(final_options, file_system)); + return std::make_pair(table_schema, std::move(core_options)); +} + +/// Validate that the table is an append-only unaware-bucket table without DV. +Status ValidateTable(const std::shared_ptr& table_schema, + const CoreOptions& core_options) { + if (!table_schema->PrimaryKeys().empty() || core_options.GetBucket() != -1) { + return Status::Invalid( + "AppendCompactCoordinator only supports append-only tables " + "with UNAWARE_BUCKET mode"); + } + if (core_options.DeletionVectorsEnabled()) { + return Status::NotImplemented( + "AppendCompactCoordinator not support for dv in UNAWARE_BUCKET mode"); + } + return Status::OK(); +} + +/// Build FileStorePathFactory from core options and table schema. +Result> BuildPathFactory( + const std::string& table_path, const std::shared_ptr& table_schema, + const std::shared_ptr& arrow_schema, const CoreOptions& core_options, + const std::shared_ptr& pool) { + PAIMON_ASSIGN_OR_RAISE(std::vector external_paths, + core_options.CreateExternalPaths()); + PAIMON_ASSIGN_OR_RAISE(std::optional global_index_external_path, + core_options.CreateGlobalIndexExternalPath()); + return FileStorePathFactory::Create( + table_path, arrow_schema, table_schema->PartitionKeys(), + core_options.GetPartitionDefaultName(), core_options.GetFileFormat()->Identifier(), + core_options.DataFilePrefix(), core_options.LegacyPartitionNameEnabled(), external_paths, + global_index_external_path, core_options.IndexFileInDataFileDir(), pool); +} + +/// Scan the latest snapshot and collect small files grouped by partition. +Result>>> ScanSmallFiles( + const std::shared_ptr& snapshot_manager, + const std::shared_ptr& schema_manager, + const std::shared_ptr& table_schema, + const std::shared_ptr& arrow_schema, + const std::shared_ptr& partition_schema, const CoreOptions& core_options, + const std::shared_ptr& path_factory, + const std::vector>& partitions, + const std::shared_ptr& executor, const std::shared_ptr& pool) { + auto scan_filter = std::make_shared( + /*predicate=*/nullptr, partitions, /*bucket_filter=*/std::nullopt); + + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr scan, + CreateFileStoreScan(snapshot_manager, schema_manager, table_schema, + arrow_schema, partition_schema, core_options, + path_factory, scan_filter, executor, pool)); + + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr plan, scan->CreatePlan()); + std::vector add_entries = plan->Files(FileKind::Add()); + + int64_t compaction_file_size = core_options.GetCompactionFileSize(/*has_primary_key=*/false); + LinkedHashMap>> partition_files; + + for (const auto& entry : add_entries) { + const auto& file = entry.File(); + if (file->file_size < compaction_file_size) { + partition_files[entry.Partition()].push_back(file); + } + } + return partition_files; +} + +/// Generate compact tasks from partitioned small files via bin-packing. +std::vector GenerateCompactTasks( + const LinkedHashMap>>& partition_files, + const CoreOptions& core_options) { + int64_t target_file_size = core_options.GetTargetFileSize(/*has_primary_key=*/false); + int64_t open_file_cost = core_options.GetSourceSplitOpenFileCost(); + int32_t min_file_num = core_options.GetCompactionMinFileNum(); + + std::vector tasks; + for (const auto& [partition, files] : partition_files) { + auto packed_groups = PackFiles(files, target_file_size, open_file_cost, min_file_num); + for (const auto& group : packed_groups) { + tasks.emplace_back(partition, group); + } + } + return tasks; +} + +/// Execute compact tasks synchronously and collect commit messages. +Result>> ExecuteCompactTasks( + std::vector&& tasks, + const std::shared_ptr& path_factory, + const std::shared_ptr& snapshot_manager, + const std::shared_ptr& schema_manager, const std::string& table_path, + const std::shared_ptr& table_schema, + const std::shared_ptr& arrow_schema, + const std::shared_ptr& partition_schema, const CoreOptions& core_options, + const std::shared_ptr& executor, const std::shared_ptr& pool) { + auto write = CreateFileStoreWrite(path_factory, snapshot_manager, schema_manager, table_path, + table_schema, arrow_schema, partition_schema, core_options, + executor, pool); + + std::vector> commit_messages; + commit_messages.reserve(tasks.size()); + + for (auto& task : tasks) { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr message, + task.DoCompact(core_options, write.get())); + commit_messages.push_back(std::move(message)); + } + return commit_messages; +} + +} // namespace + +Result>> AppendCompactCoordinator::Run( + const std::string& table_path, const std::map& options, + const std::vector>& partitions, + const std::shared_ptr& file_system, const std::shared_ptr& input_pool) { + auto pool = input_pool ? input_pool : GetDefaultPool(); + std::shared_ptr executor = CreateDefaultExecutor(); + + // Load schema and merge options + std::pair, CoreOptions> schema_and_options; + PAIMON_ASSIGN_OR_RAISE(schema_and_options, + LoadSchemaAndOptions(table_path, options, file_system)); + const auto& [table_schema, core_options] = schema_and_options; + + // Validate table type + PAIMON_RETURN_NOT_OK(ValidateTable(table_schema, core_options)); + + // Build shared objects + auto arrow_schema = DataField::ConvertDataFieldsToArrowSchema(table_schema->Fields()); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr partition_schema, + FieldMapping::GetPartitionSchema(arrow_schema, table_schema->PartitionKeys())); + + auto snapshot_manager = + std::make_shared(core_options.GetFileSystem(), table_path); + auto schema_manager = std::make_shared(core_options.GetFileSystem(), table_path); + + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr path_factory, + BuildPathFactory(table_path, table_schema, arrow_schema, core_options, pool)); + + // Scan small files from latest snapshot + LinkedHashMap>> partition_files; + PAIMON_ASSIGN_OR_RAISE( + partition_files, + ScanSmallFiles(snapshot_manager, schema_manager, table_schema, arrow_schema, + partition_schema, core_options, path_factory, partitions, executor, pool)); + + if (partition_files.empty()) { + return std::vector>{}; + } + + // Generate compact tasks via bin-packing + std::vector tasks = GenerateCompactTasks(partition_files, core_options); + if (tasks.empty()) { + return std::vector>{}; + } + + // Execute compact tasks synchronously + return ExecuteCompactTasks(std::move(tasks), path_factory, snapshot_manager, schema_manager, + table_path, table_schema, arrow_schema, partition_schema, + core_options, executor, pool); +} + +} // namespace paimon diff --git a/src/paimon/append/append_compact_task.cpp b/src/paimon/append/append_compact_task.cpp new file mode 100644 index 0000000..d7e0e83 --- /dev/null +++ b/src/paimon/append/append_compact_task.cpp @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/append/append_compact_task.h" + +#include +#include +#include + +#include "fmt/format.h" +#include "fmt/ranges.h" +#include "paimon/common/utils/object_utils.h" +#include "paimon/core/compact/cancellation_controller.h" +#include "paimon/core/io/compact_increment.h" +#include "paimon/core/io/data_increment.h" +#include "paimon/core/operation/append_only_file_store_write.h" +#include "paimon/core/table/bucket_mode.h" +#include "paimon/core/table/sink/commit_message_impl.h" + +namespace paimon { + +AppendCompactTask::AppendCompactTask(const BinaryRow& partition, + const std::vector>& files) + : partition_(partition), compact_before_(files) {} + +Result> AppendCompactTask::DoCompact( + const CoreOptions& options, AppendOnlyFileStoreWrite* write) { + if (!options.DeletionVectorsEnabled() && compact_before_.size() <= 1) { + return Status::Invalid("AppendCompactTask needs more than one file input."); + } + if (options.DeletionVectorsEnabled()) { + // TODO(xinyu.lxy): support dv + return Status::NotImplemented("not support for dv in UNAWARE_BUCKET mode"); + } + // Non-DV mode: rewrite all compact_before files into new files. + auto cancellation_controller = std::make_shared(); + PAIMON_ASSIGN_OR_RAISE( + std::vector> rewritten, + write->CompactRewrite(partition_, BucketModeDefine::UNAWARE_BUCKET, + /*dv_factory=*/nullptr, compact_before_, cancellation_controller)); + compact_after_ = std::move(rewritten); + + // Build CompactIncrement with before/after file lists and empty changelog/index files. + auto compact_before_copy = compact_before_; + auto compact_after_copy = compact_after_; + CompactIncrement compact_increment(std::move(compact_before_copy), + std::move(compact_after_copy), + /*changelog_files=*/{}, + /*new_index_files=*/{}, + /*deleted_index_files=*/{}); + + // Build an empty DataIncrement (no new data files from compaction). + DataIncrement data_increment(/*new_files=*/{}, /*deleted_files=*/{}, /*changelog_files=*/{}); + + // Bucket 0 is the bucket for unaware-bucket table, for compatibility with the old design. + auto commit_message = std::make_shared(partition_, + /*bucket=*/0, + /*total_buckets=*/options.GetBucket(), + data_increment, compact_increment); + + return std::static_pointer_cast(commit_message); +} + +std::string AppendCompactTask::ToString() const { + std::vector before_names; + before_names.reserve(compact_before_.size()); + for (const auto& file : compact_before_) { + before_names.emplace_back(file->file_name); + } + + std::vector after_names; + after_names.reserve(compact_after_.size()); + for (const auto& file : compact_after_) { + after_names.emplace_back(file->file_name); + } + + return fmt::format( + "CompactionTask {{partition = {}, compactBefore = [{}], compactAfter = [{}]}}", + partition_.ToString(), fmt::join(before_names, ", "), fmt::join(after_names, ", ")); +} + +} // namespace paimon diff --git a/src/paimon/append/append_compact_task.h b/src/paimon/append/append_compact_task.h new file mode 100644 index 0000000..973a98b --- /dev/null +++ b/src/paimon/append/append_compact_task.h @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/common/data/binary_row.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/result.h" + +namespace paimon { + +class AppendOnlyFileStoreWrite; +class CommitMessage; +class CoreOptions; + +/// Compaction task for append-only unaware-bucket tables. +/// +/// This task holds the partition and the list of files to compact (compact_before), +/// performs the actual compaction via `AppendOnlyFileStoreWrite::CompactRewrite`, +/// and produces a `CommitMessage` containing the `CompactIncrement`. +class AppendCompactTask { + public: + AppendCompactTask(const BinaryRow& partition, + const std::vector>& files); + + ~AppendCompactTask() = default; + + const BinaryRow& Partition() const { + return partition_; + } + + const std::vector>& CompactBefore() const { + return compact_before_; + } + + const std::vector>& CompactAfter() const { + return compact_after_; + } + + Result> DoCompact(const CoreOptions& options, + AppendOnlyFileStoreWrite* write); + + std::string ToString() const; + + private: + BinaryRow partition_; + std::vector> compact_before_; + std::vector> compact_after_; +}; + +} // namespace paimon diff --git a/src/paimon/append/append_only_writer.cpp b/src/paimon/append/append_only_writer.cpp new file mode 100644 index 0000000..3edc12a --- /dev/null +++ b/src/paimon/append/append_only_writer.cpp @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/append/append_only_writer.h" + +#include +#include +#include + +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/c/helpers.h" +#include "arrow/type.h" +#include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/types/row_kind.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/long_counter.h" +#include "paimon/common/utils/scope_guard.h" +#include "paimon/core/io/compact_increment.h" +#include "paimon/core/io/data_file_path_factory.h" +#include "paimon/core/io/data_file_writer.h" +#include "paimon/core/io/data_increment.h" +#include "paimon/core/io/multiple_blob_file_writer.h" +#include "paimon/core/io/rolling_blob_file_writer.h" +#include "paimon/core/io/rolling_file_writer.h" +#include "paimon/core/io/single_file_writer.h" +#include "paimon/core/manifest/file_source.h" +#include "paimon/core/utils/commit_increment.h" +#include "paimon/format/file_format.h" +#include "paimon/format/file_format_factory.h" +#include "paimon/format/writer_builder.h" +#include "paimon/macros.h" +#include "paimon/metrics.h" +#include "paimon/record_batch.h" + +namespace paimon { + +class MemoryPool; +class FormatStatsExtractor; + +AppendOnlyWriter::AppendOnlyWriter(const CoreOptions& options, int64_t schema_id, + const std::shared_ptr& write_schema, + const std::optional>& write_cols, + int64_t max_sequence_number, + const std::shared_ptr& path_factory, + const std::shared_ptr& compact_manager, + const std::shared_ptr& memory_pool) + : options_(options), + schema_id_(schema_id), + write_schema_(write_schema), + write_cols_(write_cols), + seq_num_counter_(std::make_shared(max_sequence_number + 1)), + path_factory_(path_factory), + compact_manager_(compact_manager), + memory_pool_(memory_pool), + metrics_(std::make_shared()) {} + +AppendOnlyWriter::~AppendOnlyWriter() = default; + +Status AppendOnlyWriter::Write(std::unique_ptr&& batch) { + for (const auto& row_kind : batch->GetRowKind()) { + if (PAIMON_UNLIKELY(row_kind != RecordBatch::RowKind::INSERT)) { + PAIMON_ASSIGN_OR_RAISE(const RowKind* kind, + RowKind::FromByteValue(static_cast(row_kind))); + return Status::Invalid("Append only writer can not accept record batch with RowKind ", + kind->Name()); + } + } + if (writer_ == nullptr) { + PAIMON_ASSIGN_OR_RAISE(writer_, CreateRollingRowWriter()); + } + return writer_->Write(batch->GetData()); +} + +Result AppendOnlyWriter::PrepareCommit(bool wait_compaction) { + PAIMON_RETURN_NOT_OK( + Flush(/*wait_for_latest_compaction=*/false, /*forced_full_compaction=*/false)); + PAIMON_RETURN_NOT_OK(TrySyncLatestCompaction(wait_compaction || options_.CommitForceCompact())); + return DrainIncrement(); +} + +Result AppendOnlyWriter::DrainIncrement() { + DataIncrement data_increment(std::move(new_files_), std::move(deleted_files_), {}); + CompactIncrement compact_increment(std::move(compact_before_), std::move(compact_after_), {}); + auto drain_deletion_file = compact_deletion_file_; + + new_files_.clear(); + deleted_files_.clear(); + compact_before_.clear(); + compact_after_.clear(); + compact_deletion_file_ = nullptr; + + return CommitIncrement(data_increment, compact_increment, drain_deletion_file); +} + +Status AppendOnlyWriter::TrySyncLatestCompaction(bool blocking) { + PAIMON_ASSIGN_OR_RAISE(std::optional> result, + compact_manager_->GetCompactionResult(blocking)); + if (result.has_value()) { + const auto& compaction_result = result.value(); + const auto& before = compaction_result->Before(); + compact_before_.insert(compact_before_.end(), before.begin(), before.end()); + const auto& after = compaction_result->After(); + compact_after_.insert(compact_after_.end(), after.begin(), after.end()); + PAIMON_RETURN_NOT_OK(UpdateCompactDeletionFile(compaction_result->DeletionFile())); + } + return Status::OK(); +} + +Status AppendOnlyWriter::UpdateCompactDeletionFile( + const std::shared_ptr& new_deletion_file) { + if (new_deletion_file) { + if (compact_deletion_file_ == nullptr) { + compact_deletion_file_ = new_deletion_file; + } else { + PAIMON_ASSIGN_OR_RAISE(compact_deletion_file_, + new_deletion_file->MergeOldFile(compact_deletion_file_)); + } + } + return Status::OK(); +} + +Status AppendOnlyWriter::Flush(bool wait_for_latest_compaction, bool forced_full_compaction) { + std::vector> flushed_files; + if (writer_) { + PAIMON_RETURN_NOT_OK(writer_->Close()); + PAIMON_ASSIGN_OR_RAISE(flushed_files, writer_->GetResult()); + } + // add new generated files + for (const auto& flushed_file : flushed_files) { + PAIMON_RETURN_NOT_OK(compact_manager_->AddNewFile(flushed_file)); + } + PAIMON_RETURN_NOT_OK(TrySyncLatestCompaction(wait_for_latest_compaction)); + PAIMON_RETURN_NOT_OK(compact_manager_->TriggerCompaction(forced_full_compaction)); + new_files_.insert(new_files_.end(), flushed_files.begin(), flushed_files.end()); + if (writer_) { + metrics_->Merge(writer_->GetMetrics()); + writer_.reset(); + } + return Status::OK(); +} + +AppendOnlyWriter::RollingFileWriterResult AppendOnlyWriter::CreateRollingRowWriter() const { + auto schemas = BlobUtils::SeparateBlobSchema(write_schema_); + if (schemas.blob_schema && schemas.blob_schema->num_fields() > 0) { + return CreateRollingBlobWriter(schemas); + } else { + return std::make_unique>>( + options_.GetTargetFileSize(/*has_primary_key=*/false), + GetDataFileWriterCreator(write_schema_, write_cols_)); + } +} + +AppendOnlyWriter::SingleFileWriterCreator AppendOnlyWriter::GetDataFileWriterCreator( + const std::shared_ptr& schema, + const std::optional>& write_cols) const { + return + [this, schema, write_cols]() + -> Result< + std::unique_ptr>>> { + ::ArrowSchema arrow_schema; + ScopeGuard guard([&arrow_schema]() { ArrowSchemaRelease(&arrow_schema); }); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema, &arrow_schema)); + auto format = options_.GetFileFormat(); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr writer_builder, + format->CreateWriterBuilder(&arrow_schema, options_.GetWriteBatchSize())); + writer_builder->WithMemoryPool(memory_pool_); + + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema, &arrow_schema)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr stats_extractor, + format->CreateStatsExtractor(&arrow_schema)); + auto writer = std::make_unique( + options_.GetFileCompression(), std::function(), + schema_id_, seq_num_counter_, FileSource::Append(), stats_extractor, + path_factory_->IsExternalPath(), write_cols, memory_pool_); + PAIMON_RETURN_NOT_OK( + writer->Init(options_.GetFileSystem(), path_factory_->NewPath(), writer_builder)); + return writer; + }; +} + +AppendOnlyWriter::SingleFileWriterCreator AppendOnlyWriter::GetBlobFileWriterCreator( + const std::shared_ptr& writer_builder, + const std::shared_ptr& stats_extractor, + const std::optional>& write_cols) const { + return + [this, writer_builder, stats_extractor, write_cols]() + -> Result< + std::unique_ptr>>> { + auto writer = std::make_unique( + /*compression=*/"none", std::function(), + schema_id_, seq_num_counter_, FileSource::Append(), stats_extractor, + path_factory_->IsExternalPath(), write_cols, memory_pool_); + PAIMON_RETURN_NOT_OK(writer->Init(options_.GetFileSystem(), + path_factory_->NewBlobPath(), writer_builder)); + return writer; + }; +} + +AppendOnlyWriter::RollingFileWriterResult AppendOnlyWriter::CreateRollingBlobWriter( + const BlobUtils::SeparatedSchemas& schemas) const { + // Multiple blob fields are supported. Each blob field gets its own rolling file writer + // via MultipleBlobFileWriter. + auto blob_schema = schemas.blob_schema; + auto blob_writer_creator = [this, blob_schema](const std::string& blob_field_name) + -> Result< + std::unique_ptr>>> { + // Create a single-field schema for this blob field + auto field = blob_schema->GetFieldByName(blob_field_name); + if (!field) { + return Status::Invalid( + fmt::format("Blob field '{}' not found in blob schema", blob_field_name)); + } + auto single_field_schema = arrow::schema({field}); + ::ArrowSchema arrow_schema; + ScopeGuard guard([&arrow_schema]() { ArrowSchemaRelease(&arrow_schema); }); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*single_field_schema, &arrow_schema)); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr format, + FileFormatFactory::Get("blob", options_.ToMap())); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr writer_builder, + format->CreateWriterBuilder(&arrow_schema, options_.GetWriteBatchSize())); + writer_builder->WithMemoryPool(memory_pool_); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*single_field_schema, &arrow_schema)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr stats_extractor, + format->CreateStatsExtractor(&arrow_schema)); + + std::vector write_cols = {blob_field_name}; + auto single_blob_file_writer_creator = + GetBlobFileWriterCreator(writer_builder, stats_extractor, write_cols); + return std::make_unique>>( + options_.GetBlobTargetFileSize(), single_blob_file_writer_creator); + }; + + return std::make_unique( + options_.GetTargetFileSize(/*has_primary_key=*/false), + GetDataFileWriterCreator(schemas.main_schema, schemas.main_schema->field_names()), + blob_schema, blob_writer_creator, arrow::struct_(write_schema_->fields())); +} + +Status AppendOnlyWriter::Sync() { + return TrySyncLatestCompaction(/*blocking=*/true); +} + +Status AppendOnlyWriter::Close() { + // Request cancellation and wait for running compaction to exit. + // This avoids reusing cancellation state while an old task is still running. + compact_manager_->CancelAndWaitCompaction(); + PAIMON_RETURN_NOT_OK(Sync()); + + PAIMON_RETURN_NOT_OK(compact_manager_->Close()); + auto fs = options_.GetFileSystem(); + for (const auto& file : compact_after_) { + // AppendOnlyCompactManager will rewrite the file and no file upgrade will occur, so we + // can directly delete the file in compact_after_. + [[maybe_unused]] auto s = fs->Delete(path_factory_->ToPath(file)); + } + + if (writer_) { + writer_->Abort(); + writer_.reset(); + } + + if (compact_deletion_file_ != nullptr) { + compact_deletion_file_->Clean(); + } + return Status::OK(); +} + +} // namespace paimon diff --git a/src/paimon/append/append_only_writer.h b/src/paimon/append/append_only_writer.h new file mode 100644 index 0000000..25febf7 --- /dev/null +++ b/src/paimon/append/append_only_writer.h @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "paimon/common/data/blob_utils.h" +#include "paimon/core/compact/compact_manager.h" +#include "paimon/core/core_options.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/io/single_file_writer.h" +#include "paimon/core/utils/batch_writer.h" +#include "paimon/result.h" +#include "paimon/status.h" + +struct ArrowSchema; +struct ArrowArray; + +namespace arrow { +class Schema; +} // namespace arrow + +namespace paimon { + +class CommitIncrement; +class RecordBatch; +template +class RollingFileWriter; +class LongCounter; +class DataFilePathFactory; +class MemoryPool; +class Metrics; +class FormatStatsExtractor; +class WriterBuilder; + +class AppendOnlyWriter : public BatchWriter { + public: + AppendOnlyWriter(const CoreOptions& options, int64_t schema_id, + const std::shared_ptr& write_schema, + const std::optional>& write_cols, + int64_t max_sequence_number, + const std::shared_ptr& path_factory, + const std::shared_ptr& compact_manager, + const std::shared_ptr& memory_pool); + + ~AppendOnlyWriter() override; + + Status Write(std::unique_ptr&& batch) override; + Status Compact(bool full_compaction) override { + return Flush(/*wait_for_latest_compaction=*/true, full_compaction); + } + + uint64_t GetMemoryUsage() const override { + // used for spill, AppendOnlyWriter do not support spill, so return 0 to avoid triggering + // spill + return 0; + } + Status FlushMemory() override { + return Flush(/*wait_for_latest_compaction=*/false, /*forced_full_compaction=*/false); + } + Result PrepareCommit(bool wait_compaction) override; + Result CompactNotCompleted() override { + PAIMON_RETURN_NOT_OK(compact_manager_->TriggerCompaction(/*full_compaction=*/false)); + return compact_manager_->CompactNotCompleted(); + } + Status Sync() override; + Status Close() override; + std::shared_ptr GetMetrics() const override { + return metrics_; + } + + private: + using SingleFileWriterCreator = std::function< + Result>>>()>; + using RollingFileWriterResult = + Result>>>; + + RollingFileWriterResult CreateRollingRowWriter() const; + RollingFileWriterResult CreateRollingBlobWriter( + const BlobUtils::SeparatedSchemas& schemas) const; + + Result DrainIncrement(); + Status Flush(bool wait_for_latest_compaction, bool forced_full_compaction); + + SingleFileWriterCreator GetDataFileWriterCreator( + const std::shared_ptr& schema, + const std::optional>& write_cols) const; + + SingleFileWriterCreator GetBlobFileWriterCreator( + const std::shared_ptr& writer_builder, + const std::shared_ptr& stats_extractor, + const std::optional>& write_cols) const; + + Status TrySyncLatestCompaction(bool blocking); + Status UpdateCompactDeletionFile(const std::shared_ptr& new_deletion_file); + + CoreOptions options_; + int64_t schema_id_; + std::shared_ptr write_schema_; + std::optional> write_cols_; + std::shared_ptr seq_num_counter_; + std::shared_ptr path_factory_; + std::shared_ptr compact_manager_; + std::shared_ptr memory_pool_; + std::shared_ptr metrics_; + + std::vector> new_files_; + std::vector> deleted_files_; + std::vector> compact_before_; + std::vector> compact_after_; + + std::shared_ptr compact_deletion_file_; + std::unique_ptr>> writer_; +}; + +} // namespace paimon diff --git a/src/paimon/append/append_only_writer_test.cpp b/src/paimon/append/append_only_writer_test.cpp new file mode 100644 index 0000000..db1adb8 --- /dev/null +++ b/src/paimon/append/append_only_writer_test.cpp @@ -0,0 +1,710 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/append/append_only_writer.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/array/builder_binary.h" +#include "arrow/array/builder_nested.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/c/helpers.h" +#include "arrow/type.h" +#include "gtest/gtest.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/common/fs/external_path_provider.h" +#include "paimon/core/compact/compact_deletion_file.h" +#include "paimon/core/compact/compact_result.h" +#include "paimon/core/compact/noop_compact_manager.h" +#include "paimon/core/core_options.h" +#include "paimon/core/io/compact_increment.h" +#include "paimon/core/io/data_file_path_factory.h" +#include "paimon/core/io/data_increment.h" +#include "paimon/core/manifest/file_source.h" +#include "paimon/core/stats/simple_stats.h" +#include "paimon/core/utils/commit_increment.h" +#include "paimon/defs.h" +#include "paimon/fs/file_system.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/record_batch.h" +#include "paimon/testing/utils/testharness.h" + +namespace arrow { +class Array; +} // namespace arrow + +namespace paimon::test { + +namespace { + +class FakeCompactDeletionFile : public CompactDeletionFile, + public std::enable_shared_from_this { + public: + explicit FakeCompactDeletionFile(std::string id) : id_(std::move(id)) {} + + Result>> GetOrCompute() override { + return std::optional>(); + } + + Result> MergeOldFile( + const std::shared_ptr& old) override { + merged_old_ = old; + return shared_from_this(); + } + + void Clean() override { + cleaned_ = true; + } + + const std::string& Id() const { + return id_; + } + + bool Cleaned() const { + return cleaned_; + } + + std::shared_ptr MergedOld() const { + return merged_old_; + } + + private: + std::string id_; + bool cleaned_ = false; + std::shared_ptr merged_old_; +}; + +class FakeCompactManager : public CompactManager { + public: + Status AddNewFile(const std::shared_ptr& file) override { + added_files.push_back(file); + return Status::OK(); + } + + std::vector> AllFiles() const override { + return all_files; + } + + Status TriggerCompaction(bool full_compaction) override { + trigger_calls.push_back(full_compaction); + return Status::OK(); + } + + Result>> GetCompactionResult( + bool blocking) override { + get_result_blocking_calls.push_back(blocking); + if (queued_results.empty()) { + return std::optional>(); + } + auto result = queued_results.front(); + queued_results.pop_front(); + return result; + } + + void RequestCancelCompaction() override { + request_cancel_called = true; + } + + void WaitForCompactionToExit() override { + wait_called = true; + } + + bool CompactNotCompleted() const override { + return compact_not_completed; + } + + bool ShouldWaitForLatestCompaction() const override { + return should_wait_latest; + } + + bool ShouldWaitForPreparingCheckpoint() const override { + return should_wait_prepare; + } + + Status Close() override { + close_called = true; + return Status::OK(); + } + + std::vector> added_files; + std::vector> all_files; + std::vector trigger_calls; + std::vector get_result_blocking_calls; + std::deque>>> queued_results; + bool compact_not_completed = false; + bool should_wait_latest = false; + bool should_wait_prepare = false; + bool request_cancel_called = false; + bool wait_called = false; + bool close_called = false; +}; + +} // namespace + +class AppendOnlyWriterTest : public testing::Test { + public: + void SetUp() override { + memory_pool_ = GetDefaultPool(); + compact_manager_ = std::make_shared(); + } + + CoreOptions CreateOptions(const std::map& overrides = {}) const { + std::map raw_options = { + {Options::FILE_SYSTEM, "local"}, + {Options::FILE_FORMAT, "mock_format"}, + {Options::MANIFEST_FORMAT, "mock_format"}, + }; + for (const auto& [key, value] : overrides) { + raw_options[key] = value; + } + return CoreOptions::FromMap(raw_options).value(); + } + + std::shared_ptr CreatePathFactory(const std::string& dir, + const std::string& format, + const CoreOptions& options) const { + auto path_factory = std::make_shared(); + EXPECT_TRUE(path_factory->Init(dir, format, options.DataFilePrefix(), nullptr).ok()); + return path_factory; + } + + std::shared_ptr NewAppendFile(const std::string& file_name, int64_t row_count, + int64_t min_sequence_number, + int64_t max_sequence_number) const { + return DataFileMeta::ForAppend(file_name, /*file_size=*/row_count, row_count, + SimpleStats::EmptyStats(), min_sequence_number, + max_sequence_number, /*schema_id=*/0, FileSource::Append(), + std::nullopt, std::nullopt, std::nullopt, std::nullopt) + .value(); + } + + std::unique_ptr CreateSingleStringBatch( + const std::vector& values, + const std::optional>& row_kinds = std::nullopt) const { + arrow::FieldVector fields = {arrow::field("f0", arrow::utf8())}; + auto struct_type = arrow::struct_(fields); + arrow::StructBuilder struct_builder(struct_type, arrow::default_memory_pool(), + {std::make_shared()}); + auto string_builder = static_cast(struct_builder.field_builder(0)); + for (const auto& value : values) { + EXPECT_TRUE(struct_builder.Append().ok()); + EXPECT_TRUE(string_builder->Append(value).ok()); + } + std::shared_ptr array; + EXPECT_TRUE(struct_builder.Finish(&array).ok()); + + ::ArrowArray arrow_array; + EXPECT_TRUE(arrow::ExportArray(*array, &arrow_array).ok()); + RecordBatchBuilder batch_builder(&arrow_array); + if (row_kinds.has_value()) { + batch_builder.SetRowKinds(row_kinds.value()); + } + return batch_builder.Finish().value(); + } + + std::unique_ptr CreateStructBatch( + const std::shared_ptr& schema, + const std::vector>& columns) const { + auto raw_struct_array = arrow::StructArray::Make(columns, schema->fields()).ValueOrDie(); + ::ArrowArray arrow_array; + EXPECT_TRUE(arrow::ExportArray(*raw_struct_array, &arrow_array).ok()); + RecordBatchBuilder batch_builder(&arrow_array); + return batch_builder.Finish().value(); + } + + private: + std::shared_ptr memory_pool_; + std::shared_ptr compact_manager_; +}; + +TEST_F(AppendOnlyWriterTest, TestEmptyCommits) { + std::map raw_options; + raw_options[Options::FILE_FORMAT] = "mock_format"; + raw_options[Options::FILE_SYSTEM] = "local"; + raw_options[Options::MANIFEST_FORMAT] = "mock_format"; + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap(raw_options)); + + arrow::FieldVector fields = { + arrow::field("f0", arrow::boolean()), arrow::field("f1", arrow::uint8()), + arrow::field("f10", arrow::float64()), arrow::field("f11", arrow::utf8()), + arrow::field("f12", arrow::binary()), arrow::field("non-partition-field", arrow::int32())}; + + auto schema = arrow::schema(fields); + + auto path_factory = std::make_shared(); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + ASSERT_OK(path_factory->Init(dir->Str(), "mock_format", options.DataFilePrefix(), nullptr)); + + AppendOnlyWriter writer(options, /*schema_id=*/0, schema, /*write_cols=*/std::nullopt, + /*max_sequence_number=*/-1, path_factory, compact_manager_, + memory_pool_); + for (int32_t i = 0; i < 3; i++) { + ASSERT_OK_AND_ASSIGN(CommitIncrement inc, writer.PrepareCommit(true)); + ASSERT_TRUE(inc.GetNewFilesIncrement().IsEmpty()); + ASSERT_TRUE(inc.GetCompactIncrement().IsEmpty()); + } +} + +TEST_F(AppendOnlyWriterTest, TestWriteAndPrepareCommit) { + std::map raw_options; + raw_options[Options::FILE_FORMAT] = "mock_format"; + raw_options[Options::FILE_SYSTEM] = "local"; + raw_options[Options::MANIFEST_FORMAT] = "mock_format"; + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap(raw_options)); + + arrow::FieldVector fields = { + arrow::field("f0", arrow::boolean()), arrow::field("f1", arrow::uint8()), + arrow::field("f10", arrow::float64()), arrow::field("f11", arrow::utf8()), + arrow::field("f12", arrow::binary()), arrow::field("non-partition-field", arrow::int32())}; + + auto schema = arrow::schema(fields); + + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + + auto path_factory = std::make_shared(); + ASSERT_OK(path_factory->Init(dir->Str(), "mock_format", options.DataFilePrefix(), nullptr)); + AppendOnlyWriter writer(options, /*schema_id=*/2, schema, /*write_cols=*/std::nullopt, + /*max_sequence_number=*/-1, path_factory, compact_manager_, + memory_pool_); + arrow::StringBuilder builder; + for (size_t j = 0; j < 100; j++) { + ASSERT_TRUE(builder.Append(std::to_string(j)).ok()); + } + std::shared_ptr array = builder.Finish().ValueOrDie(); + ::ArrowArray arrow_array; + ASSERT_TRUE(arrow::ExportArray(*array, &arrow_array).ok()); + RecordBatchBuilder batch_builder(&arrow_array); + ASSERT_OK_AND_ASSIGN(auto record_batch, batch_builder.Finish()); + ASSERT_OK(writer.Write(std::move(record_batch))); + ASSERT_TRUE(ArrowArrayIsReleased(&arrow_array)); + ASSERT_OK_AND_ASSIGN(CommitIncrement inc, writer.PrepareCommit(true)); + ASSERT_FALSE(inc.GetNewFilesIncrement().IsEmpty()); + const auto& data_increment = inc.GetNewFilesIncrement(); + const auto& data_file_metas = data_increment.NewFiles(); + ASSERT_EQ(1, data_file_metas.size()); + ASSERT_EQ(2, data_file_metas[0]->schema_id); + ASSERT_TRUE(inc.GetCompactIncrement().IsEmpty()); + std::string path = path_factory->ToPath(inc.GetNewFilesIncrement().NewFiles()[0]->file_name); + ASSERT_OK_AND_ASSIGN(bool exist, options.GetFileSystem()->Exists(path)); + ASSERT_TRUE(exist); + ASSERT_OK(writer.Close()); +} + +TEST_F(AppendOnlyWriterTest, TestWriteAndClose) { + std::map raw_options; + raw_options[Options::FILE_FORMAT] = "orc"; + raw_options[Options::FILE_SYSTEM] = "local"; + raw_options[Options::MANIFEST_FORMAT] = "orc"; + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap(raw_options)); + + arrow::FieldVector fields = {arrow::field("f0", arrow::utf8())}; + auto schema = arrow::schema(fields); + + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + + auto path_factory = std::make_shared(); + ASSERT_OK(path_factory->Init(dir->Str(), "orc", options.DataFilePrefix(), nullptr)); + AppendOnlyWriter writer(options, /*schema_id=*/1, schema, /*write_cols=*/std::nullopt, + /*max_sequence_number=*/-1, path_factory, compact_manager_, + memory_pool_); + auto struct_type = arrow::struct_(fields); + arrow::StructBuilder struct_builder(struct_type, arrow::default_memory_pool(), + {std::make_shared()}); + auto string_builder = static_cast(struct_builder.field_builder(0)); + for (size_t j = 0; j < 100; j++) { + ASSERT_TRUE(struct_builder.Append().ok()); + ASSERT_TRUE(string_builder->Append(std::to_string(j)).ok()); + } + std::shared_ptr array; + ASSERT_TRUE(struct_builder.Finish(&array).ok()); + ASSERT_TRUE(array); + ::ArrowArray arrow_array; + ASSERT_TRUE(arrow::ExportArray(*array, &arrow_array).ok()); + + RecordBatchBuilder batch_builder(&arrow_array); + ASSERT_OK_AND_ASSIGN(auto record_batch, batch_builder.Finish()); + ASSERT_OK(writer.Write(std::move(record_batch))); + ASSERT_TRUE(ArrowArrayIsReleased(&arrow_array)); + ASSERT_OK(writer.Close()); + + auto file_system = std::make_shared(); + std::vector> file_status_list; + ASSERT_OK(file_system->ListDir(dir->Str(), &file_status_list)); + ASSERT_TRUE(file_status_list.empty()); +} + +TEST_F(AppendOnlyWriterTest, TestInvalidRowKind) { + std::map raw_options; + raw_options[Options::FILE_FORMAT] = "orc"; + raw_options[Options::FILE_SYSTEM] = "local"; + raw_options[Options::MANIFEST_FORMAT] = "orc"; + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap(raw_options)); + + arrow::FieldVector fields = {arrow::field("f0", arrow::utf8())}; + auto schema = arrow::schema(fields); + + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + + auto path_factory = std::make_shared(); + ASSERT_OK(path_factory->Init(dir->Str(), "orc", options.DataFilePrefix(), nullptr)); + AppendOnlyWriter writer(options, /*schema_id=*/1, schema, /*write_cols=*/std::nullopt, + /*max_sequence_number=*/-1, path_factory, compact_manager_, + memory_pool_); + auto struct_type = arrow::struct_(fields); + arrow::StructBuilder struct_builder(struct_type, arrow::default_memory_pool(), + {std::make_shared()}); + auto string_builder = static_cast(struct_builder.field_builder(0)); + ASSERT_TRUE(struct_builder.Append().ok()); + ASSERT_TRUE(string_builder->Append("row0").ok()); + std::shared_ptr array; + ASSERT_TRUE(struct_builder.Finish(&array).ok()); + ASSERT_TRUE(array); + ::ArrowArray arrow_array; + ASSERT_TRUE(arrow::ExportArray(*array, &arrow_array).ok()); + + RecordBatchBuilder batch_builder(&arrow_array); + ASSERT_OK_AND_ASSIGN(auto record_batch, + batch_builder.SetRowKinds({RecordBatch::RowKind::DELETE}).Finish()); + ASSERT_NOK_WITH_MSG(writer.Write(std::move(record_batch)), + "Append only writer can not accept record batch with RowKind DELETE"); + ASSERT_TRUE(ArrowArrayIsReleased(&arrow_array)); + ASSERT_OK(writer.Close()); + + auto file_system = std::make_shared(); + std::vector> file_status_list; + ASSERT_OK(file_system->ListDir(dir->Str(), &file_status_list)); + ASSERT_TRUE(file_status_list.empty()); +} + +TEST_F(AppendOnlyWriterTest, TestPrepareCommitWaitCompactionUsesBlockingGetResult) { + auto options = CreateOptions(); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto path_factory = CreatePathFactory(dir->Str(), "mock_format", options); + auto compact_manager = std::make_shared(); + + arrow::FieldVector fields = {arrow::field("f0", arrow::utf8())}; + auto schema = arrow::schema(fields); + AppendOnlyWriter writer(options, /*schema_id=*/0, schema, /*write_cols=*/std::nullopt, + /*max_sequence_number=*/-1, path_factory, compact_manager, + memory_pool_); + + ASSERT_OK(writer.Write(CreateSingleStringBatch({"a", "b"}))); + ASSERT_OK(writer.PrepareCommit(/*wait_compaction=*/true).status()); + + ASSERT_EQ(compact_manager->get_result_blocking_calls.size(), 2); + ASSERT_FALSE(compact_manager->get_result_blocking_calls[0]); + ASSERT_TRUE(compact_manager->get_result_blocking_calls[1]); + ASSERT_OK(writer.Close()); +} + +TEST_F(AppendOnlyWriterTest, TestPrepareCommitForceCompactUsesBlockingGetResult) { + auto options = CreateOptions({{Options::COMMIT_FORCE_COMPACT, "true"}}); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto path_factory = CreatePathFactory(dir->Str(), "mock_format", options); + auto compact_manager = std::make_shared(); + + arrow::FieldVector fields = {arrow::field("f0", arrow::utf8())}; + auto schema = arrow::schema(fields); + AppendOnlyWriter writer(options, /*schema_id=*/0, schema, /*write_cols=*/std::nullopt, + /*max_sequence_number=*/-1, path_factory, compact_manager, + memory_pool_); + + ASSERT_OK(writer.Write(CreateSingleStringBatch({"a"}))); + ASSERT_OK(writer.PrepareCommit(/*wait_compaction=*/false).status()); + + ASSERT_EQ(compact_manager->get_result_blocking_calls.size(), 2); + ASSERT_FALSE(compact_manager->get_result_blocking_calls[0]); + ASSERT_TRUE(compact_manager->get_result_blocking_calls[1]); + ASSERT_OK(writer.Close()); +} + +TEST_F(AppendOnlyWriterTest, + TestSyncAndPrepareCommitConsumeCompactionResultsAndMergeDeletionFiles) { + auto options = CreateOptions(); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto path_factory = CreatePathFactory(dir->Str(), "mock_format", options); + auto compact_manager = std::make_shared(); + + auto before1 = NewAppendFile("before-1", 10, 0, 9); + auto after1 = NewAppendFile("after-1", 10, 10, 19); + auto before2 = NewAppendFile("before-2", 10, 20, 29); + auto after2 = NewAppendFile("after-2", 10, 30, 39); + auto deletion_file1 = std::make_shared("d1"); + auto deletion_file2 = std::make_shared("d2"); + + auto result1 = + std::make_shared(std::vector>{before1}, + std::vector>{after1}); + result1->SetDeletionFile(deletion_file1); + auto result2 = + std::make_shared(std::vector>{before2}, + std::vector>{after2}); + result2->SetDeletionFile(deletion_file2); + compact_manager->queued_results.push_back( + std::optional>(result1)); + compact_manager->queued_results.push_back( + std::optional>(result2)); + + arrow::FieldVector fields = {arrow::field("f0", arrow::utf8())}; + auto schema = arrow::schema(fields); + AppendOnlyWriter writer(options, /*schema_id=*/0, schema, /*write_cols=*/std::nullopt, + /*max_sequence_number=*/-1, path_factory, compact_manager, + memory_pool_); + + ASSERT_OK(writer.Sync()); + ASSERT_OK(writer.Sync()); + ASSERT_OK_AND_ASSIGN(CommitIncrement inc, writer.PrepareCommit(/*wait_compaction=*/false)); + + ASSERT_EQ(inc.GetCompactIncrement().CompactBefore().size(), 2); + ASSERT_EQ(inc.GetCompactIncrement().CompactAfter().size(), 2); + ASSERT_EQ(*inc.GetCompactIncrement().CompactBefore()[0], *before1); + ASSERT_EQ(*inc.GetCompactIncrement().CompactBefore()[1], *before2); + ASSERT_EQ(*inc.GetCompactIncrement().CompactAfter()[0], *after1); + ASSERT_EQ(*inc.GetCompactIncrement().CompactAfter()[1], *after2); + + auto merged = std::dynamic_pointer_cast(inc.GetCompactDeletionFile()); + ASSERT_TRUE(merged); + ASSERT_EQ(merged->Id(), "d2"); + ASSERT_EQ(merged->MergedOld(), deletion_file1); + ASSERT_OK(writer.Close()); +} + +TEST_F(AppendOnlyWriterTest, TestCloseDeletesCompactAfterFiles) { + auto options = + CreateOptions({{Options::FILE_FORMAT, "orc"}, {Options::MANIFEST_FORMAT, "orc"}}); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto path_factory = CreatePathFactory(dir->Str(), "orc", options); + auto compact_manager = std::make_shared(); + + auto compact_after = NewAppendFile("compact-after.orc", 1, 0, 0); + auto compact_after_path = path_factory->ToPath(compact_after->file_name); + ASSERT_OK_AND_ASSIGN(auto output, options.GetFileSystem()->Create(compact_after_path, true)); + ASSERT_OK(output->Close()); + + auto result = + std::make_shared(std::vector>{}, + std::vector>{compact_after}); + compact_manager->queued_results.push_back( + std::optional>(result)); + + arrow::FieldVector fields = {arrow::field("f0", arrow::utf8())}; + auto schema = arrow::schema(fields); + AppendOnlyWriter writer(options, /*schema_id=*/0, schema, /*write_cols=*/std::nullopt, + /*max_sequence_number=*/-1, path_factory, compact_manager, + memory_pool_); + + ASSERT_OK(writer.Sync()); + ASSERT_TRUE(options.GetFileSystem()->Exists(compact_after_path).value()); + ASSERT_OK(writer.Close()); + ASSERT_FALSE(options.GetFileSystem()->Exists(compact_after_path).value()); + ASSERT_TRUE(compact_manager->request_cancel_called); + ASSERT_TRUE(compact_manager->wait_called); + ASSERT_TRUE(compact_manager->close_called); +} + +TEST_F(AppendOnlyWriterTest, TestCloseCleansDeletionFile) { + auto options = CreateOptions(); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto path_factory = CreatePathFactory(dir->Str(), "mock_format", options); + auto compact_manager = std::make_shared(); + + auto deletion_file = std::make_shared("del-close"); + auto before = NewAppendFile("before-close", 5, 0, 4); + auto after = NewAppendFile("after-close", 5, 5, 9); + auto result = + std::make_shared(std::vector>{before}, + std::vector>{after}); + result->SetDeletionFile(deletion_file); + compact_manager->queued_results.push_back( + std::optional>(result)); + + arrow::FieldVector fields = {arrow::field("f0", arrow::utf8())}; + auto schema = arrow::schema(fields); + AppendOnlyWriter writer(options, /*schema_id=*/0, schema, /*write_cols=*/std::nullopt, + /*max_sequence_number=*/-1, path_factory, compact_manager, + memory_pool_); + + // Sync to consume the compaction result and populate compact_deletion_file_. + ASSERT_OK(writer.Sync()); + ASSERT_FALSE(deletion_file->Cleaned()); + + ASSERT_OK(writer.Close()); + ASSERT_TRUE(deletion_file->Cleaned()); +} + +TEST_F(AppendOnlyWriterTest, TestCompactNotCompletedTriggersCompaction) { + auto options = CreateOptions(); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto path_factory = CreatePathFactory(dir->Str(), "mock_format", options); + auto compact_manager = std::make_shared(); + compact_manager->compact_not_completed = true; + + arrow::FieldVector fields = {arrow::field("f0", arrow::utf8())}; + auto schema = arrow::schema(fields); + AppendOnlyWriter writer(options, /*schema_id=*/0, schema, /*write_cols=*/std::nullopt, + /*max_sequence_number=*/-1, path_factory, compact_manager, + memory_pool_); + + ASSERT_OK_AND_ASSIGN(bool not_completed, writer.CompactNotCompleted()); + ASSERT_TRUE(not_completed); + ASSERT_EQ(compact_manager->trigger_calls, std::vector({false})); + ASSERT_OK(writer.Close()); +} + +TEST_F(AppendOnlyWriterTest, TestCompactPassesFullCompactionFlag) { + auto options = CreateOptions(); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto path_factory = CreatePathFactory(dir->Str(), "mock_format", options); + auto compact_manager = std::make_shared(); + + arrow::FieldVector fields = {arrow::field("f0", arrow::utf8())}; + auto schema = arrow::schema(fields); + AppendOnlyWriter writer(options, /*schema_id=*/0, schema, /*write_cols=*/std::nullopt, + /*max_sequence_number=*/-1, path_factory, compact_manager, + memory_pool_); + + ASSERT_OK(writer.Compact(/*full_compaction=*/true)); + ASSERT_OK(writer.Compact(/*full_compaction=*/false)); + ASSERT_EQ(compact_manager->trigger_calls, std::vector({true, false})); + ASSERT_OK(writer.Close()); +} + +TEST_F(AppendOnlyWriterTest, TestWriteWithSingleBlobField) { + auto options = + CreateOptions({{Options::FILE_FORMAT, "orc"}, {Options::MANIFEST_FORMAT, "orc"}}); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto path_factory = CreatePathFactory(dir->Str(), "orc", options); + + auto int_field = arrow::field("id", arrow::int32()); + auto blob_field = BlobUtils::ToArrowField("blob", false); + auto schema = arrow::schema({int_field, blob_field}); + + AppendOnlyWriter writer(options, /*schema_id=*/0, schema, /*write_cols=*/std::nullopt, + /*max_sequence_number=*/-1, path_factory, compact_manager_, + memory_pool_); + + arrow::Int32Builder int_builder; + ASSERT_TRUE(int_builder.AppendValues({1, 2}).ok()); + auto int_array = int_builder.Finish().ValueOrDie(); + arrow::LargeBinaryBuilder blob_builder; + ASSERT_TRUE(blob_builder.Append("a", 1).ok()); + ASSERT_TRUE(blob_builder.Append("bb", 2).ok()); + auto blob_array = blob_builder.Finish().ValueOrDie(); + + ASSERT_OK(writer.Write(CreateStructBatch(schema, {int_array, blob_array}))); + ASSERT_OK_AND_ASSIGN(CommitIncrement inc, writer.PrepareCommit(/*wait_compaction=*/true)); + + ASSERT_EQ(inc.GetNewFilesIncrement().NewFiles().size(), 2); + const auto& main_file = inc.GetNewFilesIncrement().NewFiles()[0]; + const auto& blob_file = inc.GetNewFilesIncrement().NewFiles()[1]; + ASSERT_TRUE( + options.GetFileSystem()->Exists(path_factory->ToPath(main_file->file_name)).value()); + ASSERT_TRUE( + options.GetFileSystem()->Exists(path_factory->ToPath(blob_file->file_name)).value()); + ASSERT_OK(writer.Close()); +} + +TEST_F(AppendOnlyWriterTest, TestWriteWithMultipleBlobFields) { + auto options = + CreateOptions({{Options::FILE_FORMAT, "orc"}, {Options::MANIFEST_FORMAT, "orc"}}); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto path_factory = CreatePathFactory(dir->Str(), "orc", options); + + auto schema = + arrow::schema({arrow::field("id", arrow::int32()), BlobUtils::ToArrowField("blob1", false), + BlobUtils::ToArrowField("blob2", false)}); + AppendOnlyWriter writer(options, /*schema_id=*/0, schema, /*write_cols=*/std::nullopt, + /*max_sequence_number=*/-1, path_factory, compact_manager_, + memory_pool_); + + arrow::Int32Builder int_builder; + ASSERT_TRUE(int_builder.AppendValues({1}).ok()); + auto int_array = int_builder.Finish().ValueOrDie(); + arrow::LargeBinaryBuilder blob_builder1; + ASSERT_TRUE(blob_builder1.Append("a", 1).ok()); + auto blob_array1 = blob_builder1.Finish().ValueOrDie(); + arrow::LargeBinaryBuilder blob_builder2; + ASSERT_TRUE(blob_builder2.Append("b", 1).ok()); + auto blob_array2 = blob_builder2.Finish().ValueOrDie(); + + ASSERT_OK(writer.Write(CreateStructBatch(schema, {int_array, blob_array1, blob_array2}))); + ASSERT_OK_AND_ASSIGN(CommitIncrement inc, writer.PrepareCommit(/*wait_compaction=*/true)); + + ASSERT_EQ(inc.GetNewFilesIncrement().NewFiles().size(), 3); + const auto& main_file = inc.GetNewFilesIncrement().NewFiles()[0]; + const auto& blob_file1 = inc.GetNewFilesIncrement().NewFiles()[1]; + const auto& blob_file2 = inc.GetNewFilesIncrement().NewFiles()[2]; + ASSERT_TRUE( + options.GetFileSystem()->Exists(path_factory->ToPath(main_file->file_name)).value()); + ASSERT_TRUE( + options.GetFileSystem()->Exists(path_factory->ToPath(blob_file1->file_name)).value()); + ASSERT_TRUE( + options.GetFileSystem()->Exists(path_factory->ToPath(blob_file2->file_name)).value()); + ASSERT_OK(writer.Close()); +} + +TEST_F(AppendOnlyWriterTest, TestMultiplePrepareCommitSequenceContinuity) { + auto options = CreateOptions(); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto path_factory = CreatePathFactory(dir->Str(), "mock_format", options); + + arrow::FieldVector fields = {arrow::field("f0", arrow::utf8())}; + auto schema = arrow::schema(fields); + AppendOnlyWriter writer(options, /*schema_id=*/0, schema, /*write_cols=*/std::nullopt, + /*max_sequence_number=*/-1, path_factory, compact_manager_, + memory_pool_); + + ASSERT_OK(writer.Write(CreateSingleStringBatch({"a", "b", "c"}))); + ASSERT_OK_AND_ASSIGN(CommitIncrement first, writer.PrepareCommit(/*wait_compaction=*/false)); + ASSERT_OK(writer.Write(CreateSingleStringBatch({"d", "e"}))); + ASSERT_OK_AND_ASSIGN(CommitIncrement second, writer.PrepareCommit(/*wait_compaction=*/false)); + + ASSERT_EQ(first.GetNewFilesIncrement().NewFiles().size(), 1); + ASSERT_EQ(second.GetNewFilesIncrement().NewFiles().size(), 1); + ASSERT_EQ(first.GetNewFilesIncrement().NewFiles()[0]->min_sequence_number, 0); + ASSERT_EQ(first.GetNewFilesIncrement().NewFiles()[0]->max_sequence_number, 2); + ASSERT_EQ(second.GetNewFilesIncrement().NewFiles()[0]->min_sequence_number, 3); + ASSERT_EQ(second.GetNewFilesIncrement().NewFiles()[0]->max_sequence_number, 4); + ASSERT_OK(writer.Close()); +} + +} // namespace paimon::test