Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,9 @@ This product includes code from Apache ORC.
* Adapted read-ahead cache:
* include/paimon/utils/read_ahead_cache.h (adapted from Cache.hh)
* src/paimon/common/utils/read_ahead_cache.cpp (adapted from Cache.cc)
* Adapted byte range combiner:
* src/paimon/common/utils/byte_range_combiner.h (adapted from Cache.hh)
* src/paimon/common/utils/byte_range_combiner.cpp (adapted from Cache.cc)

Copyright: 2013 and onwards The Apache Software Foundation.
Home page: https://orc.apache.org/
Expand Down
169 changes: 169 additions & 0 deletions src/paimon/common/global_index/wrap/file_index_reader_wrapper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once
#include <algorithm>
#include <functional>
#include <memory>
#include <string>
#include <vector>

#include "paimon/file_index/bitmap_index_result.h"
#include "paimon/file_index/file_index_reader.h"
#include "paimon/global_index/bitmap_global_index_result.h"
#include "paimon/global_index/global_index_reader.h"
#include "paimon/utils/roaring_bitmap64.h"

namespace paimon {
/// A `GlobalIndexReader` wrapper for `FileIndexReader`.
class FileIndexReaderWrapper : public GlobalIndexReader {
public:
FileIndexReaderWrapper(const std::shared_ptr<FileIndexReader>& reader,
const std::function<Result<std::shared_ptr<GlobalIndexResult>>(
const std::shared_ptr<FileIndexResult>&)>& transform)
: reader_(reader), transform_(transform) {}

Result<std::shared_ptr<GlobalIndexResult>> VisitIsNotNull() override {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileIndexResult> file_index_result,
reader_->VisitIsNotNull());
return transform_(file_index_result);
}

Result<std::shared_ptr<GlobalIndexResult>> VisitIsNull() override {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileIndexResult> file_index_result,
reader_->VisitIsNull());
return transform_(file_index_result);
}

Result<std::shared_ptr<GlobalIndexResult>> VisitEqual(const Literal& literal) override {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileIndexResult> file_index_result,
reader_->VisitEqual(literal));
return transform_(file_index_result);
}

Result<std::shared_ptr<GlobalIndexResult>> VisitNotEqual(const Literal& literal) override {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileIndexResult> file_index_result,
reader_->VisitNotEqual(literal));
return transform_(file_index_result);
}

Result<std::shared_ptr<GlobalIndexResult>> VisitLessThan(const Literal& literal) override {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileIndexResult> file_index_result,
reader_->VisitLessThan(literal));
return transform_(file_index_result);
}

Result<std::shared_ptr<GlobalIndexResult>> VisitLessOrEqual(const Literal& literal) override {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileIndexResult> file_index_result,
reader_->VisitLessOrEqual(literal));
return transform_(file_index_result);
}

Result<std::shared_ptr<GlobalIndexResult>> VisitGreaterThan(const Literal& literal) override {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileIndexResult> file_index_result,
reader_->VisitGreaterThan(literal));
return transform_(file_index_result);
}

Result<std::shared_ptr<GlobalIndexResult>> VisitGreaterOrEqual(
const Literal& literal) override {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileIndexResult> file_index_result,
reader_->VisitGreaterOrEqual(literal));
return transform_(file_index_result);
}

Result<std::shared_ptr<GlobalIndexResult>> VisitIn(
const std::vector<Literal>& literals) override {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileIndexResult> file_index_result,
reader_->VisitIn(literals));
return transform_(file_index_result);
}

Result<std::shared_ptr<GlobalIndexResult>> VisitNotIn(
const std::vector<Literal>& literals) override {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileIndexResult> file_index_result,
reader_->VisitNotIn(literals));
return transform_(file_index_result);
}

Result<std::shared_ptr<GlobalIndexResult>> VisitStartsWith(const Literal& prefix) override {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileIndexResult> file_index_result,
reader_->VisitStartsWith(prefix));
return transform_(file_index_result);
}

Result<std::shared_ptr<GlobalIndexResult>> VisitEndsWith(const Literal& suffix) override {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileIndexResult> file_index_result,
reader_->VisitEndsWith(suffix));
return transform_(file_index_result);
}

