diff --git a/src/paimon/core/append/append_compact_coordinator_test.cpp b/src/paimon/core/append/append_compact_coordinator_test.cpp new file mode 100644 index 0000000..10ef255 --- /dev/null +++ b/src/paimon/core/append/append_compact_coordinator_test.cpp @@ -0,0 +1,787 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "paimon/append/append_compact_coordinator.h" + +#include +#include +#include +#include +#include + +#include "arrow/c/bridge.h" +#include "arrow/ipc/json_simple.h" +#include "arrow/type.h" +#include "gtest/gtest.h" +#include "paimon/catalog/catalog.h" +#include "paimon/commit_context.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/core/table/source/data_split_impl.h" +#include "paimon/defs.h" +#include "paimon/file_store_commit.h" +#include "paimon/file_store_write.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/record_batch.h" +#include "paimon/result.h" +#include "paimon/testing/utils/test_helper.h" +#include "paimon/testing/utils/testharness.h" +#include "paimon/write_context.h" + +namespace paimon::test { + +class AppendCompactCoordinatorTest : public ::testing::Test { + public: + void SetUp() override { + pool_ = GetDefaultPool(); + dir_ = UniqueTestDirectory::Create("local"); + } + + void TearDown() override { + dir_.reset(); + } + + void CreateTable(const arrow::FieldVector& fields, + const std::vector& partition_keys, + const std::map& options) { + fields_ = fields; + auto schema = arrow::schema(fields_); + ::ArrowSchema c_schema; + ASSERT_TRUE(arrow::ExportSchema(*schema, &c_schema).ok()); + + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir_->Str(), options)); + ASSERT_OK(catalog->CreateDatabase("foo", {}, /*ignore_if_exists=*/false)); + ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &c_schema, partition_keys, + /*primary_keys=*/{}, options, + /*ignore_if_exists=*/false)); + } + + std::string TablePath() const { + return PathUtil::JoinPath(dir_->Str(), "foo.db/bar"); + } + + Status WriteAndCommit(const std::string& table_path, + const std::map& partition, int32_t bucket, + const std::shared_ptr& write_array, + int64_t commit_identifier) { + WriteContextBuilder write_builder(table_path, "commit_user_1"); + write_builder.WithStreamingMode(false); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr write_context, write_builder.Finish()); + PAIMON_ASSIGN_OR_RAISE(auto file_store_write, + FileStoreWrite::Create(std::move(write_context))); + ArrowArray c_array; + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*write_array, &c_array)); + auto record_batch = std::make_unique( + partition, bucket, std::vector{}, &c_array); + PAIMON_RETURN_NOT_OK(file_store_write->Write(std::move(record_batch))); + PAIMON_ASSIGN_OR_RAISE(auto commit_msgs, file_store_write->PrepareCommit( + /*wait_compaction=*/false, commit_identifier)); + PAIMON_RETURN_NOT_OK(file_store_write->Close()); + return Commit(table_path, commit_msgs); + } + + Status Commit(const std::string& table_path, + const std::vector>& commit_msgs) const { + CommitContextBuilder commit_builder(table_path, "commit_user_1"); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr commit_context, + commit_builder.Finish()); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr file_store_commit, + FileStoreCommit::Create(std::move(commit_context))); + return file_store_commit->Commit(commit_msgs); + } + + void ScanAndVerify(const std::string& table_path, const arrow::FieldVector& fields, + const std::map, std::string>& + expected_data_per_partition_bucket) { + std::map options = {{Options::FILE_SYSTEM, "local"}}; + ASSERT_OK_AND_ASSIGN(auto helper, + TestHelper::Create(table_path, options, /*is_streaming_mode=*/false)); + ASSERT_OK_AND_ASSIGN( + std::vector> data_splits, + helper->NewScan(StartupMode::LatestFull(), /*snapshot_id=*/std::nullopt)); + + arrow::FieldVector fields_with_row_kind = fields; + fields_with_row_kind.insert(fields_with_row_kind.begin(), + arrow::field("_VALUE_KIND", arrow::int8())); + auto data_type = arrow::struct_(fields_with_row_kind); + + std::map, std::vector>> + splits_by_partition_bucket; + for (const auto& split : data_splits) { + auto split_impl = dynamic_cast(split.get()); + ASSERT_OK_AND_ASSIGN(std::string partition_str, + helper->PartitionStr(split_impl->Partition())); + splits_by_partition_bucket[std::make_pair(partition_str, split_impl->Bucket())] + .push_back(split); + } + + ASSERT_EQ(splits_by_partition_bucket.size(), expected_data_per_partition_bucket.size()); + for (const auto& [key, splits] : splits_by_partition_bucket) { + auto iter = expected_data_per_partition_bucket.find(key); + ASSERT_TRUE(iter != expected_data_per_partition_bucket.end()) + << "Unexpected partition=" << key.first << " bucket=" << key.second; + ASSERT_OK_AND_ASSIGN(bool success, + helper->ReadAndCheckResult(data_type, splits, iter->second)); + ASSERT_TRUE(success); + } + } + + void CheckCommitMessage(const std::shared_ptr& msg, size_t expected_before_files, + int64_t expected_total_rows) { + auto impl = dynamic_cast(msg.get()); + ASSERT_TRUE(impl); + ASSERT_EQ(impl->Bucket(), 0); + + const auto& compact_before = impl->GetCompactIncrement().CompactBefore(); + const auto& compact_after = impl->GetCompactIncrement().CompactAfter(); + ASSERT_EQ(compact_before.size(), expected_before_files); + ASSERT_EQ(compact_after.size(), 1); + + int64_t total_before_rows = 0; + for (const auto& file : compact_before) { + total_before_rows += file->row_count; + } + ASSERT_EQ(total_before_rows, expected_total_rows); + ASSERT_EQ(compact_after[0]->row_count, expected_total_rows); + } + + private: + std::shared_ptr pool_; + std::unique_ptr dir_; + arrow::FieldVector fields_; +}; + +/// Test that AppendCompactCoordinator::Run compacts all partitions' small files. +/// +/// Steps: +/// 1. Create an unaware-bucket append-only table (bucket=-1) with partition key "f1". +/// 2. Write 4 batches across 2 partitions (f1=10, f1=20), producing multiple small files. +/// 3. Call AppendCompactCoordinator::Run with empty partitions (compact all). +/// 4. Commit the compact results. +/// 5. Verify that the data is correct after compaction. +TEST_F(AppendCompactCoordinatorTest, TestRunCompactsAllPartitions) { + std::map options = { + {Options::FILE_FORMAT, "parquet"}, + {Options::BUCKET, "-1"}, + {Options::FILE_SYSTEM, "local"}, + {Options::COMPACTION_MIN_FILE_NUM, "2"}, + }; + + arrow::FieldVector fields = { + arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; + CreateTable(fields, /*partition_keys=*/{"f1"}, options); + + auto table_path = TablePath(); + auto data_type = arrow::struct_(fields); + + // Write batch 1: partition f1=10 + { + auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ + ["Alice", 10, 1, 11.1], + ["Bob", 10, 0, 12.1], + ["Emily", 10, 0, 13.1], + ["Tony", 10, 0, 14.1] + ])") + .ValueOrDie(); + ASSERT_OK(WriteAndCommit(table_path, {{"f1", "10"}}, /*bucket=*/0, array, + /*commit_identifier=*/0)); + } + + // Write batch 2: partition f1=10 + { + auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ + ["Emily", 10, 0, 15.1], + ["Bob", 10, 0, 12.1], + ["Alex", 10, 0, 16.1] + ])") + .ValueOrDie(); + ASSERT_OK(WriteAndCommit(table_path, {{"f1", "10"}}, /*bucket=*/0, array, + /*commit_identifier=*/1)); + } + + // Write batch 3: partition f1=20 + { + auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ + ["Lucy", 20, 1, 14.1], + ["Paul", 20, 1, null] + ])") + .ValueOrDie(); + ASSERT_OK(WriteAndCommit(table_path, {{"f1", "20"}}, /*bucket=*/0, array, + /*commit_identifier=*/2)); + } + + // Write batch 4: partition f1=20 (another small file) + { + auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ + ["David", 20, 0, 17.1] + ])") + .ValueOrDie(); + ASSERT_OK(WriteAndCommit(table_path, {{"f1", "20"}}, /*bucket=*/0, array, + /*commit_identifier=*/3)); + } + + // Run AppendCompactCoordinator to compact all partitions + ASSERT_OK_AND_ASSIGN( + auto compact_messages, + AppendCompactCoordinator::Run(table_path, options, + /*partitions=*/{}, /*file_system=*/nullptr, pool_)); + + // Verify commit messages: 2 messages (one per partition) + ASSERT_EQ(compact_messages.size(), 2); + // f1=10: 2 files compacted into 1, total 7 rows + CheckCommitMessage(compact_messages[0], + /*expected_before_files=*/2, + /*expected_total_rows=*/7); + // f1=20: 2 files compacted into 1, total 3 rows + CheckCommitMessage(compact_messages[1], + /*expected_before_files=*/2, + /*expected_total_rows=*/3); + + // Commit compact results + ASSERT_OK(Commit(table_path, compact_messages)); + + // Verify data after compaction + std::map, std::string> expected_datas; + // Files are sorted by file_size ascending in PackFiles, so the smaller file + // (batch 2: 3 rows) comes before the larger file (batch 1: 4 rows). + expected_datas[std::make_pair("f1=10/", 0)] = R"([ +[0, "Emily", 10, 0, 15.1], +[0, "Bob", 10, 0, 12.1], +[0, "Alex", 10, 0, 16.1], +[0, "Alice", 10, 1, 11.1], +[0, "Bob", 10, 0, 12.1], +[0, "Emily", 10, 0, 13.1], +[0, "Tony", 10, 0, 14.1] +])"; + + expected_datas[std::make_pair("f1=20/", 0)] = R"([ +[0, "David", 20, 0, 17.1], +[0, "Lucy", 20, 1, 14.1], +[0, "Paul", 20, 1, null] +])"; + + ScanAndVerify(table_path, fields, expected_datas); +} + +/// Test that AppendCompactCoordinator::Run only compacts the specified partition. +/// +/// Steps: +/// 1. Create an unaware-bucket append-only table with partition key "f1". +/// 2. Write data to both f1=10 and f1=20 partitions. +/// 3. Call Run with partitions=[{f1=10}], so only f1=10 is compacted. +/// 4. Verify only 1 commit message for f1=10, and data is correct for both partitions. +TEST_F(AppendCompactCoordinatorTest, TestRunCompactsSinglePartition) { + std::map options = { + {Options::FILE_FORMAT, "parquet"}, + {Options::BUCKET, "-1"}, + {Options::FILE_SYSTEM, "local"}, + {Options::COMPACTION_MIN_FILE_NUM, "2"}, + }; + + arrow::FieldVector fields = { + arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; + CreateTable(fields, /*partition_keys=*/{"f1"}, options); + + auto table_path = TablePath(); + auto data_type = arrow::struct_(fields); + + // Write 2 batches to f1=10 + { + auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ + ["Alice", 10, 1, 11.1], + ["Bob", 10, 0, 12.1] + ])") + .ValueOrDie(); + ASSERT_OK(WriteAndCommit(table_path, {{"f1", "10"}}, 0, array, 0)); + } + { + auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ + ["Emily", 10, 0, 13.1] + ])") + .ValueOrDie(); + ASSERT_OK(WriteAndCommit(table_path, {{"f1", "10"}}, 0, array, 1)); + } + + // Write 2 batches to f1=20 + { + auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ + ["Lucy", 20, 1, 14.1] + ])") + .ValueOrDie(); + ASSERT_OK(WriteAndCommit(table_path, {{"f1", "20"}}, 0, array, 2)); + } + { + auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ + ["Paul", 20, 0, 15.1] + ])") + .ValueOrDie(); + ASSERT_OK(WriteAndCommit(table_path, {{"f1", "20"}}, 0, array, 3)); + } + + // Only compact partition f1=10 + ASSERT_OK_AND_ASSIGN(auto compact_messages, + AppendCompactCoordinator::Run(table_path, options, + /*partitions=*/{{{"f1", "10"}}}, + /*file_system=*/nullptr, pool_)); + + // Should only have 1 commit message for f1=10 + ASSERT_EQ(compact_messages.size(), 1); + CheckCommitMessage(compact_messages[0], /*expected_before_files=*/2, + /*expected_total_rows=*/3); + + ASSERT_OK(Commit(table_path, compact_messages)); + + // Verify: f1=10 is compacted (1 file), f1=20 remains uncompacted (2 splits) + std::map, std::string> expected_datas; + expected_datas[std::make_pair("f1=10/", 0)] = R"([ +[0, "Emily", 10, 0, 13.1], +[0, "Alice", 10, 1, 11.1], +[0, "Bob", 10, 0, 12.1] +])"; + + expected_datas[std::make_pair("f1=20/", 0)] = R"([ +[0, "Lucy", 20, 1, 14.1], +[0, "Paul", 20, 0, 15.1] +])"; + + ScanAndVerify(table_path, fields, expected_datas); +} + +/// Test that no compaction happens when files don't meet compact conditions. +/// With COMPACTION_MIN_FILE_NUM=3, having only 2 file won't trigger compaction. +TEST_F(AppendCompactCoordinatorTest, TestNoCompactWhenConditionsNotMet) { + std::map options = { + {Options::FILE_FORMAT, "parquet"}, + {Options::BUCKET, "-1"}, + {Options::FILE_SYSTEM, "local"}, + {Options::COMPACTION_MIN_FILE_NUM, "3"}, + }; + + arrow::FieldVector fields = { + arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; + CreateTable(fields, /*partition_keys=*/{}, options); + + auto table_path = TablePath(); + auto data_type = arrow::struct_(fields); + + // Write only 2 files (less than min_file_num=3) + { + auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ + ["Alice", 10, 1, 11.1] + ])") + .ValueOrDie(); + ASSERT_OK(WriteAndCommit(table_path, {}, 0, array, 0)); + } + { + auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ + ["Bob", 10, 0, 12.1] + ])") + .ValueOrDie(); + ASSERT_OK(WriteAndCommit(table_path, {}, 0, array, 1)); + } + + // Run compact: should return empty since 2 files < min_file_num=3 + ASSERT_OK_AND_ASSIGN( + auto compact_messages, + AppendCompactCoordinator::Run(table_path, options, + /*partitions=*/{}, /*file_system=*/nullptr, pool_)); + ASSERT_TRUE(compact_messages.empty()); + + // Data should remain unchanged + std::map, std::string> expected_datas; + expected_datas[std::make_pair("", 0)] = R"([ +[0, "Alice", 10, 1, 11.1], +[0, "Bob", 10, 0, 12.1] +])"; + ScanAndVerify(table_path, fields, expected_datas); +} + +/// Test compacting a partition that has no data (e.g. f1=30). +/// Should return empty compact messages without error. +TEST_F(AppendCompactCoordinatorTest, TestCompactNonExistentPartition) { + std::map options = { + {Options::FILE_FORMAT, "parquet"}, + {Options::BUCKET, "-1"}, + {Options::FILE_SYSTEM, "local"}, + {Options::COMPACTION_MIN_FILE_NUM, "2"}, + }; + + arrow::FieldVector fields = { + arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; + CreateTable(fields, /*partition_keys=*/{"f1"}, options); + + auto table_path = TablePath(); + auto data_type = arrow::struct_(fields); + + // Write data only to f1=10 + { + auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ + ["Alice", 10, 1, 11.1] + ])") + .ValueOrDie(); + ASSERT_OK(WriteAndCommit(table_path, {{"f1", "10"}}, 0, array, 0)); + } + { + auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ + ["Bob", 10, 0, 12.1] + ])") + .ValueOrDie(); + ASSERT_OK(WriteAndCommit(table_path, {{"f1", "10"}}, 0, array, 1)); + } + + // Compact only f1=30 which has no data + ASSERT_OK_AND_ASSIGN(auto compact_messages, + AppendCompactCoordinator::Run(table_path, options, + /*partitions=*/{{{"f1", "30"}}}, + /*file_system=*/nullptr, pool_)); + ASSERT_TRUE(compact_messages.empty()); + + // Original data should remain unchanged + std::map, std::string> expected_datas; + expected_datas[std::make_pair("f1=10/", 0)] = R"([ +[0, "Alice", 10, 1, 11.1], +[0, "Bob", 10, 0, 12.1] +])"; + ScanAndVerify(table_path, fields, expected_datas); +} + +/// Test that Run fails on a bucketed table (bucket != -1). +TEST_F(AppendCompactCoordinatorTest, TestValidateFailsOnBucketedTable) { + std::map options = { + {Options::FILE_FORMAT, "parquet"}, + {Options::BUCKET, "2"}, + {Options::BUCKET_KEY, "f0"}, + {Options::FILE_SYSTEM, "local"}, + }; + + arrow::FieldVector fields = { + arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; + CreateTable(fields, /*partition_keys=*/{"f1"}, options); + + ASSERT_NOK_WITH_MSG( + AppendCompactCoordinator::Run(TablePath(), options, /*partitions=*/{}, + /*file_system=*/nullptr, pool_), + "AppendCompactCoordinator only supports append-only tables with UNAWARE_BUCKET mode"); +} + +/// Test that Run fails on a dv table +TEST_F(AppendCompactCoordinatorTest, TestValidateFailsOnDvTable) { + std::map options = {{Options::FILE_FORMAT, "parquet"}, + {Options::BUCKET, "-1"}, + {Options::FILE_SYSTEM, "local"}, + {Options::DELETION_VECTORS_ENABLED, "true"}}; + + arrow::FieldVector fields = { + arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; + CreateTable(fields, /*partition_keys=*/{"f1"}, options); + + ASSERT_NOK_WITH_MSG(AppendCompactCoordinator::Run(TablePath(), options, /*partitions=*/{}, + /*file_system=*/nullptr, pool_), + "not support for dv in UNAWARE_BUCKET mode"); +} + +/// Test that compact output files are written to external path when configured. +TEST_F(AppendCompactCoordinatorTest, TestCompactWithExternalPath) { + auto external_dir = UniqueTestDirectory::Create("local"); + ASSERT_TRUE(external_dir); + std::string external_path = external_dir->Str(); + + std::map options = { + {Options::FILE_FORMAT, "parquet"}, + {Options::BUCKET, "-1"}, + {Options::FILE_SYSTEM, "local"}, + {Options::COMPACTION_MIN_FILE_NUM, "2"}, + {Options::DATA_FILE_EXTERNAL_PATHS, "FILE://" + external_path}, + {Options::DATA_FILE_EXTERNAL_PATHS_STRATEGY, "round-robin"}, + }; + + arrow::FieldVector fields = { + arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; + CreateTable(fields, /*partition_keys=*/{}, options); + + auto table_path = TablePath(); + auto data_type = arrow::struct_(fields); + + // Write 2 batches + { + auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ + ["Alice", 10, 1, 11.1], + ["Bob", 10, 0, 12.1] + ])") + .ValueOrDie(); + ASSERT_OK(WriteAndCommit(table_path, {}, 0, array, 0)); + } + { + auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ + ["Emily", 10, 0, 13.1] + ])") + .ValueOrDie(); + ASSERT_OK(WriteAndCommit(table_path, {}, 0, array, 1)); + } + + // Run compact + ASSERT_OK_AND_ASSIGN(auto compact_messages, + AppendCompactCoordinator::Run(table_path, options, /*partitions=*/{}, + /*file_system=*/nullptr, pool_)); + ASSERT_EQ(compact_messages.size(), 1); + CheckCommitMessage(compact_messages[0], /*expected_before_files=*/2, + /*expected_total_rows=*/3); + + // Verify compact output files have external_path set + { + bool found_compact_after_with_external_path = false; + for (const auto& msg : compact_messages) { + auto impl = dynamic_cast(msg.get()); + ASSERT_TRUE(impl); + for (const auto& file_meta : impl->GetCompactIncrement().CompactAfter()) { + ASSERT_TRUE(file_meta->external_path.has_value()) + << "Compact output file " << file_meta->file_name + << " should have external_path set"; + found_compact_after_with_external_path = true; + } + } + ASSERT_TRUE(found_compact_after_with_external_path) + << "Should have at least one compact output file"; + } + + // Verify files physically exist in external path directory + { + auto filesystem = external_dir->GetFileSystem(); + auto bucket_dir = external_path + "/bucket-0/"; + std::vector> file_statuses; + ASSERT_OK(filesystem->ListDir(bucket_dir, &file_statuses)); + ASSERT_EQ(file_statuses.size(), 3) + << "External path directory should contain compact output files"; + } + + ASSERT_OK(Commit(table_path, compact_messages)); + + // Verify data is correct after compaction + std::map, std::string> expected_datas; + expected_datas[std::make_pair("", 0)] = R"([ +[0, "Emily", 10, 0, 13.1], +[0, "Alice", 10, 1, 11.1], +[0, "Bob", 10, 0, 12.1] +])"; + ScanAndVerify(table_path, fields, expected_datas); +} + +/// Test that no compaction happens when file sizes exceed compaction_file_size threshold. +/// With a very small target-file-size (100 bytes), compaction_file_size = 100/10*7 = 70 bytes. +/// Parquet files are always larger than 70 bytes, so ScanSmallFiles collects nothing. +TEST_F(AppendCompactCoordinatorTest, TestNoCompactWhenFileSizeExceedsThreshold) { + std::map options = { + {Options::FILE_FORMAT, "parquet"}, {Options::BUCKET, "-1"}, + {Options::FILE_SYSTEM, "local"}, {Options::COMPACTION_MIN_FILE_NUM, "2"}, + {Options::TARGET_FILE_SIZE, "100"}, + }; + + arrow::FieldVector fields = { + arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; + CreateTable(fields, /*partition_keys=*/{}, options); + + auto table_path = TablePath(); + auto data_type = arrow::struct_(fields); + + // Write 2 files + { + auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ + ["Alice", 10, 1, 11.1] + ])") + .ValueOrDie(); + ASSERT_OK(WriteAndCommit(table_path, {}, 0, array, 0)); + } + { + auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ + ["Bob", 10, 0, 12.1] + ])") + .ValueOrDie(); + ASSERT_OK(WriteAndCommit(table_path, {}, 0, array, 1)); + } + + // Run compact: files are too large relative to compaction_file_size (70 bytes) + ASSERT_OK_AND_ASSIGN( + auto compact_messages, + AppendCompactCoordinator::Run(table_path, options, + /*partitions=*/{}, /*file_system=*/nullptr, pool_)); + ASSERT_TRUE(compact_messages.empty()); + + // Data should remain unchanged + std::map, std::string> expected_datas; + expected_datas[std::make_pair("", 0)] = R"([ +[0, "Alice", 10, 1, 11.1], +[0, "Bob", 10, 0, 12.1] +])"; + ScanAndVerify(table_path, fields, expected_datas); +} + +/// Test that compaction triggers via EnoughContent condition. +/// With min_file_num=100 (so EnoughInputFiles won't trigger) and a large open-file-cost, +/// the total_file_size (file_size + open_file_cost per file) exceeds target_file_size * 2, +/// satisfying EnoughContent and triggering compaction. +TEST_F(AppendCompactCoordinatorTest, TestCompactViaEnoughContent) { + std::map options = { + {Options::FILE_FORMAT, "parquet"}, + {Options::BUCKET, "-1"}, + {Options::FILE_SYSTEM, "local"}, + {Options::COMPACTION_MIN_FILE_NUM, "100"}, + {Options::SOURCE_SPLIT_OPEN_FILE_COST, "256mb"}, + }; + + arrow::FieldVector fields = { + arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; + CreateTable(fields, /*partition_keys=*/{}, options); + + auto table_path = TablePath(); + auto data_type = arrow::struct_(fields); + + // Write 2 files + { + auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ + ["Alice", 10, 1, 11.1], + ["Bob", 10, 0, 12.1] + ])") + .ValueOrDie(); + ASSERT_OK(WriteAndCommit(table_path, {}, 0, array, 0)); + } + { + auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type, R"([ + ["Emily", 10, 0, 13.1] + ])") + .ValueOrDie(); + ASSERT_OK(WriteAndCommit(table_path, {}, 0, array, 1)); + } + + // Run compact: 2 files < min_file_num=100, but EnoughContent triggers because + // total_file_size = 2 * (file_size + 256MB) >= 256MB * 2 = 512MB + ASSERT_OK_AND_ASSIGN( + auto compact_messages, + AppendCompactCoordinator::Run(table_path, options, + /*partitions=*/{}, /*file_system=*/nullptr, pool_)); + ASSERT_EQ(compact_messages.size(), 1); + CheckCommitMessage(compact_messages[0], /*expected_before_files=*/2, + /*expected_total_rows=*/3); + + ASSERT_OK(Commit(table_path, compact_messages)); + + // Verify data after compaction + std::map, std::string> expected_datas; + expected_datas[std::make_pair("", 0)] = R"([ +[0, "Emily", 10, 0, 13.1], +[0, "Alice", 10, 1, 11.1], +[0, "Bob", 10, 0, 12.1] +])"; + ScanAndVerify(table_path, fields, expected_datas); +} + +/// Test compact with schema evolution (alter table): +/// - Add a new column f4(int32) +/// - Drop column f3(float64) +/// - Change f2 type from int32 to utf8 +/// Write data with old schema, alter table, write data with new schema, then compact. +TEST_F(AppendCompactCoordinatorTest, TestCompactWithSchemaEvolution) { + std::map options = { + {Options::FILE_FORMAT, "parquet"}, + {Options::BUCKET, "-1"}, + {Options::FILE_SYSTEM, "local"}, + {Options::COMPACTION_MIN_FILE_NUM, "2"}, + }; + + // Schema-0: f0(utf8, id=0), f1(int32, id=1), f2(int32, id=2), f3(float64, id=3) + arrow::FieldVector fields_v0 = { + arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; + CreateTable(fields_v0, /*partition_keys=*/{}, options); + + auto table_path = TablePath(); + auto data_type_v0 = arrow::struct_(fields_v0); + + // Write batch with schema-0 + { + auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type_v0, R"([ + ["Alice", 10, 1, 11.1], + ["Bob", 20, 2, 22.2] + ])") + .ValueOrDie(); + ASSERT_OK(WriteAndCommit(table_path, {}, 0, array, 0)); + } + + // Alter table: create schema-1 + // - f2: int32 -> utf8 (id=2) + // - f3: dropped + // - f0: utf8 (id=0, unchanged) + // - f1: int32 (id=1, unchanged) + // - f4: new int32 column (id=4) + auto fs = dir_->GetFileSystem(); + auto schema_manager = std::make_shared(fs, table_path); + ASSERT_OK_AND_ASSIGN(auto table_schema0, schema_manager->ReadSchema(0)); + auto table_schema1 = std::make_shared(*table_schema0); + std::vector new_fields = {DataField(1, arrow::field("f1", arrow::int32())), + DataField(2, arrow::field("f2", arrow::utf8())), + DataField(4, arrow::field("f4", arrow::int32())), + DataField(0, arrow::field("f0", arrow::utf8()))}; + table_schema1->id_ = 1; + table_schema1->fields_ = new_fields; + table_schema1->highest_field_id_ = 4; + ASSERT_OK_AND_ASSIGN(std::string schema_content, table_schema1->ToJsonString()); + ASSERT_OK(fs->AtomicStore(schema_manager->ToSchemaPath(1), schema_content)); + + auto data_type_v1 = DataField::ConvertDataFieldsToArrowStructType(new_fields); + + // Write batch with schema-1 + { + auto array = arrow::ipc::internal::json::ArrayFromJSON(data_type_v1, R"([ + [30, "three", 300, "Carol"] + ])") + .ValueOrDie(); + ASSERT_OK(WriteAndCommit(table_path, {}, 0, array, 1)); + } + + // Run compact: should compact 2 files (one from schema-0, one from schema-1) + ASSERT_OK_AND_ASSIGN(auto compact_messages, + AppendCompactCoordinator::Run(table_path, options, /*partitions=*/{}, + /*file_system=*/nullptr, pool_)); + ASSERT_EQ(compact_messages.size(), 1); + CheckCommitMessage(compact_messages[0], /*expected_before_files=*/2, + /*expected_total_rows=*/3); + + ASSERT_OK(Commit(table_path, compact_messages)); + + // Verify data after compaction using the latest schema (schema-1) + arrow::FieldVector fields_v1 = { + arrow::field("f1", arrow::int32()), arrow::field("f2", arrow::utf8()), + arrow::field("f4", arrow::int32()), arrow::field("f0", arrow::utf8())}; + std::map, std::string> expected_datas; + expected_datas[std::make_pair("", 0)] = R"([ +[0, 30, "three", 300, "Carol"], +[0, 10, "1", null, "Alice"], +[0, 20, "2", null, "Bob"] +])"; + ScanAndVerify(table_path, fields_v1, expected_datas); +} + +} // namespace paimon::test diff --git a/src/paimon/core/append/bucketed_append_compact_manager.cpp b/src/paimon/core/append/bucketed_append_compact_manager.cpp new file mode 100644 index 0000000..fc45e38 --- /dev/null +++ b/src/paimon/core/append/bucketed_append_compact_manager.cpp @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "paimon/core/append/bucketed_append_compact_manager.h" + +#include + +#include "paimon/common/executor/future.h" + +namespace paimon { + +BucketedAppendCompactManager::BucketedAppendCompactManager( + const std::shared_ptr& executor, + const std::vector>& restored, + const std::shared_ptr& dv_maintainer, int32_t min_file_num, + int64_t target_file_size, int64_t compaction_file_size, bool force_rewrite_all_files, + CompactRewriter rewriter, const std::shared_ptr& reporter, + const std::shared_ptr& cancellation_controller) + : executor_(executor), + dv_maintainer_(dv_maintainer), + min_file_num_(min_file_num), + target_file_size_(target_file_size), + compaction_file_size_(compaction_file_size), + force_rewrite_all_files_(force_rewrite_all_files), + rewriter_(rewriter), + reporter_(reporter), + to_compact_( + [](const std::shared_ptr& lhs, const std::shared_ptr& rhs) { + return lhs->min_sequence_number > rhs->min_sequence_number; + }), + cancellation_controller_(cancellation_controller) { + assert(cancellation_controller_ != nullptr); + for (const auto& file : restored) { + to_compact_.push(file); + } +} + +void BucketedAppendCompactManager::RequestCancelCompaction() { + cancellation_controller_->Cancel(); +} + +Status BucketedAppendCompactManager::TriggerCompaction(bool full_compaction) { + if (full_compaction) { + PAIMON_RETURN_NOT_OK(TriggerFullCompaction()); + } else { + TriggerCompactionWithBestEffort(); + } + return Status::OK(); +} + +Status BucketedAppendCompactManager::TriggerFullCompaction() { + if (task_future_.valid()) { + return Status::Invalid( + "A compaction task is still running while the user forces a new compaction. This " + "is unexpected."); + } + // if all files are force picked or deletion vector enables, always trigger compaction. + if (!force_rewrite_all_files_ && + (to_compact_.empty() || + (dv_maintainer_ == nullptr && to_compact_.size() < FULL_COMPACT_MIN_FILE))) { + return Status::OK(); + } + + std::vector> compacting; + while (!to_compact_.empty()) { + compacting.push_back(to_compact_.top()); + to_compact_.pop(); + } + cancellation_controller_->Reset(); + auto compact_task = std::make_shared(reporter_, dv_maintainer_, compacting, + compaction_file_size_, + force_rewrite_all_files_, rewriter_); + task_future_ = Via(executor_.get(), [compact_task]() -> Result> { + return compact_task->Execute(); + }); + compacting_ = compacting; + return Status::OK(); +} + +void BucketedAppendCompactManager::TriggerCompactionWithBestEffort() { + if (task_future_.valid()) { + return; + } + std::optional>> picked = PickCompactBefore(); + if (picked) { + cancellation_controller_->Reset(); + compacting_ = picked.value(); + auto compact_task = std::make_shared(reporter_, dv_maintainer_, + compacting_.value(), rewriter_); + task_future_ = + Via(executor_.get(), [compact_task]() -> Result> { + return compact_task->Execute(); + }); + } +} + +std::optional>> +BucketedAppendCompactManager::PickCompactBefore() { + if (to_compact_.empty()) { + return std::nullopt; + } + int64_t total_file_size = 0; + int32_t file_num = 0; + std::deque> candidates; + + while (!to_compact_.empty()) { + std::shared_ptr file = to_compact_.top(); + to_compact_.pop(); + candidates.push_back(file); + total_file_size += file->file_size; + file_num++; + if (file_num >= min_file_num_) { + return std::vector>(candidates.begin(), candidates.end()); + } else if (total_file_size >= target_file_size_ * 2) { + // Shift the compaction window right and drop the oldest file so picked files stay + // contiguous, preserving append order during compaction. + std::shared_ptr removed = candidates.front(); + candidates.pop_front(); + total_file_size -= removed->file_size; + file_num--; + } + } + for (const auto& candidate : candidates) { + to_compact_.push(candidate); + } + return std::nullopt; +} + +std::vector> BucketedAppendCompactManager::AllFiles() const { + std::vector> all_files; + if (compacting_ != std::nullopt) { + all_files.insert(all_files.end(), compacting_.value().begin(), compacting_.value().end()); + } + auto to_compact = to_compact_; + while (!to_compact.empty()) { + all_files.push_back(to_compact.top()); + to_compact.pop(); + } + return all_files; +} + +Result>> +BucketedAppendCompactManager::GetCompactionResult(bool blocking) { + PAIMON_ASSIGN_OR_RAISE(std::optional> result, + InnerGetCompactionResult(blocking)); + if (result) { + std::shared_ptr compact_result = result.value(); + if (!compact_result->After().empty()) { + // if the last compacted file is still small, + // add it back to the head + std::shared_ptr last_file = compact_result->After().back(); + if (last_file->file_size < compaction_file_size_) { + to_compact_.push(last_file); + } + } + compacting_ = std::nullopt; + } + return result; +} + +Result> BucketedAppendCompactManager::Compact( + const std::shared_ptr& dv_maintainer, + const std::vector>& to_compact, CompactRewriter rewriter) { + PAIMON_ASSIGN_OR_RAISE(std::vector> rewrite, + rewriter(to_compact)); + + auto result = std::make_shared(to_compact, rewrite); + if (dv_maintainer != nullptr) { + for (const auto& file : to_compact) { + dv_maintainer->RemoveDeletionVectorOf(file->file_name); + } + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr deletion_file, + CompactDeletionFile::GenerateFiles(dv_maintainer)); + result->SetDeletionFile(deletion_file); + } + return result; +} + +Result> BucketedAppendCompactManager::FullCompactTask::DoCompact() { + // remove large files + while (!force_rewrite_all_files_ && !to_compact_.empty()) { + const auto& file = to_compact_.front(); + // the data file with deletion file always need to be compacted. + if (file->file_size >= compaction_file_size_ && !HasDeletionFile(file)) { + to_compact_.pop_front(); + continue; + } + break; + } + + // do compaction + if (dv_maintainer_ != nullptr) { + // if deletion vector enables, always trigger compaction. + return Compact( + dv_maintainer_, + std::vector>(to_compact_.begin(), to_compact_.end()), + rewriter_); + } else { + // compute small files + int32_t big = 0; + int32_t small = 0; + for (const auto& file : to_compact_) { + if (file->file_size >= compaction_file_size_) { + big++; + } else { + small++; + } + } + if (force_rewrite_all_files_ || + (small > big && to_compact_.size() >= FULL_COMPACT_MIN_FILE)) { + return Compact( + /*dv_maintainer=*/nullptr, + std::vector>(to_compact_.begin(), to_compact_.end()), + rewriter_); + } else { + return std::make_shared(); + } + } +} + +} // namespace paimon diff --git a/src/paimon/core/append/bucketed_append_compact_manager.h b/src/paimon/core/append/bucketed_append_compact_manager.h new file mode 100644 index 0000000..63a61ac --- /dev/null +++ b/src/paimon/core/append/bucketed_append_compact_manager.h @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "paimon/common/executor/future.h" +#include "paimon/core/compact/cancellation_controller.h" +#include "paimon/core/compact/compact_deletion_file.h" +#include "paimon/core/compact/compact_future_manager.h" +#include "paimon/core/compact/compact_task.h" +#include "paimon/core/deletionvectors/bucketed_dv_maintainer.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/operation/metrics/compaction_metrics.h" +#include "paimon/executor.h" +#include "paimon/logging.h" +#include "paimon/result.h" + +namespace paimon { + +/// Compact manager for `AppendOnlyFileStore`. +class BucketedAppendCompactManager : public CompactFutureManager { + public: + using DataFileMetaPriorityQueue = + std::priority_queue, + std::vector>, + std::function&, + const std::shared_ptr&)>>; + + using CompactRewriter = std::function>>( + const std::vector>&)>; + + /// New files may be created during the compaction process, then the results of the compaction + /// may be put after the new files, and this order will be disrupted. We need to ensure this + /// order, so we force the order by sequence. + static std::function&, + const std::shared_ptr&)> + FileComparator(bool ignore_overlap) { + return [ignore_overlap](const std::shared_ptr& lhs, + const std::shared_ptr& rhs) -> bool { + if (*lhs == *rhs) { + return false; + } + if (!ignore_overlap && IsOverlap(lhs, rhs)) { + static auto logger = Logger::GetLogger("BucketedAppendCompactManager"); + PAIMON_LOG_WARN(logger, + "Overlap detected between files %s and %s, " + "sequence range [%ld, %ld] and [%ld, %ld].", + lhs->file_name.c_str(), rhs->file_name.c_str(), + lhs->min_sequence_number, lhs->max_sequence_number, + rhs->min_sequence_number, rhs->max_sequence_number); + } + return lhs->min_sequence_number < rhs->min_sequence_number; + }; + } + + BucketedAppendCompactManager( + const std::shared_ptr& executor, + const std::vector>& restored, + const std::shared_ptr& dv_maintainer, int32_t min_file_num, + int64_t target_file_size, int64_t compaction_file_size, bool force_rewrite_all_files, + CompactRewriter rewriter, const std::shared_ptr& reporter, + const std::shared_ptr& cancellation_controller); + ~BucketedAppendCompactManager() override = default; + + void RequestCancelCompaction() override; + + Status TriggerCompaction(bool full_compaction) override; + + bool ShouldWaitForLatestCompaction() const override { + return false; + } + bool ShouldWaitForPreparingCheckpoint() const override { + return false; + } + + Status AddNewFile(const std::shared_ptr& file) override { + to_compact_.push(file); + return Status::OK(); + } + + std::vector> AllFiles() const override; + + /// Finish current task, and update result files to to_compact_ + Result>> GetCompactionResult( + bool blocking) override; + + Status Close() override { + if (reporter_) { + reporter_->Unregister(); + reporter_.reset(); + } + return Status::OK(); + } + + private: + static constexpr int32_t FULL_COMPACT_MIN_FILE = 3; + + static bool IsOverlap(const std::shared_ptr& o1, + const std::shared_ptr& o2) { + return o2->min_sequence_number <= o1->max_sequence_number && + o2->max_sequence_number >= o1->min_sequence_number; + } + + static Result> Compact( + const std::shared_ptr& dv_maintainer, + const std::vector>& to_compact, CompactRewriter rewriter); + + /// A `CompactTask` impl for full compaction of append-only table. + class FullCompactTask : public CompactTask { + public: + FullCompactTask(const std::shared_ptr& reporter, + const std::shared_ptr& dv_maintainer, + const std::vector>& inputs, + int64_t compaction_file_size, bool force_rewrite_all_files, + CompactRewriter rewriter) + : CompactTask(reporter), + dv_maintainer_(dv_maintainer), + to_compact_(inputs.begin(), inputs.end()), + compaction_file_size_(compaction_file_size), + force_rewrite_all_files_(force_rewrite_all_files), + rewriter_(rewriter) {} + + protected: + Result> DoCompact() override; + + private: + bool HasDeletionFile(const std::shared_ptr& file) const { + if (dv_maintainer_) { + return dv_maintainer_->DeletionVectorOf(file->file_name) != std::nullopt; + } + return false; + } + + static constexpr int32_t FULL_COMPACT_MIN_FILE = 3; + + std::shared_ptr dv_maintainer_; + std::deque> to_compact_; + int64_t compaction_file_size_; + bool force_rewrite_all_files_; + CompactRewriter rewriter_; + }; + + /// A `CompactTask` impl for append-only table auto-compaction. + /// + /// This task accepts an already-picked candidate to perform one-time rewrite. And for the + /// rest of input files, it is the duty of `AppendOnlyWriter` to invoke the next time + /// compaction. + class AutoCompactTask : public CompactTask { + public: + AutoCompactTask(const std::shared_ptr& reporter, + const std::shared_ptr& dv_maintainer, + const std::vector>& to_compact, + CompactRewriter rewriter) + : CompactTask(reporter), + dv_maintainer_(dv_maintainer), + to_compact_(to_compact), + rewriter_(rewriter) {} + + protected: + Result> DoCompact() override { + return Compact(dv_maintainer_, to_compact_, rewriter_); + } + + private: + std::shared_ptr dv_maintainer_; + std::vector> to_compact_; + CompactRewriter rewriter_; + }; + + DataFileMetaPriorityQueue GetToCompact() const { + return to_compact_; + } + + std::optional>> PickCompactBefore(); + Status TriggerFullCompaction(); + void TriggerCompactionWithBestEffort(); + + std::shared_ptr executor_; + std::shared_ptr dv_maintainer_; + int32_t min_file_num_; + int64_t target_file_size_; + int64_t compaction_file_size_; + bool force_rewrite_all_files_; + CompactRewriter rewriter_; + std::shared_ptr reporter_; + std::optional>> compacting_; + DataFileMetaPriorityQueue to_compact_; + std::shared_ptr cancellation_controller_; +}; + +} // namespace paimon diff --git a/src/paimon/core/append/bucketed_append_compact_manager_test.cpp b/src/paimon/core/append/bucketed_append_compact_manager_test.cpp new file mode 100644 index 0000000..a6376bf --- /dev/null +++ b/src/paimon/core/append/bucketed_append_compact_manager_test.cpp @@ -0,0 +1,588 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include "paimon/core/append/bucketed_append_compact_manager.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/core/deletionvectors/bitmap_deletion_vector.h" +#include "paimon/core/deletionvectors/bucketed_dv_maintainer.h" +#include "paimon/core/deletionvectors/deletion_vectors_index_file.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/manifest/file_source.h" +#include "paimon/core/stats/simple_stats.h" +#include "paimon/executor.h" +#include "paimon/fs/file_system_factory.h" +#include "paimon/result.h" +#include "paimon/testing/mock/mock_index_path_factory.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +class BucketedAppendCompactManagerTest : public testing::Test { + public: + void SetUp() override { + executor_ = CreateDefaultExecutor(); + } + + std::vector> GenerateDataFileMeta() { + std::vector> metas; + metas.push_back(DataFileMeta::ForAppend("file1", 100, 100, SimpleStats::EmptyStats(), + /*min_sequence_number=*/1, + /*max_sequence_number=*/10, 0, FileSource::Append(), + std::nullopt, std::nullopt, std::nullopt, + std::nullopt) + .value()); + metas.push_back(DataFileMeta::ForAppend("file2", 200, 200, SimpleStats::EmptyStats(), + /*min_sequence_number=*/5, + /*max_sequence_number=*/15, 0, FileSource::Append(), + std::nullopt, std::nullopt, std::nullopt, + std::nullopt) + .value()); + metas.push_back(DataFileMeta::ForAppend("file3", 200, 200, SimpleStats::EmptyStats(), + /*min_sequence_number=*/20, + /*max_sequence_number=*/30, 0, FileSource::Append(), + std::nullopt, std::nullopt, std::nullopt, + std::nullopt) + .value()); + return metas; + } + + private: + void InnerTest(const std::vector>& to_compact_before_pick, + bool expected_present, + const std::vector>& expected_compact_before, + const std::vector>& to_compact_after_pick) { + int32_t min_file_num = 4; + int64_t target_file_size = 1024; + int64_t threshold = target_file_size / 10 * 7; + BucketedAppendCompactManager manager( + executor_, to_compact_before_pick, + /*dv_maintainer=*/nullptr, min_file_num, target_file_size, threshold, + /*force_rewrite_all_files=*/false, /*rewriter=*/nullptr, /*reporter=*/nullptr, + /*cancellation_controller=*/std::make_shared()); + auto actual = manager.PickCompactBefore(); + if (expected_present) { + ASSERT_TRUE(actual.has_value()); + ExpectVectorsEqual(actual.value(), expected_compact_before); + } else { + ASSERT_FALSE(actual.has_value()); + } + auto pq = manager.GetToCompact(); + std::vector> to_compact; + while (!pq.empty()) { + to_compact.push_back(pq.top()); + pq.pop(); + } + ExpectVectorsEqual(to_compact, to_compact_after_pick); + } + + std::shared_ptr NewFile(int64_t min_sequence_number, + int64_t max_sequence_number) { + return std::make_shared( + /*file_name=*/"", /*file_size=*/max_sequence_number - min_sequence_number + 1, + /*row_count=*/0, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + /*value_stats=*/SimpleStats::EmptyStats(), min_sequence_number, max_sequence_number, + /*schema_id=*/0, + /*level=*/DataFileMeta::DUMMY_LEVEL, + /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1724090888706ll, 0), + /*delete_row_count=*/max_sequence_number - min_sequence_number + 1, + /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + } + + std::shared_ptr NewNamedFile(const std::string& file_name, int64_t file_size, + int64_t min_sequence_number, + int64_t max_sequence_number) { + return std::make_shared( + file_name, file_size, + /*row_count=*/0, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + /*value_stats=*/SimpleStats::EmptyStats(), min_sequence_number, max_sequence_number, + /*schema_id=*/0, + /*level=*/DataFileMeta::DUMMY_LEVEL, + /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1724090888706ll, 0), + /*delete_row_count=*/0, + /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + } + + std::shared_ptr CreateTestDvMaintainer( + const std::string& root_path, + const std::map>& deletion_vectors) { + auto pool = GetDefaultPool(); + EXPECT_OK_AND_ASSIGN(std::shared_ptr fs, + FileSystemFactory::Get("local", root_path, {})); + auto path_factory = std::make_shared(root_path); + auto dv_index_file = + std::make_shared(fs, path_factory, /*bitmap64=*/false, pool); + return std::make_shared(dv_index_file, deletion_vectors); + } + + std::shared_ptr CreateSimpleDeletionVector(int32_t deleted_position) { + RoaringBitmap32 bitmap; + bitmap.Add(deleted_position); + return std::make_shared(bitmap); + } + + void ExpectVectorsEqual(const std::vector>& actual, + const std::vector>& expected) { + ASSERT_EQ(actual.size(), expected.size()); + for (size_t i = 0; i < actual.size(); ++i) { + ASSERT_EQ(*actual[i], *expected[i]); + } + } + + std::shared_ptr executor_; +}; + +TEST_F(BucketedAppendCompactManagerTest, TestFileComparatorWithoutOverlap) { + auto files = GenerateDataFileMeta(); + auto& file1 = files[0]; + auto& file2 = files[1]; + auto& file3 = files[2]; + + auto comparator = BucketedAppendCompactManager::FileComparator(false); + ASSERT_TRUE(comparator(file1, file2)); + ASSERT_FALSE(comparator(file2, file1)); + ASSERT_TRUE(comparator(file1, file3)); + ASSERT_FALSE(comparator(file3, file1)); +} + +TEST_F(BucketedAppendCompactManagerTest, TestFileComparatorWithOverlap) { + auto files = GenerateDataFileMeta(); + auto& file1 = files[0]; + auto& file2 = files[1]; + auto& file3 = files[2]; + + auto comparator = BucketedAppendCompactManager::FileComparator(true); + ASSERT_TRUE(comparator(file1, file2)); + ASSERT_FALSE(comparator(file2, file1)); + ASSERT_TRUE(comparator(file1, file3)); + ASSERT_FALSE(comparator(file3, file1)); +} + +TEST_F(BucketedAppendCompactManagerTest, TestIsOverlap) { + auto files = GenerateDataFileMeta(); + auto& file1 = files[0]; + auto& file2 = files[1]; + auto& file3 = files[2]; + + ASSERT_TRUE(BucketedAppendCompactManager::IsOverlap(file1, file2)); + ASSERT_FALSE(BucketedAppendCompactManager::IsOverlap(file1, file3)); + ASSERT_FALSE(BucketedAppendCompactManager::IsOverlap(file2, file3)); +} + +TEST_F(BucketedAppendCompactManagerTest, TestPickEmptyAndNotRelease) { + // 1~50 is small enough, so hold it + std::vector> to_compact = {NewFile(1, 50)}; + InnerTest(to_compact, /*expected_present=*/false, /*expected_compact_before=*/{}, to_compact); +} + +TEST_F(BucketedAppendCompactManagerTest, TestPickPresentWhenEnoughSmallFiles) { + // All four files are small and should be picked once min_file_num is reached. + std::vector> to_compact_before_pick = { + NewFile(1, 100), NewFile(101, 200), NewFile(201, 300), NewFile(301, 400)}; + InnerTest(to_compact_before_pick, + /*expected_present=*/true, + /*expected_compact_before=*/to_compact_before_pick, + /*to_compact_after_pick=*/{}); +} + +TEST_F(BucketedAppendCompactManagerTest, TestPickEmptyAndRelease) { + // large file, release + InnerTest(/*to_compact_before_pick=*/{NewFile(1, 2048)}, /*expected_present=*/false, + /*expected_compact_before=*/{}, /*to_compact_after_pick=*/{}); + + // small file at last, release previous + InnerTest(/*to_compact_before_pick=*/{NewFile(1, 2048), NewFile(2049, 2100)}, + /*expected_present=*/false, + /*expected_compact_before=*/{}, /*to_compact_after_pick=*/{NewFile(2049, 2100)}); + InnerTest( + /*to_compact_before_pick=*/{NewFile(1, 2048), NewFile(2049, 2100), NewFile(2101, 2110)}, + /*expected_present=*/false, + /*expected_compact_before=*/{}, + /*to_compact_after_pick=*/{NewFile(2049, 2100), NewFile(2101, 2110)}); + InnerTest( + /*to_compact_before_pick=*/{NewFile(1, 2048), NewFile(2049, 4096), NewFile(4097, 5000)}, + /*expected_present=*/false, + /*expected_compact_before=*/{}, /*to_compact_after_pick=*/{NewFile(4097, 5000)}); + InnerTest( + /*to_compact_before_pick=*/{NewFile(1, 1024), NewFile(1025, 2049), NewFile(2050, 2500), + NewFile(2501, 4096), NewFile(4097, 6000), NewFile(6001, 7000), + NewFile(7001, 7600)}, + /*expected_present=*/false, + /*expected_compact_before=*/{}, + /*to_compact_after_pick=*/{NewFile(6001, 7000), NewFile(7001, 7600)}); + + // ignore single small file (in the middle) + InnerTest( + /*to_compact_before_pick=*/{NewFile(1, 2048), NewFile(2049, 4096), NewFile(4097, 4100), + NewFile(4101, 6150)}, + /*expected_present=*/false, + /*expected_compact_before=*/{}, + /*to_compact_after_pick=*/{NewFile(4101, 6150)}); + InnerTest( + /*to_compact_before_pick=*/{NewFile(1, 2048), NewFile(2049, 4096), NewFile(4097, 5000), + NewFile(5001, 6144), NewFile(6145, 7048)}, + /*expected_present=*/false, + /*expected_compact_before=*/{}, /*to_compact_after_pick=*/{NewFile(6145, 7048)}); + + // wait for more file + InnerTest(/*to_compact_before_pick=*/{NewFile(1, 500), NewFile(501, 1000)}, + /*expected_present=*/false, + /*expected_compact_before=*/{}, + /*to_compact_after_pick=*/{NewFile(1, 500), NewFile(501, 1000)}); + + InnerTest(/*to_compact_before_pick=*/{NewFile(1, 500), NewFile(501, 1000), NewFile(1001, 2048)}, + /*expected_present=*/false, + /*expected_compact_before=*/{}, + /*to_compact_after_pick=*/{NewFile(501, 1000), NewFile(1001, 2048)}); + InnerTest( + /*to_compact_before_pick=*/{NewFile(1, 2050), NewFile(2051, 2100), NewFile(2101, 2110)}, + /*expected_present=*/false, + /*expected_compact_before=*/{}, + /*to_compact_after_pick=*/{NewFile(2051, 2100), NewFile(2101, 2110)}); +} + +TEST_F(BucketedAppendCompactManagerTest, TestPick) { + // fileNum is 13 (which > 4) and totalFileSize is 130 (which < 1024) + InnerTest({NewFile(1, 10), NewFile(11, 20), NewFile(21, 30), NewFile(31, 40), NewFile(41, 50), + NewFile(51, 60), NewFile(61, 70), NewFile(71, 80), NewFile(81, 90), NewFile(91, 100), + NewFile(101, 110), NewFile(111, 120), NewFile(121, 130)}, + /*expected_present=*/true, /*expected_compact_before=*/ + {NewFile(1, 10), NewFile(11, 20), NewFile(21, 30), NewFile(31, 40)}, + /*to_compact_after_pick=*/ + {NewFile(41, 50), NewFile(51, 60), NewFile(61, 70), NewFile(71, 80), NewFile(81, 90), + NewFile(91, 100), NewFile(101, 110), NewFile(111, 120), NewFile(121, 130)}); + + // fileNum is 4 (which > 3) and totalFileSize is 1026 (which > 1024) + InnerTest({NewFile(1, 2), NewFile(3, 500), NewFile(501, 1000), NewFile(1001, 1025), + NewFile(1026, 1050)}, + /*expected_present=*/true, /*expected_compact_before=*/ + {NewFile(1, 2), NewFile(3, 500), NewFile(501, 1000), NewFile(1001, 1025)}, + /*to_compact_after_pick=*/{NewFile(1026, 1050)}); + + // The window shifts right after large files are dropped, then picks contiguous files. + InnerTest({NewFile(1, 1022), NewFile(1023, 1024), NewFile(1025, 2050), + // 2051~2510, ..., 2611~2620 + NewFile(2051, 2510), NewFile(2511, 2520), NewFile(2521, 2530), NewFile(2531, 2540), + NewFile(2541, 2550), NewFile(2551, 2560), NewFile(2561, 2570), NewFile(2571, 2580), + NewFile(2581, 2590), NewFile(2591, 2600), NewFile(2601, 2610), NewFile(2611, 2620), + NewFile(2621, 2630)}, + /*expected_present=*/true, + /*expected_compact_before=*/ + {NewFile(1023, 1024), NewFile(1025, 2050), NewFile(2051, 2510), NewFile(2511, 2520)}, + /*to_compact_after_pick=*/ + {NewFile(2521, 2530), NewFile(2531, 2540), NewFile(2541, 2550), NewFile(2551, 2560), + NewFile(2561, 2570), NewFile(2571, 2580), NewFile(2581, 2590), NewFile(2591, 2600), + NewFile(2601, 2610), NewFile(2611, 2620), NewFile(2621, 2630)}); +} + +TEST_F(BucketedAppendCompactManagerTest, TestCancelCompactionPropagatesToRewriteLoop) { + auto cancellation_controller = std::make_shared(); + auto exit_signal = std::make_shared>(); + auto exit_future = exit_signal->get_future(); + + auto rewriter = [cancellation_controller, + exit_signal](const std::vector>& to_compact) + -> Result>> { + while (!cancellation_controller->IsCancelled()) { + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + exit_signal->set_value(); + return Status::Invalid("compaction cancelled in rewrite loop"); + }; + + BucketedAppendCompactManager manager( + executor_, {NewFile(1, 100), NewFile(101, 200), NewFile(201, 300), NewFile(301, 400)}, + /*dv_maintainer=*/nullptr, + /*min_file_num=*/4, + /*target_file_size=*/1024, + /*compaction_file_size=*/700, + /*force_rewrite_all_files=*/false, rewriter, + /*reporter=*/nullptr, cancellation_controller); + + ASSERT_OK(manager.TriggerCompaction(/*full_compaction=*/true)); + manager.CancelAndWaitCompaction(); + + ASSERT_EQ(exit_future.wait_for(std::chrono::seconds(1)), std::future_status::ready); +} + +TEST_F(BucketedAppendCompactManagerTest, TestTriggerCompactionResetsCancelFlag) { + auto cancellation_controller = std::make_shared(); + cancellation_controller->Cancel(); + auto rewriter = [](const std::vector>& to_compact) + -> Result>> { return to_compact; }; + + BucketedAppendCompactManager manager( + executor_, {NewFile(1, 100), NewFile(101, 200), NewFile(201, 300), NewFile(301, 400)}, + /*dv_maintainer=*/nullptr, + /*min_file_num=*/4, + /*target_file_size=*/1024, + /*compaction_file_size=*/700, + /*force_rewrite_all_files=*/false, rewriter, + /*reporter=*/nullptr, cancellation_controller); + + ASSERT_OK(manager.TriggerCompaction(/*full_compaction=*/true)); + ASSERT_FALSE(cancellation_controller->IsCancelled()); +} + +TEST_F(BucketedAppendCompactManagerTest, TestHasDeletionFileLargeFileWithDvRetained) { + // A large file with a deletion vector should NOT be skipped during full compaction. + // compaction_file_size = 500, so "big_file" (size=2000) is a large file. + // Because it has a deletion vector, HasDeletionFile returns true and keeps it in compaction. + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + + std::map> deletion_vectors; + deletion_vectors["big_file"] = CreateSimpleDeletionVector(0); + auto dv_maintainer = CreateTestDvMaintainer(dir->Str(), deletion_vectors); + + auto rewriter = [](const std::vector>& to_compact) + -> Result>> { return to_compact; }; + + // big_file: size=2000 → exceeds compaction_file_size=500 + // small_file1, small_file2: size=100 each → below threshold + auto big_file = NewNamedFile("big_file", 2000, 1, 2000); + auto small_file1 = NewNamedFile("small_file1", 100, 2001, 2100); + auto small_file2 = NewNamedFile("small_file2", 100, 2101, 2200); + + BucketedAppendCompactManager manager( + executor_, {big_file, small_file1, small_file2}, dv_maintainer, + /*min_file_num=*/4, + /*target_file_size=*/1024, + /*compaction_file_size=*/500, + /*force_rewrite_all_files=*/false, rewriter, + /*reporter=*/nullptr, + /*cancellation_controller=*/std::make_shared()); + + ASSERT_OK(manager.TriggerCompaction(/*full_compaction=*/true)); + ASSERT_OK_AND_ASSIGN(auto result, manager.GetCompactionResult(/*blocking=*/true)); + ASSERT_TRUE(result.has_value()); + + // big_file has a deletion vector, so HasDeletionFile returns true and it is retained. + // All 3 files should appear in Before(). + const auto& before = result.value()->Before(); + ASSERT_EQ(before.size(), 3); + ASSERT_EQ(before[0]->file_name, "big_file"); + ASSERT_EQ(before[1]->file_name, "small_file1"); + ASSERT_EQ(before[2]->file_name, "small_file2"); +} + +TEST_F(BucketedAppendCompactManagerTest, TestHasDeletionFileLargeFileWithoutDvSkipped) { + // A large file WITHOUT a deletion vector should be skipped during full compaction. + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + + // dv_maintainer exists but has no deletion vector for "big_file" + std::map> deletion_vectors; + deletion_vectors["other_file"] = CreateSimpleDeletionVector(0); + auto dv_maintainer = CreateTestDvMaintainer(dir->Str(), deletion_vectors); + + auto rewriter = [](const std::vector>& to_compact) + -> Result>> { return to_compact; }; + + // big_file: size=2000, exceeds compaction_file_size=500, but has no DV → skipped + auto big_file = NewNamedFile("big_file", 2000, 1, 2000); + auto small_file1 = NewNamedFile("small_file1", 100, 2001, 2100); + auto small_file2 = NewNamedFile("small_file2", 100, 2101, 2200); + + BucketedAppendCompactManager manager( + executor_, {big_file, small_file1, small_file2}, dv_maintainer, + /*min_file_num=*/4, + /*target_file_size=*/1024, + /*compaction_file_size=*/500, + /*force_rewrite_all_files=*/false, rewriter, + /*reporter=*/nullptr, + /*cancellation_controller=*/std::make_shared()); + + ASSERT_OK(manager.TriggerCompaction(/*full_compaction=*/true)); + ASSERT_OK_AND_ASSIGN(auto result, manager.GetCompactionResult(/*blocking=*/true)); + ASSERT_TRUE(result.has_value()); + + // big_file has no deletion vector, so HasDeletionFile returns false and it is skipped. + // Only small files remain in Before(). + const auto& before = result.value()->Before(); + ASSERT_EQ(before.size(), 2); + ASSERT_EQ(before[0]->file_name, "small_file1"); + ASSERT_EQ(before[1]->file_name, "small_file2"); +} + +TEST_F(BucketedAppendCompactManagerTest, TestDoCompactWithNullDvMaintainerWithLessBigFile) { + // When dv_maintainer is nullptr, so large files should be skipped. + auto rewriter = [](const std::vector>& to_compact) + -> Result>> { return to_compact; }; + + auto big_file = NewNamedFile("big_file", 2000, 1, 2000); + auto small_file1 = NewNamedFile("small1", 100, 2001, 2100); + auto small_file2 = NewNamedFile("small2", 100, 2101, 2200); + auto small_file3 = NewNamedFile("small3", 100, 2201, 2300); + + BucketedAppendCompactManager manager( + executor_, {big_file, small_file1, small_file2, small_file3}, + /*dv_maintainer=*/nullptr, + /*min_file_num=*/4, + /*target_file_size=*/1024, + /*compaction_file_size=*/500, + /*force_rewrite_all_files=*/false, rewriter, + /*reporter=*/nullptr, + /*cancellation_controller=*/std::make_shared()); + + ASSERT_OK(manager.TriggerCompaction(/*full_compaction=*/true)); + ASSERT_OK_AND_ASSIGN(auto result, manager.GetCompactionResult(/*blocking=*/true)); + ASSERT_TRUE(result.has_value()); + + // No dv_maintainer → HasDeletionFile returns false → big_file is skipped. + // Only the 3 small files appear in Before(). + const auto& before = result.value()->Before(); + ASSERT_EQ(before.size(), 3); + ASSERT_EQ(before[0]->file_name, "small1"); + ASSERT_EQ(before[1]->file_name, "small2"); + ASSERT_EQ(before[2]->file_name, "small3"); +} + +TEST_F(BucketedAppendCompactManagerTest, TestDoNoCompact) { + auto rewriter = [](const std::vector>& to_compact) + -> Result>> { return to_compact; }; + + auto small_file = NewNamedFile("small_file", 100, 1, 100); + auto big_file1 = NewNamedFile("big_file1", 2000, 101, 2100); + auto big_file2 = NewNamedFile("big_file2", 2000, 2101, 4100); + + BucketedAppendCompactManager manager( + executor_, {small_file, big_file1, big_file2}, + /*dv_maintainer=*/nullptr, + /*min_file_num=*/4, + /*target_file_size=*/1024, + /*compaction_file_size=*/500, + /*force_rewrite_all_files=*/false, rewriter, + /*reporter=*/nullptr, + /*cancellation_controller=*/std::make_shared()); + + ASSERT_OK(manager.TriggerCompaction(/*full_compaction=*/true)); + ASSERT_OK_AND_ASSIGN(auto result, manager.GetCompactionResult(/*blocking=*/true)); + ASSERT_TRUE(result.has_value()); + ASSERT_TRUE(result.value()->Before().empty()); + ASSERT_TRUE(result.value()->After().empty()); +} + +TEST_F(BucketedAppendCompactManagerTest, TestAllFilesWithoutCompacting) { + // When no compaction is running, AllFiles returns only the to_compact_ files. + auto rewriter = [](const std::vector>& to_compact) + -> Result>> { return to_compact; }; + + auto file1 = NewNamedFile("file1", 100, 1, 100); + auto file2 = NewNamedFile("file2", 200, 101, 300); + + BucketedAppendCompactManager manager( + executor_, {file1, file2}, + /*dv_maintainer=*/nullptr, + /*min_file_num=*/4, + /*target_file_size=*/1024, + /*compaction_file_size=*/500, + /*force_rewrite_all_files=*/false, rewriter, + /*reporter=*/nullptr, + /*cancellation_controller=*/std::make_shared()); + + auto all_files = manager.AllFiles(); + // to_compact_ is a min-heap sorted by min_sequence_number, so file1 comes first. + ASSERT_EQ(all_files.size(), 2); + ASSERT_EQ(all_files[0]->file_name, "file1"); + ASSERT_EQ(all_files[1]->file_name, "file2"); +} + +TEST_F(BucketedAppendCompactManagerTest, TestAllFilesWithCompacting) { + // When compaction is running, AllFiles returns compacting_ + to_compact_ files. + auto exit_signal = std::make_shared>(); + auto proceed_signal = std::make_shared>(); + auto proceed_future = std::make_shared>(proceed_signal->get_future()); + + auto rewriter = [exit_signal, + proceed_future](const std::vector>& to_compact) + -> Result>> { + // Signal that rewriter has started + exit_signal->set_value(); + // Wait for the test to check AllFiles before completing + proceed_future->wait(); + return to_compact; + }; + + auto file1 = NewNamedFile("file1", 100, 1, 100); + auto file2 = NewNamedFile("file2", 100, 101, 200); + auto file3 = NewNamedFile("file3", 100, 201, 300); + auto file4 = NewNamedFile("file4", 100, 301, 400); + + BucketedAppendCompactManager manager( + executor_, {file1, file2, file3, file4}, + /*dv_maintainer=*/nullptr, + /*min_file_num=*/4, + /*target_file_size=*/1024, + /*compaction_file_size=*/500, + /*force_rewrite_all_files=*/false, rewriter, + /*reporter=*/nullptr, + /*cancellation_controller=*/std::make_shared()); + + // Trigger compaction — all 4 small files will be picked + ASSERT_OK(manager.TriggerCompaction(/*full_compaction=*/true)); + + // Wait until the rewriter is running (compaction in progress) + auto started_future = exit_signal->get_future(); + ASSERT_EQ(started_future.wait_for(std::chrono::seconds(5)), std::future_status::ready); + + // While compaction is in progress, AllFiles should return all files (compacting_ + to_compact_) + auto all_files = manager.AllFiles(); + ASSERT_EQ(all_files.size(), 4); + ASSERT_EQ(all_files[0]->file_name, "file1"); + ASSERT_EQ(all_files[1]->file_name, "file2"); + ASSERT_EQ(all_files[2]->file_name, "file3"); + ASSERT_EQ(all_files[3]->file_name, "file4"); + + // Let compaction finish + proceed_signal->set_value(); + + // After getting the result, compacting_ is cleared + ASSERT_OK_AND_ASSIGN(auto result, manager.GetCompactionResult(/*blocking=*/true)); + ASSERT_TRUE(result.has_value()); + + // Now AllFiles should be empty (no to_compact_, compacting_ cleared) + // But the last file might be put back if it's small enough + auto all_files_after = manager.AllFiles(); + // file4 (size=100 < compaction_file_size=500) should be put back to to_compact_ + ASSERT_EQ(all_files_after.size(), 1); + ASSERT_EQ(all_files_after[0]->file_name, "file4"); +} + +} // namespace paimon::test