diff --git a/src/paimon/core/table/source/append_only_split_generator.h b/src/paimon/core/table/source/append_only_split_generator.h new file mode 100644 index 0000000..f8881ef --- /dev/null +++ b/src/paimon/core/table/source/append_only_split_generator.h @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "paimon/common/utils/bin_packing.h" +#include "paimon/core/append/bucketed_append_compact_manager.h" +#include "paimon/core/table/bucket_mode.h" +#include "paimon/core/table/source/split_generator.h" + +namespace paimon { +/// Append only implementation of `SplitGenerator`. +class AppendOnlySplitGenerator : public SplitGenerator { + public: + AppendOnlySplitGenerator(int64_t target_split_size, int64_t open_file_cost, + const BucketMode& bucket_mode) + : target_split_size_(target_split_size), + open_file_cost_(open_file_cost), + bucket_mode_(bucket_mode) {} + + Result> SplitForBatch( + std::vector>&& input) const override { + std::vector> files = std::move(input); + std::stable_sort(files.begin(), files.end(), + BucketedAppendCompactManager::FileComparator(bucket_mode_ == + BucketMode::BUCKET_UNAWARE)); + auto weight_func = [open_file_cost = open_file_cost_]( + const std::shared_ptr& meta) -> int64_t { + return std::max(meta->file_size, open_file_cost); + }; + auto packed = BinPacking::PackForOrdered>( + std::move(files), weight_func, target_split_size_); + std::vector ret; + ret.reserve(packed.size()); + for (auto& pack : packed) { + ret.push_back(SplitGroup::RawConvertibleGroup(std::move(pack))); + } + return ret; + } + + Result> SplitForStreaming( + std::vector>&& files) const override { + // When the bucket mode is unaware, we split the files as batch, because unaware-bucket + // table only contains one bucket (bucket 0). + if (bucket_mode_ == BucketMode::BUCKET_UNAWARE) { + return SplitForBatch(std::move(files)); + } else { + return std::vector({SplitGroup::RawConvertibleGroup(std::move(files))}); + } + } + + private: + int64_t target_split_size_; + int64_t open_file_cost_; + BucketMode bucket_mode_; +}; +} // namespace paimon diff --git a/src/paimon/core/table/source/append_only_table_read.cpp b/src/paimon/core/table/source/append_only_table_read.cpp new file mode 100644 index 0000000..b51dc83 --- /dev/null +++ b/src/paimon/core/table/source/append_only_table_read.cpp @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/table/source/append_only_table_read.h" + +#include "paimon/core/core_options.h" +#include "paimon/core/operation/data_evolution_split_read.h" +#include "paimon/core/operation/internal_read_context.h" +#include "paimon/core/operation/raw_file_split_read.h" +#include "paimon/status.h" + +namespace paimon { +class DataSplit; +class Executor; +class FileStorePathFactory; +class MemoryPool; + +AppendOnlyTableRead::AppendOnlyTableRead(const std::shared_ptr& path_factory, + const std::shared_ptr& context, + const std::shared_ptr& memory_pool, + const std::shared_ptr& executor) + : TableRead(memory_pool) { + const auto& core_options = context->GetCoreOptions(); + if (core_options.DataEvolutionEnabled()) { + // add data evolution first + split_reads_.push_back( + std::make_unique(path_factory, context, memory_pool, executor)); + } else { + split_reads_.push_back( + std::make_unique(path_factory, context, memory_pool, executor)); + } +} + +Result> AppendOnlyTableRead::CreateReader( + const std::shared_ptr& split) { + for (const auto& read : split_reads_) { + PAIMON_ASSIGN_OR_RAISE(bool matched, read->Match(split, /*force_keep_delete=*/false)); + if (matched) { + return read->CreateReader(split); + } + } + return Status::Invalid("create reader failed, not read match with split."); +} + +} // namespace paimon diff --git a/src/paimon/core/table/source/append_only_table_read.h b/src/paimon/core/table/source/append_only_table_read.h new file mode 100644 index 0000000..b911dec --- /dev/null +++ b/src/paimon/core/table/source/append_only_table_read.h @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/core/operation/internal_read_context.h" +#include "paimon/core/operation/split_read.h" +#include "paimon/core/utils/file_store_path_factory.h" +#include "paimon/reader/batch_reader.h" +#include "paimon/result.h" +#include "paimon/table/source/table_read.h" + +namespace paimon { + +class SplitRead; +class Executor; +class FileStorePathFactory; +class InternalReadContext; +class MemoryPool; + +class AppendOnlyTableRead : public TableRead { + public: + AppendOnlyTableRead(const std::shared_ptr& path_factory, + const std::shared_ptr& context, + const std::shared_ptr& memory_pool, + const std::shared_ptr& executor); + + Result> CreateReader( + const std::shared_ptr& data_split) override; + + private: + std::vector> split_reads_; +}; + +} // namespace paimon diff --git a/src/paimon/core/table/source/data_evolution_batch_scan.cpp b/src/paimon/core/table/source/data_evolution_batch_scan.cpp new file mode 100644 index 0000000..d43addf --- /dev/null +++ b/src/paimon/core/table/source/data_evolution_batch_scan.cpp @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/table/source/data_evolution_batch_scan.h" + +#include "paimon/core/global_index/global_index_scan_impl.h" +#include "paimon/core/global_index/indexed_split_impl.h" +#include "paimon/core/table/source/data_split_impl.h" +#include "paimon/global_index/bitmap_global_index_result.h" +#include "paimon/global_index/global_index_scan.h" + +namespace paimon { +DataEvolutionBatchScan::DataEvolutionBatchScan( + const std::string& table_path, const std::shared_ptr& snapshot_reader, + std::unique_ptr&& batch_scan, + const std::shared_ptr& global_index_result, const CoreOptions& core_options, + const std::shared_ptr& pool, const std::shared_ptr& executor) + : AbstractTableScan(core_options, snapshot_reader), + pool_(pool), + table_path_(table_path), + batch_scan_(std::move(batch_scan)), + global_index_result_(global_index_result), + executor_(executor) {} + +Result> DataEvolutionBatchScan::CreatePlan() { + std::optional> row_ranges; + std::shared_ptr final_global_index_result = global_index_result_; + if (!final_global_index_result) { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr index_result, EvalGlobalIndex()); + if (index_result) { + final_global_index_result = index_result; + PAIMON_ASSIGN_OR_RAISE(row_ranges, index_result->ToRanges()); + } + } else { + PAIMON_ASSIGN_OR_RAISE(row_ranges, final_global_index_result->ToRanges()); + } + if (!row_ranges) { + return batch_scan_->CreatePlan(); + } + if (row_ranges.value().empty()) { + return PlanImpl::EmptyPlan(); + } + PAIMON_ASSIGN_OR_RAISE(RowRangeIndex row_range_index, + RowRangeIndex::Create(row_ranges.value())); + batch_scan_->WithRowRangeIndex(row_range_index); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr data_plan, batch_scan_->CreatePlan()); + std::map id_to_score; + if (auto scored_result = + std::dynamic_pointer_cast(final_global_index_result)) { + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr scored_iter, + scored_result->CreateScoredIterator()); + while (scored_iter->HasNext()) { + auto [id, score] = scored_iter->NextWithScore(); + id_to_score[id] = score; + } + } + return WrapToIndexedSplits(data_plan, row_range_index, id_to_score); +} + +Result> DataEvolutionBatchScan::WrapToIndexedSplits( + const std::shared_ptr& data_plan, const RowRangeIndex& row_range_index, + const std::map& id_to_score) const { + // TODO(lisizhuo.lsz): add executor here + auto data_splits = data_plan->Splits(); + std::vector> indexed_splits; + indexed_splits.reserve(data_splits.size()); + for (const auto& split : data_splits) { + auto data_split = std::dynamic_pointer_cast(split); + if (!data_split) { + return Status::Invalid("Cannot cast split to DataSplit when create IndexedSplit"); + } + const auto& files = data_split->DataFiles(); + if (files.empty()) { + return Status::Invalid("Empty data files in WrapToIndexedSplits"); + } + PAIMON_ASSIGN_OR_RAISE(int64_t min, files[0]->NonNullFirstRowId()); + PAIMON_ASSIGN_OR_RAISE(int64_t max, files[files.size() - 1]->NonNullFirstRowId()); + max += files[files.size() - 1]->row_count - 1; + + std::vector expected = row_range_index.IntersectedRanges(min, max); + if (expected.empty()) { + return Status::Invalid( + fmt::format("There should be intersected ranges for split with min row id {} and " + "max row id {}.", + min, max)); + } + + std::vector scores; + if (!id_to_score.empty()) { + for (const auto& range : expected) { + for (int64_t i = range.from; i <= range.to; i++) { + auto iter = id_to_score.find(i); + if (iter != id_to_score.end()) { + scores.push_back(iter->second); + } else { + return Status::Invalid(fmt::format("cannot find score for row {}", i)); + } + } + } + } + indexed_splits.push_back(std::make_shared(data_split, expected, scores)); + } + return std::make_shared(data_plan->SnapshotId(), indexed_splits); +} + +Result> DataEvolutionBatchScan::EvalGlobalIndex() const { + auto predicate = batch_scan_->GetNonPartitionPredicate(); + if (!predicate) { + return std::shared_ptr(nullptr); + } + if (!core_options_.GlobalIndexEnabled()) { + return std::shared_ptr(nullptr); + } + auto partition_filter = batch_scan_->GetPartitionPredicate(); + // TODO(lisizhuo.lsz): support time travel + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr index_scan, + GlobalIndexScan::Create(table_path_, core_options_.GetScanSnapshotId(), partition_filter, + core_options_.ToMap(), core_options_.GetFileSystem(), executor_, + pool_)); + auto index_scan_impl = dynamic_cast(index_scan.get()); + if (!index_scan_impl) { + return Status::Invalid("invalid GlobalIndexScan, cannot cast to GlobalIndexScanImpl"); + } + + return index_scan_impl->Scan(predicate); +} + +} // namespace paimon diff --git a/src/paimon/core/table/source/data_evolution_batch_scan.h b/src/paimon/core/table/source/data_evolution_batch_scan.h new file mode 100644 index 0000000..1dc2f29 --- /dev/null +++ b/src/paimon/core/table/source/data_evolution_batch_scan.h @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once +#include +#include +#include +#include + +#include "paimon/core/table/source/abstract_table_scan.h" +#include "paimon/core/table/source/data_table_batch_scan.h" +#include "paimon/result.h" +#include "paimon/utils/range.h" + +namespace paimon { +class DataEvolutionBatchScan : public AbstractTableScan { + public: + DataEvolutionBatchScan(const std::string& table_path, + const std::shared_ptr& snapshot_reader, + std::unique_ptr&& batch_scan, + const std::shared_ptr& global_index_result, + const CoreOptions& core_options, const std::shared_ptr& pool, + const std::shared_ptr& executor); + + Result> CreatePlan() override; + + private: + Result> WrapToIndexedSplits( + const std::shared_ptr& data_plan, const RowRangeIndex& row_range_index, + const std::map& id_to_score) const; + Result> EvalGlobalIndex() const; + + private: + std::shared_ptr pool_; + std::string table_path_; + std::unique_ptr batch_scan_; + std::shared_ptr global_index_result_; + std::shared_ptr executor_; +}; + +} // namespace paimon diff --git a/src/paimon/core/table/source/data_evolution_split_generator.cpp b/src/paimon/core/table/source/data_evolution_split_generator.cpp new file mode 100644 index 0000000..66b705b --- /dev/null +++ b/src/paimon/core/table/source/data_evolution_split_generator.cpp @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/table/source/data_evolution_split_generator.h" + +#include +#include +#include +#include + +#include "paimon/common/utils/bin_packing.h" +#include "paimon/common/utils/range_helper.h" +#include "paimon/core/io/data_file_meta.h" + +namespace paimon { +Result> DataEvolutionSplitGenerator::SplitForBatch( + std::vector>&& input) const { + RangeHelper> range_helper( + [](const std::shared_ptr& meta) -> Result { + return meta->NonNullFirstRowId(); + }, + [](const std::shared_ptr& meta) -> Result { + PAIMON_ASSIGN_OR_RAISE(int64_t first_row_id, meta->NonNullFirstRowId()); + return first_row_id + meta->row_count - 1; + }); + + PAIMON_ASSIGN_OR_RAISE(std::vector>> ranges, + range_helper.MergeOverlappingRanges(std::move(input))); + + auto weight_func = [open_file_cost = open_file_cost_]( + const std::vector>& metas) -> int64_t { + int64_t file_size_sum = 0; + for (const auto& meta : metas) { + file_size_sum += meta->file_size; + } + return std::max(file_size_sum, open_file_cost); + }; + + auto packed = BinPacking::PackForOrdered>>( + std::move(ranges), weight_func, target_split_size_); + + std::vector ret; + ret.reserve(packed.size()); + for (auto& pack : packed) { + bool raw_convertible = true; + std::vector> flat_meta; + for (auto& with_same_row_id : pack) { + if (with_same_row_id.size() != 1) { + raw_convertible = false; + } + flat_meta.insert(flat_meta.end(), std::make_move_iterator(with_same_row_id.begin()), + std::make_move_iterator(with_same_row_id.end())); + } + if (raw_convertible) { + ret.push_back(SplitGenerator::SplitGroup::RawConvertibleGroup(std::move(flat_meta))); + } else { + ret.push_back(SplitGenerator::SplitGroup::NonRawConvertibleGroup(std::move(flat_meta))); + } + } + return ret; +} +} // namespace paimon diff --git a/src/paimon/core/table/source/data_evolution_split_generator.h b/src/paimon/core/table/source/data_evolution_split_generator.h new file mode 100644 index 0000000..7e9d75c --- /dev/null +++ b/src/paimon/core/table/source/data_evolution_split_generator.h @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once +#include +#include +#include +#include +#include + +#include "paimon/core/table/source/split_generator.h" +#include "paimon/result.h" + +namespace paimon { +struct DataFileMeta; + +/// Append data evolution table split generator, which implementation of `SplitGenerator`. +class DataEvolutionSplitGenerator : public SplitGenerator { + public: + DataEvolutionSplitGenerator(int64_t target_split_size, int64_t open_file_cost) + : target_split_size_(target_split_size), open_file_cost_(open_file_cost) {} + + Result> SplitForBatch( + std::vector>&& input) const override; + + Result> SplitForStreaming( + std::vector>&& files) const override { + return SplitForBatch(std::move(files)); + } + + private: + int64_t target_split_size_; + int64_t open_file_cost_; +}; + +} // namespace paimon diff --git a/src/paimon/core/table/source/data_table_batch_scan.cpp b/src/paimon/core/table/source/data_table_batch_scan.cpp new file mode 100644 index 0000000..9cb816f --- /dev/null +++ b/src/paimon/core/table/source/data_table_batch_scan.cpp @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/table/source/data_table_batch_scan.h" + +#include +#include +#include + +#include "paimon/core/core_options.h" +#include "paimon/core/options/merge_engine.h" +#include "paimon/core/table/bucket_mode.h" +#include "paimon/core/table/source/data_split_impl.h" +#include "paimon/core/table/source/plan_impl.h" +#include "paimon/core/table/source/snapshot/snapshot_reader.h" +#include "paimon/status.h" + +namespace paimon { +class DataSplit; + +DataTableBatchScan::DataTableBatchScan(bool pk_table, const CoreOptions& core_options, + const std::shared_ptr& snapshot_reader, + std::optional push_down_limit) + : AbstractTableScan(core_options, snapshot_reader), push_down_limit_(push_down_limit) { + if (pk_table && (core_options.DeletionVectorsEnabled() || + core_options.GetMergeEngine() == MergeEngine::FIRST_ROW)) { + auto level_filter = [](int32_t level) -> bool { return level > 0; }; + snapshot_reader_->WithLevelFilter(level_filter); + snapshot_reader_->EnableValueFilter(); + } + if (core_options.GetBucket() == BucketModeDefine::POSTPONE_BUCKET) { + snapshot_reader_->OnlyReadRealBuckets(); + } +} + +Result> DataTableBatchScan::CreatePlan() { + if (starting_scanner_ == nullptr) { + PAIMON_ASSIGN_OR_RAISE(starting_scanner_, CreateStartingScanner(/*is_streaming=*/false)); + } + if (has_next_) { + has_next_ = false; + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr scan_result, + starting_scanner_->Scan(snapshot_reader_)); + return ApplyPushDownLimit(scan_result); + } + return Status::Invalid("end of scan"); +} + +Result> DataTableBatchScan::ApplyPushDownLimit( + const std::shared_ptr& scan_result) const { + auto current_scan_result = + std::dynamic_pointer_cast(scan_result); + if (!current_scan_result) { + // NoSnapshot + return PlanImpl::EmptyPlan(); + } + if (push_down_limit_ == std::nullopt) { + return current_scan_result->GetPlan(); + } + std::vector> splits = current_scan_result->Splits(); + std::vector> limited_data_splits; + limited_data_splits.reserve(splits.size()); + int64_t scanned_row_count = 0; + for (const auto& split : splits) { + auto data_split = std::dynamic_pointer_cast(split); + if (!data_split) { + return Status::Invalid("DataSplit cannot cast to DataSplitImpl"); + } + if (data_split->RawConvertible()) { + int64_t partial_merged_row_count = data_split->PartialMergedRowCount(); + limited_data_splits.emplace_back(data_split); + scanned_row_count += partial_merged_row_count; + if (scanned_row_count >= push_down_limit_.value()) { + PAIMON_ASSIGN_OR_RAISE(int64_t snapshot_id, current_scan_result->SnapshotId()); + return std::make_shared(snapshot_id, limited_data_splits); + } + } + } + return current_scan_result->GetPlan(); +} + +} // namespace paimon diff --git a/src/paimon/core/table/source/data_table_batch_scan.h b/src/paimon/core/table/source/data_table_batch_scan.h new file mode 100644 index 0000000..7f6dbb1 --- /dev/null +++ b/src/paimon/core/table/source/data_table_batch_scan.h @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once +#include +#include +#include + +#include "paimon/core/table/source/abstract_table_scan.h" +#include "paimon/core/table/source/snapshot/starting_scanner.h" +#include "paimon/result.h" +#include "paimon/table/source/plan.h" + +namespace paimon { +class CoreOptions; +class SnapshotReader; + +/// `TableScan` implementation for batch planning. +class DataTableBatchScan : public AbstractTableScan { + public: + DataTableBatchScan(bool pk_table, const CoreOptions& core_options, + const std::shared_ptr& snapshot_reader, + std::optional push_down_limit); + + Result> CreatePlan() override; + + std::shared_ptr GetNonPartitionPredicate() const { + return snapshot_reader_->GetNonPartitionPredicate(); + } + std::shared_ptr GetPartitionPredicate() const { + return snapshot_reader_->GetPartitionPredicate(); + } + + DataTableBatchScan* WithRowRangeIndex(const RowRangeIndex& row_range_index) { + snapshot_reader_->WithRowRangeIndex(row_range_index); + return this; + } + + private: + Result> ApplyPushDownLimit( + const std::shared_ptr& scan_result) const; + + private: + std::shared_ptr starting_scanner_; + bool has_next_ = true; + std::optional push_down_limit_; +}; +} // namespace paimon diff --git a/src/paimon/core/table/source/data_table_stream_scan.cpp b/src/paimon/core/table/source/data_table_stream_scan.cpp new file mode 100644 index 0000000..4c6d4df --- /dev/null +++ b/src/paimon/core/table/source/data_table_stream_scan.cpp @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/table/source/data_table_stream_scan.h" + +#include + +#include "fmt/format.h" +#include "paimon/core/core_options.h" +#include "paimon/core/options/changelog_producer.h" +#include "paimon/core/table/bucket_mode.h" +#include "paimon/core/table/source/plan_impl.h" +#include "paimon/core/table/source/snapshot/delta_follow_up_scanner.h" +#include "paimon/core/table/source/snapshot/follow_up_scanner.h" +#include "paimon/core/table/source/snapshot/snapshot_reader.h" +#include "paimon/core/table/source/snapshot/starting_scanner.h" +#include "paimon/core/utils/snapshot_manager.h" + +namespace paimon { +DataTableStreamScan::DataTableStreamScan(const CoreOptions& core_options, + const std::shared_ptr& snapshot_reader) + : AbstractTableScan(core_options, snapshot_reader) { + if (core_options.GetBucket() == BucketModeDefine::POSTPONE_BUCKET && + core_options.GetChangelogProducer() != ChangelogProducer::NONE) { + snapshot_reader_->OnlyReadRealBuckets(); + } +} + +Result> DataTableStreamScan::CreatePlan() { + if (!starting_scanner_) { + PAIMON_RETURN_NOT_OK(InitScanner()); + } + if (next_snapshot_id_ == std::nullopt) { + return TryFirstPlan(); + } else { + return NextPlan(); + } +} + +Result> DataTableStreamScan::TryFirstPlan() { + std::shared_ptr scan_result; + if (core_options_.GetChangelogProducer() == ChangelogProducer::LOOKUP) { + return Status::NotImplemented("do not support lookup changelog producer"); + } else if (core_options_.GetChangelogProducer() == ChangelogProducer::FULL_COMPACTION) { + return Status::NotImplemented("do not support full compaction changelog producer"); + } else { + PAIMON_ASSIGN_OR_RAISE(scan_result, starting_scanner_->Scan(snapshot_reader_)); + } + if (auto current_snapshot = + std::dynamic_pointer_cast(scan_result)) { + PAIMON_ASSIGN_OR_RAISE(int64_t current_snapshot_id, current_snapshot->SnapshotId()); + next_snapshot_id_ = current_snapshot_id + 1; + return current_snapshot->GetPlan(); + } else if (auto next_snapshot = + std::dynamic_pointer_cast(scan_result)) { + next_snapshot_id_ = next_snapshot->NextSnapshotId(); + } + return PlanImpl::EmptyPlan(); +} + +Result> DataTableStreamScan::NextPlan() { + while (true) { + PAIMON_ASSIGN_OR_RAISE(std::optional snapshot, + GetNextSnapshot(next_snapshot_id_.value())); + if (snapshot == std::nullopt) { + return PlanImpl::EmptyPlan(); + } + if (follow_up_scanner_->NeedScanSnapshot(snapshot.value())) { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr plan, + follow_up_scanner_->Scan(snapshot.value(), snapshot_reader_)); + next_snapshot_id_.value()++; + return plan; + } else { + next_snapshot_id_.value()++; + } + } +} + +Result> DataTableStreamScan::GetNextSnapshot( + int64_t next_snapshot_id) const { + auto snapshot_manager = snapshot_reader_->GetSnapshotManager(); + PAIMON_ASSIGN_OR_RAISE(bool exists, snapshot_manager->SnapshotExists(next_snapshot_id)); + if (exists) { + PAIMON_ASSIGN_OR_RAISE(Snapshot snapshot, snapshot_manager->LoadSnapshot(next_snapshot_id)); + return std::optional(snapshot); + } + PAIMON_ASSIGN_OR_RAISE(std::optional earliest, snapshot_manager->EarliestSnapshotId()); + PAIMON_ASSIGN_OR_RAISE(std::optional latest, snapshot_manager->LatestSnapshotId()); + // No snapshot now + if (earliest == std::nullopt || earliest.value() <= next_snapshot_id) { + if ((earliest == std::nullopt && next_snapshot_id > 1) || + (latest != std::nullopt && next_snapshot_id > latest.value() + 1)) { + return Status::Invalid(fmt::format( + "The next expected snapshot is too big! Most possible cause might be the table had " + "been recreated. The next snapshot id is {}, while the latest snapshot id is {}", + next_snapshot_id, latest.value())); + } + return std::optional(); + } + return Status::Invalid( + fmt::format("The snapshot with id {} has expired. You can: 1. increase the snapshot or " + "changelog expiration time. 2. use consumer-id to ensure that unconsumed " + "snapshots will not be expired.", + next_snapshot_id)); +} + +Status DataTableStreamScan::InitScanner() { + PAIMON_ASSIGN_OR_RAISE(starting_scanner_, CreateStartingScanner(/*is_streaming=*/true)); + follow_up_scanner_ = std::make_shared(); + return Status::OK(); +} + +} // namespace paimon diff --git a/src/paimon/core/table/source/data_table_stream_scan.h b/src/paimon/core/table/source/data_table_stream_scan.h new file mode 100644 index 0000000..9d887db --- /dev/null +++ b/src/paimon/core/table/source/data_table_stream_scan.h @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once +#include +#include +#include + +#include "paimon/core/snapshot.h" +#include "paimon/core/table/source/abstract_table_scan.h" +#include "paimon/core/table/source/plan_impl.h" +#include "paimon/core/table/source/snapshot/follow_up_scanner.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "paimon/table/source/plan.h" + +namespace paimon { +class CoreOptions; +class FollowUpScanner; +class SnapshotReader; +class StartingScanner; + +/// `StreamTableScan` implementation for streaming planning. +class DataTableStreamScan : public AbstractTableScan { + public: + DataTableStreamScan(const CoreOptions& core_options, + const std::shared_ptr& snapshot_reader); + + Result> CreatePlan() override; + + private: + Status InitScanner(); + + Result> TryFirstPlan(); + + Result> NextPlan(); + + Result> GetNextSnapshot(int64_t next_snapshot_id) const; + + private: + std::shared_ptr starting_scanner_; + std::shared_ptr follow_up_scanner_; + std::optional next_snapshot_id_; +}; +} // namespace paimon diff --git a/src/paimon/core/table/source/fallback_table_read.cpp b/src/paimon/core/table/source/fallback_table_read.cpp new file mode 100644 index 0000000..6ca4421 --- /dev/null +++ b/src/paimon/core/table/source/fallback_table_read.cpp @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/table/source/fallback_table_read.h" + +#include "paimon/core/table/source/data_split_impl.h" +#include "paimon/core/table/source/fallback_data_split.h" +#include "paimon/status.h" +#include "paimon/table/source/data_split.h" + +namespace paimon { +Result> FallbackTableRead::CreateReader( + const std::shared_ptr& split) { + auto fallback_data_split = std::dynamic_pointer_cast(split); + if (fallback_data_split) { + if (fallback_data_split->IsFallback()) { + return fallback_table_->CreateReader(fallback_data_split->GetSplit()); + } else { + return main_table_->CreateReader(fallback_data_split->GetSplit()); + } + } + auto data_split = std::dynamic_pointer_cast(split); + if (!data_split) { + return Status::Invalid("cannot cast split to data split"); + } + return main_table_->CreateReader(data_split); +} + +} // namespace paimon diff --git a/src/paimon/core/table/source/fallback_table_read.h b/src/paimon/core/table/source/fallback_table_read.h new file mode 100644 index 0000000..91af53a --- /dev/null +++ b/src/paimon/core/table/source/fallback_table_read.h @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/reader/batch_reader.h" +#include "paimon/result.h" +#include "paimon/table/source/table_read.h" + +namespace paimon { +class DataSplit; +class MemoryPool; + +class FallbackTableRead : public TableRead { + public: + FallbackTableRead(std::unique_ptr main_table, + std::unique_ptr fallback_table, + const std::shared_ptr& memory_pool) + : TableRead(memory_pool), + main_table_(std::move(main_table)), + fallback_table_(std::move(fallback_table)) {} + + Result> CreateReader(const std::shared_ptr& split) override; + + private: + std::unique_ptr main_table_; + std::unique_ptr fallback_table_; +}; + +} // namespace paimon diff --git a/src/paimon/core/table/source/key_value_table_read.cpp b/src/paimon/core/table/source/key_value_table_read.cpp new file mode 100644 index 0000000..0d70909 --- /dev/null +++ b/src/paimon/core/table/source/key_value_table_read.cpp @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/table/source/key_value_table_read.h" + +#include + +#include "paimon/core/operation/merge_file_split_read.h" +#include "paimon/core/operation/raw_file_split_read.h" +#include "paimon/status.h" + +namespace paimon { +class DataSplit; +class Executor; +class FileStorePathFactory; +class InternalReadContext; +class MemoryPool; + +KeyValueTableRead::KeyValueTableRead(std::vector>&& split_reads, + const std::shared_ptr& memory_pool) + : TableRead(memory_pool), split_reads_(std::move(split_reads)) {} + +Result> KeyValueTableRead::Create( + const std::shared_ptr& path_factory, + const std::shared_ptr& context, + const std::shared_ptr& memory_pool, const std::shared_ptr& executor) { + auto raw_file_split_read = + std::make_unique(path_factory, context, memory_pool, executor); + std::vector> split_reads; + split_reads.emplace_back(std::move(raw_file_split_read)); + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr merge_file_split_read, + MergeFileSplitRead::Create(path_factory, context, memory_pool, executor)); + split_reads.emplace_back(std::move(merge_file_split_read)); + + return std::unique_ptr(new KeyValueTableRead(std::move(split_reads), memory_pool)); +} + +void KeyValueTableRead::ForceKeepDelete(bool force_keep_delete) { + force_keep_delete_ = force_keep_delete; + for (const auto& read : split_reads_) { + auto* merge_read = dynamic_cast(read.get()); + if (merge_read != nullptr) { + merge_read->ForceKeepDelete(force_keep_delete); + } + } +} + +Result> KeyValueTableRead::CreateReader( + const std::shared_ptr& split) { + auto data_split = std::dynamic_pointer_cast(split); + if (!data_split) { + return Status::Invalid("split cannot be casted to DataSplit"); + } + for (const auto& read : split_reads_) { + PAIMON_ASSIGN_OR_RAISE(bool matched, read->Match(data_split, force_keep_delete_)); + if (matched) { + return read->CreateReader(data_split); + } + } + return Status::Invalid("create reader failed, not read match with data split."); +} + +} // namespace paimon diff --git a/src/paimon/core/table/source/key_value_table_read.h b/src/paimon/core/table/source/key_value_table_read.h new file mode 100644 index 0000000..7751e59 --- /dev/null +++ b/src/paimon/core/table/source/key_value_table_read.h @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/core/operation/internal_read_context.h" +#include "paimon/core/operation/split_read.h" +#include "paimon/core/utils/file_store_path_factory.h" +#include "paimon/reader/batch_reader.h" +#include "paimon/result.h" +#include "paimon/table/source/table_read.h" + +namespace paimon { +class Split; +class Executor; +class FileStorePathFactory; +class InternalReadContext; +class MemoryPool; + +class KeyValueTableRead : public TableRead { + public: + static Result> Create( + const std::shared_ptr& path_factory, + const std::shared_ptr& context, + const std::shared_ptr& memory_pool, const std::shared_ptr& executor); + + Result> CreateReader(const std::shared_ptr& split) override; + + void ForceKeepDelete(bool force_keep_delete); + + private: + KeyValueTableRead(std::vector>&& split_reads, + const std::shared_ptr& memory_pool); + + std::vector> split_reads_; + bool force_keep_delete_ = false; +}; + +} // namespace paimon diff --git a/src/paimon/core/table/source/merge_tree_split_generator.cpp b/src/paimon/core/table/source/merge_tree_split_generator.cpp new file mode 100644 index 0000000..925ce22 --- /dev/null +++ b/src/paimon/core/table/source/merge_tree_split_generator.cpp @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/table/source/merge_tree_split_generator.h" + +#include +#include +#include +#include + +#include "paimon/common/utils/bin_packing.h" +#include "paimon/core/mergetree/compact/interval_partition.h" +#include "paimon/core/mergetree/sorted_run.h" +#include "paimon/core/options/merge_engine.h" + +namespace paimon { +class FieldsComparator; + +MergeTreeSplitGenerator::MergeTreeSplitGenerator( + int64_t target_split_size, int64_t open_file_cost, bool deletion_vectors_enabled, + const MergeEngine& merge_engine, const std::shared_ptr& key_comparator) + : target_split_size_(target_split_size), + open_file_cost_(open_file_cost), + deletion_vectors_enabled_(deletion_vectors_enabled), + merge_engine_(merge_engine), + key_comparator_(key_comparator) {} + +Result> MergeTreeSplitGenerator::SplitForBatch( + std::vector>&& input) const { + bool raw_convertible = true; + std::set all_levels; + for (const auto& meta : input) { + if (meta->level == 0 || !WithoutDeleteRow(meta)) { + raw_convertible = false; + } + all_levels.insert(meta->level); + } + bool one_level = (all_levels.size() == 1); + + if (raw_convertible && + (deletion_vectors_enabled_ || merge_engine_ == MergeEngine::FIRST_ROW || one_level)) { + auto weight_func = [open_file_cost = open_file_cost_]( + const std::shared_ptr& meta) -> int64_t { + return std::max(meta->file_size, open_file_cost); + }; + auto packed = BinPacking::PackForOrdered>( + std::move(input), weight_func, target_split_size_); + std::vector ret; + ret.reserve(packed.size()); + for (auto& pack : packed) { + ret.push_back(SplitGenerator::SplitGroup::RawConvertibleGroup(std::move(pack))); + } + return ret; + } + + /* + * The generator aims to parallel the scan execution by slicing the files of each bucket + * into multiple splits. The generation has one constraint: files with intersected key + * ranges (within one section) must go to the same split. Therefore, the files are first to + * go through the interval partition algorithm to generate sections and then through the + * OrderedPack algorithm. Note that the item to be packed here is each section, the capacity + * is denoted as the targetSplitSize, and the final number of the bins is the number of + * splits generated. + * + * For instance, there are files: [1, 2] [3, 4] [5, 180] [5, 190] [200, 600] [210, 700] + * with targetSplitSize 128M. After interval partition, there are four sections: + * - section1: [1, 2] + * - section2: [3, 4] + * - section3: [5, 180], [5, 190] + * - section4: [200, 600], [210, 700] + * + * After OrderedPack, section1 and section2 will be put into one bin (split), so the final + * result will be: + * - split1: [1, 2] [3, 4] + * - split2: [5, 180] [5,190] + * - split3: [200, 600] [210, 700] + */ + std::vector> sorted_runs_vec = + IntervalPartition(std::move(input), key_comparator_).Partition(); + std::vector>> sections; + sections.reserve(sorted_runs_vec.size()); + for (auto& sorted_runs : sorted_runs_vec) { + auto files = FlatRun(std::move(sorted_runs)); + sections.push_back(std::move(files)); + } + + std::vector>> metas_vec = + PackSplits(std::move(sections)); + std::vector split_groups; + split_groups.reserve(metas_vec.size()); + for (auto& metas : metas_vec) { + if (metas.size() == 1 && WithoutDeleteRow(metas[0])) { + split_groups.push_back( + SplitGenerator::SplitGroup::RawConvertibleGroup(std::move(metas))); + } else { + split_groups.push_back( + SplitGenerator::SplitGroup::NonRawConvertibleGroup(std::move(metas))); + } + } + return split_groups; +} + +std::vector>> MergeTreeSplitGenerator::PackSplits( + std::vector>>&& sections) const { + auto total_size = [](const std::vector>& section) -> int64_t { + int64_t ret = 0; + for (const auto& meta : section) { + ret += meta->file_size; + } + return ret; + }; + auto weight_func = [open_file_cost = open_file_cost_, total_size]( + const std::vector>& metas) -> int64_t { + return std::max(total_size(metas), open_file_cost); + }; + auto packed = BinPacking::PackForOrdered>>( + std::move(sections), weight_func, target_split_size_); + std::vector>> result; + result.reserve(packed.size()); + for (auto& pa : packed) { + auto flat_files = MergeTreeSplitGenerator::FlatFiles(std::move(pa)); + result.push_back(std::move(flat_files)); + } + return result; +} + +std::vector> MergeTreeSplitGenerator::FlatFiles( + std::vector>>&& section) { + std::vector> result; + for (auto& sec : section) { + for (auto& file : sec) { + result.push_back(std::move(file)); + } + } + return result; +} + +std::vector> MergeTreeSplitGenerator::FlatRun( + std::vector&& section) { + std::vector> result; + for (auto& run : section) { + for (auto& file : run.Files()) { + result.push_back(std::move(file)); + } + } + return result; +} + +bool MergeTreeSplitGenerator::WithoutDeleteRow(const std::shared_ptr& meta) { + // null to true to be compatible with old version + if (meta->delete_row_count == std::nullopt) { + return true; + } + return meta->delete_row_count.value() == 0l; +} + +} // namespace paimon diff --git a/src/paimon/core/table/source/merge_tree_split_generator.h b/src/paimon/core/table/source/merge_tree_split_generator.h new file mode 100644 index 0000000..0579a0f --- /dev/null +++ b/src/paimon/core/table/source/merge_tree_split_generator.h @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once +#include +#include +#include +#include + +#include "paimon/common/utils/bin_packing.h" +#include "paimon/common/utils/fields_comparator.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/mergetree/sorted_run.h" +#include "paimon/core/options/merge_engine.h" +#include "paimon/core/table/source/split_generator.h" +#include "paimon/result.h" + +namespace paimon { +class FieldsComparator; +class SortedRun; +enum class MergeEngine; + +/// Merge tree implementation of `SplitGenerator`. +class MergeTreeSplitGenerator : public SplitGenerator { + public: + MergeTreeSplitGenerator(int64_t target_split_size, int64_t open_file_cost, + bool deletion_vectors_enabled, const MergeEngine& merge_engine, + const std::shared_ptr& key_comparator); + + Result> SplitForBatch( + std::vector>&& input) const override; + + Result> SplitForStreaming( + std::vector>&& files) const override { + // We don't split streaming scan files + return std::vector({SplitGroup::RawConvertibleGroup(std::move(files))}); + } + + private: + std::vector>> PackSplits( + std::vector>>&& sections) const; + + static std::vector> FlatFiles( + std::vector>>&& section); + + static std::vector> FlatRun(std::vector&& section); + + static bool WithoutDeleteRow(const std::shared_ptr& meta); + + private: + int64_t target_split_size_; + int64_t open_file_cost_; + bool deletion_vectors_enabled_; + MergeEngine merge_engine_; + std::shared_ptr key_comparator_; +}; +} // namespace paimon diff --git a/src/paimon/core/table/source/snapshot/continuous_from_snapshot_full_starting_scanner.h b/src/paimon/core/table/source/snapshot/continuous_from_snapshot_full_starting_scanner.h new file mode 100644 index 0000000..08abcb5 --- /dev/null +++ b/src/paimon/core/table/source/snapshot/continuous_from_snapshot_full_starting_scanner.h @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/core/table/source/snapshot/starting_scanner.h" + +namespace paimon { +/// `StartingScanner` for the `StartupMode::FromSnapshotFull()` startup mode +/// of a batch read. +class ContinuousFromSnapshotFullStartingScanner : public StartingScanner { + public: + ContinuousFromSnapshotFullStartingScanner( + const std::shared_ptr& snapshot_manager, int64_t starting_snapshot_id) + : StartingScanner(snapshot_manager) { + starting_snapshot_id_ = starting_snapshot_id; + } + + Result> Scan( + const std::shared_ptr& snapshot_reader) override { + PAIMON_ASSIGN_OR_RAISE(std::optional earliest_id, + snapshot_manager_->EarliestSnapshotId()); + if (earliest_id == std::nullopt) { + return std::make_shared(); + } + int64_t ceiled_snapshot_id = std::max(earliest_id.value(), starting_snapshot_id_.value()); + PAIMON_ASSIGN_OR_RAISE(Snapshot snapshot, + snapshot_manager_->LoadSnapshot(ceiled_snapshot_id)); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr plan, + snapshot_reader->WithMode(ScanMode::ALL)->WithSnapshot(snapshot)->Read()); + return std::make_shared(plan); + } +}; +} // namespace paimon diff --git a/src/paimon/core/table/source/snapshot/continuous_from_snapshot_starting_scanner.h b/src/paimon/core/table/source/snapshot/continuous_from_snapshot_starting_scanner.h new file mode 100644 index 0000000..0338949 --- /dev/null +++ b/src/paimon/core/table/source/snapshot/continuous_from_snapshot_starting_scanner.h @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/core/table/source/snapshot/starting_scanner.h" + +namespace paimon { +/// `StartingScanner` for the `StartupMode::FromSnapshot()` startup mode of a +/// streaming read. +class ContinuousFromSnapshotStartingScanner : public StartingScanner { + public: + ContinuousFromSnapshotStartingScanner(const std::shared_ptr& snapshot_manager, + int64_t starting_snapshot_id) + : StartingScanner(snapshot_manager) { + starting_snapshot_id_ = starting_snapshot_id; + } + + Result> Scan( + const std::shared_ptr& snapshot_reader) override { + PAIMON_ASSIGN_OR_RAISE(std::optional earliest_id, + snapshot_manager_->EarliestSnapshotId()); + if (earliest_id == std::nullopt) { + return std::make_shared(); + } + // We should return the specified snapshot as next snapshot to indicate to scan delta data + // from it. If the snapshotId < earliestSnapshotId, start from the earliest. + return std::make_shared( + std::max(starting_snapshot_id_.value(), earliest_id.value())); + } +}; +} // namespace paimon diff --git a/src/paimon/core/table/source/snapshot/continuous_latest_starting_scanner.h b/src/paimon/core/table/source/snapshot/continuous_latest_starting_scanner.h new file mode 100644 index 0000000..868bf5f --- /dev/null +++ b/src/paimon/core/table/source/snapshot/continuous_latest_starting_scanner.h @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "paimon/core/table/source/snapshot/starting_scanner.h" + +namespace paimon { +/// `StartingScanner` for the `StartupMode::Latest()` startup mode of a +/// streaming read. +class ContinuousLatestStartingScanner : public StartingScanner { + public: + static Result> Create( + const std::shared_ptr& snapshot_manager) { + PAIMON_ASSIGN_OR_RAISE(std::optional starting_snapshot_id, + snapshot_manager->LatestSnapshotId()); + return std::unique_ptr( + new ContinuousLatestStartingScanner(snapshot_manager, starting_snapshot_id)); + } + + Result> Scan( + const std::shared_ptr& snapshot_reader) override { + PAIMON_ASSIGN_OR_RAISE(std::optional latest_snapshot_id, + snapshot_manager_->LatestSnapshotId()); + if (latest_snapshot_id == std::nullopt) { + return std::make_shared(); + } + // If there's no snapshot before the reading job starts, + // then the first snapshot should be considered as an incremental snapshot + int64_t next_snapshot_id = starting_snapshot_id_ == std::nullopt + ? Snapshot::FIRST_SNAPSHOT_ID + : latest_snapshot_id.value() + 1; + return std::make_shared(next_snapshot_id); + } + + private: + ContinuousLatestStartingScanner(const std::shared_ptr& snapshot_manager, + const std::optional& starting_snapshot_id) + : StartingScanner(snapshot_manager) { + starting_snapshot_id_ = starting_snapshot_id; + } +}; +} // namespace paimon diff --git a/src/paimon/core/table/source/snapshot/delta_follow_up_scanner.h b/src/paimon/core/table/source/snapshot/delta_follow_up_scanner.h new file mode 100644 index 0000000..238ec52 --- /dev/null +++ b/src/paimon/core/table/source/snapshot/delta_follow_up_scanner.h @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "paimon/core/table/source/snapshot/follow_up_scanner.h" +#include "paimon/logging.h" + +namespace paimon { +class DeltaFollowUpScanner : public FollowUpScanner { + public: + DeltaFollowUpScanner() : logger_(Logger::GetLogger("DeltaFollowUpScanner")) {} + + bool NeedScanSnapshot(const Snapshot& snapshot) const override { + if (snapshot.GetCommitKind() == Snapshot::CommitKind::Append()) { + return true; + } + PAIMON_LOG_DEBUG( + logger_, "Ignore snapshot #%ld with commit kind %s in delta follow-up scanner.", + snapshot.Id(), Snapshot::CommitKind::ToString(snapshot.GetCommitKind()).c_str()); + return false; + } + Result> Scan( + const Snapshot& snapshot, + const std::shared_ptr& snapshot_reader) const override { + return snapshot_reader->WithMode(ScanMode::DELTA)->WithSnapshot(snapshot)->Read(); + } + + private: + std::unique_ptr logger_; +}; +} // namespace paimon diff --git a/src/paimon/core/table/source/snapshot/follow_up_scanner.h b/src/paimon/core/table/source/snapshot/follow_up_scanner.h new file mode 100644 index 0000000..39e0b82 --- /dev/null +++ b/src/paimon/core/table/source/snapshot/follow_up_scanner.h @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "paimon/core/table/source/snapshot/snapshot_reader.h" +namespace paimon { +/// Helper class for the follow-up planning of `StreamTableScan`. +class FollowUpScanner { + public: + virtual ~FollowUpScanner() = default; + virtual bool NeedScanSnapshot(const Snapshot& snapshot) const = 0; + virtual Result> Scan( + const Snapshot& snapshot, const std::shared_ptr& snapshot_reader) const = 0; +}; +} // namespace paimon diff --git a/src/paimon/core/table/source/snapshot/full_starting_scanner.h b/src/paimon/core/table/source/snapshot/full_starting_scanner.h new file mode 100644 index 0000000..28a4872 --- /dev/null +++ b/src/paimon/core/table/source/snapshot/full_starting_scanner.h @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "paimon/core/table/source/snapshot/starting_scanner.h" + +namespace paimon { +/// `StartingScanner` for the `StartupMode::LatestFull()` startup mode. +class FullStartingScanner : public StartingScanner { + public: + explicit FullStartingScanner(const std::shared_ptr& snapshot_manager) + : StartingScanner(snapshot_manager) {} + + Result> Scan( + const std::shared_ptr& snapshot_reader) override { + if (starting_snapshot_id_ == std::nullopt) { + // try to get first snapshot + PAIMON_ASSIGN_OR_RAISE(starting_snapshot_id_, snapshot_manager_->LatestSnapshotId()); + } + if (starting_snapshot_id_ == std::nullopt) { + return std::make_shared(); + } + PAIMON_ASSIGN_OR_RAISE(Snapshot snapshot, + snapshot_manager_->LoadSnapshot(starting_snapshot_id_.value())); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr plan, + snapshot_reader->WithMode(ScanMode::ALL)->WithSnapshot(snapshot)->Read()); + return std::make_shared(plan); + } +}; +} // namespace paimon diff --git a/src/paimon/core/table/source/snapshot/snapshot_reader.cpp b/src/paimon/core/table/source/snapshot/snapshot_reader.cpp new file mode 100644 index 0000000..c6b0477 --- /dev/null +++ b/src/paimon/core/table/source/snapshot/snapshot_reader.cpp @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/table/source/snapshot/snapshot_reader.h" + +#include +#include +#include +#include + +#include "paimon/common/data/binary_row.h" +#include "paimon/common/utils/linked_hash_map.h" +#include "paimon/core/core_options.h" +#include "paimon/core/deletionvectors/deletion_vectors_index_file.h" +#include "paimon/core/index/deletion_vector_meta.h" +#include "paimon/core/index/index_file_meta.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/manifest/file_kind.h" +#include "paimon/core/manifest/manifest_entry.h" +#include "paimon/core/snapshot.h" +#include "paimon/core/table/source/data_split_impl.h" +#include "paimon/core/table/source/plan_impl.h" +#include "paimon/core/utils/file_store_path_factory.h" + +namespace paimon { +Result> SnapshotReader::Read() const { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr raw_plan, scan_->CreatePlan()); + const std::optional& snapshot = raw_plan->GetSnapshot(); + FileStoreScan::RawPlan::GroupFiles files = + FileStoreScan::RawPlan::GroupByPartFiles(raw_plan->Files(FileKind::Add())); + PAIMON_ASSIGN_OR_RAISE( + std::vector> data_splits, + GenerateSplits(snapshot, scan_mode_ != ScanMode::ALL, split_generator_, std::move(files))); + return std::make_shared(raw_plan->SnapshotId(), data_splits); +} + +Result>> SnapshotReader::GenerateSplits( + const std::optional& snapshot, bool is_streaming, + const std::unique_ptr& split_generator, + FileStoreScan::RawPlan::GroupFiles&& grouped_manifest_entries) const { + std::vector> splits; + // Read deletion indexes at once to reduce file IO + std::unordered_map, std::vector>> + deletion_index_files_map; + bool deletion_file_enabled = scan_->GetCoreOptions().DeletionVectorsEnabled(); + if (!is_streaming) { + if (deletion_file_enabled && snapshot != std::nullopt) { + PAIMON_ASSIGN_OR_RAISE( + deletion_index_files_map, + index_file_handler_->Scan( + snapshot.value(), std::string(DeletionVectorsIndexFile::DELETION_VECTORS_INDEX), + grouped_manifest_entries.key_set())); + } + } + for (auto& [partition, bucket_map] : grouped_manifest_entries) { + for (auto& [bucket, manifest_entries] : bucket_map) { + // collect data file metas + assert(!manifest_entries.empty()); + auto total_buckets = manifest_entries[0].TotalBuckets(); + std::vector> files; + files.reserve(manifest_entries.size()); + for (auto& entry : manifest_entries) { + files.emplace_back(std::move(entry.File())); + } + std::vector split_groups; + if (is_streaming) { + PAIMON_ASSIGN_OR_RAISE(split_groups, + split_generator->SplitForStreaming(std::move(files))); + } else { + PAIMON_ASSIGN_OR_RAISE(split_groups, + split_generator->SplitForBatch(std::move(files))); + } + for (auto& split_group : split_groups) { + std::vector>& data_files = split_group.files; + PAIMON_ASSIGN_OR_RAISE(std::string bucket_path, + path_factory_->BucketPath(partition, bucket)); + DataSplitImpl::Builder builder(partition, bucket, bucket_path, + std::move(data_files)); + builder.WithTotalBuckets(total_buckets) + .WithSnapshot(snapshot == std::nullopt ? Snapshot::FIRST_SNAPSHOT_ID - 1 + : snapshot.value().Id()) + .IsStreaming(is_streaming) + .RawConvertible(split_group.raw_convertible); + if (deletion_file_enabled && !deletion_index_files_map.empty()) { + PAIMON_ASSIGN_OR_RAISE( + std::vector> deletion_files, + GetDeletionFiles( + partition, bucket, builder.DataFiles(), + deletion_index_files_map[std::make_pair(partition, bucket)])); + builder.WithDataDeletionFiles(deletion_files); + } + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr data_split, builder.Build()); + splits.emplace_back(data_split); + } + } + } + return splits; +} + +Result>> SnapshotReader::GetDeletionFiles( + const BinaryRow& partition, int32_t bucket, + const std::vector>& data_files, + const std::vector>& index_file_metas) const { + std::unordered_map> data_file_to_index_file_meta; + for (const auto& index_file_meta : index_file_metas) { + const auto& dv_metas = index_file_meta->DvRanges(); + if (dv_metas != std::nullopt) { + for (const auto& dv_meta_iter : dv_metas.value()) { + const auto& dv_meta = dv_meta_iter.second; + data_file_to_index_file_meta.insert( + std::make_pair(dv_meta.GetDataFileName(), index_file_meta)); + } + } + } + + std::vector> deletion_files; + deletion_files.reserve(data_files.size()); + for (const auto& file : data_files) { + auto index_file_meta_iter = data_file_to_index_file_meta.find(file->file_name); + if (index_file_meta_iter != data_file_to_index_file_meta.end()) { + const auto& optional_dv_metas = index_file_meta_iter->second->DvRanges(); + assert(optional_dv_metas != std::nullopt); + const auto& dv_metas = optional_dv_metas.value(); + auto dv_meta_iter = dv_metas.find(file->file_name); + if (dv_meta_iter != dv_metas.end()) { + PAIMON_ASSIGN_OR_RAISE( + std::string index_file_path, + index_file_handler_->FilePath(partition, bucket, index_file_meta_iter->second)); + deletion_files.emplace_back(DeletionFile( + index_file_path, dv_meta_iter->second.GetOffset(), + dv_meta_iter->second.GetLength(), dv_meta_iter->second.GetCardinality())); + continue; + } + } + deletion_files.emplace_back(std::nullopt); + } + return deletion_files; +} + +} // namespace paimon diff --git a/src/paimon/core/table/source/snapshot/snapshot_reader.h b/src/paimon/core/table/source/snapshot/snapshot_reader.h new file mode 100644 index 0000000..c19299b --- /dev/null +++ b/src/paimon/core/table/source/snapshot/snapshot_reader.h @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "paimon/core/index/index_file_handler.h" +#include "paimon/core/operation/file_store_scan.h" +#include "paimon/core/table/source/data_split_impl.h" +#include "paimon/core/table/source/deletion_file.h" +#include "paimon/core/table/source/plan_impl.h" +#include "paimon/core/table/source/scan_mode.h" +#include "paimon/core/table/source/split_generator.h" +#include "paimon/core/utils/file_store_path_factory.h" +#include "paimon/result.h" +#include "paimon/table/source/data_split.h" +#include "paimon/table/source/plan.h" + +namespace paimon { +class BinaryRow; +class FileStorePathFactory; +class IndexFileMeta; +class Snapshot; +class SnapshotManager; +struct DataFileMeta; + +class SnapshotReader { + public: + SnapshotReader(const std::shared_ptr& scan, + const std::shared_ptr& path_factory, + std::unique_ptr&& split_generator, + std::unique_ptr&& index_file_handler) + : scan_(scan), + path_factory_(path_factory), + split_generator_(std::move(split_generator)), + index_file_handler_(std::move(index_file_handler)) {} + + SnapshotReader* WithMode(const ScanMode& scan_mode) { + scan_mode_ = scan_mode; + scan_->WithKind(scan_mode); + return this; + } + + SnapshotReader* WithSnapshot(const Snapshot& snapshot) { + scan_->WithSnapshot(snapshot); + return this; + } + + SnapshotReader* WithLevelFilter(const std::function& level_filter) { + scan_->WithLevelFilter(level_filter); + return this; + } + + SnapshotReader* EnableValueFilter() { + scan_->EnableValueFilter(); + return this; + } + + SnapshotReader* OnlyReadRealBuckets() { + scan_->OnlyReadRealBuckets(); + return this; + } + + SnapshotReader* WithRowRangeIndex(const RowRangeIndex& row_range_index) { + scan_->WithRowRangeIndex(row_range_index); + return this; + } + + const std::shared_ptr& GetSnapshotManager() const { + return scan_->GetSnapshotManager(); + } + + std::shared_ptr GetNonPartitionPredicate() const { + return scan_->GetNonPartitionPredicate(); + } + + std::shared_ptr GetPartitionPredicate() const { + return scan_->GetPartitionPredicate(); + } + + /// Get splits from `FileKind::ADD` files. + Result> Read() const; + + private: + Result>> GenerateSplits( + const std::optional& snapshot, bool is_streaming, + const std::unique_ptr& split_generator, + FileStoreScan::RawPlan::GroupFiles&& grouped_data_files) const; + + Result>> GetDeletionFiles( + const BinaryRow& partition, int32_t bucket, + const std::vector>& data_files, + const std::vector>& index_file_metas) const; + + private: + std::shared_ptr scan_; + std::shared_ptr path_factory_; + std::unique_ptr split_generator_; + std::unique_ptr index_file_handler_; + ScanMode scan_mode_ = ScanMode::ALL; +}; +} // namespace paimon diff --git a/src/paimon/core/table/source/snapshot/starting_scanner.h b/src/paimon/core/table/source/snapshot/starting_scanner.h new file mode 100644 index 0000000..2119196 --- /dev/null +++ b/src/paimon/core/table/source/snapshot/starting_scanner.h @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/core/table/source/snapshot/snapshot_reader.h" +namespace paimon { + +/// Helper class for the first planning of `TableScan`. +class StartingScanner { + public: + /// Scan result of `Scan()`. + class ScanResult { + public: + virtual ~ScanResult() = default; + }; + + /// Currently, there is no snapshot, need to wait for the snapshot to be generated. + class NoSnapshot : public ScanResult {}; + + /// ScanResult with scanned snapshot. Next snapshot should be the current snapshot plus 1. + class CurrentSnapshot : public ScanResult { + public: + explicit CurrentSnapshot(const std::shared_ptr& plan) : plan_(plan) {} + + Result SnapshotId() const { + if (plan_->SnapshotId() == std::nullopt) { + return Status::Invalid("CurrentSnapshot must have a snapshot id"); + } + return plan_->SnapshotId().value(); + } + + const std::vector>& Splits() const { + return plan_->Splits(); + } + + const std::shared_ptr& GetPlan() const { + return plan_; + } + + private: + std::shared_ptr plan_; + }; + + /// Return the next snapshot for followup scanning. The current snapshot is not scanned (even + /// doesn't exist), so there are no splits. + class NextSnapshot : public ScanResult { + public: + explicit NextSnapshot(int64_t next_snapshot_id) : next_snapshot_id_(next_snapshot_id) {} + + int64_t NextSnapshotId() const { + return next_snapshot_id_; + } + + private: + int64_t next_snapshot_id_; + }; + + explicit StartingScanner(const std::shared_ptr& snapshot_manager) + : snapshot_manager_(snapshot_manager) {} + + virtual ~StartingScanner() = default; + + virtual Result> Scan( + const std::shared_ptr& snapshot_reader) = 0; + + protected: + std::shared_ptr snapshot_manager_; + std::optional starting_snapshot_id_; +}; +} // namespace paimon diff --git a/src/paimon/core/table/source/snapshot/static_from_snapshot_starting_scanner.h b/src/paimon/core/table/source/snapshot/static_from_snapshot_starting_scanner.h new file mode 100644 index 0000000..79ebb79 --- /dev/null +++ b/src/paimon/core/table/source/snapshot/static_from_snapshot_starting_scanner.h @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "paimon/core/table/source/snapshot/starting_scanner.h" +#include "paimon/logging.h" + +namespace paimon { +/// `StartingScanner` for the `StartupMode::FromSnapshot()` or `StartupMode::FromSnapshotFull()` +/// startup mode of a batch read. +class StaticFromSnapshotStartingScanner : public StartingScanner { + public: + StaticFromSnapshotStartingScanner(const std::shared_ptr& snapshot_manager, + int64_t snapshot_id) + : StartingScanner(snapshot_manager), + logger_(Logger::GetLogger("StaticFromSnapshotStartingScanner")) { + starting_snapshot_id_ = snapshot_id; + } + + Result> Scan( + const std::shared_ptr& snapshot_reader) override { + PAIMON_ASSIGN_OR_RAISE(std::optional earliest, + snapshot_manager_->EarliestSnapshotId()); + PAIMON_ASSIGN_OR_RAISE(std::optional latest, + snapshot_manager_->LatestSnapshotId()); + if (earliest == std::nullopt || latest == std::nullopt) { + PAIMON_LOG_INFO( + logger_, "There is currently no snapshot. Waiting for snapshot generation.%s", ""); + return std::make_shared(); + } + if (starting_snapshot_id_.value() < earliest.value() || + starting_snapshot_id_.value() > latest.value()) { + return Status::Invalid( + fmt::format("The specified scan snapshotId {} is out of " + "available snapshotId range [{}, {}].", + starting_snapshot_id_.value(), earliest.value(), latest.value())); + } + PAIMON_ASSIGN_OR_RAISE(Snapshot snapshot, + snapshot_manager_->LoadSnapshot(starting_snapshot_id_.value())); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr plan, + snapshot_reader->WithMode(ScanMode::ALL)->WithSnapshot(snapshot)->Read()); + return std::make_shared(plan); + } + + private: + std::unique_ptr logger_; +}; +} // namespace paimon diff --git a/src/paimon/core/table/source/snapshot/static_from_tag_starting_scanner.h b/src/paimon/core/table/source/snapshot/static_from_tag_starting_scanner.h new file mode 100644 index 0000000..a10b11b --- /dev/null +++ b/src/paimon/core/table/source/snapshot/static_from_tag_starting_scanner.h @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include + +#include "paimon/core/table/source/snapshot/starting_scanner.h" +#include "paimon/core/utils/tag_manager.h" + +namespace paimon { +/// `StartingScanner` for the `CoreOptions::GetScanTagName()` of a batch read. +class StaticFromTagStartingScanner : public StartingScanner { + public: + StaticFromTagStartingScanner(const std::shared_ptr& snapshot_manager, + const std::string& tag_name) + : StartingScanner(snapshot_manager) { + tag_name_ = tag_name; + } + + Result> Scan( + const std::shared_ptr& snapshot_reader) override { + const TagManager tag_manager(snapshot_manager_->Fs(), snapshot_manager_->RootPath()); + PAIMON_ASSIGN_OR_RAISE(const Tag tag, tag_manager.GetOrThrow(tag_name_)); + PAIMON_ASSIGN_OR_RAISE(const Snapshot snapshot, tag.TrimToSnapshot()); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr plan, + snapshot_reader->WithMode(ScanMode::ALL)->WithSnapshot(snapshot)->Read()); + return std::make_shared(plan); + } + + private: + std::string tag_name_; +}; +} // namespace paimon