Result<std::shared_ptr<GlobalIndexResult>> VisitContains(const Literal& literal) override {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileIndexResult> file_index_result,
reader_->VisitContains(literal));
return transform_(file_index_result);
}

Result<std::shared_ptr<GlobalIndexResult>> VisitLike(const Literal& literal) override {
PAIMON_ASSIGN_OR_RAISE(std::shared_ptr<FileIndexResult> file_index_result,
reader_->VisitLike(literal));
return transform_(file_index_result);
}

Result<std::shared_ptr<ScoredGlobalIndexResult>> VisitVectorSearch(
const std::shared_ptr<VectorSearch>& vector_search) override {
return Status::Invalid(
"FileIndexReaderWrapper is not supposed to handle vector search query");
}

Result<std::shared_ptr<GlobalIndexResult>> VisitFullTextSearch(
const std::shared_ptr<FullTextSearch>& full_text_search) override {
std::shared_ptr<FileIndexResult> remain = FileIndexResult::Remain();
return transform_(remain);
}

/// Converts a `FileIndexResult` to a `GlobalIndexResult` by mapping 32-bit row IDs
/// to 64-bit global row IDs.
static Result<std::shared_ptr<GlobalIndexResult>> ToGlobalIndexResult(
const std::shared_ptr<FileIndexResult>& result) {
if (auto remain = std::dynamic_pointer_cast<Remain>(result)) {
return std::shared_ptr<GlobalIndexResult>();
} else if (auto skip = std::dynamic_pointer_cast<Skip>(result)) {
return std::make_shared<BitmapGlobalIndexResult>(
[]() -> Result<RoaringBitmap64> { return RoaringBitmap64(); });
} else if (auto bitmap_result = std::dynamic_pointer_cast<BitmapIndexResult>(result)) {
return std::make_shared<BitmapGlobalIndexResult>(
[bitmap_result]() -> Result<RoaringBitmap64> {
PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap32* bitmap,
bitmap_result->GetBitmap());
return RoaringBitmap64(*bitmap);
});
}
return Status::Invalid(
"invalid FileIndexResult, supposed to be Remain or Skip or BitmapIndexResult");
}

private:
std::shared_ptr<FileIndexReader> reader_;
std::function<Result<std::shared_ptr<GlobalIndexResult>>(
const std::shared_ptr<FileIndexResult>&)>
transform_;
};

} // namespace paimon
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "paimon/common/global_index/wrap/file_index_reader_wrapper.h"

#include <memory>
#include <vector>

#include "gtest/gtest.h"
#include "paimon/file_index/bitmap_index_result.h"
#include "paimon/global_index/bitmap_global_index_result.h"
#include "paimon/global_index/global_index_result.h"
#include "paimon/testing/utils/testharness.h"

namespace paimon::test {

TEST(FileIndexReaderWrapperTest, TestToGlobalIndexResult) {
auto check_result = [](const std::shared_ptr<GlobalIndexResult>& result,
const std::vector<int64_t>& expected) {
auto typed_result = std::dynamic_pointer_cast<BitmapGlobalIndexResult>(result);
ASSERT_TRUE(typed_result);
ASSERT_OK_AND_ASSIGN(const RoaringBitmap64* bitmap, typed_result->GetBitmap());
ASSERT_TRUE(bitmap);
ASSERT_EQ(*(typed_result->GetBitmap().value()), RoaringBitmap64::From(expected))
<< "result=" << (typed_result->GetBitmap().value())->ToString()
<< ", expected=" << RoaringBitmap64::From(expected).ToString();
};

{
ASSERT_OK_AND_ASSIGN(auto global_result, FileIndexReaderWrapper::ToGlobalIndexResult(
FileIndexResult::Remain()));
ASSERT_FALSE(global_result);
}
{
ASSERT_OK_AND_ASSIGN(auto global_result,
FileIndexReaderWrapper::ToGlobalIndexResult(FileIndexResult::Skip()));
check_result(global_result, {});
}
{
auto bitmap_supplier = []() -> Result<RoaringBitmap32> {
return RoaringBitmap32::From({1, 4, 2147483647});
};
auto file_result = std::make_shared<BitmapIndexResult>(bitmap_supplier);
ASSERT_OK_AND_ASSIGN(auto global_result,
FileIndexReaderWrapper::ToGlobalIndexResult(file_result));
check_result(global_result, {1l, 4l, 2147483647l});
}
{
class FakeFileIndexResult : public FileIndexResult {
Result<bool> IsRemain() const override {
return true;
}
std::string ToString() const override {
return "fake file index result";
}
};
auto file_result = std::make_shared<FakeFileIndexResult>();
ASSERT_NOK_WITH_MSG(
FileIndexReaderWrapper::ToGlobalIndexResult(file_result),
"invalid FileIndexResult, supposed to be Remain or Skip or BitmapIndexResult");
}
}

} // namespace paimon::test
96 changes: 96 additions & 0 deletions src/paimon/common/global_index/wrap/file_index_writer_wrapper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#pragma once

