Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions src/paimon/core/table/source/append_only_split_generator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <algorithm>
#include <memory>
#include <utility>
#include <vector>

#include "paimon/common/utils/bin_packing.h"
#include "paimon/core/append/bucketed_append_compact_manager.h"
#include "paimon/core/table/bucket_mode.h"
#include "paimon/core/table/source/split_generator.h"

namespace paimon {
/// Append only implementation of `SplitGenerator`.
class AppendOnlySplitGenerator : public SplitGenerator {
public:
AppendOnlySplitGenerator(int64_t target_split_size, int64_t open_file_cost,
const BucketMode& bucket_mode)
: target_split_size_(target_split_size),
open_file_cost_(open_file_cost),
bucket_mode_(bucket_mode) {}

Result<std::vector<SplitGroup>> SplitForBatch(
std::vector<std::shared_ptr<DataFileMeta>>&& input) const override {
std::vector<std::shared_ptr<DataFileMeta>> files = std::move(input);
std::stable_sort(files.begin(), files.end(),
BucketedAppendCompactManager::FileComparator(bucket_mode_ ==
BucketMode::BUCKET_UNAWARE));
auto weight_func = [open_file_cost = open_file_cost_](
const std::shared_ptr<DataFileMeta>& meta) -> int64_t {
return std::max(meta->file_size, open_file_cost);
};
auto packed = BinPacking::PackForOrdered<std::shared_ptr<DataFileMeta>>(
std::move(files), weight_func, target_split_size_);
std::vector<SplitGroup> ret;
ret.reserve(packed.size());
for (auto& pack : packed) {
ret.push_back(SplitGroup::RawConvertibleGroup(std::move(pack)));
}
return ret;
}

Result<std::vector<SplitGroup>> SplitForStreaming(
std::vector<std::shared_ptr<DataFileMeta>>&& files) const override {
// When the bucket mode is unaware, we split the files as batch, because unaware-bucket
// table only contains one bucket (bucket 0).
if (bucket_mode_ == BucketMode::BUCKET_UNAWARE) {
return SplitForBatch(std::move(files));
} else {
return std::vector<SplitGroup>({SplitGroup::RawConvertibleGroup(std::move(files))});
}
}

private:
int64_t target_split_size_;
int64_t open_file_cost_;
BucketMode bucket_mode_;
};
} // namespace paimon
61 changes: 61 additions & 0 deletions src/paimon/core/table/source/append_only_table_read.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "paimon/core/table/source/append_only_table_read.h"

#include "paimon/core/core_options.h"
#include "paimon/core/operation/data_evolution_split_read.h"
#include "paimon/core/operation/internal_read_context.h"
#include "paimon/core/operation/raw_file_split_read.h"
#include "paimon/status.h"

namespace paimon {
class DataSplit;
class Executor;
class FileStorePathFactory;
class MemoryPool;

AppendOnlyTableRead::AppendOnlyTableRead(const std::shared_ptr<FileStorePathFactory>& path_factory,
const std::shared_ptr<InternalReadContext>& context,
const std::shared_ptr<MemoryPool>& memory_pool,
const std::shared_ptr<Executor>& executor)
: TableRead(memory_pool) {
const auto& core_options = context->GetCoreOptions();
if (core_options.DataEvolutionEnabled()) {
// add data evolution first
split_reads_.push_back(
std::make_unique<DataEvolutionSplitRead>(path_factory, context, memory_pool, executor));
} else {
split_reads_.push_back(
std::make_unique<RawFileSplitRead>(path_factory, context, memory_pool, executor));
}
}

Result<std::unique_ptr<BatchReader>> AppendOnlyTableRead::CreateReader(
const std::shared_ptr<Split>& split) {
for (const auto& read : split_reads_) {
PAIMON_ASSIGN_OR_RAISE(bool matched, read->Match(split, /*force_keep_delete=*/false));
if (matched) {
return read->CreateReader(split);
}
}
return Status::Invalid("create reader failed, not read match with split.");
}

} // namespace paimon
54 changes: 54 additions & 0 deletions src/paimon/core/table/source/append_only_table_read.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <memory>
#include <vector>

