diff --git a/LICENSE b/LICENSE index 2665c1e..c4c88c9 100644 --- a/LICENSE +++ b/LICENSE @@ -401,6 +401,9 @@ This product includes code from Apache ORC. * Adapted read-ahead cache: * include/paimon/utils/read_ahead_cache.h (adapted from Cache.hh) * src/paimon/common/utils/read_ahead_cache.cpp (adapted from Cache.cc) +* Adapted byte range combiner: + * src/paimon/common/utils/byte_range_combiner.h (adapted from Cache.hh) + * src/paimon/common/utils/byte_range_combiner.cpp (adapted from Cache.cc) Copyright: 2013 and onwards The Apache Software Foundation. Home page: https://orc.apache.org/ diff --git a/src/paimon/common/global_index/wrap/file_index_reader_wrapper.h b/src/paimon/common/global_index/wrap/file_index_reader_wrapper.h new file mode 100644 index 0000000..d47a774 --- /dev/null +++ b/src/paimon/common/global_index/wrap/file_index_reader_wrapper.h @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once +#include +#include +#include +#include +#include + +#include "paimon/file_index/bitmap_index_result.h" +#include "paimon/file_index/file_index_reader.h" +#include "paimon/global_index/bitmap_global_index_result.h" +#include "paimon/global_index/global_index_reader.h" +#include "paimon/utils/roaring_bitmap64.h" + +namespace paimon { +/// A `GlobalIndexReader` wrapper for `FileIndexReader`. +class FileIndexReaderWrapper : public GlobalIndexReader { + public: + FileIndexReaderWrapper(const std::shared_ptr& reader, + const std::function>( + const std::shared_ptr&)>& transform) + : reader_(reader), transform_(transform) {} + + Result> VisitIsNotNull() override { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr file_index_result, + reader_->VisitIsNotNull()); + return transform_(file_index_result); + } + + Result> VisitIsNull() override { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr file_index_result, + reader_->VisitIsNull()); + return transform_(file_index_result); + } + + Result> VisitEqual(const Literal& literal) override { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr file_index_result, + reader_->VisitEqual(literal)); + return transform_(file_index_result); + } + + Result> VisitNotEqual(const Literal& literal) override { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr file_index_result, + reader_->VisitNotEqual(literal)); + return transform_(file_index_result); + } + + Result> VisitLessThan(const Literal& literal) override { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr file_index_result, + reader_->VisitLessThan(literal)); + return transform_(file_index_result); + } + + Result> VisitLessOrEqual(const Literal& literal) override { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr file_index_result, + reader_->VisitLessOrEqual(literal)); + return transform_(file_index_result); + } + + Result> VisitGreaterThan(const Literal& literal) override { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr file_index_result, + reader_->VisitGreaterThan(literal)); + return transform_(file_index_result); + } + + Result> VisitGreaterOrEqual( + const Literal& literal) override { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr file_index_result, + reader_->VisitGreaterOrEqual(literal)); + return transform_(file_index_result); + } + + Result> VisitIn( + const std::vector& literals) override { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr file_index_result, + reader_->VisitIn(literals)); + return transform_(file_index_result); + } + + Result> VisitNotIn( + const std::vector& literals) override { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr file_index_result, + reader_->VisitNotIn(literals)); + return transform_(file_index_result); + } + + Result> VisitStartsWith(const Literal& prefix) override { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr file_index_result, + reader_->VisitStartsWith(prefix)); + return transform_(file_index_result); + } + + Result> VisitEndsWith(const Literal& suffix) override { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr file_index_result, + reader_->VisitEndsWith(suffix)); + return transform_(file_index_result); + } + + Result> VisitContains(const Literal& literal) override { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr file_index_result, + reader_->VisitContains(literal)); + return transform_(file_index_result); + } + + Result> VisitLike(const Literal& literal) override { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr file_index_result, + reader_->VisitLike(literal)); + return transform_(file_index_result); + } + + Result> VisitVectorSearch( + const std::shared_ptr& vector_search) override { + return Status::Invalid( + "FileIndexReaderWrapper is not supposed to handle vector search query"); + } + + Result> VisitFullTextSearch( + const std::shared_ptr& full_text_search) override { + std::shared_ptr remain = FileIndexResult::Remain(); + return transform_(remain); + } + + /// Converts a `FileIndexResult` to a `GlobalIndexResult` by mapping 32-bit row IDs + /// to 64-bit global row IDs. + static Result> ToGlobalIndexResult( + const std::shared_ptr& result) { + if (auto remain = std::dynamic_pointer_cast(result)) { + return std::shared_ptr(); + } else if (auto skip = std::dynamic_pointer_cast(result)) { + return std::make_shared( + []() -> Result { return RoaringBitmap64(); }); + } else if (auto bitmap_result = std::dynamic_pointer_cast(result)) { + return std::make_shared( + [bitmap_result]() -> Result { + PAIMON_ASSIGN_OR_RAISE(const RoaringBitmap32* bitmap, + bitmap_result->GetBitmap()); + return RoaringBitmap64(*bitmap); + }); + } + return Status::Invalid( + "invalid FileIndexResult, supposed to be Remain or Skip or BitmapIndexResult"); + } + + private: + std::shared_ptr reader_; + std::function>( + const std::shared_ptr&)> + transform_; +}; + +} // namespace paimon diff --git a/src/paimon/common/global_index/wrap/file_index_reader_wrapper_test.cpp b/src/paimon/common/global_index/wrap/file_index_reader_wrapper_test.cpp new file mode 100644 index 0000000..9324605 --- /dev/null +++ b/src/paimon/common/global_index/wrap/file_index_reader_wrapper_test.cpp @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "paimon/common/global_index/wrap/file_index_reader_wrapper.h" + +#include +#include + +#include "gtest/gtest.h" +#include "paimon/file_index/bitmap_index_result.h" +#include "paimon/global_index/bitmap_global_index_result.h" +#include "paimon/global_index/global_index_result.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(FileIndexReaderWrapperTest, TestToGlobalIndexResult) { + auto check_result = [](const std::shared_ptr& result, + const std::vector& expected) { + auto typed_result = std::dynamic_pointer_cast(result); + ASSERT_TRUE(typed_result); + ASSERT_OK_AND_ASSIGN(const RoaringBitmap64* bitmap, typed_result->GetBitmap()); + ASSERT_TRUE(bitmap); + ASSERT_EQ(*(typed_result->GetBitmap().value()), RoaringBitmap64::From(expected)) + << "result=" << (typed_result->GetBitmap().value())->ToString() + << ", expected=" << RoaringBitmap64::From(expected).ToString(); + }; + + { + ASSERT_OK_AND_ASSIGN(auto global_result, FileIndexReaderWrapper::ToGlobalIndexResult( + FileIndexResult::Remain())); + ASSERT_FALSE(global_result); + } + { + ASSERT_OK_AND_ASSIGN(auto global_result, + FileIndexReaderWrapper::ToGlobalIndexResult(FileIndexResult::Skip())); + check_result(global_result, {}); + } + { + auto bitmap_supplier = []() -> Result { + return RoaringBitmap32::From({1, 4, 2147483647}); + }; + auto file_result = std::make_shared(bitmap_supplier); + ASSERT_OK_AND_ASSIGN(auto global_result, + FileIndexReaderWrapper::ToGlobalIndexResult(file_result)); + check_result(global_result, {1l, 4l, 2147483647l}); + } + { + class FakeFileIndexResult : public FileIndexResult { + Result IsRemain() const override { + return true; + } + std::string ToString() const override { + return "fake file index result"; + } + }; + auto file_result = std::make_shared(); + ASSERT_NOK_WITH_MSG( + FileIndexReaderWrapper::ToGlobalIndexResult(file_result), + "invalid FileIndexResult, supposed to be Remain or Skip or BitmapIndexResult"); + } +} + +} // namespace paimon::test diff --git a/src/paimon/common/global_index/wrap/file_index_writer_wrapper.h b/src/paimon/common/global_index/wrap/file_index_writer_wrapper.h new file mode 100644 index 0000000..90a7b84 --- /dev/null +++ b/src/paimon/common/global_index/wrap/file_index_writer_wrapper.h @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/c/abi.h" +#include "arrow/c/helpers.h" +#include "fmt/format.h" +#include "paimon/common/global_index/global_index_utils.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/file_index/file_index_writer.h" +#include "paimon/global_index/global_index_writer.h" +#include "paimon/global_index/io/global_index_file_writer.h" + +namespace paimon { +/// A `GlobalIndexWriter` wrapper for `FileIndexWriter`. +class FileIndexWriterWrapper : public GlobalIndexWriter { + public: + FileIndexWriterWrapper(const std::string& index_type, + const std::shared_ptr& file_manager, + const std::shared_ptr& writer) + : index_type_(index_type), file_manager_(file_manager), writer_(writer) {} + + Status AddBatch(::ArrowArray* c_arrow_array, std::vector&& relative_row_ids) override { + PAIMON_RETURN_NOT_OK( + GlobalIndexUtils::CheckRelativeRowIds(c_arrow_array, relative_row_ids, count_)); + auto length = c_arrow_array->length; + PAIMON_RETURN_NOT_OK(writer_->AddBatch(c_arrow_array)); + count_ += length; + return Status::OK(); + } + + Result> Finish() override { + if (count_ == 0) { + return std::vector(); + } + PAIMON_ASSIGN_OR_RAISE(std::string file_name, file_manager_->NewFileName(index_type_)); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr out, + file_manager_->NewOutputStream(file_name)); + PAIMON_ASSIGN_OR_RAISE(PAIMON_UNIQUE_PTR bytes, writer_->SerializedBytes()); + + uint64_t total_write_size = 0; + while (total_write_size < bytes->size()) { + uint64_t current_write_size = + std::min(bytes->size() - total_write_size, max_write_size_); + PAIMON_ASSIGN_OR_RAISE(int32_t actual_size, + out->Write(bytes->data() + total_write_size, + static_cast(current_write_size))); + if (static_cast(actual_size) != current_write_size) { + return Status::IOError( + fmt::format("expect write len {} mismatch actual write len {}", + current_write_size, actual_size)); + } + total_write_size += current_write_size; + } + PAIMON_RETURN_NOT_OK(out->Flush()); + PAIMON_RETURN_NOT_OK(out->Close()); + GlobalIndexIOMeta meta(file_manager_->ToPath(file_name), /*file_size=*/bytes->size(), + /*metadata=*/nullptr); + return std::vector({meta}); + } + + private: + static constexpr uint64_t kMaxWriteSize = std::numeric_limits::max(); + + std::string index_type_; + int64_t count_ = 0; + uint64_t max_write_size_ = kMaxWriteSize; + std::shared_ptr file_manager_; + std::shared_ptr writer_; +}; +} // namespace paimon diff --git a/src/paimon/common/sst/block_meta_test.cpp b/src/paimon/common/sst/block_meta_test.cpp new file mode 100644 index 0000000..b485255 --- /dev/null +++ b/src/paimon/common/sst/block_meta_test.cpp @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include + +#include "gtest/gtest.h" +#include "paimon/common/sst/block_handle.h" +#include "paimon/common/sst/block_trailer.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +class BlockMetaTest : public ::testing::Test { + protected: + void SetUp() override { + pool_ = GetDefaultPool(); + } + + std::shared_ptr pool_; +}; + +TEST_F(BlockMetaTest, TestBlockHandleSimple) { + int64_t offset = 1024; + int32_t size = 256; + BlockHandle handle(offset, size); + + // Test Offset and Size + ASSERT_EQ(handle.Offset(), offset); + ASSERT_EQ(handle.Size(), size); + + // Test GetFullBlockSize + ASSERT_EQ(handle.GetFullBlockSize(), size + BlockHandle::MAX_ENCODED_LENGTH); + + // Test ToString + ASSERT_EQ(handle.ToString(), "BlockHandle{offset=1024, size=256}"); + + // Test WriteBlockHandle and ReadBlockHandle (round-trip serialization) + ASSERT_OK_AND_ASSIGN(MemorySlice slice, handle.WriteBlockHandle(pool_.get())); + MemorySliceInput input(slice); + ASSERT_OK_AND_ASSIGN(BlockHandle restored, BlockHandle::ReadBlockHandle(&input)); + ASSERT_EQ(restored.Offset(), offset); + ASSERT_EQ(restored.Size(), size); + ASSERT_EQ(restored.GetFullBlockSize(), handle.GetFullBlockSize()); + ASSERT_EQ(restored.ToString(), handle.ToString()); + + // Test with zero values + BlockHandle zero_handle(0, 0); + ASSERT_EQ(zero_handle.Offset(), 0); + ASSERT_EQ(zero_handle.Size(), 0); + ASSERT_EQ(zero_handle.GetFullBlockSize(), BlockHandle::MAX_ENCODED_LENGTH); + ASSERT_EQ(zero_handle.ToString(), "BlockHandle{offset=0, size=0}"); + + // Test with large values + int64_t large_offset = 9999999999ll; + int32_t large_size = 2147483647; + BlockHandle large_handle(large_offset, large_size); + ASSERT_EQ(large_handle.Offset(), large_offset); + ASSERT_EQ(large_handle.Size(), large_size); + ASSERT_EQ(large_handle.ToString(), "BlockHandle{offset=9999999999, size=2147483647}"); + + // Round-trip for large values + ASSERT_OK_AND_ASSIGN(MemorySlice large_slice, large_handle.WriteBlockHandle(pool_.get())); + MemorySliceInput large_input(large_slice); + ASSERT_OK_AND_ASSIGN(BlockHandle large_restored, BlockHandle::ReadBlockHandle(&large_input)); + ASSERT_EQ(large_restored.Offset(), large_offset); + ASSERT_EQ(large_restored.Size(), large_size); +} + +TEST_F(BlockMetaTest, TestBlockTrailerSimple) { + int8_t compression_type = 2; + int32_t crc32c = 0x12345678; + BlockTrailer trailer(compression_type, crc32c); + + // Test CompressionType and Crc32c + ASSERT_EQ(trailer.CompressionType(), compression_type); + ASSERT_EQ(trailer.Crc32c(), crc32c); + + // Test ToString + std::string str = trailer.ToString(); + ASSERT_NE(str.find("compression_type=2"), std::string::npos); + ASSERT_NE(str.find("0x12345678"), std::string::npos); + + // Test ENCODED_LENGTH constant + ASSERT_EQ(BlockTrailer::ENCODED_LENGTH, 5); + + // Test WriteBlockTrailer and ReadBlockTrailer (round-trip serialization) + MemorySlice slice = trailer.WriteBlockTrailer(pool_.get()); + ASSERT_EQ(slice.Length(), BlockTrailer::ENCODED_LENGTH); + MemorySliceInput input(slice); + auto restored = BlockTrailer::ReadBlockTrailer(&input); + ASSERT_NE(restored, nullptr); + ASSERT_EQ(restored->CompressionType(), compression_type); + ASSERT_EQ(restored->Crc32c(), crc32c); + ASSERT_EQ(restored->ToString(), trailer.ToString()); + + // Test with zero values + BlockTrailer zero_trailer(0, 0); + ASSERT_EQ(zero_trailer.CompressionType(), 0); + ASSERT_EQ(zero_trailer.Crc32c(), 0); + + MemorySlice zero_slice = zero_trailer.WriteBlockTrailer(pool_.get()); + MemorySliceInput zero_input(zero_slice); + auto zero_restored = BlockTrailer::ReadBlockTrailer(&zero_input); + ASSERT_EQ(zero_restored->CompressionType(), 0); + ASSERT_EQ(zero_restored->Crc32c(), 0); + + // Test with negative crc32c (valid signed int32) + int32_t negative_crc = -1; + BlockTrailer neg_trailer(/*compression_type=*/1, negative_crc); + ASSERT_EQ(neg_trailer.Crc32c(), negative_crc); + ASSERT_EQ(neg_trailer.CompressionType(), 1); + + MemorySlice neg_slice = neg_trailer.WriteBlockTrailer(pool_.get()); + MemorySliceInput neg_input(neg_slice); + auto neg_restored = BlockTrailer::ReadBlockTrailer(&neg_input); + ASSERT_EQ(neg_restored->CompressionType(), 1); + ASSERT_EQ(neg_restored->Crc32c(), negative_crc); +} + +} // namespace paimon::test diff --git a/src/paimon/common/sst/block_write_read_test.cpp b/src/paimon/common/sst/block_write_read_test.cpp new file mode 100644 index 0000000..8126111 --- /dev/null +++ b/src/paimon/common/sst/block_write_read_test.cpp @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/common/sst/block_iterator.h" +#include "paimon/common/sst/block_reader.h" +#include "paimon/common/sst/block_writer.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +class BlockWriteReadTest : public ::testing::Test { + protected: + void SetUp() override { + pool_ = GetDefaultPool(); + comparator_ = [](const MemorySlice& a, const MemorySlice& b) -> Result { + std::string_view va = a.ReadStringView(); + std::string_view vb = b.ReadStringView(); + if (va < vb) { + return -1; + } + if (va > vb) { + return 1; + } + return 0; + }; + } + + std::shared_ptr MakeBytes(const std::string& str) const { + return std::make_shared(str, pool_.get()); + } + + /// Build a block with the given key-value pairs via BlockWriter. + MemorySlice BuildBlock(const std::vector>& pairs, + BlockWriter* writer) const { + for (const auto& [k, v] : pairs) { + auto key = MakeBytes(k); + auto value = MakeBytes(v); + EXPECT_OK(writer->Write(key, value)); + } + EXPECT_OK_AND_ASSIGN(auto result, writer->Finish()); + return result; + } + + std::shared_ptr pool_; + MemorySlice::SliceComparator comparator_; +}; + +TEST_F(BlockWriteReadTest, AlignedWriteAndRead) { + BlockWriter writer(1024, pool_); + ASSERT_EQ(writer.Size(), 0); + + auto key1 = MakeBytes("aaaa"); + auto val1 = MakeBytes("1111"); + ASSERT_OK(writer.Write(key1, val1)); + ASSERT_EQ(writer.Size(), 1); + ASSERT_GT(writer.Memory(), 0); + + auto key2 = MakeBytes("bbbb"); + auto val2 = MakeBytes("2222"); + auto key3 = MakeBytes("cccc"); + auto val3 = MakeBytes("3333"); + ASSERT_OK(writer.Write(key2, val2)); + ASSERT_OK(writer.Write(key3, val3)); + ASSERT_EQ(writer.Size(), 3); + + auto block = BuildBlock({}, &writer); + ASSERT_GT(block.Length(), 0); + // Last byte should be ALIGNED (0) because all kv pairs are same-size + ASSERT_EQ(block.ReadByte(block.Length() - 1), static_cast(BlockAlignedType::ALIGNED)); + + // Read back + ASSERT_OK_AND_ASSIGN(auto reader, BlockReader::Create(block, comparator_)); + ASSERT_EQ(reader->RecordCount(), 3); + ASSERT_NE(reader->Comparator(), nullptr); + + // SeekTo returns byte positions, should be increasing + ASSERT_EQ(reader->SeekTo(0), 0); + ASSERT_GT(reader->SeekTo(1), 0); + ASSERT_GT(reader->SeekTo(2), reader->SeekTo(1)); + + // BlockInput should be readable + ASSERT_TRUE(reader->BlockInput().IsReadable()); + + // Iterate and verify all entries + auto iter = reader->Iterator(); + ASSERT_TRUE(iter->HasNext()); + ASSERT_OK_AND_ASSIGN(auto entry1, iter->Next()); + ASSERT_EQ(entry1.key.ReadStringView(), "aaaa"); + ASSERT_EQ(entry1.value.ReadStringView(), "1111"); + + ASSERT_OK_AND_ASSIGN(auto entry2, iter->Next()); + ASSERT_EQ(entry2.key.ReadStringView(), "bbbb"); + ASSERT_EQ(entry2.value.ReadStringView(), "2222"); + + ASSERT_OK_AND_ASSIGN(auto entry3, iter->Next()); + ASSERT_EQ(entry3.key.ReadStringView(), "cccc"); + ASSERT_EQ(entry3.value.ReadStringView(), "3333"); + + ASSERT_FALSE(iter->HasNext()); +} + +TEST_F(BlockWriteReadTest, UnalignedWriteAndRead) { + BlockWriter writer(1024, pool_, /*aligned=*/false); + auto block = BuildBlock({{"a", "short"}, {"bb", "a_longer_value"}}, &writer); + ASSERT_GT(block.Length(), 0); + // Last byte should be UNALIGNED (1) + ASSERT_EQ(block.ReadByte(block.Length() - 1), static_cast(BlockAlignedType::UNALIGNED)); + + ASSERT_OK_AND_ASSIGN(auto reader, BlockReader::Create(block, comparator_)); + ASSERT_EQ(reader->RecordCount(), 2); + + auto iter = reader->Iterator(); + ASSERT_OK_AND_ASSIGN(auto entry1, iter->Next()); + ASSERT_EQ(entry1.key.ReadStringView(), "a"); + ASSERT_EQ(entry1.value.ReadStringView(), "short"); + + ASSERT_OK_AND_ASSIGN(auto entry2, iter->Next()); + ASSERT_EQ(entry2.key.ReadStringView(), "bb"); + ASSERT_EQ(entry2.value.ReadStringView(), "a_longer_value"); + + ASSERT_FALSE(iter->HasNext()); +} + +TEST_F(BlockWriteReadTest, EmptyBlockWriteAndRead) { + BlockWriter writer(1024, pool_); + + ASSERT_OK_AND_ASSIGN(auto block, writer.Finish()); + ASSERT_GT(block.Length(), 0); + // Empty block must be UNALIGNED (1) + ASSERT_EQ(block.ReadByte(block.Length() - 1), static_cast(BlockAlignedType::UNALIGNED)); + + // Reader should see 0 records + ASSERT_OK_AND_ASSIGN(auto reader, BlockReader::Create(block, comparator_)); + ASSERT_EQ(reader->RecordCount(), 0); + + auto iter = reader->Iterator(); + ASSERT_FALSE(iter->HasNext()); +} + +TEST_F(BlockWriteReadTest, ResetAndRewrite) { + BlockWriter writer(1024, pool_); + + auto key = MakeBytes("old_key"); + auto val = MakeBytes("old_val"); + ASSERT_OK(writer.Write(key, val)); + ASSERT_EQ(writer.Size(), 1); + + writer.Reset(); + ASSERT_EQ(writer.Size(), 0); + + // Write new data and read back + auto block = BuildBlock({{"new_key", "new_val"}}, &writer); + ASSERT_OK_AND_ASSIGN(auto reader, BlockReader::Create(block, comparator_)); + ASSERT_EQ(reader->RecordCount(), 1); + + auto iter = reader->Iterator(); + ASSERT_OK_AND_ASSIGN(auto entry, iter->Next()); + ASSERT_EQ(entry.key.ReadStringView(), "new_key"); + ASSERT_EQ(entry.value.ReadStringView(), "new_val"); +} + +TEST_F(BlockWriteReadTest, MemoryAlignedVsUnaligned) { + BlockWriter aligned_writer(1024, pool_); + auto k1 = MakeBytes("aaaa"); + auto v1 = MakeBytes("1111"); + auto k2 = MakeBytes("bbbb"); + auto v2 = MakeBytes("2222"); + ASSERT_OK(aligned_writer.Write(k1, v1)); + ASSERT_OK(aligned_writer.Write(k2, v2)); + + BlockWriter unaligned_writer(1024, pool_, /*aligned=*/false); + auto k3 = MakeBytes("aaaa"); + auto v3 = MakeBytes("1111"); + auto k4 = MakeBytes("bbbb"); + auto v4 = MakeBytes("2222"); + ASSERT_OK(unaligned_writer.Write(k3, v3)); + ASSERT_OK(unaligned_writer.Write(k4, v4)); + + // Unaligned memory should be larger due to position index overhead + ASSERT_GT(unaligned_writer.Memory(), aligned_writer.Memory()); +} + +TEST_F(BlockWriteReadTest, SkipKeyAndReadValue) { + BlockWriter writer(1024, pool_); + auto block = BuildBlock({{"key1", "val1"}, {"key2", "val2"}}, &writer); + + ASSERT_OK_AND_ASSIGN(auto reader, BlockReader::Create(block, comparator_)); + auto iter = reader->Iterator(); + + ASSERT_TRUE(iter->HasNext()); + ASSERT_OK_AND_ASSIGN(auto value1, iter->SkipKeyAndReadValue()); + ASSERT_EQ(value1.ReadStringView(), "val1"); + + ASSERT_TRUE(iter->HasNext()); + ASSERT_OK_AND_ASSIGN(auto value2, iter->SkipKeyAndReadValue()); + ASSERT_EQ(value2.ReadStringView(), "val2"); + + ASSERT_FALSE(iter->HasNext()); +} + +TEST_F(BlockWriteReadTest, IteratorSeekTo) { + BlockWriter writer(1024, pool_); + auto block = + BuildBlock({{"apple", "1"}, {"banana", "2"}, {"cherry", "3"}, {"date", "4"}}, &writer); + + ASSERT_OK_AND_ASSIGN(auto reader, BlockReader::Create(block, comparator_)); + + // Exact match + { + auto iter = reader->Iterator(); + auto target = MemorySlice::Wrap(MakeBytes("cherry")); + ASSERT_OK_AND_ASSIGN(bool found, iter->SeekTo(target)); + ASSERT_TRUE(found); + ASSERT_TRUE(iter->HasNext()); + ASSERT_OK_AND_ASSIGN(auto entry, iter->Next()); + ASSERT_EQ(entry.key.ReadStringView(), "cherry"); + ASSERT_EQ(entry.value.ReadStringView(), "3"); + } + + // No exact match: "blueberry" should land on "cherry" (first key >= target) + { + auto iter = reader->Iterator(); + auto target = MemorySlice::Wrap(MakeBytes("blueberry")); + ASSERT_OK_AND_ASSIGN(bool found, iter->SeekTo(target)); + ASSERT_FALSE(found); + ASSERT_TRUE(iter->HasNext()); + ASSERT_OK_AND_ASSIGN(auto entry, iter->Next()); + ASSERT_EQ(entry.key.ReadStringView(), "cherry"); + } +} + +} // namespace paimon::test diff --git a/src/paimon/common/utils/byte_range_combiner.cpp b/src/paimon/common/utils/byte_range_combiner.cpp new file mode 100644 index 0000000..4287700 --- /dev/null +++ b/src/paimon/common/utils/byte_range_combiner.cpp @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Adapted from Apache ORC +// https://github.com/apache/orc/blob/main/c%2B%2B/src/io/Cache.cc + +#include "paimon/common/utils/byte_range_combiner.h" + +#include +#include + +#include "fmt/format.h" + +namespace paimon { + +Result> ByteRangeCombiner::CoalesceByteRanges( + std::vector&& ranges, uint64_t hole_size_limit, uint64_t range_size_limit) { + if (range_size_limit <= hole_size_limit) { + return Status::Invalid( + fmt::format("range size limit {} should be larger than hole size limit {}", + range_size_limit, hole_size_limit)); + } + if (ranges.empty()) { + return ranges; + } + + std::vector adjusted_ranges; + for (const auto& range : ranges) { + uint64_t range_start = range.offset; + uint64_t range_end = range.offset + range.length; + + while (range_end - range_start > range_size_limit) { + adjusted_ranges.emplace_back(range_start, range_size_limit); + range_start += range_size_limit; + } + + if (range_end > range_start) { + adjusted_ranges.emplace_back(range_start, range_end - range_start); + } + } + ranges = std::move(adjusted_ranges); + + // Remove zero-sized ranges + auto end = std::remove_if(ranges.begin(), ranges.end(), + [](const ByteRange& range) { return range.length == 0; }); + // Sort in position order + std::sort(ranges.begin(), end, [](const ByteRange& a, const ByteRange& b) { + // Prefer longer ranges at same offset to simplify deduplication + return a.offset != b.offset ? a.offset < b.offset : a.length > b.length; + }); + + // Remove ranges that overlap 100% + std::vector unique_ranges; + unique_ranges.reserve(ranges.size()); + for (auto it = ranges.begin(); it != end; ++it) { + if (unique_ranges.empty() || !unique_ranges.back().Contains(*it)) { + unique_ranges.emplace_back(*it); + } + } + ranges = std::move(unique_ranges); + + // Skip further processing if ranges is empty after removing zero-sized ranges. + if (ranges.empty()) { + return ranges; + } + + for (size_t i = 0; i < ranges.size() - 1; ++i) { + const auto& left = ranges[i]; + const auto& right = ranges[i + 1]; + if (left.offset >= right.offset || left.Contains(right)) { + return Status::Invalid("Byte ranges must be non-overlapping and sorted."); + } + } + + std::vector coalesced; + auto iter = ranges.begin(); + + // Start of the current coalesced range and end (exclusive) of previous range. + // Both are initialized with the start of first range which is a placeholder value. + uint64_t coalesced_start = iter->offset; + uint64_t coalesced_end = coalesced_start + iter->length; + + for (++iter; iter < ranges.end(); ++iter) { + const uint64_t current_range_start = iter->offset; + const uint64_t current_range_end = current_range_start + iter->length; + + assert(coalesced_start < coalesced_end); + assert(current_range_start < current_range_end); + + // At this point, the coalesced range is [coalesced_start, prev_range_end). + // Stop coalescing if: + // - coalesced range is too large, or + // - distance (hole/gap) between consecutive ranges is too large. + if ((current_range_end - coalesced_start > range_size_limit) || + (current_range_start > coalesced_end + hole_size_limit)) { + coalesced.emplace_back(coalesced_start, coalesced_end - coalesced_start); + coalesced_start = current_range_start; + } + + // Update the prev_range_end with the current range. + coalesced_end = current_range_end; + } + coalesced.emplace_back(coalesced_start, coalesced_end - coalesced_start); + assert(coalesced.front().offset == ranges.front().offset); + assert(coalesced.back().offset + coalesced.back().length == + ranges.back().offset + ranges.back().length); + return coalesced; +} + +} // namespace paimon diff --git a/src/paimon/common/utils/byte_range_combiner.h b/src/paimon/common/utils/byte_range_combiner.h new file mode 100644 index 0000000..599ca59 --- /dev/null +++ b/src/paimon/common/utils/byte_range_combiner.h @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Adapted from Apache ORC +// https://github.com/apache/orc/blob/main/c%2B%2B/src/io/Cache.hh + +#pragma once + +#include + +#include "paimon/result.h" +#include "paimon/utils/read_ahead_cache.h" + +namespace paimon { + +struct ByteRangeCombiner { + static Result> CoalesceByteRanges(std::vector&& ranges, + uint64_t hole_size_limit, + uint64_t range_size_limit); +}; + +} // namespace paimon diff --git a/src/paimon/common/utils/byte_range_combiner_test.cpp b/src/paimon/common/utils/byte_range_combiner_test.cpp new file mode 100644 index 0000000..19d739a --- /dev/null +++ b/src/paimon/common/utils/byte_range_combiner_test.cpp @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Adapted from Apache ORC +// https://github.com/apache/orc/blob/main/c%2B%2B/test/TestCache.cc + +#include "paimon/common/utils/byte_range_combiner.h" + +#include "gtest/gtest.h" +#include "paimon/testing/utils/testharness.h" +#include "paimon/utils/read_ahead_cache.h" + +namespace paimon::test { + +TEST(ByteRangeCombinerTest, TestBasics) { + auto check = [](std::vector ranges, std::vector expected) -> void { + const uint64_t hole_size_limit = 9; + const uint64_t range_size_limit = 99; + auto actual = ranges; + ASSERT_OK_AND_ASSIGN(auto coalesced, + ByteRangeCombiner::CoalesceByteRanges( + std::move(actual), hole_size_limit, range_size_limit)); + ASSERT_EQ(coalesced, expected); + }; + + check({}, {}); + // Zero sized range that ends up in empty list + check({{110, 0}}, {}); + // Combination on 1 zero sized range and 1 non-zero sized range + check({{110, 10}, {120, 0}}, {{110, 10}}); + // 1 non-zero sized range + check({{110, 10}}, {{110, 10}}); + // No holes + unordered ranges + check({{130, 10}, {110, 10}, {120, 10}}, {{110, 30}}); + // No holes + check({{110, 10}, {120, 10}, {130, 10}}, {{110, 30}}); + // Small holes only + check({{110, 11}, {130, 11}, {150, 11}}, {{110, 51}}); + // Large holes + check({{110, 10}, {130, 10}}, {{110, 10}, {130, 10}}); + check({{110, 11}, {130, 11}, {150, 10}, {170, 11}, {190, 11}}, {{110, 50}, {170, 31}}); + + // With zero-sized ranges + check({{110, 11}, {130, 0}, {130, 11}, {145, 0}, {150, 11}, {200, 0}}, {{110, 51}}); + + // No holes but large ranges + check({{110, 100}, {210, 100}}, {{110, 99}, {209, 1}, {210, 99}, {309, 1}}); + // Small holes and large range in the middle (*) + check({{110, 10}, {120, 11}, {140, 100}, {240, 11}, {260, 11}}, + {{110, 21}, {140, 99}, {239, 32}}); + // Mid-size ranges that would turn large after coalescing + check({{100, 50}, {150, 50}}, {{100, 50}, {150, 50}}); + check({{100, 30}, {130, 30}, {160, 30}, {190, 30}, {220, 30}}, {{100, 90}, {190, 60}}); + + // Same as (*) but unsorted + check({{140, 100}, {120, 11}, {240, 11}, {110, 10}, {260, 11}}, + {{110, 21}, {140, 99}, {239, 32}}); + + // Completely overlapping ranges should be eliminated + check({{20, 5}, {20, 5}, {21, 2}}, {{20, 5}}); +} + +} // namespace paimon::test diff --git a/src/paimon/core/io/multiple_blob_file_writer.cpp b/src/paimon/core/io/multiple_blob_file_writer.cpp new file mode 100644 index 0000000..915c115 --- /dev/null +++ b/src/paimon/core/io/multiple_blob_file_writer.cpp @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/io/multiple_blob_file_writer.h" + +#include +#include +#include + +#include "arrow/array/array_nested.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/c/helpers.h" +#include "arrow/type.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/scope_guard.h" +#include "paimon/macros.h" + +namespace paimon { + +MultipleBlobFileWriter::MultipleBlobFileWriter(const std::shared_ptr& blob_schema, + BlobWriterCreator blob_writer_creator) + : blob_schema_(blob_schema), + blob_writer_creator_(std::move(blob_writer_creator)), + logger_(Logger::GetLogger("MultipleBlobFileWriter")) {} + +Status MultipleBlobFileWriter::Write(::ArrowArray* record) { + // Lazily initialize per-field blob writers on first write + if (blob_field_writers_.empty()) { + for (int32_t i = 0; i < blob_schema_->num_fields(); ++i) { + const std::string& field_name = blob_schema_->field(i)->name(); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr writer, + blob_writer_creator_(field_name)); + blob_field_writers_.push_back(BlobFieldWriter{field_name, i, std::move(writer)}); + } + } + + // Import the ArrowArray as a StructArray containing all blob fields + std::shared_ptr data_type = arrow::struct_(blob_schema_->fields()); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr arrow_array, + arrow::ImportArray(record, data_type)); + std::shared_ptr struct_array = + std::dynamic_pointer_cast(arrow_array); + if (!struct_array) { + return Status::Invalid("MultipleBlobFileWriter: input is not a StructArray"); + } + + // TODO(xinyu.lxy): support write parallel + // For each blob field, extract the column and write row by row to its dedicated writer + for (BlobFieldWriter& field_writer : blob_field_writers_) { + std::shared_ptr field_array = struct_array->field(field_writer.field_index); + // Create a single-field StructArray for each row and write to the blob writer + for (int64_t row = 0; row < field_array->length(); ++row) { + std::shared_ptr slice = field_array->Slice(row, 1); + // Wrap single field into a StructArray with the same field name + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + std::shared_ptr single_field_struct, + arrow::StructArray::Make({slice}, {field_writer.field_name})); + ::ArrowArray c_blob_array; + PAIMON_RETURN_NOT_OK_FROM_ARROW( + arrow::ExportArray(*single_field_struct, &c_blob_array)); + ScopeGuard guard([&c_blob_array]() { ArrowArrayRelease(&c_blob_array); }); + PAIMON_RETURN_NOT_OK(field_writer.writer->Write(&c_blob_array)); + } + } + + return Status::OK(); +} + +void MultipleBlobFileWriter::Abort() { + for (auto& field_writer : blob_field_writers_) { + if (field_writer.writer) { + field_writer.writer->Abort(); + } + } + blob_field_writers_.clear(); +} + +Status MultipleBlobFileWriter::Close() { + if (closed_) { + return Status::OK(); + } + for (auto& field_writer : blob_field_writers_) { + if (field_writer.writer) { + PAIMON_RETURN_NOT_OK(field_writer.writer->Close()); + } + } + closed_ = true; + return Status::OK(); +} + +Result>> MultipleBlobFileWriter::GetResult() { + std::vector> all_results; + for (BlobFieldWriter& field_writer : blob_field_writers_) { + if (field_writer.writer) { + PAIMON_ASSIGN_OR_RAISE(std::vector> results, + field_writer.writer->GetResult()); + all_results.insert(all_results.end(), results.begin(), results.end()); + } + } + blob_field_writers_.clear(); + return all_results; +} + +} // namespace paimon diff --git a/src/paimon/core/io/multiple_blob_file_writer.h b/src/paimon/core/io/multiple_blob_file_writer.h new file mode 100644 index 0000000..03ec183 --- /dev/null +++ b/src/paimon/core/io/multiple_blob_file_writer.h @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/io/rolling_file_writer.h" +#include "paimon/core/io/single_file_writer.h" +#include "paimon/logging.h" +#include "paimon/result.h" +#include "paimon/status.h" + +struct ArrowArray; + +namespace arrow { +class Schema; +class StructArray; +} // namespace arrow + +namespace paimon { + +/// A blob file writer that manages multiple blob fields, each written to separate rolling files. +/// +/// For each blob field in the schema, a dedicated RollingFileWriter is created. When writing a +/// row, the writer projects out each blob field and writes it to the corresponding blob file +/// independently. +/// +/// This design supports multiple blob columns in a single table, where each blob column produces +/// its own set of blob files that are rolled independently based on target file size. +class MultipleBlobFileWriter { + public: + using BlobRollingWriter = RollingFileWriter<::ArrowArray*, std::shared_ptr>; + using BlobWriterCreator = std::function>( + const std::string& blob_field_name)>; + + /// Constructs a MultipleBlobFileWriter. + /// @param blob_schema The schema containing only blob fields. + /// @param blob_writer_creator Factory function to create a RollingFileWriter for each blob + /// field. + MultipleBlobFileWriter(const std::shared_ptr& blob_schema, + BlobWriterCreator blob_writer_creator); + + ~MultipleBlobFileWriter() = default; + + /// Writes a batch of blob data. The input ArrowArray should contain all blob fields as a + /// StructArray. Each blob field is extracted and written to its dedicated rolling file writer + /// row by row. + Status Write(::ArrowArray* record); + + /// Aborts all blob writers and releases resources. + void Abort(); + + /// Closes all blob writers. + Status Close(); + + /// Returns the results (DataFileMeta) from all blob writers. + Result>> GetResult(); + + private: + /// Internal per-field blob writer. + struct BlobFieldWriter { + std::string field_name; + int32_t field_index; + std::unique_ptr writer; + }; + + std::shared_ptr blob_schema_; + BlobWriterCreator blob_writer_creator_; + std::vector blob_field_writers_; + bool closed_ = false; + + std::unique_ptr logger_; +}; + +} // namespace paimon diff --git a/src/paimon/core/operation/blob_file_context.cpp b/src/paimon/core/operation/blob_file_context.cpp new file mode 100644 index 0000000..de6cd85 --- /dev/null +++ b/src/paimon/core/operation/blob_file_context.cpp @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/operation/blob_file_context.h" + +#include + +#include "arrow/type.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/core/core_options.h" + +namespace paimon { + +BlobFileContext::BlobFileContext(std::set descriptor_fields, + std::set view_fields, + std::set inline_fields, + std::set external_storage_fields, + std::set blob_file_fields, + std::optional external_storage_path) + : descriptor_fields_(std::move(descriptor_fields)), + view_fields_(std::move(view_fields)), + inline_fields_(std::move(inline_fields)), + external_storage_fields_(std::move(external_storage_fields)), + blob_file_fields_(std::move(blob_file_fields)), + external_storage_path_(std::move(external_storage_path)) {} + +std::unique_ptr BlobFileContext::Create( + const std::shared_ptr& schema, const CoreOptions& options) { + // Check if there are any BLOB fields in the schema + bool has_blob = false; + for (int i = 0; i < schema->num_fields(); ++i) { + if (BlobUtils::IsBlobField(schema->field(i))) { + has_blob = true; + break; + } + } + if (!has_blob) { + return nullptr; + } + + // Populate descriptor fields + std::set descriptor_fields; + for (const auto& name : options.GetBlobDescriptorFields()) { + descriptor_fields.insert(name); + } + + // Populate view fields + std::set view_fields; + for (const auto& name : options.GetBlobViewFields()) { + view_fields.insert(name); + } + + // Populate inline fields from options (descriptor ∪ view) + std::set inline_fields; + for (const auto& name : options.GetBlobInlineFields()) { + inline_fields.insert(name); + } + + // Populate external storage fields + std::set external_storage_fields; + for (const auto& name : options.GetBlobExternalStorageFields()) { + external_storage_fields.insert(name); + } + + // Populate external storage path + std::optional external_storage_path = options.GetBlobExternalStoragePath(); + + // Determine blob_file_fields: BLOB fields that are NOT inline + std::set blob_file_fields; + for (int i = 0; i < schema->num_fields(); ++i) { + const auto& field = schema->field(i); + if (BlobUtils::IsBlobField(field) && inline_fields.count(field->name()) == 0) { + blob_file_fields.insert(field->name()); + } + } + + return std::unique_ptr( + new BlobFileContext(std::move(descriptor_fields), std::move(view_fields), + std::move(inline_fields), std::move(external_storage_fields), + std::move(blob_file_fields), std::move(external_storage_path))); +} + +bool BlobFileContext::IsInlineField(const std::string& field_name) const { + return inline_fields_.count(field_name) > 0; +} + +bool BlobFileContext::IsBlobFileField(const std::string& field_name) const { + return blob_file_fields_.count(field_name) > 0; +} + +bool BlobFileContext::IsDescriptorField(const std::string& field_name) const { + return descriptor_fields_.count(field_name) > 0; +} + +bool BlobFileContext::IsViewField(const std::string& field_name) const { + return view_fields_.count(field_name) > 0; +} + +bool BlobFileContext::IsExternalStorageField(const std::string& field_name) const { + return external_storage_fields_.count(field_name) > 0; +} + +bool BlobFileContext::RequireBlobFileWriter() const { + return !blob_file_fields_.empty(); +} + +bool BlobFileContext::RequireExternalStorageWriter() const { + return !external_storage_fields_.empty(); +} + +} // namespace paimon diff --git a/src/paimon/core/operation/blob_file_context.h b/src/paimon/core/operation/blob_file_context.h new file mode 100644 index 0000000..36f524b --- /dev/null +++ b/src/paimon/core/operation/blob_file_context.h @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include + +namespace arrow { +class Schema; +} // namespace arrow + +namespace paimon { + +class CoreOptions; + +/// Context that classifies BLOB fields into different storage categories. +/// +/// Categories: +/// - descriptor_fields: stored as BlobDescriptor bytes inline in the main data file. +/// - view_fields: stored as BlobViewStruct bytes inline in the main data file. +/// - inline_fields: descriptor_fields ∪ view_fields. These stay in the main data file. +/// - external_storage_fields: subset of descriptor_fields whose raw data is written to an +/// external storage path (the descriptor still goes into the main data file). +/// - blob_file_fields: BLOB fields that are NOT inline. These go into separate .blob files. +class BlobFileContext { + public: + /// Creates a BlobFileContext from schema and options. + /// Returns nullptr if the schema has no BLOB fields at all. + /// Otherwise always returns a valid context (even if all blobs are inline). + static std::unique_ptr Create(const std::shared_ptr& schema, + const CoreOptions& options); + + /// Returns true if the given field should be stored inline in the main data file + /// (either as descriptor bytes or view bytes). + bool IsInlineField(const std::string& field_name) const; + + /// Returns true if the given field should be written to a separate .blob file. + bool IsBlobFileField(const std::string& field_name) const; + + /// Returns true if the given field is a descriptor field. + bool IsDescriptorField(const std::string& field_name) const; + + /// Returns true if the given field is a view field. + bool IsViewField(const std::string& field_name) const; + + /// Returns true if the given field should be written to external storage. + bool IsExternalStorageField(const std::string& field_name) const; + + /// Returns true if there are any BLOB fields that need a .blob file writer. + bool RequireBlobFileWriter() const; + + /// Returns true if there are any external storage fields that need an external writer. + bool RequireExternalStorageWriter() const; + + const std::set& GetDescriptorFields() const { + return descriptor_fields_; + } + + const std::set& GetViewFields() const { + return view_fields_; + } + + const std::set& GetInlineFields() const { + return inline_fields_; + } + + const std::set& GetExternalStorageFields() const { + return external_storage_fields_; + } + + const std::set& GetBlobFileFields() const { + return blob_file_fields_; + } + + const std::optional& GetExternalStoragePath() const { + return external_storage_path_; + } + + private: + BlobFileContext(std::set descriptor_fields, std::set view_fields, + std::set inline_fields, + std::set external_storage_fields, + std::set blob_file_fields, + std::optional external_storage_path); + + std::set descriptor_fields_; + std::set view_fields_; + std::set inline_fields_; + std::set external_storage_fields_; + std::set blob_file_fields_; + std::optional external_storage_path_; +}; + +} // namespace paimon diff --git a/src/paimon/core/operation/blob_file_context_test.cpp b/src/paimon/core/operation/blob_file_context_test.cpp new file mode 100644 index 0000000..a5d9f75 --- /dev/null +++ b/src/paimon/core/operation/blob_file_context_test.cpp @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/operation/blob_file_context.h" + +#include +#include +#include +#include +#include + +#include "arrow/type.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/core/core_options.h" +#include "paimon/data/blob.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon { + +class BlobFileContextTest : public ::testing::Test { + protected: + static std::shared_ptr MakeSchema(const std::vector& normal_fields, + const std::vector& blob_fields) { + std::vector> fields; + for (const auto& name : normal_fields) { + fields.push_back(arrow::field(name, arrow::int32())); + } + for (const auto& name : blob_fields) { + fields.push_back(BlobUtils::ToArrowField(name)); + } + return arrow::schema(fields); + } +}; + +TEST_F(BlobFileContextTest, NoBlobFields) { + auto schema = MakeSchema({"id", "name"}, {}); + std::map opts_map; + ASSERT_OK_AND_ASSIGN(auto options, CoreOptions::FromMap(opts_map)); + auto context = BlobFileContext::Create(schema, options); + ASSERT_FALSE(context); +} + +TEST_F(BlobFileContextTest, AllInlineNoExternalStorage) { + auto schema = MakeSchema({"id"}, {"image", "video"}); + std::map opts_map = { + {Options::BLOB_DESCRIPTOR_FIELD, "image"}, + {Options::BLOB_VIEW_FIELD, "video"}, + }; + ASSERT_OK_AND_ASSIGN(auto options, CoreOptions::FromMap(opts_map)); + auto context = BlobFileContext::Create(schema, options); + // All blobs are inline but context is still valid for callers to query inline fields + ASSERT_TRUE(context); + ASSERT_EQ(context->GetInlineFields(), std::set({"image", "video"})); + ASSERT_TRUE(context->GetBlobFileFields().empty()); + ASSERT_FALSE(context->RequireBlobFileWriter()); + ASSERT_FALSE(context->RequireExternalStorageWriter()); +} + +TEST_F(BlobFileContextTest, MixedInlineAndBlobFile) { + auto schema = MakeSchema({"id"}, {"image", "video", "audio"}); + std::map opts_map = { + {Options::BLOB_DESCRIPTOR_FIELD, "image"}, + // video and audio are not configured as inline -> go to .blob files + }; + ASSERT_OK_AND_ASSIGN(auto options, CoreOptions::FromMap(opts_map)); + auto context = BlobFileContext::Create(schema, options); + ASSERT_TRUE(context); + + // descriptor fields + ASSERT_EQ(context->GetDescriptorFields(), std::set({"image"})); + + // view fields (none configured) + ASSERT_TRUE(context->GetViewFields().empty()); + + // inline = descriptor ∪ view + ASSERT_EQ(context->GetInlineFields(), std::set({"image"})); + + // blob file fields = non-inline blob fields + ASSERT_EQ(context->GetBlobFileFields(), std::set({"video", "audio"})); + + // Query methods + ASSERT_TRUE(context->IsInlineField("image")); + ASSERT_TRUE(context->IsDescriptorField("image")); + ASSERT_FALSE(context->IsViewField("image")); + ASSERT_FALSE(context->IsBlobFileField("image")); + + ASSERT_FALSE(context->IsInlineField("video")); + ASSERT_TRUE(context->IsBlobFileField("video")); + + ASSERT_FALSE(context->IsInlineField("audio")); + ASSERT_TRUE(context->IsBlobFileField("audio")); + + // Requires blob file writer for video and audio + ASSERT_TRUE(context->RequireBlobFileWriter()); + ASSERT_FALSE(context->RequireExternalStorageWriter()); +} + +TEST_F(BlobFileContextTest, ExternalStorageFields) { + auto schema = MakeSchema({"id"}, {"image", "video"}); + std::map opts_map = { + {Options::BLOB_DESCRIPTOR_FIELD, "image,video"}, + {Options::BLOB_EXTERNAL_STORAGE_FIELD, "image"}, + {Options::BLOB_EXTERNAL_STORAGE_PATH, "oss://bucket/blob/"}, + }; + ASSERT_OK_AND_ASSIGN(auto options, CoreOptions::FromMap(opts_map)); + auto context = BlobFileContext::Create(schema, options); + ASSERT_TRUE(context); + + ASSERT_EQ(context->GetDescriptorFields(), std::set({"image", "video"})); + ASSERT_EQ(context->GetInlineFields(), std::set({"image", "video"})); + ASSERT_EQ(context->GetExternalStorageFields(), std::set({"image"})); + ASSERT_TRUE(context->GetExternalStoragePath()); + ASSERT_EQ(context->GetExternalStoragePath(), "oss://bucket/blob/"); + ASSERT_TRUE(context->GetBlobFileFields().empty()); + + ASSERT_TRUE(context->IsExternalStorageField("image")); + ASSERT_FALSE(context->IsExternalStorageField("video")); + + ASSERT_FALSE(context->RequireBlobFileWriter()); + ASSERT_TRUE(context->RequireExternalStorageWriter()); +} + +TEST_F(BlobFileContextTest, ViewFields) { + auto schema = MakeSchema({"id"}, {"ref_image", "raw_blob"}); + std::map opts_map = { + {Options::BLOB_VIEW_FIELD, "ref_image"}, + // raw_blob not configured -> goes to .blob file + }; + ASSERT_OK_AND_ASSIGN(auto options, CoreOptions::FromMap(opts_map)); + auto context = BlobFileContext::Create(schema, options); + ASSERT_TRUE(context); + + ASSERT_TRUE(context->GetDescriptorFields().empty()); + ASSERT_EQ(context->GetViewFields(), std::set({"ref_image"})); + ASSERT_EQ(context->GetInlineFields(), std::set({"ref_image"})); + ASSERT_EQ(context->GetBlobFileFields(), std::set({"raw_blob"})); + + ASSERT_TRUE(context->IsInlineField("ref_image")); + ASSERT_TRUE(context->IsViewField("ref_image")); + ASSERT_FALSE(context->IsDescriptorField("ref_image")); + + ASSERT_TRUE(context->RequireBlobFileWriter()); + ASSERT_FALSE(context->RequireExternalStorageWriter()); +} + +TEST_F(BlobFileContextTest, DescriptorAndViewTogether) { + auto schema = MakeSchema({"id"}, {"desc_blob", "view_blob", "normal_blob"}); + std::map opts_map = { + {Options::BLOB_DESCRIPTOR_FIELD, "desc_blob"}, + {Options::BLOB_VIEW_FIELD, "view_blob"}, + {Options::BLOB_EXTERNAL_STORAGE_FIELD, "desc_blob"}, + {Options::BLOB_EXTERNAL_STORAGE_PATH, "/tmp/ext/"}, + }; + ASSERT_OK_AND_ASSIGN(auto options, CoreOptions::FromMap(opts_map)); + auto context = BlobFileContext::Create(schema, options); + ASSERT_TRUE(context); + + ASSERT_EQ(context->GetDescriptorFields(), std::set({"desc_blob"})); + ASSERT_EQ(context->GetViewFields(), std::set({"view_blob"})); + ASSERT_EQ(context->GetInlineFields(), std::set({"desc_blob", "view_blob"})); + ASSERT_EQ(context->GetExternalStorageFields(), std::set({"desc_blob"})); + ASSERT_TRUE(context->GetExternalStoragePath()); + ASSERT_EQ(context->GetExternalStoragePath(), "/tmp/ext/"); + ASSERT_EQ(context->GetBlobFileFields(), std::set({"normal_blob"})); + + ASSERT_TRUE(context->IsDescriptorField("desc_blob")); + ASSERT_TRUE(context->IsExternalStorageField("desc_blob")); + ASSERT_TRUE(context->IsInlineField("desc_blob")); + ASSERT_FALSE(context->IsBlobFileField("desc_blob")); + + ASSERT_TRUE(context->IsViewField("view_blob")); + ASSERT_TRUE(context->IsInlineField("view_blob")); + ASSERT_FALSE(context->IsDescriptorField("view_blob")); + + ASSERT_FALSE(context->IsInlineField("normal_blob")); + ASSERT_TRUE(context->IsBlobFileField("normal_blob")); + + // Non-existent field + ASSERT_FALSE(context->IsInlineField("not_exist")); + ASSERT_FALSE(context->IsBlobFileField("not_exist")); + + ASSERT_TRUE(context->RequireBlobFileWriter()); + ASSERT_TRUE(context->RequireExternalStorageWriter()); +} + +} // namespace paimon