Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions include/paimon/migrate/file_meta_utils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <map>
#include <memory>
#include <string>
#include <vector>

#include "paimon/commit_message.h"
#include "paimon/result.h"
#include "paimon/visibility.h"

namespace paimon {

/// Utility class for handling file metadata operations during data migration.
///
/// This class provides static utility functions for migrating external data files into Paimon
/// tables. It handles the generation of commit messages from source data files, enabling seamless
/// integration of existing data into Paimon's file store architecture.
///
/// @note This utility currently does not support:
/// - object store file systems
/// - primary-key tables
///
/// @warning This utility will move/rename source data files to destination paths during migration.
/// It is recommended to back up your data in advance before using this utility to avoid data loss.
class PAIMON_EXPORT FileMetaUtils {
public:
FileMetaUtils() = delete;
~FileMetaUtils() = delete;

/// Generate a commit message for migrating external data files into a Paimon table.
///
/// This method analyzes the provided source data files and generates a commit message that can
/// be used to incorporate these files into the target Paimon table.
///
/// @param src_data_files Vector of paths to source data files to be migrated.
/// **These files must have the same schema as the target Paimon table**.
/// @param dst_table_path Path to the destination Paimon table directory.
/// @param partition_values Map of partition column names to their values for partitioned
/// tables. Use empty map for non-partitioned tables.
/// @param options Set a configuration options map to set some option entries which are not
/// defined in the table schema or whose values you want to overwrite.
/// @param file_system Specifies the file system for file operations.
/// If `nullptr`, use default file system.
/// @return Result containing a unique pointer to the generated `CommitMessage`,
/// or an error status if the migration cannot be performed.
static Result<std::unique_ptr<CommitMessage>> GenerateCommitMessage(
const std::vector<std::string>& src_data_files, const std::string& dst_table_path,
const std::map<std::string, std::string>& partition_values,
const std::map<std::string, std::string>& options,
const std::shared_ptr<FileSystem>& file_system = nullptr);
};

} // namespace paimon
46 changes: 46 additions & 0 deletions src/paimon/core/global_index/global_index_evaluator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <memory>
#include <optional>

#include "paimon/global_index/global_index_result.h"
#include "paimon/predicate/predicate.h"
#include "paimon/visibility.h"

namespace paimon {
/// Abstract base class for evaluating predicates against a global index.
class PAIMON_EXPORT GlobalIndexEvaluator {
public:
virtual ~GlobalIndexEvaluator() = default;
/// Evaluates a predicate against the global index.
///
/// @param predicate The filter predicate to evaluate.
/// @return A `Result` containing:
/// - `nullptr` if the predicate cannot be evaluated by this index (e.g., field has
/// no index),
/// - A `std::shared_ptr<GlobalIndexResult>` if evaluation succeeds.
/// The `GlobalIndexResult` indicates the matching rows (e.g., via row ID bitmaps).
virtual Result<std::shared_ptr<GlobalIndexResult>> Evaluate(
const std::shared_ptr<Predicate>& predicate) = 0;
};

} // namespace paimon
146 changes: 146 additions & 0 deletions src/paimon/core/global_index/global_index_evaluator_impl.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "paimon/core/global_index/global_index_evaluator_impl.h"

#include "fmt/format.h"
#include "paimon/global_index/bitmap_global_index_result.h"
#include "paimon/predicate/leaf_predicate.h"
#include "paimon/predicate/predicate_utils.h"