#include "paimon/core/operation/internal_read_context.h"
#include "paimon/core/operation/split_read.h"
#include "paimon/core/utils/file_store_path_factory.h"
#include "paimon/reader/batch_reader.h"
#include "paimon/result.h"
#include "paimon/table/source/table_read.h"

namespace paimon {

class SplitRead;
class Executor;
class FileStorePathFactory;
class InternalReadContext;
class MemoryPool;

class AppendOnlyTableRead : public TableRead {
public:
AppendOnlyTableRead(const std::shared_ptr<FileStorePathFactory>& path_factory,
const std::shared_ptr<InternalReadContext>& context,
const std::shared_ptr<MemoryPool>& memory_pool,
const std::shared_ptr<Executor>& executor);

Result<std::unique_ptr<BatchReader>> CreateReader(
const std::shared_ptr<Split>& data_split) override;

private:
std::vector<std::unique_ptr<SplitRead>> split_reads_;
};

} // namespace paimon
145 changes: 145 additions & 0 deletions src/paimon/core/table/source/data_evolution_batch_scan.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "paimon/core/table/source/data_evolution_batch_scan.h"

#include "paimon/core/global_index/global_index_scan_impl.h"
#include "paimon/core/global_index/indexed_split_impl.h"
#include "paimon/core/table/source/data_split_impl.h"
#include "paimon/global_index/bitmap_global_index_result.h"
#include "paimon/global_index/global_index_scan.h"

namespace paimon {
DataEvolutionBatchScan::DataEvolutionBatchScan(
const std::string& table_path, const std::shared_ptr<SnapshotReader>& snapshot_reader,
std::unique_ptr<DataTableBatchScan>&& batch_scan,
const std::shared_ptr<GlobalIndexResult>& global_index_result, const CoreOptions& core_options,
const std::shared_ptr<MemoryPool>& pool, const std::shared_ptr<Executor>& executor)
: AbstractTableScan(core_options, snapshot_reader),
pool_(pool),
table_path_(table_path),
batch_scan_(std::move(batch_scan)),
global_index_result_(global_index_result),
executor_(executor) {}

Result<std::shared_ptr<Plan>> DataEvolutionBatchScan::CreatePlan() {
std::optional<std::vector<Range>> row_ranges;
std::shared_ptr<GlobalIndexResult> final_global_index_result = global_index_result_;
if (!final_global_index_result) {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<GlobalIndexResult> index_result, EvalGlobalIndex());
if (index_result) {
final_global_index_result = index_result;
PAIMON_ASSIGN_OR_RAISE(row_ranges, index_result->ToRanges());
}
} else {
PAIMON_ASSIGN_OR_RAISE(row_ranges, final_global_index_result->ToRanges());
}
if (!row_ranges) {
return batch_scan_->CreatePlan();
}
if (row_ranges.value().empty()) {
return PlanImpl::EmptyPlan();
}
PAIMON_ASSIGN_OR_RAISE(RowRangeIndex row_range_index,
RowRangeIndex::Create(row_ranges.value()));
batch_scan_->WithRowRangeIndex(row_range_index);
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<Plan> data_plan, batch_scan_->CreatePlan());
std::map<int64_t, float> id_to_score;
if (auto scored_result =
std::dynamic_pointer_cast<ScoredGlobalIndexResult>(final_global_index_result)) {
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<ScoredGlobalIndexResult::ScoredIterator> scored_iter,
scored_result->CreateScoredIterator());
while (scored_iter->HasNext()) {
auto [id, score] = scored_iter->NextWithScore();
id_to_score[id] = score;
}
}
return WrapToIndexedSplits(data_plan, row_range_index, id_to_score);
}

