diff --git a/test/inte/scan_inte_test.cpp b/test/inte/scan_inte_test.cpp new file mode 100644 index 0000000..2989cf2 --- /dev/null +++ b/test/inte/scan_inte_test.cpp @@ -0,0 +1,2216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "gtest/gtest.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/data/data_define.h" +#include "paimon/common/utils/decimal_utils.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/manifest/file_source.h" +#include "paimon/core/snapshot.h" +#include "paimon/core/stats/simple_stats.h" +#include "paimon/core/table/source/data_split_impl.h" +#include "paimon/core/utils/snapshot_manager.h" +#include "paimon/data/decimal.h" +#include "paimon/data/timestamp.h" +#include "paimon/defs.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/predicate/literal.h" +#include "paimon/predicate/predicate_builder.h" +#include "paimon/result.h" +#include "paimon/scan_context.h" +#include "paimon/status.h" +#include "paimon/table/source/plan.h" +#include "paimon/table/source/startup_mode.h" +#include "paimon/table/source/table_scan.h" +#include "paimon/testing/utils/binary_row_generator.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +class ScanInteTest : public testing::Test { + public: + std::vector> CollectDataSplits( + const std::shared_ptr& plan) const { + std::vector> result_data_splits; + for (const auto& result : plan->Splits()) { + auto data_split = std::dynamic_pointer_cast(result); + EXPECT_TRUE(data_split); + result_data_splits.push_back(data_split); + } + return result_data_splits; + } + + void CheckResult(std::vector> expected, + std::vector> result) const { + ASSERT_EQ(result.size(), expected.size()); + for (size_t i = 0; i < result.size(); i++) { + ASSERT_EQ(*result[i], *expected[i]) << result[i]->ToString() << std::endl + << expected[i]->ToString(); + } + } + + void CheckStreamScanResult(TableScan* table_scan, + const std::vector> expected_snapshot_ids, + const std::vector>>& + expected_data_splits) const { + size_t scan_id = 0; + while (true) { + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + if (scan_id == expected_snapshot_ids.size()) { + // no snapshot + ASSERT_EQ(std::nullopt, result_plan->SnapshotId()); + ASSERT_TRUE(result_plan->Splits().empty()); + return; + } + // check snapshot ids + ASSERT_EQ(result_plan->SnapshotId(), expected_snapshot_ids[scan_id]); + + std::vector> result_data_splits; + for (const auto& result : result_plan->Splits()) { + auto data_split = std::dynamic_pointer_cast(result); + ASSERT_TRUE(data_split); + result_data_splits.push_back(data_split); + } + // check data splits + CheckResult(expected_data_splits[scan_id], result_data_splits); + scan_id++; + } + } + + private: + std::shared_ptr pool_ = GetDefaultPool(); + + std::shared_ptr arrow_data_type_ = + arrow::struct_({arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}); + + std::shared_ptr meta_snapshot1_partition10_bucket0_ = + std::make_shared( + "data-d41fd7d1-b3e4-4905-aad9-b20a780e90a2-0.orc", /*file_size=*/543, /*row_count=*/1, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Alice"), 10, 1, 11.1}, + {std::string("Alice"), 10, 1, 11.1}, {0, 0, 0, 0}, + pool_.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1721643142435ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + + std::shared_ptr meta_snapshot1_partition10_bucket1_ = + std::make_shared( + "data-4e30d6c0-f109-4300-a010-4ba03047dd9d-0.orc", /*file_size=*/575, /*row_count=*/3, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Bob"), 10, 0, 12.1}, + {std::string("Tony"), 10, 0, 14.1}, {0, 0, 0, 0}, + pool_.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/2, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1721643142456ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + + std::shared_ptr meta_snapshot1_partition20_bucket0_ = + std::make_shared( + "data-db2b44c0-0d73-449d-82a0-4075bd2cb6e3-0.orc", /*file_size=*/541, /*row_count=*/1, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Lucy"), 20, 1, 14.1}, + {std::string("Lucy"), 20, 1, 14.1}, {0, 0, 0, 0}, + pool_.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1721643142472ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + + std::shared_ptr meta_snapshot2_partition10_bucket1_ = + std::make_shared( + "data-10b9eea8-241d-4e4b-8ab8-2a82d72d79a2-0.orc", /*file_size=*/589, /*row_count=*/3, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Alex"), 10, 0, 12.1}, + {std::string("Emily"), 10, 0, 16.1}, {0, 0, 0, 0}, + pool_.get()), + /*min_sequence_number=*/3, /*max_sequence_number=*/5, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1721643267385ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + + std::shared_ptr meta_snapshot2_partition20_bucket0_ = + std::make_shared( + "data-b913a160-a4d1-4084-af2a-18333c35668e-0.orc", /*file_size=*/506, /*row_count=*/1, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Paul"), 20, 1, NullType()}, + {std::string("Paul"), 20, 1, NullType()}, + {0, 0, 0, 1}, pool_.get()), + /*min_sequence_number=*/1, /*max_sequence_number=*/1, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1721643267404ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + + std::shared_ptr meta_snapshot3_partition10_bucket1_ = + std::make_shared( + "data-e2bb59ee-ae25-4e5b-9bcc-257250bc5fdd-0.orc", /*file_size=*/541, /*row_count=*/1, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("David"), 10, 0, 17.1}, + {std::string("David"), 10, 0, 17.1}, {0, 0, 0, 0}, + pool_.get()), + /*min_sequence_number=*/6, /*max_sequence_number=*/6, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1721643314161ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + + std::shared_ptr meta_snapshot4_partition10_bucket1_ = + std::make_shared( + "data-2d5ea1ea-77c1-47ff-bb87-19a509962a37-0.orc", /*file_size=*/538, /*row_count=*/1, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Lily"), 10, 0, 17.1}, + {std::string("Lily"), 10, 0, 17.1}, {0, 0, 0, 0}, + pool_.get()), + /*min_sequence_number=*/7, /*max_sequence_number=*/7, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1721643834400ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + + std::shared_ptr meta_snapshot5_partition10_bucket1_ = + std::make_shared( + "data-b9e7c41f-66e8-4dad-b25a-e6e1963becc4-0.orc", /*file_size=*/640, /*row_count=*/8, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Alex"), 10, 0, 12.1}, + {std::string("Tony"), 10, 0, 17.1}, {0, 0, 0, 0}, + pool_.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/7, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1721643834472ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Compact(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); +}; + +TEST_F(ScanInteTest, TestScanAppendWithSnapshot1) { + std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; + ScanContextBuilder context_builder(table_path); + context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "1"); + ASSERT_OK_AND_ASSIGN(std::unique_ptr scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + // check snapshot id + ASSERT_EQ(result_plan->SnapshotId().value(), 1); + + // check data splits + auto result_data_splits = CollectDataSplits(result_plan); + DataSplitImpl::Builder builder1( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-0", + {meta_snapshot1_partition10_bucket0_}); + auto expected_data_split1 = + std::dynamic_pointer_cast(builder1.WithTotalBuckets(2) + .WithSnapshot(1) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder2( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/1, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", + {meta_snapshot1_partition10_bucket1_}); + auto expected_data_split2 = + std::dynamic_pointer_cast(builder2.WithTotalBuckets(2) + .WithSnapshot(1) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder3( + BinaryRowGenerator::GenerateRow({20}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=20/bucket-0", + {meta_snapshot1_partition20_bucket0_}); + auto expected_data_split3 = + std::dynamic_pointer_cast(builder3.WithTotalBuckets(2) + .WithSnapshot(1) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + std::vector> expected_data_splits = { + expected_data_split1, expected_data_split2, expected_data_split3}; + CheckResult(expected_data_splits, result_data_splits); +} + +TEST_F(ScanInteTest, TestScanAppendWithSnapshot3) { + std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; + ScanContextBuilder context_builder(table_path); + context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "3"); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + // check snapshot id + ASSERT_EQ(result_plan->SnapshotId().value(), 3); + + // check data splits + auto result_data_splits = CollectDataSplits(result_plan); + DataSplitImpl::Builder builder1( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-0", + {meta_snapshot1_partition10_bucket0_}); + auto expected_data_split1 = + std::dynamic_pointer_cast(builder1.WithTotalBuckets(2) + .WithSnapshot(3) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder2( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), /*bucket=*/1, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", + {meta_snapshot1_partition10_bucket1_, meta_snapshot2_partition10_bucket1_, + meta_snapshot3_partition10_bucket1_}); + auto expected_data_split2 = + std::dynamic_pointer_cast(builder2.WithTotalBuckets(2) + .WithSnapshot(3) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder3( + BinaryRowGenerator::GenerateRow({20}, pool_.get()), /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=20/bucket-0", + {meta_snapshot1_partition20_bucket0_, meta_snapshot2_partition20_bucket0_}); + auto expected_data_split3 = + std::dynamic_pointer_cast(builder3.WithTotalBuckets(2) + .WithSnapshot(3) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + std::vector> expected_data_splits = { + expected_data_split1, expected_data_split2, expected_data_split3}; + CheckResult(expected_data_splits, result_data_splits); +} + +TEST_F(ScanInteTest, TestScanInvalidSnapshot) { + std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; + ScanContextBuilder context_builder(table_path); + context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "100"); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_NOK_WITH_MSG( + table_scan->CreatePlan(), + "The specified scan snapshotId 100 is out of available snapshotId range [1, 5]."); +} + +TEST_F(ScanInteTest, TestBatchScanMultipleTimes) { + std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; + ScanContextBuilder context_builder(table_path); + context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "1"); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + // batch scan multiple + ASSERT_NOK_WITH_MSG(table_scan->CreatePlan(), "end of scan"); +} + +TEST_F(ScanInteTest, TestScanAppendWithSnapshot3WithSplitTargetSize) { + std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; + ScanContextBuilder context_builder(table_path); + context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "3") + .AddOption(Options::SOURCE_SPLIT_OPEN_FILE_COST, "1024") + .AddOption(Options::SOURCE_SPLIT_TARGET_SIZE, "2048"); + // open cost = 1024, and split target size is 2048, indicates at most 2 files in a split + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + // check snapshot id + ASSERT_EQ(result_plan->SnapshotId().value(), 3); + + // check data splits + auto result_data_splits = CollectDataSplits(result_plan); + DataSplitImpl::Builder builder1( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-0", + {meta_snapshot1_partition10_bucket0_}); + auto expected_data_split1 = + std::dynamic_pointer_cast(builder1.WithTotalBuckets(2) + .WithSnapshot(3) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder2( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), /*bucket=*/1, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", + {meta_snapshot1_partition10_bucket1_, meta_snapshot2_partition10_bucket1_}); + auto expected_data_split2 = + std::dynamic_pointer_cast(builder2.WithTotalBuckets(2) + .WithSnapshot(3) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder3( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/1, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", + {meta_snapshot3_partition10_bucket1_}); + auto expected_data_split3 = + std::dynamic_pointer_cast(builder3.WithTotalBuckets(2) + .WithSnapshot(3) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder4( + BinaryRowGenerator::GenerateRow({20}, pool_.get()), /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=20/bucket-0", + {meta_snapshot1_partition20_bucket0_, meta_snapshot2_partition20_bucket0_}); + auto expected_data_split4 = + std::dynamic_pointer_cast(builder4.WithTotalBuckets(2) + .WithSnapshot(3) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + std::vector> expected_data_splits = { + expected_data_split1, expected_data_split2, expected_data_split3, expected_data_split4}; + CheckResult(expected_data_splits, result_data_splits); +} + +TEST_F(ScanInteTest, TestScanAppendWithSnapshot3WithRowCountLimit) { + std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; + ScanContextBuilder context_builder(table_path); + context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "3").SetLimit(3); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + // check snapshot id + ASSERT_EQ(result_plan->SnapshotId().value(), 3); + + // check data splits + auto result_data_splits = CollectDataSplits(result_plan); + // since row limit is set to 3, we only return partition10, bucket0 and partition10, bucket1 in + // plan (without partition20, bucket0) + DataSplitImpl::Builder builder1( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-0", + {meta_snapshot1_partition10_bucket0_}); + auto expected_data_split1 = + std::dynamic_pointer_cast(builder1.WithTotalBuckets(2) + .WithSnapshot(3) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder3( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/1, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", + {meta_snapshot1_partition10_bucket1_, meta_snapshot2_partition10_bucket1_, + meta_snapshot3_partition10_bucket1_}); + auto expected_data_split3 = + std::dynamic_pointer_cast(builder3.WithTotalBuckets(2) + .WithSnapshot(3) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + std::vector> expected_data_splits = {expected_data_split1, + expected_data_split3}; + CheckResult(expected_data_splits, result_data_splits); +} + +TEST_F(ScanInteTest, TestScanAppendWithSnapshot3WithBucketFilter) { + std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; + ScanContextBuilder context_builder(table_path); + context_builder.SetBucketFilter(0).AddOption(Options::SCAN_SNAPSHOT_ID, "3"); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + // check snapshot id + ASSERT_EQ(result_plan->SnapshotId().value(), 3); + + // check data splits + auto result_data_splits = CollectDataSplits(result_plan); + DataSplitImpl::Builder builder1( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-0", + {meta_snapshot1_partition10_bucket0_}); + auto expected_data_split1 = + std::dynamic_pointer_cast(builder1.WithTotalBuckets(2) + .WithSnapshot(3) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder3( + BinaryRowGenerator::GenerateRow({20}, pool_.get()), /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=20/bucket-0", + {meta_snapshot1_partition20_bucket0_, meta_snapshot2_partition20_bucket0_}); + auto expected_data_split3 = + std::dynamic_pointer_cast(builder3.WithTotalBuckets(2) + .WithSnapshot(3) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + std::vector> expected_data_splits = {expected_data_split1, + expected_data_split3}; + CheckResult(expected_data_splits, result_data_splits); +} + +TEST_F(ScanInteTest, TestScanAppendWithStreamWithDefaultMode) { + // from snapshot is specified + std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; + ScanContextBuilder context_builder(table_path); + context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "1").WithStreamingMode(true); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + + DataSplitImpl::Builder builder1_1( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-0", + {meta_snapshot1_partition10_bucket0_}); + auto expected_data_split1_1 = + std::dynamic_pointer_cast(builder1_1.WithTotalBuckets(2) + .WithSnapshot(1) + .IsStreaming(true) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder1_2( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/1, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", + {meta_snapshot1_partition10_bucket1_}); + auto expected_data_split1_2 = + std::dynamic_pointer_cast(builder1_2.WithTotalBuckets(2) + .WithSnapshot(1) + .IsStreaming(true) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder1_3( + BinaryRowGenerator::GenerateRow({20}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=20/bucket-0", + {meta_snapshot1_partition20_bucket0_}); + auto expected_data_split1_3 = + std::dynamic_pointer_cast(builder1_3.WithTotalBuckets(2) + .WithSnapshot(1) + .IsStreaming(true) + .RawConvertible(true) + .Build() + .value()); + + // second scan + DataSplitImpl::Builder builder2_1( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/1, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", + {meta_snapshot2_partition10_bucket1_}); + auto expected_data_split2_1 = + std::dynamic_pointer_cast(builder2_1.WithTotalBuckets(2) + .WithSnapshot(2) + .IsStreaming(true) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder2_2( + BinaryRowGenerator::GenerateRow({20}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=20/bucket-0", + {meta_snapshot2_partition20_bucket0_}); + auto expected_data_split2_2 = + std::dynamic_pointer_cast(builder2_2.WithTotalBuckets(2) + .WithSnapshot(2) + .IsStreaming(true) + .RawConvertible(true) + .Build() + .value()); + + // third scan + DataSplitImpl::Builder builder3_1( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/1, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", + {meta_snapshot3_partition10_bucket1_}); + auto expected_data_split3_1 = + std::dynamic_pointer_cast(builder3_1.WithTotalBuckets(2) + .WithSnapshot(3) + .IsStreaming(true) + .RawConvertible(true) + .Build() + .value()); + + // fourth scan + DataSplitImpl::Builder builder4_1( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/1, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", + {meta_snapshot4_partition10_bucket1_}); + auto expected_data_split4_1 = + std::dynamic_pointer_cast(builder4_1.WithTotalBuckets(2) + .WithSnapshot(4) + .IsStreaming(true) + .RawConvertible(true) + .Build() + .value()); + + std::vector>> expected_data_splits = { + {}, + {expected_data_split1_1, expected_data_split1_2, expected_data_split1_3}, + {expected_data_split2_1, expected_data_split2_2}, + {expected_data_split3_1}, + {expected_data_split4_1}}; + + std::vector> expected_snapshot_ids = {std::nullopt, 1, 2, 3, 4}; + CheckStreamScanResult(table_scan.get(), expected_snapshot_ids, expected_data_splits); +} + +TEST_F(ScanInteTest, TestScanAppendWithStreamOfLatestFullMode) { + std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; + ScanContextBuilder context_builder(table_path); + context_builder.AddOption(Options::SCAN_MODE, StartupMode::LatestFull().ToString()) + .WithStreamingMode(true); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + + DataSplitImpl::Builder builder1( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-0", + {meta_snapshot1_partition10_bucket0_}); + auto expected_data_split1 = + std::dynamic_pointer_cast(builder1.WithTotalBuckets(2) + .WithSnapshot(5) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder2( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/1, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", + {meta_snapshot5_partition10_bucket1_}); + auto expected_data_split2 = + std::dynamic_pointer_cast(builder2.WithTotalBuckets(2) + .WithSnapshot(5) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder3( + BinaryRowGenerator::GenerateRow({20}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=20/bucket-0", + {meta_snapshot1_partition20_bucket0_, meta_snapshot2_partition20_bucket0_}); + auto expected_data_split3 = + std::dynamic_pointer_cast(builder3.WithTotalBuckets(2) + .WithSnapshot(5) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + std::vector>> expected_data_splits = { + {expected_data_split1, expected_data_split2, expected_data_split3}}; + + std::vector> expected_snapshot_ids = {5}; + CheckStreamScanResult(table_scan.get(), expected_snapshot_ids, expected_data_splits); +} + +TEST_F(ScanInteTest, TestScanAppendWithBatchScanOfLatestMode) { + std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; + ScanContextBuilder context_builder(table_path); + context_builder.AddOption(Options::SCAN_MODE, StartupMode::Latest().ToString()); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + // check snapshot id + ASSERT_EQ(result_plan->SnapshotId().value(), 5); + // check data splits + auto result_data_splits = CollectDataSplits(result_plan); + DataSplitImpl::Builder builder1( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-0", + {meta_snapshot1_partition10_bucket0_}); + auto expected_data_split1 = + std::dynamic_pointer_cast(builder1.WithTotalBuckets(2) + .WithSnapshot(5) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder2( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/1, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", + {meta_snapshot5_partition10_bucket1_}); + auto expected_data_split2 = + std::dynamic_pointer_cast(builder2.WithTotalBuckets(2) + .WithSnapshot(5) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder3( + BinaryRowGenerator::GenerateRow({20}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=20/bucket-0", + {meta_snapshot1_partition20_bucket0_, meta_snapshot2_partition20_bucket0_}); + auto expected_data_split3 = + std::dynamic_pointer_cast(builder3.WithTotalBuckets(2) + .WithSnapshot(5) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + std::vector> expected_data_splits = { + expected_data_split1, expected_data_split2, expected_data_split3}; + CheckResult(expected_data_splits, result_data_splits); +} + +TEST_F(ScanInteTest, TestScanAppendWithStreamOfLatestMode) { + std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; + + ScanContextBuilder context_builder(table_path); + context_builder.AddOption(Options::SCAN_MODE, StartupMode::Latest().ToString()) + .WithStreamingMode(true); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + + std::vector> expected_snapshot_ids = {}; + CheckStreamScanResult(table_scan.get(), expected_snapshot_ids, {}); +} + +TEST_F(ScanInteTest, TestScanAppendWithStreamOfFromSnapshotMode) { + std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; + + ScanContextBuilder context_builder(table_path); + context_builder.AddOption(Options::SCAN_MODE, StartupMode::FromSnapshot().ToString()) + .AddOption(Options::SCAN_SNAPSHOT_ID, "2") + .WithStreamingMode(true); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + + DataSplitImpl::Builder builder2_1( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/1, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", + {meta_snapshot2_partition10_bucket1_}); + auto expected_data_split2_1 = + std::dynamic_pointer_cast(builder2_1.WithTotalBuckets(2) + .WithSnapshot(2) + .IsStreaming(true) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder2_2( + BinaryRowGenerator::GenerateRow({20}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=20/bucket-0", + {meta_snapshot2_partition20_bucket0_}); + auto expected_data_split2_2 = + std::dynamic_pointer_cast(builder2_2.WithTotalBuckets(2) + .WithSnapshot(2) + .IsStreaming(true) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder3_1( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/1, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", + {meta_snapshot3_partition10_bucket1_}); + auto expected_data_split3_1 = + std::dynamic_pointer_cast(builder3_1.WithTotalBuckets(2) + .WithSnapshot(3) + .IsStreaming(true) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder4_1( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/1, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", + {meta_snapshot4_partition10_bucket1_}); + auto expected_data_split4_1 = + std::dynamic_pointer_cast(builder4_1.WithTotalBuckets(2) + .WithSnapshot(4) + .IsStreaming(true) + .RawConvertible(true) + .Build() + .value()); + + std::vector>> expected_data_splits = { + {}, + {expected_data_split2_1, expected_data_split2_2}, + {expected_data_split3_1}, + {expected_data_split4_1}}; + + std::vector> expected_snapshot_ids = {std::nullopt, 2, 3, 4}; + CheckStreamScanResult(table_scan.get(), expected_snapshot_ids, expected_data_splits); +} + +TEST_F(ScanInteTest, TestScanAppendWithStreamOfFromSnapshotFullMode) { + std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; + + ScanContextBuilder context_builder(table_path); + context_builder.AddOption(Options::SCAN_MODE, StartupMode::FromSnapshotFull().ToString()) + .AddOption(Options::SCAN_SNAPSHOT_ID, "2") + .WithStreamingMode(true); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_TRUE(scan_context); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + + DataSplitImpl::Builder builder1_1( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-0", + {meta_snapshot1_partition10_bucket0_}); + auto expected_data_split1_1 = + std::dynamic_pointer_cast(builder1_1.WithTotalBuckets(2) + .WithSnapshot(2) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder1_2( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/1, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", + {meta_snapshot1_partition10_bucket1_, meta_snapshot2_partition10_bucket1_}); + auto expected_data_split1_2 = + std::dynamic_pointer_cast(builder1_2.WithTotalBuckets(2) + .WithSnapshot(2) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder1_3( + BinaryRowGenerator::GenerateRow({20}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=20/bucket-0", + {meta_snapshot1_partition20_bucket0_, meta_snapshot2_partition20_bucket0_}); + auto expected_data_split1_3 = + std::dynamic_pointer_cast(builder1_3.WithTotalBuckets(2) + .WithSnapshot(2) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder3_1( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/1, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", + {meta_snapshot3_partition10_bucket1_}); + auto expected_data_split3_1 = + std::dynamic_pointer_cast(builder3_1.WithTotalBuckets(2) + .WithSnapshot(3) + .IsStreaming(true) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder4_1( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/1, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", + {meta_snapshot4_partition10_bucket1_}); + auto expected_data_split4_1 = + std::dynamic_pointer_cast(builder4_1.WithTotalBuckets(2) + .WithSnapshot(4) + .IsStreaming(true) + .RawConvertible(true) + .Build() + .value()); + + std::vector>> expected_data_splits = { + {expected_data_split1_1, expected_data_split1_2, expected_data_split1_3}, + {expected_data_split3_1}, + {expected_data_split4_1}}; + + std::vector> expected_snapshot_ids = {2, 3, 4}; + CheckStreamScanResult(table_scan.get(), expected_snapshot_ids, expected_data_splits); +} + +TEST_F(ScanInteTest, TestScanAppendWithInvalidOptions) { + std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; + { + ScanContextBuilder context_builder(table_path); + context_builder.AddOption(Options::SCAN_MODE, StartupMode::FromSnapshot().ToString()) + .WithStreamingMode(true); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_NOK_WITH_MSG( + table_scan->CreatePlan(), + "scan.snapshot-id or scan.tag-name must be set when startup mode is FROM_SNAPSHOT"); + } + { + ScanContextBuilder context_builder(table_path); + context_builder.AddOption(Options::BUCKET, "-2").WithStreamingMode(true); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_NOK_WITH_MSG(TableScan::Create(std::move(scan_context)), + "do not support bucket=-2 in scan process"); + } +} + +TEST_F(ScanInteTest, TestScanAppendWithSnapshot1WithEqualPredicate) { + std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; + + std::string val("Bob"); + auto predicate = + PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, + Literal(FieldType::STRING, val.data(), val.size())); + + ScanContextBuilder context_builder(table_path); + context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "1"); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + // check snapshot id + ASSERT_EQ(result_plan->SnapshotId().value(), 1); + + // check data splits + auto result_data_splits = CollectDataSplits(result_plan); + DataSplitImpl::Builder builder( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/1, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", + {meta_snapshot1_partition10_bucket1_}); + auto expected_data_split = std::dynamic_pointer_cast(builder.WithTotalBuckets(2) + .WithSnapshot(1) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + std::vector> expected_data_splits = {expected_data_split}; + CheckResult(expected_data_splits, result_data_splits); +} + +TEST_F(ScanInteTest, TestScanAppendWithStreamWithAndPredicate) { + // from snapshot is specified + std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; + + auto greater_than_predicate = PredicateBuilder::GreaterThan( + /*field_index=*/3, /*field_name=*/"f3", FieldType::DOUBLE, Literal(13.1)); + std::string val("Paul"); + auto less_than_predicate = PredicateBuilder::LessThan( + /*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, + Literal(FieldType::STRING, val.data(), val.size())); + val = "David"; + auto not_in_predicate = + PredicateBuilder::NotIn(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, + {Literal(FieldType::STRING, val.data(), val.size())}); + auto not_equal = PredicateBuilder::NotEqual(/*field_index=*/1, /*field_name=*/"f1", + FieldType::INT, {Literal(20)}); + ASSERT_OK_AND_ASSIGN(auto predicate, + PredicateBuilder::And({greater_than_predicate, less_than_predicate, + not_in_predicate, not_equal})); + + ScanContextBuilder context_builder(table_path); + context_builder.SetPredicate(predicate) + .AddOption(Options::SCAN_SNAPSHOT_ID, "1") + .WithStreamingMode(true); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + + DataSplitImpl::Builder builder1_2( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/1, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", + {meta_snapshot1_partition10_bucket1_}); + auto expected_data_split1_2 = + std::dynamic_pointer_cast(builder1_2.WithTotalBuckets(2) + .WithSnapshot(1) + .IsStreaming(true) + .RawConvertible(true) + .Build() + .value()); + + // second scan + DataSplitImpl::Builder builder2_1( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/1, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", + {meta_snapshot2_partition10_bucket1_}); + auto expected_data_split2_1 = + std::dynamic_pointer_cast(builder2_1.WithTotalBuckets(2) + .WithSnapshot(2) + .IsStreaming(true) + .RawConvertible(true) + .Build() + .value()); + + // files in third scan is all filtered + // fourth scan + DataSplitImpl::Builder builder4_1( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/1, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", + {meta_snapshot4_partition10_bucket1_}); + auto expected_data_split4_1 = + std::dynamic_pointer_cast(builder4_1.WithTotalBuckets(2) + .WithSnapshot(4) + .IsStreaming(true) + .RawConvertible(true) + .Build() + .value()); + + std::vector>> expected_data_splits = { + {}, {expected_data_split1_2}, {expected_data_split2_1}, {}, {expected_data_split4_1}}; + + std::vector> expected_snapshot_ids = {std::nullopt, 1, 2, 3, 4}; + CheckStreamScanResult(table_scan.get(), expected_snapshot_ids, expected_data_splits); +} + +TEST_F(ScanInteTest, TestScanAppendWithSnapshot1WithPartitionFilter) { + std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; + + std::map partition_keys; + partition_keys["f1"] = "10"; + ScanContextBuilder context_builder(table_path); + context_builder.SetPartitionFilter({partition_keys}).AddOption(Options::SCAN_SNAPSHOT_ID, "1"); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + // check snapshot id + ASSERT_EQ(result_plan->SnapshotId().value(), 1); + + // check data splits + auto result_data_splits = CollectDataSplits(result_plan); + DataSplitImpl::Builder builder1( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-0", + {meta_snapshot1_partition10_bucket0_}); + auto expected_data_split1 = + std::dynamic_pointer_cast(builder1.WithTotalBuckets(2) + .WithSnapshot(1) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder2( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/1, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", + {meta_snapshot1_partition10_bucket1_}); + auto expected_data_split2 = + std::dynamic_pointer_cast(builder2.WithTotalBuckets(2) + .WithSnapshot(1) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + std::vector> expected_data_splits = {expected_data_split1, + expected_data_split2}; + CheckResult(expected_data_splits, result_data_splits); +} + +TEST_F(ScanInteTest, TestScanAppendWithSnapshot1WithInvalidPartitionFilter) { + std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; + + std::map partition_keys; + partition_keys["invalid_partition_key"] = "10"; + ScanContextBuilder context_builder(table_path); + context_builder.SetPartitionFilter({partition_keys}).AddOption(Options::SCAN_SNAPSHOT_ID, "1"); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_NOK_WITH_MSG(TableScan::Create(std::move(scan_context)), + "field invalid_partition_key does not exist in partition keys"); +} + +TEST_F(ScanInteTest, TestScanAppendWithSnapshot1WithPartitionFilterAndPredicateFilter) { + std::string table_path = paimon::test::GetDataDir() + "orc/append_09.db/append_09"; + + // set predicate filter, f1 = 20 + auto predicate = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1", FieldType::INT, + Literal(20)); + // set partition filter, f1 = 10 + std::map partition_keys; + partition_keys["f1"] = "10"; + + ScanContextBuilder context_builder(table_path); + context_builder.SetPredicate(predicate) + .SetPartitionFilter({partition_keys}) + .AddOption(Options::SCAN_SNAPSHOT_ID, "1"); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + // check snapshot id + ASSERT_EQ(result_plan->SnapshotId().value(), 1); + + // check data splits + auto result_data_splits = CollectDataSplits(result_plan); + DataSplitImpl::Builder builder1( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-0", + {meta_snapshot1_partition10_bucket0_}); + auto expected_data_split1 = + std::dynamic_pointer_cast(builder1.WithTotalBuckets(2) + .WithSnapshot(1) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder2( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/1, /*bucket_path=*/ + paimon::test::GetDataDir() + "orc/append_09.db/append_09/f1=10/bucket-1", + {meta_snapshot1_partition10_bucket1_}); + auto expected_data_split2 = + std::dynamic_pointer_cast(builder2.WithTotalBuckets(2) + .WithSnapshot(1) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + std::vector> expected_data_splits = {expected_data_split1, + expected_data_split2}; + CheckResult(expected_data_splits, result_data_splits); +} + +TEST_F(ScanInteTest, TestScanAppendWithSnapshot1WithMultiPartitionKeys) { + std::string table_path = paimon::test::GetDataDir() + + "orc/multi_partition_append_table.db/multi_partition_append_table"; + + // set partition filter, f1 = 10, f2 = 0 + std::map partition_keys; + partition_keys["f1"] = "10"; + partition_keys["f2"] = "0"; + + ScanContextBuilder context_builder(table_path); + context_builder.SetPartitionFilter({partition_keys}).AddOption(Options::SCAN_SNAPSHOT_ID, "1"); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + // check snapshot id + ASSERT_EQ(result_plan->SnapshotId().value(), 1); + + // check data splits + auto result_data_splits = CollectDataSplits(result_plan); + auto meta = std::make_shared( + "data-01b6a930-6564-409b-b8f4-ed1307790d72-0.orc", /*file_size=*/575, /*row_count=*/3, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Bob"), 10, 0, 12.1}, + {std::string("Tony"), 10, 0, 14.1}, {0, 0, 0, 0}, + pool_.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/2, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1728497439433ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + DataSplitImpl::Builder builder1( + BinaryRowGenerator::GenerateRow({10, 0}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + + "orc/multi_partition_append_table.db/multi_partition_append_table/f1=10/f2=0/bucket-0", + {meta}); + auto expected_data_split1 = + std::dynamic_pointer_cast(builder1.WithTotalBuckets(-1) + .WithSnapshot(1) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + std::vector> expected_data_splits = {expected_data_split1}; + CheckResult(expected_data_splits, result_data_splits); +} + +// test complex type ts & decimal +TEST_F(ScanInteTest, TestScanAppendComplexDataWithSnapshot4WithPredicateFilter) { + std::string table_path = + paimon::test::GetDataDir() + "orc/append_complex_data.db/append_complex_data"; + // set predicate filter + // less than 2024 + auto predicate1 = + PredicateBuilder::LessThan(/*field_index=*/3, /*field_name=*/"f4", FieldType::TIMESTAMP, + Literal(paimon::Timestamp(1735344000, 0))); + auto predicate2 = PredicateBuilder::GreaterThan( + /*field_index=*/4, /*field_name=*/"f5", FieldType::DECIMAL, + Literal(paimon::Decimal(5, 2, 0))); + ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({predicate1, predicate2})); + + ScanContextBuilder context_builder(table_path); + context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "4"); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + ASSERT_TRUE(result_plan); + // check snapshot id + ASSERT_EQ(result_plan->SnapshotId().value(), 4); + + // check data splits + auto result_data_splits = CollectDataSplits(result_plan); + auto meta = std::make_shared( + "data-14a30421-7650-486c-9876-66a1fa4356ff-0.orc", /*file_size=*/1004, /*row_count=*/6, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats( + {10, 1, 0, TimestampType(Timestamp(-2240521239999ll, 1001), 9), + Decimal(23, 5, DecimalUtils::StrToInt128("-12345000").value()), NullType()}, + {10, 1, 20006, TimestampType(Timestamp(2000000000000ll, 1001), 9), + Decimal(23, 5, DecimalUtils::StrToInt128("12345678998765432145678").value()), + NullType()}, + {0, 0, 1, 0, 1, 1}, pool_.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/5, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1767506722625ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Compact(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + DataSplitImpl::Builder builder1( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + + "orc/append_complex_data.db/append_complex_data/f1=10/bucket-0", + {meta}); + auto expected_data_split1 = + std::dynamic_pointer_cast(builder1.WithTotalBuckets(2) + .WithSnapshot(4) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + std::vector> expected_data_splits = {expected_data_split1}; + CheckResult(expected_data_splits, result_data_splits); +} + +// test complex type date & binary +TEST_F(ScanInteTest, TestScanAppendComplexDataWithSnapshot4WithPredicateFilter2) { + std::string table_path = + paimon::test::GetDataDir() + "orc/append_complex_data.db/append_complex_data"; + // set predicate filter + auto predicate1 = PredicateBuilder::GreaterThan(/*field_index=*/2, /*field_name=*/"f3", + FieldType::DATE, Literal(FieldType::DATE, 0)); + // BINARY does not have stats in manifest, min/max in value stats is null + // if row_count != null_count and min/max is null, file will not be filtered + auto predicate2 = PredicateBuilder::GreaterThan( + /*field_index=*/5, /*field_name=*/"f6", FieldType::BINARY, + Literal(FieldType::BINARY, "zoo", 3)); + ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({predicate1, predicate2})); + + ScanContextBuilder context_builder(table_path); + context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "4"); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + // check snapshot id + ASSERT_EQ(result_plan->SnapshotId().value(), 4); + + // check data splits + std::vector> result_data_splits; + for (const auto& result : result_plan->Splits()) { + auto data_split = std::dynamic_pointer_cast(result); + ASSERT_TRUE(data_split); + result_data_splits.push_back(data_split); + } + + auto meta = std::make_shared( + "data-14a30421-7650-486c-9876-66a1fa4356ff-0.orc", /*file_size=*/1004, /*row_count=*/6, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats( + {10, 1, 0, TimestampType(Timestamp(-2240521239999ll, 1001), 9), + Decimal(23, 5, DecimalUtils::StrToInt128("-12345000").value()), NullType()}, + {10, 1, 20006, TimestampType(Timestamp(2000000000000ll, 1001), 9), + Decimal(23, 5, DecimalUtils::StrToInt128("12345678998765432145678").value()), + NullType()}, + {0, 0, 1, 0, 1, 1}, pool_.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/5, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1767506722625ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Compact(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + DataSplitImpl::Builder builder1( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + + "orc/append_complex_data.db/append_complex_data/f1=10/bucket-0", + {meta}); + auto expected_data_split1 = + std::dynamic_pointer_cast(builder1.WithTotalBuckets(2) + .WithSnapshot(4) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + std::vector> expected_data_splits = {expected_data_split1}; + CheckResult(expected_data_splits, result_data_splits); +} + +TEST_F(ScanInteTest, TestScanAppendWithSnapshot1WithEnableStatsDenseStore) { + std::string table_path = paimon::test::GetDataDir() + + "orc/append_10_stats_dense_store.db/append_10_stats_dense_store"; + ScanContextBuilder context_builder(table_path); + auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/3, /*field_name=*/"f3", + FieldType::DOUBLE, Literal(13.0)); + context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "1"); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + // check data splits + auto result_data_splits = CollectDataSplits(result_plan); + auto file_meta1 = std::make_shared( + "data-cdb38c8a-31c1-4824-a024-9abd3fbb466f-0.orc", /*file_size=*/543, /*row_count=*/1, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Alice"), 10, 1}, + {std::string("Alice"), 10, 1}, {0, 0, 0}, pool_.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1731412938869ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::optional>({"f0", "f1", "f2"}), + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + auto file_meta2 = std::make_shared( + "data-c2613568-0412-4cd9-a0c4-1eae8e4ca89b-0.orc", /*file_size=*/575, /*row_count=*/3, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Bob"), 10, 0}, {std::string("Tony"), 10, 0}, + {0, 0, 0}, pool_.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/2, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1731412938891ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::optional>({"f0", "f1", "f2"}), + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + auto file_meta3 = std::make_shared( + "data-a6d1261a-f798-4fbd-a251-6d6c7d8060dd-0.orc", /*file_size=*/541, /*row_count=*/1, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Lucy"), 20, 1}, + {std::string("Lucy"), 20, 1}, {0, 0, 0}, pool_.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1731412938908ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::optional>({"f0", "f1", "f2"}), + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + + DataSplitImpl::Builder builder1( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + + "orc/append_10_stats_dense_store.db/append_10_stats_dense_store/f1=10/bucket-0", + {file_meta1}); + auto expected_data_split1 = + std::dynamic_pointer_cast(builder1.WithTotalBuckets(2) + .WithSnapshot(1) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + + DataSplitImpl::Builder builder2( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/1, /*bucket_path=*/ + paimon::test::GetDataDir() + + "orc/append_10_stats_dense_store.db/append_10_stats_dense_store/f1=10/bucket-1", + {file_meta2}); + auto expected_data_split2 = + std::dynamic_pointer_cast(builder2.WithTotalBuckets(2) + .WithSnapshot(1) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + DataSplitImpl::Builder builder3( + BinaryRowGenerator::GenerateRow({20}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + + "orc/append_10_stats_dense_store.db/append_10_stats_dense_store/f1=20/bucket-0", + {file_meta3}); + auto expected_data_split3 = + std::dynamic_pointer_cast(builder3.WithTotalBuckets(2) + .WithSnapshot(1) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + std::vector> expected_data_splits = { + expected_data_split1, expected_data_split2, expected_data_split3}; + CheckResult(expected_data_splits, result_data_splits); +} + +TEST_F(ScanInteTest, TestScanAppendWithSnapshot1WithEnableStatsDenseStore2) { + std::string table_path = paimon::test::GetDataDir() + + "orc/append_10_stats_dense_store.db/append_10_stats_dense_store"; + ScanContextBuilder context_builder(table_path); + auto greater_than = PredicateBuilder::GreaterThan(/*field_index=*/3, /*field_name=*/"f3", + FieldType::DOUBLE, Literal(13.0)); + auto equal = PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, + Literal(FieldType::STRING, "Emily", 5)); + auto predicate = PredicateBuilder::And({greater_than, equal}).value(); + + context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "1"); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + // check data splits + auto result_data_splits = CollectDataSplits(result_plan); + auto file_meta = std::make_shared( + "data-c2613568-0412-4cd9-a0c4-1eae8e4ca89b-0.orc", /*file_size=*/575, /*row_count=*/3, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Bob"), 10, 0}, {std::string("Tony"), 10, 0}, + {0, 0, 0}, pool_.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/2, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1731412938891ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::optional>({"f0", "f1", "f2"}), + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + + DataSplitImpl::Builder builder( + BinaryRowGenerator::GenerateRow({10}, pool_.get()), + /*bucket=*/1, /*bucket_path=*/ + paimon::test::GetDataDir() + + "orc/append_10_stats_dense_store.db/append_10_stats_dense_store/f1=10/bucket-1", + {file_meta}); + auto expected_data_split = std::dynamic_pointer_cast(builder.WithTotalBuckets(2) + .WithSnapshot(1) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + std::vector> expected_data_splits = {expected_data_split}; + CheckResult(expected_data_splits, result_data_splits); +} + +TEST_F(ScanInteTest, TestScanPKWithSnapshot1WithBucketStats) { + std::string table_path = paimon::test::GetDataDir() + + "orc/pk_table_with_total_buckets.db/pk_table_with_total_buckets"; + ScanContextBuilder context_builder(table_path); + context_builder.AddOption(Options::SCAN_SNAPSHOT_ID, "1").SetBucketFilter(2); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + ASSERT_EQ(result_plan->SnapshotId().value(), 1); + ASSERT_TRUE(result_plan->Splits().empty()); +} + +TEST_F(ScanInteTest, TestScanPKWithInvalidOptions) { + std::string table_path = paimon::test::GetDataDir() + + "orc/pk_table_with_total_buckets.db/pk_table_with_total_buckets"; + ScanContextBuilder context_builder(table_path); + context_builder.AddOption(Options::BUCKET, "-1").WithStreamingMode(true); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_NOK_WITH_MSG(TableScan::Create(std::move(scan_context)), + "do not support pk table bucket=-1 in scan process"); +} + +TEST_F(ScanInteTest, TestReadWithNoSnapshot) { + std::string table_path = paimon::test::GetDataDir() + + "orc/append_table_with_nested_type.db/append_table_with_nested_type"; + ScanContextBuilder context_builder(table_path); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + ASSERT_FALSE(result_plan->SnapshotId()); + ASSERT_TRUE(result_plan->Splits().empty()); +} + +TEST_F(ScanInteTest, TestScanAppendWithAlterTableWithCast) { + std::string table_path = + paimon::test::GetDataDir() + + "orc/append_table_alter_table_with_cast.db/append_table_alter_table_with_cast"; + ScanContextBuilder context_builder(table_path); + + auto child1 = + PredicateBuilder::Or( + {PredicateBuilder::IsNotNull(/*field_index=*/0, /*field_name=*/"f4", + FieldType::TIMESTAMP), + PredicateBuilder::IsNotNull(/*field_index=*/1, /*field_name=*/"key0", FieldType::INT), + PredicateBuilder::IsNotNull(/*field_index=*/2, /*field_name=*/"key1", FieldType::INT)}) + .value(); + + auto sub_child1 = + PredicateBuilder::IsNotNull(/*field_index=*/3, /*field_name=*/"f3", FieldType::INT); + auto sub_child2 = + PredicateBuilder::IsNotNull(/*field_index=*/4, /*field_name=*/"f1", FieldType::STRING); + auto sub_child3 = + PredicateBuilder::IsNotNull(/*field_index=*/5, /*field_name=*/"f2", FieldType::DECIMAL); + auto child2 = PredicateBuilder::And({sub_child1, sub_child2, sub_child3}).value(); + + auto child3 = PredicateBuilder::GreaterThan(/*field_index=*/7, /*field_name=*/"f6", + FieldType::INT, Literal(80)); + ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({child1, child2, child3})); + + context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "2"); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + // check data splits + auto result_data_splits = CollectDataSplits(result_plan); + auto file_meta = std::make_shared( + "data-81a1c016-765b-48c9-b209-0d8e95bf8a00-0.orc", /*file_size=*/1070, /*row_count=*/2, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats( + {TimestampType(Timestamp(1732603136084l, 84), 9), 1, 1, 180, + std::string("2024-11-26 15:29"), Decimal(6, 3, -999420), false, -86}, + {TimestampType(Timestamp(1732603136094l, 94), 9), 1, 1, 190, std::string("I'm strange"), + Decimal(6, 3, 8032), true, 96}, + {0, 0, 0, 0, 0, 0, 0, 0}, pool_.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/1, /*schema_id=*/1, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1732635461460ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + + DataSplitImpl::Builder builder(BinaryRowGenerator::GenerateRow({1, 1}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + + "orc/append_table_alter_table_with_cast.db/" + "append_table_alter_table_with_cast/key0=1/key1=1/bucket-0", + {file_meta}); + auto expected_data_split = std::dynamic_pointer_cast(builder.WithTotalBuckets(-1) + .WithSnapshot(2) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + std::vector> expected_data_splits = {expected_data_split}; + CheckResult(expected_data_splits, result_data_splits); +} + +TEST_F(ScanInteTest, TestScanAppendWithAlterTableWithNoCast) { + std::string table_path = paimon::test::GetDataDir() + + "orc/append_table_with_alter_table.db/append_table_with_alter_table"; + ScanContextBuilder context_builder(table_path); + + auto child1 = PredicateBuilder::GreaterThan(/*field_index=*/2, /*field_name=*/"k", + FieldType::INT, Literal(36)); + auto child2 = PredicateBuilder::LessThan(/*field_index=*/2, /*field_name=*/"k", FieldType::INT, + Literal(96)); + ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({child1, child2})); + + context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "2"); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + // check data splits + auto result_data_splits = CollectDataSplits(result_plan); + auto file_meta1 = std::make_shared( + "data-492ed5ab-4740-4e93-8a0a-79a6893b1770-0.orc", /*file_size=*/603, /*row_count=*/2, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({1, 1, 42, 43, 44, 45, 46}, {1, 1, 52, 53, 54, 55, 56}, + {0, 0, 0, 0, 0, 0, 0}, pool_.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/1, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1730458825047ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + + DataSplitImpl::Builder builder1(BinaryRowGenerator::GenerateRow({1, 1}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + + "orc/append_table_with_alter_table.db/" + "append_table_with_alter_table/key0=1/key1=1/bucket-0", + {file_meta1}); + auto expected_data_split1 = + std::dynamic_pointer_cast(builder1.WithTotalBuckets(-1) + .WithSnapshot(2) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + auto file_meta2 = std::make_shared( + "data-b34cd128-03e3-4e70-ba9c-5dec2183849c-0.orc", /*file_size=*/680, /*row_count=*/3, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({0, 1, 66, 63, 517, 65, 618}, + {0, 1, 86, 83, 537, 85, 638}, {0, 0, 0, 0, 0, 0, 0}, + pool_.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/2, /*schema_id=*/1, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1730459969493ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + + DataSplitImpl::Builder builder2(BinaryRowGenerator::GenerateRow({0, 1}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + + "orc/append_table_with_alter_table.db/" + "append_table_with_alter_table/key0=0/key1=1/bucket-0", + {file_meta2}); + auto expected_data_split2 = + std::dynamic_pointer_cast(builder2.WithTotalBuckets(-1) + .WithSnapshot(2) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + std::vector> expected_data_splits = {expected_data_split1, + expected_data_split2}; + CheckResult(expected_data_splits, result_data_splits); +} + +TEST_F(ScanInteTest, TestScanAppendWithAlterTableWithDenseField) { + std::string table_path = paimon::test::GetDataDir() + + "orc/append_table_with_alter_table_with_dense_field.db/" + "append_table_with_alter_table_with_dense_field"; + ScanContextBuilder context_builder(table_path); + + auto child1 = PredicateBuilder::GreaterThan(/*field_index=*/2, /*field_name=*/"f1", + FieldType::INT, Literal(22)); + auto child3 = PredicateBuilder::LessThan(/*field_index=*/3, /*field_name=*/"f3", + FieldType::DOUBLE, Literal(0.0)); + ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({child1, child3})); + + context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "2"); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + // check data splits + auto result_data_splits = CollectDataSplits(result_plan); + auto file_meta = std::make_shared( + "data-e925c7db-58e3-45e3-b21e-b1a7774a5caf-0.orc", /*file_size=*/682, /*row_count=*/2, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({1, std::string("Cathy"), 13, 23}, + {1, std::string("David"), 14, 24}, {0, 0, 0, 0}, + pool_.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/1, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1751647880163ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::optional>({"key0", "f0", "f1", "f2"}), + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + DataSplitImpl::Builder builder( + BinaryRowGenerator::GenerateRow({1}, pool_.get()), + /*bucket=*/0, /*bucket_path=*/ + paimon::test::GetDataDir() + + "orc/append_table_with_alter_table_with_dense_field.db/" + "append_table_with_alter_table_with_dense_field/key0=1/bucket-0", + {file_meta}); + auto expected_data_split = std::dynamic_pointer_cast(builder.WithTotalBuckets(-1) + .WithSnapshot(2) + .IsStreaming(false) + .RawConvertible(true) + .Build() + .value()); + std::vector> expected_data_splits = {expected_data_split}; + CheckResult(expected_data_splits, result_data_splits); +} + +TEST_F(ScanInteTest, TestScanAppendWithBitmapEmbeddedIndex) { + std::string table_path = + paimon::test::GetDataDir() + "orc/append_with_bitmap.db/append_with_bitmap/"; + ScanContextBuilder context_builder(table_path); + + auto child1 = PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, + Literal(FieldType::STRING, "Tony", 4)); + auto child2 = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1", FieldType::INT, + Literal(10)); + ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({child1, child2})); + + context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "1"); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + // check data splits + auto result_data_splits = CollectDataSplits(result_plan); + std::vector embedded_bytes = { + 0, 5, 78, 78, 208, 26, 53, 174, 0, 0, 0, 1, 0, 0, 0, 96, 0, 0, + 0, 3, 0, 2, 102, 48, 0, 0, 0, 1, 0, 6, 98, 105, 116, 109, 97, 112, + 0, 0, 0, 96, 0, 0, 0, 176, 0, 2, 102, 49, 0, 0, 0, 1, 0, 6, + 98, 105, 116, 109, 97, 112, 0, 0, 1, 16, 0, 0, 0, 102, 0, 2, 102, 50, + 0, 0, 0, 1, 0, 6, 98, 105, 116, 109, 97, 112, 0, 0, 1, 118, 0, 0, + 0, 108, 0, 0, 0, 0, 2, 0, 0, 0, 8, 0, 0, 0, 5, 0, 0, 0, + 0, 1, 0, 0, 0, 5, 65, 108, 105, 99, 101, 0, 0, 0, 0, 0, 0, 0, + 85, 0, 0, 0, 5, 0, 0, 0, 5, 65, 108, 105, 99, 101, 0, 0, 0, 0, + 0, 0, 0, 20, 0, 0, 0, 3, 66, 111, 98, 0, 0, 0, 20, 0, 0, 0, + 20, 0, 0, 0, 5, 69, 109, 105, 108, 121, 255, 255, 255, 253, 255, 255, 255, 255, + 0, 0, 0, 4, 76, 117, 99, 121, 255, 255, 255, 251, 255, 255, 255, 255, 0, 0, + 0, 4, 84, 111, 110, 121, 0, 0, 0, 40, 0, 0, 0, 20, 58, 48, 0, 0, + 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 0, 0, 7, 0, 58, 48, + 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 1, 0, 5, 0, + 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 3, 0, + 6, 0, 2, 0, 0, 0, 8, 0, 0, 0, 2, 0, 0, 0, 0, 1, 0, 0, + 0, 10, 0, 0, 0, 0, 0, 0, 0, 28, 0, 0, 0, 2, 0, 0, 0, 10, + 0, 0, 0, 22, 0, 0, 0, 26, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, + 0, 22, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 2, 0, 16, 0, 0, 0, + 4, 0, 6, 0, 7, 0, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 4, 0, + 16, 0, 0, 0, 0, 0, 1, 0, 2, 0, 3, 0, 5, 0, 2, 0, 0, 0, + 8, 0, 0, 0, 2, 1, 255, 255, 255, 248, 0, 0, 0, 18, 0, 0, 0, 1, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 28, 0, 0, 0, 2, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 22, 0, 0, 0, 1, 0, 0, 0, 22, + 0, 0, 0, 24, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 2, 0, 16, 0, + 0, 0, 2, 0, 3, 0, 6, 0, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, + 3, 0, 16, 0, 0, 0, 0, 0, 1, 0, 4, 0, 5, 0}; + auto embedded_index = std::make_shared(embedded_bytes.size(), pool_.get()); + memcpy(embedded_index->data(), reinterpret_cast(embedded_bytes.data()), + embedded_bytes.size()); + + auto file_meta = std::make_shared( + "data-62feb610-c83f-4217-9b50-bbad9cd08eb4-0.orc", /*file_size=*/689, + /*row_count=*/8, /*min_key=*/BinaryRow::EmptyRow(), + /*max_key=*/BinaryRow::EmptyRow(), /*key_stats=*/SimpleStats::EmptyStats(), + /*value_stats=*/ + BinaryRowGenerator::GenerateStats({std::string("Alice"), 10, 0, 11.1}, + {std::string("Tony"), 20, 1, 18.1}, {0, 0, 1, 0}, + pool_.get()), + /*min_sequence_number=*/0, + /*max_sequence_number=*/7, /*schema_id=*/0, + /*level=*/0, + /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1745000702835ll, 0), /*delete_row_count=*/0, + /*embedded_index=*/embedded_index, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + DataSplitImpl::Builder builder(BinaryRow::EmptyRow(), /*bucket=*/0, + /*bucket_path=*/table_path + "bucket-0", {file_meta}); + ASSERT_OK_AND_ASSIGN(auto expected_data_split, builder.WithTotalBuckets(-1) + .WithSnapshot(1) + .IsStreaming(false) + .RawConvertible(true) + .Build()); + std::vector> expected_data_splits = {expected_data_split}; + CheckResult(expected_data_splits, result_data_splits); +} + +TEST_F(ScanInteTest, TestScanAppendWithBitmapEmbeddedIndexWithEmptyResult) { + std::string table_path = + paimon::test::GetDataDir() + "orc/append_with_bitmap.db/append_with_bitmap/"; + ScanContextBuilder context_builder(table_path); + + auto child1 = PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, + Literal(FieldType::STRING, "Lucy", 4)); + auto child2 = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1", FieldType::INT, + Literal(10)); + ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({child1, child2})); + + context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "1"); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + ASSERT_TRUE(result_plan->Splits().empty()); +} + +TEST_F(ScanInteTest, TestScanAppendWithBitmapNoEmbeddedIndex) { + std::string table_path = + paimon::test::GetDataDir() + + "orc/append_with_bitmap_no_embedding.db/append_with_bitmap_no_embedding/"; + ScanContextBuilder context_builder(table_path); + + auto child1 = PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f0", FieldType::STRING, + Literal(FieldType::STRING, "Lucy", 4)); + auto child2 = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f1", FieldType::INT, + Literal(10)); + ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({child1, child2})); + + context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "1"); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + // check data splits + auto result_data_splits = CollectDataSplits(result_plan); + auto file_meta = std::make_shared( + "data-414509f5-e40c-4245-b992-bbf486778ac9-0.orc", /*file_size=*/689, + /*row_count=*/8, /*min_key=*/BinaryRow::EmptyRow(), + /*max_key=*/BinaryRow::EmptyRow(), /*key_stats=*/SimpleStats::EmptyStats(), + /*value_stats=*/ + BinaryRowGenerator::GenerateStats({std::string("Alice"), 10, 0, 11.1}, + {std::string("Tony"), 20, 1, 18.1}, {0, 0, 1, 0}, + pool_.get()), + /*min_sequence_number=*/0, + /*max_sequence_number=*/7, /*schema_id=*/0, + /*level=*/0, + /*extra_files=*/ + std::vector>( + {"data-414509f5-e40c-4245-b992-bbf486778ac9-0.orc.index"}), + /*creation_time=*/Timestamp(1745235371029ll, 0), /*delete_row_count=*/0, + /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + DataSplitImpl::Builder builder(BinaryRow::EmptyRow(), /*bucket=*/0, + /*bucket_path=*/table_path + "bucket-0", {file_meta}); + ASSERT_OK_AND_ASSIGN(auto expected_data_split, builder.WithTotalBuckets(-1) + .WithSnapshot(1) + .IsStreaming(false) + .RawConvertible(true) + .Build()); + std::vector> expected_data_splits = {expected_data_split}; + CheckResult(expected_data_splits, result_data_splits); +} + +TEST_F(ScanInteTest, TestScanAppendWithBitmapAndAlterTable) { + std::string table_path = + paimon::test::GetDataDir() + + "orc/append_with_bitmap_alter_table.db/append_with_bitmap_alter_table/"; + ScanContextBuilder context_builder(table_path); + // file0 will be removed as f5 not exists + auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/3, /*field_name=*/"f5", + FieldType::INT, Literal(100)); + + context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "2"); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + + // check data splits + auto result_data_splits = CollectDataSplits(result_plan); + std::vector embedded_bytes = { + 0, 5, 78, 78, 208, 26, 53, 174, 0, 0, 0, 1, 0, 0, 0, 96, 0, 0, + 0, 3, 0, 2, 102, 49, 0, 0, 0, 1, 0, 6, 98, 105, 116, 109, 97, 112, + 0, 0, 0, 96, 0, 0, 0, 102, 0, 2, 102, 52, 0, 0, 0, 1, 0, 6, + 98, 105, 116, 109, 97, 112, 0, 0, 0, 198, 0, 0, 0, 101, 0, 2, 102, 53, + 0, 0, 0, 1, 0, 6, 98, 105, 116, 109, 97, 112, 0, 0, 1, 43, 0, 0, + 0, 82, 0, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0, 0, 3, 0, 0, 0, + 0, 1, 0, 0, 0, 0, 0, 0, 0, 10, 0, 0, 0, 0, 0, 0, 0, 52, + 0, 0, 0, 3, 0, 0, 0, 0, 0, 0, 0, 10, 255, 255, 255, 253, 255, 255, + 255, 255, 0, 0, 0, 0, 0, 0, 0, 20, 255, 255, 255, 254, 255, 255, 255, 255, + 0, 0, 0, 0, 0, 0, 0, 30, 0, 0, 0, 0, 0, 0, 0, 20, 58, 48, + 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 0, 0, 3, 0, + 2, 0, 0, 0, 4, 0, 0, 0, 4, 0, 0, 0, 0, 1, 0, 0, 0, 5, + 65, 108, 105, 99, 101, 0, 0, 0, 0, 0, 0, 0, 70, 0, 0, 0, 4, 0, + 0, 0, 5, 65, 108, 105, 99, 101, 255, 255, 255, 255, 255, 255, 255, 255, 0, 0, + 0, 3, 66, 111, 98, 255, 255, 255, 253, 255, 255, 255, 255, 0, 0, 0, 5, 68, + 97, 118, 105, 100, 255, 255, 255, 252, 255, 255, 255, 255, 0, 0, 0, 5, 69, 109, + 105, 108, 121, 255, 255, 255, 254, 255, 255, 255, 255, 2, 0, 0, 0, 4, 0, 0, + 0, 2, 1, 255, 255, 255, 252, 0, 0, 0, 18, 0, 0, 0, 1, 0, 0, 0, + 100, 0, 0, 0, 0, 0, 0, 0, 28, 0, 0, 0, 2, 0, 0, 0, 100, 0, + 0, 0, 0, 0, 0, 0, 20, 0, 0, 0, 101, 255, 255, 255, 254, 255, 255, 255, + 255, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 0, + 0, 2, 0}; + auto embedded_index = std::make_shared(embedded_bytes.size(), pool_.get()); + memcpy(embedded_index->data(), reinterpret_cast(embedded_bytes.data()), + embedded_bytes.size()); + auto file_meta = std::make_shared( + "data-a29b7235-760d-4838-881c-39cbef585dd2-0.orc", /*file_size=*/666, + /*row_count=*/4, /*min_key=*/BinaryRow::EmptyRow(), + /*max_key=*/BinaryRow::EmptyRow(), /*key_stats=*/SimpleStats::EmptyStats(), + /*value_stats=*/ + BinaryRowGenerator::GenerateStats({10, std::string("Alice"), 21.1, 100}, + {30, std::string("Emily"), 24.1, 101}, {0, 0, 0, 1}, + pool_.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/3, /*schema_id=*/1, /*level=*/0, + /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1745253323731ll, 0), /*delete_row_count=*/0, + /*embedded_index=*/embedded_index, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + + DataSplitImpl::Builder builder(BinaryRow::EmptyRow(), /*bucket=*/0, + /*bucket_path=*/table_path + "bucket-0", {file_meta}); + ASSERT_OK_AND_ASSIGN(auto expected_data_split, builder.WithTotalBuckets(-1) + .WithSnapshot(2) + .IsStreaming(false) + .RawConvertible(true) + .Build()); + std::vector> expected_data_splits = {expected_data_split}; + CheckResult(expected_data_splits, result_data_splits); +} + +TEST_F(ScanInteTest, TestScanAppendWithBitmapAndAlterTable3) { + std::string table_path = + paimon::test::GetDataDir() + + "orc/append_with_bitmap_alter_table.db/append_with_bitmap_alter_table/"; + ScanContextBuilder context_builder(table_path); + auto child1 = PredicateBuilder::IsNull(/*field_index=*/3, /*field_name=*/"f5", FieldType::INT); + auto child2 = PredicateBuilder::LessThan(/*field_index=*/2, /*field_name=*/"f3", + FieldType::DOUBLE, Literal(20.0)); + ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({child1, child2})); + + context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "2"); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + + // check data splits + auto result_data_splits = CollectDataSplits(result_plan); + std::vector embedded_bytes = { + 0, 5, 78, 78, 208, 26, 53, 174, 0, 0, 0, 1, 0, 0, 0, 96, 0, 0, + 0, 3, 0, 2, 102, 48, 0, 0, 0, 1, 0, 6, 98, 105, 116, 109, 97, 112, + 0, 0, 0, 96, 0, 0, 0, 176, 0, 2, 102, 49, 0, 0, 0, 1, 0, 6, + 98, 105, 116, 109, 97, 112, 0, 0, 1, 16, 0, 0, 0, 102, 0, 2, 102, 50, + 0, 0, 0, 1, 0, 6, 98, 105, 116, 109, 97, 112, 0, 0, 1, 118, 0, 0, + 0, 108, 0, 0, 0, 0, 2, 0, 0, 0, 8, 0, 0, 0, 5, 0, 0, 0, + 0, 1, 0, 0, 0, 5, 65, 108, 105, 99, 101, 0, 0, 0, 0, 0, 0, 0, + 85, 0, 0, 0, 5, 0, 0, 0, 5, 65, 108, 105, 99, 101, 0, 0, 0, 0, + 0, 0, 0, 20, 0, 0, 0, 3, 66, 111, 98, 0, 0, 0, 20, 0, 0, 0, + 20, 0, 0, 0, 5, 69, 109, 105, 108, 121, 255, 255, 255, 253, 255, 255, 255, 255, + 0, 0, 0, 4, 76, 117, 99, 121, 255, 255, 255, 251, 255, 255, 255, 255, 0, 0, + 0, 4, 84, 111, 110, 121, 0, 0, 0, 40, 0, 0, 0, 20, 58, 48, 0, 0, + 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 0, 0, 7, 0, 58, 48, + 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 1, 0, 5, 0, + 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 3, 0, + 6, 0, 2, 0, 0, 0, 8, 0, 0, 0, 2, 0, 0, 0, 0, 1, 0, 0, + 0, 10, 0, 0, 0, 0, 0, 0, 0, 28, 0, 0, 0, 2, 0, 0, 0, 10, + 0, 0, 0, 22, 0, 0, 0, 26, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, + 0, 22, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 2, 0, 16, 0, 0, 0, + 4, 0, 6, 0, 7, 0, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 4, 0, + 16, 0, 0, 0, 0, 0, 1, 0, 2, 0, 3, 0, 5, 0, 2, 0, 0, 0, + 8, 0, 0, 0, 2, 1, 255, 255, 255, 248, 0, 0, 0, 18, 0, 0, 0, 1, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 28, 0, 0, 0, 2, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 22, 0, 0, 0, 1, 0, 0, 0, 22, + 0, 0, 0, 24, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 2, 0, 16, 0, + 0, 0, 2, 0, 3, 0, 6, 0, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, + 3, 0, 16, 0, 0, 0, 0, 0, 1, 0, 4, 0, 5, 0}; + auto embedded_index = std::make_shared(embedded_bytes.size(), pool_.get()); + memcpy(embedded_index->data(), reinterpret_cast(embedded_bytes.data()), + embedded_bytes.size()); + auto file_meta = std::make_shared( + "data-68014988-5451-478f-a18a-a1668214cf3d-0.orc", /*file_size=*/689, + /*row_count=*/8, /*min_key=*/BinaryRow::EmptyRow(), + /*max_key=*/BinaryRow::EmptyRow(), /*key_stats=*/SimpleStats::EmptyStats(), + /*value_stats=*/ + BinaryRowGenerator::GenerateStats({std::string("Alice"), 10, 0, 11.1}, + {std::string("Tony"), 20, 1, 18.1}, {0, 0, 1, 0}, + pool_.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/7, /*schema_id=*/0, /*level=*/0, + /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1745251357742ll, 0), /*delete_row_count=*/0, + /*embedded_index=*/embedded_index, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + + DataSplitImpl::Builder builder(BinaryRow::EmptyRow(), /*bucket=*/0, + /*bucket_path=*/table_path + "bucket-0", {file_meta}); + ASSERT_OK_AND_ASSIGN(auto expected_data_split, builder.WithTotalBuckets(-1) + .WithSnapshot(2) + .IsStreaming(false) + .RawConvertible(true) + .Build()); + std::vector> expected_data_splits = {expected_data_split}; + CheckResult(expected_data_splits, result_data_splits); +} + +TEST_F(ScanInteTest, TestScanAppendWithBitmapAndAlterTable2) { + std::string table_path = + paimon::test::GetDataDir() + + "orc/append_with_bitmap_alter_table.db/append_with_bitmap_alter_table/"; + ScanContextBuilder context_builder(table_path); + + // in stats filter: predicate is trimmed as type for f1 is not consist: int->bigint + // in index filter: predicate is removed as type for f1 is converted bigint->int, which is not + // safe + auto predicate = PredicateBuilder::GreaterThan(/*field_index=*/0, /*field_name=*/"f1", + FieldType::BIGINT, Literal(100l)); + + context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "2"); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + + // check data splits + auto result_data_splits = CollectDataSplits(result_plan); + std::vector embedded_bytes = { + 0, 5, 78, 78, 208, 26, 53, 174, 0, 0, 0, 1, 0, 0, 0, 96, 0, 0, + 0, 3, 0, 2, 102, 48, 0, 0, 0, 1, 0, 6, 98, 105, 116, 109, 97, 112, + 0, 0, 0, 96, 0, 0, 0, 176, 0, 2, 102, 49, 0, 0, 0, 1, 0, 6, + 98, 105, 116, 109, 97, 112, 0, 0, 1, 16, 0, 0, 0, 102, 0, 2, 102, 50, + 0, 0, 0, 1, 0, 6, 98, 105, 116, 109, 97, 112, 0, 0, 1, 118, 0, 0, + 0, 108, 0, 0, 0, 0, 2, 0, 0, 0, 8, 0, 0, 0, 5, 0, 0, 0, + 0, 1, 0, 0, 0, 5, 65, 108, 105, 99, 101, 0, 0, 0, 0, 0, 0, 0, + 85, 0, 0, 0, 5, 0, 0, 0, 5, 65, 108, 105, 99, 101, 0, 0, 0, 0, + 0, 0, 0, 20, 0, 0, 0, 3, 66, 111, 98, 0, 0, 0, 20, 0, 0, 0, + 20, 0, 0, 0, 5, 69, 109, 105, 108, 121, 255, 255, 255, 253, 255, 255, 255, 255, + 0, 0, 0, 4, 76, 117, 99, 121, 255, 255, 255, 251, 255, 255, 255, 255, 0, 0, + 0, 4, 84, 111, 110, 121, 0, 0, 0, 40, 0, 0, 0, 20, 58, 48, 0, 0, + 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 0, 0, 7, 0, 58, 48, + 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 1, 0, 5, 0, + 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 1, 0, 16, 0, 0, 0, 3, 0, + 6, 0, 2, 0, 0, 0, 8, 0, 0, 0, 2, 0, 0, 0, 0, 1, 0, 0, + 0, 10, 0, 0, 0, 0, 0, 0, 0, 28, 0, 0, 0, 2, 0, 0, 0, 10, + 0, 0, 0, 22, 0, 0, 0, 26, 0, 0, 0, 20, 0, 0, 0, 0, 0, 0, + 0, 22, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 2, 0, 16, 0, 0, 0, + 4, 0, 6, 0, 7, 0, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 4, 0, + 16, 0, 0, 0, 0, 0, 1, 0, 2, 0, 3, 0, 5, 0, 2, 0, 0, 0, + 8, 0, 0, 0, 2, 1, 255, 255, 255, 248, 0, 0, 0, 18, 0, 0, 0, 1, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 28, 0, 0, 0, 2, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 22, 0, 0, 0, 1, 0, 0, 0, 22, + 0, 0, 0, 24, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, 2, 0, 16, 0, + 0, 0, 2, 0, 3, 0, 6, 0, 58, 48, 0, 0, 1, 0, 0, 0, 0, 0, + 3, 0, 16, 0, 0, 0, 0, 0, 1, 0, 4, 0, 5, 0}; + auto embedded_index = std::make_shared(embedded_bytes.size(), pool_.get()); + memcpy(embedded_index->data(), reinterpret_cast(embedded_bytes.data()), + embedded_bytes.size()); + auto file_meta = std::make_shared( + "data-68014988-5451-478f-a18a-a1668214cf3d-0.orc", /*file_size=*/689, + /*row_count=*/8, /*min_key=*/BinaryRow::EmptyRow(), + /*max_key=*/BinaryRow::EmptyRow(), /*key_stats=*/SimpleStats::EmptyStats(), + /*value_stats=*/ + BinaryRowGenerator::GenerateStats({std::string("Alice"), 10, 0, 11.1}, + {std::string("Tony"), 20, 1, 18.1}, {0, 0, 1, 0}, + pool_.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/7, /*schema_id=*/0, /*level=*/0, + /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1745251357742ll, 0), /*delete_row_count=*/0, + /*embedded_index=*/embedded_index, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + + DataSplitImpl::Builder builder(BinaryRow::EmptyRow(), /*bucket=*/0, + /*bucket_path=*/table_path + "bucket-0", {file_meta}); + ASSERT_OK_AND_ASSIGN(auto expected_data_split, builder.WithTotalBuckets(-1) + .WithSnapshot(2) + .IsStreaming(false) + .RawConvertible(true) + .Build()); + std::vector> expected_data_splits = {expected_data_split}; + CheckResult(expected_data_splits, result_data_splits); +} + +TEST_F(ScanInteTest, TestScanAppendWithBitmapAndAlterTableWithEmptyResult) { + std::string table_path = + paimon::test::GetDataDir() + + "orc/append_with_bitmap_alter_table.db/append_with_bitmap_alter_table/"; + ScanContextBuilder context_builder(table_path); + + // child1 will remove file1 for schema1 + auto child1 = PredicateBuilder::Equal(/*field_index=*/0, /*field_name=*/"f1", FieldType::BIGINT, + Literal(100l)); + // child2 will remove file0 for schema 0 + auto child2 = PredicateBuilder::Equal(/*field_index=*/1, /*field_name=*/"f4", FieldType::STRING, + Literal(FieldType::STRING, "David", 5)); + ASSERT_OK_AND_ASSIGN(auto predicate, PredicateBuilder::And({child1, child2})); + + context_builder.SetPredicate(predicate).AddOption(Options::SCAN_SNAPSHOT_ID, "2"); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + ASSERT_TRUE(result_plan->Splits().empty()); +} + +TEST_F(ScanInteTest, TestScanAppendWithTag1) { + std::string table_path = + paimon::test::GetDataDir() + "orc/append_table_with_tag.db/append_table_with_tag"; + ScanContextBuilder context_builder(table_path); + context_builder.AddOption(Options::SCAN_TAG_NAME, "1"); + ASSERT_OK_AND_ASSIGN(std::unique_ptr scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_OK_AND_ASSIGN(auto result_plan, table_scan->CreatePlan()); + // check snapshot id + ASSERT_EQ(1, result_plan->SnapshotId().value()); +} + +TEST_F(ScanInteTest, TestScanInvalidTag) { + std::string table_path = + paimon::test::GetDataDir() + "orc/append_table_with_tag.db/append_table_with_tag"; + ScanContextBuilder context_builder(table_path); + context_builder.AddOption(Options::SCAN_TAG_NAME, "unknown"); + ASSERT_OK_AND_ASSIGN(auto scan_context, context_builder.Finish()); + ASSERT_OK_AND_ASSIGN(auto table_scan, TableScan::Create(std::move(scan_context))); + ASSERT_NOK_WITH_MSG(table_scan->CreatePlan(), "Tag 'unknown' doesn't exist."); +} + +TEST_F(ScanInteTest, TestWithAppendTimestampMillisBatchScan) { + std::string table_path = GetDataDir() + "orc/append_09.db/append_09"; + + auto fs = std::make_shared(); + SnapshotManager mgr(fs, table_path); + ASSERT_OK_AND_ASSIGN(Snapshot snap3, mgr.LoadSnapshot(3)); + + // EarlierOrEqual(snap3.time) → snap-3 + { + ScanContextBuilder builder(table_path); + builder.AddOption(Options::SCAN_TIMESTAMP_MILLIS, std::to_string(snap3.TimeMillis())); + ASSERT_OK_AND_ASSIGN(std::unique_ptr ctx, builder.Finish()); + ASSERT_OK_AND_ASSIGN(std::unique_ptr scan, TableScan::Create(std::move(ctx))); + ASSERT_OK_AND_ASSIGN(std::shared_ptr plan, scan->CreatePlan()); + ASSERT_EQ(plan->SnapshotId().value(), 3); + } + // EarlierOrEqual(snap3.time - 1) → snap-2 + { + ScanContextBuilder builder(table_path); + builder.AddOption(Options::SCAN_TIMESTAMP_MILLIS, std::to_string(snap3.TimeMillis() - 1)); + ASSERT_OK_AND_ASSIGN(std::unique_ptr ctx, builder.Finish()); + ASSERT_OK_AND_ASSIGN(std::unique_ptr scan, TableScan::Create(std::move(ctx))); + ASSERT_OK_AND_ASSIGN(std::shared_ptr plan, scan->CreatePlan()); + ASSERT_EQ(plan->SnapshotId().value(), 2); + } + // EarlierOrEqual(INT64_MAX) → snap-5 (latest) + { + ScanContextBuilder builder(table_path); + builder.AddOption(Options::SCAN_TIMESTAMP_MILLIS, + std::to_string(std::numeric_limits::max())); + ASSERT_OK_AND_ASSIGN(std::unique_ptr ctx, builder.Finish()); + ASSERT_OK_AND_ASSIGN(std::unique_ptr scan, TableScan::Create(std::move(ctx))); + ASSERT_OK_AND_ASSIGN(std::shared_ptr plan, scan->CreatePlan()); + ASSERT_EQ(plan->SnapshotId().value(), 5); + } + // EarlierOrEqual(0) → no snapshot found, expect error + { + ScanContextBuilder builder(table_path); + builder.AddOption(Options::SCAN_TIMESTAMP_MILLIS, "0"); + ASSERT_OK_AND_ASSIGN(std::unique_ptr ctx, builder.Finish()); + ASSERT_OK_AND_ASSIGN(std::unique_ptr scan, TableScan::Create(std::move(ctx))); + ASSERT_NOK_WITH_MSG(scan->CreatePlan(), + "There is currently no snapshot earlier than or equal to timestamp"); + } +} + +TEST_F(ScanInteTest, TestWithAppendTimestampMillisStreamScan) { + std::string table_path = GetDataDir() + "orc/append_09.db/append_09"; + + auto fs = std::make_shared(); + SnapshotManager mgr(fs, table_path); + ASSERT_OK_AND_ASSIGN(Snapshot snap2, mgr.LoadSnapshot(2)); + ASSERT_OK_AND_ASSIGN(Snapshot snap3, mgr.LoadSnapshot(3)); + + // T=0: no snapshot earlier than T=0, stream starts from snapshot 1. + { + ScanContextBuilder builder(table_path); + builder.AddOption(Options::SCAN_TIMESTAMP_MILLIS, "0").WithStreamingMode(true); + ASSERT_OK_AND_ASSIGN(std::unique_ptr ctx, builder.Finish()); + ASSERT_OK_AND_ASSIGN(std::unique_ptr scan, TableScan::Create(std::move(ctx))); + ASSERT_OK_AND_ASSIGN(std::shared_ptr plan0, scan->CreatePlan()); + ASSERT_EQ(plan0->SnapshotId(), std::nullopt); + ASSERT_OK_AND_ASSIGN(std::shared_ptr plan1, scan->CreatePlan()); + ASSERT_EQ(plan1->SnapshotId().value(), 1); + } + // T=snap2.time+1: EarlierThan(snap2.time+1)=snap2, stream starts from snapshot 3. + { + ScanContextBuilder builder(table_path); + builder.AddOption(Options::SCAN_TIMESTAMP_MILLIS, std::to_string(snap2.TimeMillis() + 1)) + .WithStreamingMode(true); + ASSERT_OK_AND_ASSIGN(std::unique_ptr ctx, builder.Finish()); + ASSERT_OK_AND_ASSIGN(std::unique_ptr scan, TableScan::Create(std::move(ctx))); + ASSERT_OK_AND_ASSIGN(std::shared_ptr plan0, scan->CreatePlan()); + ASSERT_EQ(plan0->SnapshotId(), std::nullopt); + ASSERT_OK_AND_ASSIGN(std::shared_ptr plan1, scan->CreatePlan()); + ASSERT_EQ(plan1->SnapshotId().value(), 3); + } + // T=snap3.time: EarlierThan(snap3.time)=snap2, stream starts from snapshot 3. + { + ScanContextBuilder builder(table_path); + builder.AddOption(Options::SCAN_TIMESTAMP_MILLIS, std::to_string(snap3.TimeMillis())) + .WithStreamingMode(true); + ASSERT_OK_AND_ASSIGN(std::unique_ptr ctx, builder.Finish()); + ASSERT_OK_AND_ASSIGN(std::unique_ptr scan, TableScan::Create(std::move(ctx))); + ASSERT_OK_AND_ASSIGN(std::shared_ptr plan0, scan->CreatePlan()); + ASSERT_EQ(plan0->SnapshotId(), std::nullopt); + ASSERT_OK_AND_ASSIGN(std::shared_ptr plan1, scan->CreatePlan()); + ASSERT_EQ(plan1->SnapshotId().value(), 3); + } + // T=INT64_MAX: EarlierThan(INT64_MAX)=snap5, stream starts from snapshot 6 (beyond latest). + { + ScanContextBuilder builder(table_path); + builder + .AddOption(Options::SCAN_TIMESTAMP_MILLIS, + std::to_string(std::numeric_limits::max())) + .WithStreamingMode(true); + ASSERT_OK_AND_ASSIGN(std::unique_ptr ctx, builder.Finish()); + ASSERT_OK_AND_ASSIGN(std::unique_ptr scan, TableScan::Create(std::move(ctx))); + ASSERT_OK_AND_ASSIGN(std::shared_ptr plan0, scan->CreatePlan()); + ASSERT_EQ(plan0->SnapshotId(), std::nullopt); + ASSERT_TRUE(plan0->Splits().empty()); + } +} + +} // namespace paimon::test