namespace paimon {
Result<std::shared_ptr<GlobalIndexResult>> GlobalIndexEvaluatorImpl::Evaluate(
const std::shared_ptr<Predicate>& predicate) {
std::shared_ptr<GlobalIndexResult> compound_result;
if (predicate) {
PAIMON_ASSIGN_OR_RAISE(compound_result, EvaluatePredicate(predicate));
}
return compound_result;
}

Result<std::vector<std::shared_ptr<GlobalIndexReader>>> GlobalIndexEvaluatorImpl::GetIndexReaders(
const std::string& field_name) {
PAIMON_ASSIGN_OR_RAISE(DataField data_field, table_schema_->GetField(field_name));
int32_t field_id = data_field.Id();
// get or create global index readers for current field
std::vector<std::shared_ptr<GlobalIndexReader>> readers;
auto iter = index_readers_cache_.find(field_id);
if (iter != index_readers_cache_.end()) {
readers = iter->second;
} else {
PAIMON_ASSIGN_OR_RAISE(readers, create_index_readers_(field_id));
index_readers_cache_.insert({field_id, readers});
}
return readers;
}

Result<std::shared_ptr<GlobalIndexResult>> GlobalIndexEvaluatorImpl::EvaluatePredicate(
const std::shared_ptr<Predicate>& predicate) {
if (predicate == nullptr) {
return std::shared_ptr<GlobalIndexResult>(nullptr);
}

if (auto compound_predicate = std::dynamic_pointer_cast<CompoundPredicate>(predicate)) {
return EvaluateCompoundPredicate(compound_predicate);
} else if (auto leaf_predicate = std::dynamic_pointer_cast<LeafPredicate>(predicate)) {
const std::string& field_name = leaf_predicate->FieldName();
PAIMON_ASSIGN_OR_RAISE(std::vector<std::shared_ptr<GlobalIndexReader>> readers,
GetIndexReaders(field_name));
if (readers.empty()) {
// No usable index for this field within the requested range. Treat as "no
// pushdown available" so the upstream falls back to a full scan instead of
// wrongly producing an empty result.
return std::shared_ptr<GlobalIndexResult>(nullptr);
}
// calculate compound result as field may has multiple indexes
std::shared_ptr<GlobalIndexResult> compound_result;
for (const auto& index_reader : readers) {
PAIMON_ASSIGN_OR_RAISE(
std::shared_ptr<GlobalIndexResult> sub_result,
PredicateUtils::VisitPredicate<std::shared_ptr<GlobalIndexResult>>(leaf_predicate,
index_reader));
if (sub_result) {
if (!compound_result) {
compound_result = sub_result;
} else {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<GlobalIndexResult> and_result,
compound_result->And(sub_result));
compound_result = and_result;
}
}
if (compound_result) {
PAIMON_ASSIGN_OR_RAISE(bool is_empty, compound_result->IsEmpty());
if (is_empty) {
return compound_result;
}
}
}
return compound_result;
}
return Status::Invalid(fmt::format(
"cannot cast predicate {} to CompoundPredicate or LeafPredicate", predicate->ToString()));
}

Result<std::shared_ptr<GlobalIndexResult>> GlobalIndexEvaluatorImpl::EvaluateCompoundPredicate(
const std::shared_ptr<CompoundPredicate>& compound_predicate) {
if (compound_predicate->GetFunction().GetType() == Function::Type::OR) {
std::shared_ptr<GlobalIndexResult> compound_result;
for (const auto& child : compound_predicate->Children()) {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<GlobalIndexResult> sub_result,
EvaluatePredicate(child));
if (!sub_result) {
return std::shared_ptr<GlobalIndexResult>(nullptr);
}
if (!compound_result) {
compound_result = sub_result;
} else {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<GlobalIndexResult> or_result,
compound_result->Or(sub_result));
compound_result = or_result;
}
}
return compound_result;
} else if (compound_predicate->GetFunction().GetType() == Function::Type::AND) {
std::shared_ptr<GlobalIndexResult> compound_result;
for (const auto& child : compound_predicate->Children()) {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<GlobalIndexResult> sub_result,
EvaluatePredicate(child));
if (sub_result) {
if (!compound_result) {
compound_result = sub_result;
} else {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<GlobalIndexResult> and_result,
compound_result->And(sub_result));
compound_result = and_result;
}
}

if (compound_result) {
PAIMON_ASSIGN_OR_RAISE(bool is_empty, compound_result->IsEmpty());
if (is_empty) {
return compound_result;
}
}
}
return compound_result;
}
return Status::Invalid("CompoundPredicate only support And/Or function");
}

} // namespace paimon
67 changes: 67 additions & 0 deletions src/paimon/core/global_index/global_index_evaluator_impl.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once
#include <functional>
#include <map>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <vector>

#include "paimon/core/global_index/global_index_evaluator.h"
#include "paimon/core/schema/table_schema.h"
#include "paimon/global_index/global_index_reader.h"
#include "paimon/predicate/compound_predicate.h"

namespace paimon {
class GlobalIndexEvaluatorImpl : public GlobalIndexEvaluator {
public:
/// Creates the underlying readers for the given field. Returns an empty vector when the field
/// has no usable index.
using IndexReadersCreator =
std::function<Result<std::vector<std::shared_ptr<GlobalIndexReader>>>(int32_t)>;

GlobalIndexEvaluatorImpl(const std::shared_ptr<TableSchema>& table_schema,
IndexReadersCreator create_index_readers)
: table_schema_(table_schema), create_index_readers_(std::move(create_index_readers)) {}

Result<std::shared_ptr<GlobalIndexResult>> Evaluate(
const std::shared_ptr<Predicate>& predicate) override;

private:
Result<std::shared_ptr<GlobalIndexResult>> EvaluatePredicate(
const std::shared_ptr<Predicate>& predicate);

Result<std::shared_ptr<GlobalIndexResult>> EvaluateCompoundPredicate(
const std::shared_ptr<CompoundPredicate>& compound_predicate);

Result<std::vector<std::shared_ptr<GlobalIndexReader>>> GetIndexReaders(
const std::string& field_name);

private:
std::shared_ptr<TableSchema> table_schema_;
// create_index_readers_(field_id)
IndexReadersCreator create_index_readers_;
// [field_id, vector<reader>]
std::map<int32_t, std::vector<std::shared_ptr<GlobalIndexReader>>> index_readers_cache_;
};

} // namespace paimon
Loading