Result<std::shared_ptr<Plan>> DataEvolutionBatchScan::WrapToIndexedSplits(
const std::shared_ptr<Plan>& data_plan, const RowRangeIndex& row_range_index,
const std::map<int64_t, float>& id_to_score) const {
// TODO(lisizhuo.lsz): add executor here
auto data_splits = data_plan->Splits();
std::vector<std::shared_ptr<Split>> indexed_splits;
indexed_splits.reserve(data_splits.size());
for (const auto& split : data_splits) {
auto data_split = std::dynamic_pointer_cast<DataSplitImpl>(split);
if (!data_split) {
return Status::Invalid("Cannot cast split to DataSplit when create IndexedSplit");
}
const auto& files = data_split->DataFiles();
if (files.empty()) {
return Status::Invalid("Empty data files in WrapToIndexedSplits");
}
PAIMON_ASSIGN_OR_RAISE(int64_t min, files[0]->NonNullFirstRowId());
PAIMON_ASSIGN_OR_RAISE(int64_t max, files[files.size() - 1]->NonNullFirstRowId());
max += files[files.size() - 1]->row_count - 1;

std::vector<Range> expected = row_range_index.IntersectedRanges(min, max);
if (expected.empty()) {
return Status::Invalid(
fmt::format("There should be intersected ranges for split with min row id {} and "
"max row id {}.",
min, max));
}

std::vector<float> scores;
if (!id_to_score.empty()) {
for (const auto& range : expected) {
for (int64_t i = range.from; i <= range.to; i++) {
auto iter = id_to_score.find(i);
if (iter != id_to_score.end()) {
scores.push_back(iter->second);
} else {
return Status::Invalid(fmt::format("cannot find score for row {}", i));
}
}
}
}
indexed_splits.push_back(std::make_shared<IndexedSplitImpl>(data_split, expected, scores));
}
return std::make_shared<PlanImpl>(data_plan->SnapshotId(), indexed_splits);
}

Result<std::shared_ptr<GlobalIndexResult>> DataEvolutionBatchScan::EvalGlobalIndex() const {
auto predicate = batch_scan_->GetNonPartitionPredicate();
if (!predicate) {
return std::shared_ptr<GlobalIndexResult>(nullptr);
}
if (!core_options_.GlobalIndexEnabled()) {
return std::shared_ptr<GlobalIndexResult>(nullptr);
}
auto partition_filter = batch_scan_->GetPartitionPredicate();
// TODO(lisizhuo.lsz): support time travel
PAIMON_ASSIGN_OR_RAISE(
std::unique_ptr<GlobalIndexScan> index_scan,
GlobalIndexScan::Create(table_path_, core_options_.GetScanSnapshotId(), partition_filter,
core_options_.ToMap(), core_options_.GetFileSystem(), executor_,
pool_));
auto index_scan_impl = dynamic_cast<GlobalIndexScanImpl*>(index_scan.get());
if (!index_scan_impl) {
return Status::Invalid("invalid GlobalIndexScan, cannot cast to GlobalIndexScanImpl");
}

return index_scan_impl->Scan(predicate);
}

} // namespace paimon
57 changes: 57 additions & 0 deletions src/paimon/core/table/source/data_evolution_batch_scan.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once
#include <cstdint>
#include <memory>
#include <utility>
#include <vector>

#include "paimon/core/table/source/abstract_table_scan.h"
#include "paimon/core/table/source/data_table_batch_scan.h"
#include "paimon/result.h"
#include "paimon/utils/range.h"

namespace paimon {
class DataEvolutionBatchScan : public AbstractTableScan {
public:
DataEvolutionBatchScan(const std::string& table_path,
const std::shared_ptr<SnapshotReader>& snapshot_reader,
std::unique_ptr<DataTableBatchScan>&& batch_scan,
const std::shared_ptr<GlobalIndexResult>& global_index_result,
const CoreOptions& core_options, const std::shared_ptr<MemoryPool>& pool,
const std::shared_ptr<Executor>& executor);

Result<std::shared_ptr<Plan>> CreatePlan() override;

private:
Result<std::shared_ptr<Plan>> WrapToIndexedSplits(
const std::shared_ptr<Plan>& data_plan, const RowRangeIndex& row_range_index,
const std::map<int64_t, float>& id_to_score) const;
Result<std::shared_ptr<GlobalIndexResult>> EvalGlobalIndex() const;

private:
std::shared_ptr<MemoryPool> pool_;
std::string table_path_;
std::unique_ptr<DataTableBatchScan> batch_scan_;
std::shared_ptr<GlobalIndexResult> global_index_result_;
std::shared_ptr<Executor> executor_;
};

} // namespace paimon
Loading