#include <algorithm>
#include <cassert>
#include <limits>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "arrow/c/abi.h"
#include "arrow/c/helpers.h"
#include "fmt/format.h"
#include "paimon/common/global_index/global_index_utils.h"
#include "paimon/common/utils/arrow/status_utils.h"
#include "paimon/file_index/file_index_writer.h"
#include "paimon/global_index/global_index_writer.h"
#include "paimon/global_index/io/global_index_file_writer.h"

namespace paimon {
/// A `GlobalIndexWriter` wrapper for `FileIndexWriter`.
class FileIndexWriterWrapper : public GlobalIndexWriter {
public:
FileIndexWriterWrapper(const std::string& index_type,
const std::shared_ptr<GlobalIndexFileWriter>& file_manager,
const std::shared_ptr<FileIndexWriter>& writer)
: index_type_(index_type), file_manager_(file_manager), writer_(writer) {}

Status AddBatch(::ArrowArray* c_arrow_array, std::vector<int64_t>&& relative_row_ids) override {
PAIMON_RETURN_NOT_OK(
GlobalIndexUtils::CheckRelativeRowIds(c_arrow_array, relative_row_ids, count_));
auto length = c_arrow_array->length;
PAIMON_RETURN_NOT_OK(writer_->AddBatch(c_arrow_array));
count_ += length;
return Status::OK();
}

Result<std::vector<GlobalIndexIOMeta>> Finish() override {
if (count_ == 0) {
return std::vector<GlobalIndexIOMeta>();
}
PAIMON_ASSIGN_OR_RAISE(std::string file_name, file_manager_->NewFileName(index_type_));
PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<OutputStream> out,
file_manager_->NewOutputStream(file_name));
PAIMON_ASSIGN_OR_RAISE(PAIMON_UNIQUE_PTR<Bytes> bytes, writer_->SerializedBytes());

uint64_t total_write_size = 0;
while (total_write_size < bytes->size()) {
uint64_t current_write_size =
std::min(bytes->size() - total_write_size, max_write_size_);
PAIMON_ASSIGN_OR_RAISE(int32_t actual_size,
out->Write(bytes->data() + total_write_size,
static_cast<uint32_t>(current_write_size)));
if (static_cast<uint64_t>(actual_size) != current_write_size) {
return Status::IOError(
fmt::format("expect write len {} mismatch actual write len {}",
current_write_size, actual_size));
}
total_write_size += current_write_size;
}
PAIMON_RETURN_NOT_OK(out->Flush());
PAIMON_RETURN_NOT_OK(out->Close());
GlobalIndexIOMeta meta(file_manager_->ToPath(file_name), /*file_size=*/bytes->size(),
/*metadata=*/nullptr);
return std::vector<GlobalIndexIOMeta>({meta});
}

private:
static constexpr uint64_t kMaxWriteSize = std::numeric_limits<int32_t>::max();

std::string index_type_;
int64_t count_ = 0;
uint64_t max_write_size_ = kMaxWriteSize;
std::shared_ptr<GlobalIndexFileWriter> file_manager_;
std::shared_ptr<FileIndexWriter> writer_;
};
} // namespace paimon
Loading