diff --git a/include/paimon/commit_message.h b/include/paimon/commit_message.h new file mode 100644 index 0000000..3d8757a --- /dev/null +++ b/include/paimon/commit_message.h @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "paimon/result.h" +#include "paimon/type_fwd.h" +#include "paimon/visibility.h" + +namespace paimon { +class CommitMessageSerializer; +class MemoryPool; + +/// Commit message for partition and bucket. Support Serialize and Deserialize, compatible with java +/// version. +// TODO(yonghao.fyh): to add some statistics of write (e.g., write bytes) +class PAIMON_EXPORT CommitMessage { + public: + /// Gets the version with which this serializer serializes. + static int32_t CurrentVersion(); + + virtual ~CommitMessage(); + + /// Serializes a single commit message to a binary string format. + /// The serialized format is compatible with the Java version of Paimon. + /// @param commit_message The commit message to serialize. + /// @param pool Memory pool for memory allocation during serialization. + /// @return Result containing the serialized string data, or an error if serialization fails. + static Result Serialize(const std::shared_ptr& commit_message, + const std::shared_ptr& pool); + + /// Serializes a list of commit messages to a binary string format. + /// @param commit_messages Vector of commit messages to serialize. + /// @param pool Memory pool for memory allocation during serialization. + /// @return Result containing the serialized string data, or an error if serialization fails. + static Result SerializeList( + const std::vector>& commit_messages, + const std::shared_ptr& pool); + + /// Deserializes a single commit message from binary data. + /// @param version The serialization format version used when the data was serialized. + /// @param buffer Pointer to the binary data buffer. + /// @param length Length of the binary data in bytes. + /// @param pool Memory pool for memory allocation during deserialization. + /// @return Result containing the deserialized CommitMessage, or an error if deserialization + /// fails. + static Result> Deserialize( + int32_t version, const char* buffer, int32_t length, + const std::shared_ptr& pool); + + /// Deserializes a list of commit messages from binary data. + /// This is the counterpart to SerializeList() for batch processing. + /// @param version The serialization format version used when the data was serialized. + /// @param buffer Pointer to the binary data buffer. + /// @param length Length of the binary data in bytes. + /// @param pool Memory pool for memory allocation during deserialization. + /// @return Result containing a vector of deserialized CommitMessages, or an error if + /// deserialization fails. + static Result>> DeserializeList( + int32_t version, const char* buffer, int32_t length, + const std::shared_ptr& pool); + + /// Converts a commit message to a human-readable debug string. + /// This is useful for logging, debugging, and troubleshooting purposes. + /// @param commit_message The commit message to convert to string. + /// @return Result containing the debug string representation, or an error if conversion fails. + static Result ToDebugString(const std::shared_ptr& commit_message); +}; + +} // namespace paimon diff --git a/src/paimon/core/table/sink/commit_message.cpp b/src/paimon/core/table/sink/commit_message.cpp new file mode 100644 index 0000000..6b3ce6a --- /dev/null +++ b/src/paimon/core/table/sink/commit_message.cpp @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/commit_message.h" + +#include + +#include "paimon/common/io/memory_segment_output_stream.h" +#include "paimon/common/memory/memory_segment_utils.h" +#include "paimon/core/table/sink/commit_message_impl.h" +#include "paimon/core/table/sink/commit_message_serializer.h" +#include "paimon/io/byte_array_input_stream.h" +#include "paimon/io/data_input_stream.h" +#include "paimon/memory/bytes.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace paimon { + +int32_t CommitMessage::CurrentVersion() { + return CommitMessageSerializer::CURRENT_VERSION; +} + +CommitMessage::~CommitMessage() = default; + +Result CommitMessage::Serialize(const std::shared_ptr& commit_message, + const std::shared_ptr& pool) { + CommitMessageSerializer serializer(pool); + MemorySegmentOutputStream out(MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool); + PAIMON_RETURN_NOT_OK(serializer.Serialize(commit_message, &out)); + PAIMON_UNIQUE_PTR bytes = + MemorySegmentUtils::CopyToBytes(out.Segments(), 0, out.CurrentSize(), pool.get()); + return std::string(bytes->data(), bytes->size()); +} + +Result CommitMessage::SerializeList( + const std::vector>& commit_messages, + const std::shared_ptr& pool) { + CommitMessageSerializer serializer(pool); + MemorySegmentOutputStream out(MemorySegmentOutputStream::DEFAULT_SEGMENT_SIZE, pool); + PAIMON_RETURN_NOT_OK(serializer.SerializeList(commit_messages, &out)); + PAIMON_UNIQUE_PTR bytes = + MemorySegmentUtils::CopyToBytes(out.Segments(), 0, out.CurrentSize(), pool.get()); + return std::string(bytes->data(), bytes->size()); +} + +Result> CommitMessage::Deserialize( + int32_t version, const char* buffer, int32_t length, const std::shared_ptr& pool) { + if (buffer == nullptr) { + return Status::Invalid("buffer is null pointer"); + } + if (length <= 0) { + return Status::Invalid("length is equal or less than zero"); + } + CommitMessageSerializer serializer(pool); + auto input_stream = std::make_shared(buffer, length); + DataInputStream in(input_stream); + return serializer.Deserialize(version, &in); +} + +Result>> CommitMessage::DeserializeList( + int32_t version, const char* buffer, int32_t length, const std::shared_ptr& pool) { + if (buffer == nullptr) { + return Status::Invalid("buffer is null pointer"); + } + if (length <= 0) { + return Status::Invalid("length is equal or less than zero"); + } + CommitMessageSerializer serializer(pool); + auto input_stream = std::make_shared(buffer, length); + DataInputStream in(input_stream); + return serializer.DeserializeList(version, &in); +} + +Result CommitMessage::ToDebugString( + const std::shared_ptr& commit_message) { + auto message = std::dynamic_pointer_cast(commit_message); + if (message == nullptr) { + return Status::Invalid("failed to cast commit message to commit message impl"); + } + return message->ToString(); +} + +} // namespace paimon diff --git a/src/paimon/core/table/sink/commit_message_impl.cpp b/src/paimon/core/table/sink/commit_message_impl.cpp new file mode 100644 index 0000000..48dbf8e --- /dev/null +++ b/src/paimon/core/table/sink/commit_message_impl.cpp @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/table/sink/commit_message_impl.h" + +#include "fmt/format.h" + +namespace paimon { + +CommitMessageImpl::CommitMessageImpl(const BinaryRow& partition, int32_t bucket, + const std::optional& total_buckets, + const DataIncrement& data_increment, + const CompactIncrement& compact_increment) + : partition_(partition), + bucket_(bucket), + total_buckets_(total_buckets), + data_increment_(data_increment), + compact_increment_(compact_increment) {} + +std::string CommitMessageImpl::ToString() const { + std::string total_buckets_str = + (total_buckets_ == std::nullopt ? "null" : std::to_string(total_buckets_.value())); + return fmt::format( + "FileCommittable {{partition = {}, bucket = {}, totalBuckets = {}, newFilesIncrement = " + "{}, compactIncrement = {}}}", + partition_.ToString(), bucket_, total_buckets_str, data_increment_.ToString(), + compact_increment_.ToString()); +} + +bool CommitMessageImpl::operator==(const CommitMessageImpl& other) const { + if (this == &other) { + return true; + } + return partition_ == other.partition_ && bucket_ == other.bucket_ && + total_buckets_ == other.total_buckets_ && data_increment_ == other.data_increment_ && + compact_increment_ == other.compact_increment_; +} + +bool CommitMessageImpl::TEST_Equal(const CommitMessageImpl& other) const { + if (this == &other) { + return true; + } + return partition_ == other.partition_ && bucket_ == other.bucket_ && + total_buckets_ == other.total_buckets_ && + data_increment_.TEST_Equal(other.data_increment_) && + compact_increment_.TEST_Equal(other.compact_increment_); +} + +} // namespace paimon diff --git a/src/paimon/core/table/sink/commit_message_impl.h b/src/paimon/core/table/sink/commit_message_impl.h new file mode 100644 index 0000000..a435516 --- /dev/null +++ b/src/paimon/core/table/sink/commit_message_impl.h @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/commit_message.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/core/io/compact_increment.h" +#include "paimon/core/io/data_increment.h" + +namespace paimon { +class CommitMessageImpl : public CommitMessage { + public: + CommitMessageImpl(const BinaryRow& partition, int32_t bucket, + const std::optional& total_buckets, + const DataIncrement& data_increment, + const CompactIncrement& compact_increment); + + ~CommitMessageImpl() override = default; + + const BinaryRow& Partition() const { + return partition_; + } + + /// Bucket of this commit message. + int32_t Bucket() const { + return bucket_; + } + + /// Total number of buckets in this partition. + const std::optional& TotalBuckets() const { + return total_buckets_; + } + + const DataIncrement& GetNewFilesIncrement() const { + return data_increment_; + } + + const CompactIncrement& GetCompactIncrement() const { + return compact_increment_; + } + + bool IsEmpty() const { + return data_increment_.IsEmpty() && compact_increment_.IsEmpty(); + } + + std::string ToString() const; + + bool operator==(const CommitMessageImpl& other) const; + + bool TEST_Equal(const CommitMessageImpl& other) const; + + private: + BinaryRow partition_; + int32_t bucket_; + std::optional total_buckets_; + DataIncrement data_increment_; + CompactIncrement compact_increment_; +}; + +} // namespace paimon diff --git a/src/paimon/core/table/sink/commit_message_impl_test.cpp b/src/paimon/core/table/sink/commit_message_impl_test.cpp new file mode 100644 index 0000000..fb6daa2 --- /dev/null +++ b/src/paimon/core/table/sink/commit_message_impl_test.cpp @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/table/sink/commit_message_impl.h" + +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/fs/file_system.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(CommitMessageImplTest, TestToString) { + std::string data_path = paimon::test::GetDataDir() + + "/orc/append_10_external_path.db/append_10_external_path/" + "commit_messages/commit_messages-01"; + auto file_system = std::make_shared(); + ASSERT_OK_AND_ASSIGN(std::unique_ptr file_status, + file_system->GetFileStatus(data_path)); + auto buffer_length = file_status->GetLen(); + std::vector buffer(buffer_length, 0); + + auto in_stream = file_system->Open(data_path).value_or(nullptr); + ASSERT_TRUE(in_stream); + ASSERT_OK(in_stream->Read(reinterpret_cast(buffer.data()), buffer.size())); + ASSERT_OK(in_stream->Close()); + + auto pool = GetDefaultPool(); + ASSERT_OK_AND_ASSIGN( + auto commit_messages, + CommitMessage::DeserializeList(/*version=*/6, reinterpret_cast(buffer.data()), + buffer.size(), pool)); + ASSERT_EQ(1, commit_messages.size()); + auto msg_impl = std::dynamic_pointer_cast(commit_messages[0]); + ASSERT_TRUE(msg_impl); + std::string expect = + "FileCommittable {partition = BinaryRow@0x9c67b85d, bucket = 0, totalBuckets = null, " + "newFilesIncrement = " + "DataIncrement {newFiles = data-64d93fc3-eaf2-4253-9cff-a9faa701e207-0.orc, deletedFiles = " + ", changelogFiles = , newIndexFiles = , deletedIndexFiles = }, compactIncrement = " + "CompactIncrement {compactBefore = , compactAfter = , changelogFiles = , newIndexFiles = " + ", deletedIndexFiles = }}"; + ASSERT_EQ(expect, msg_impl->ToString()); +} + +} // namespace paimon::test diff --git a/src/paimon/core/table/sink/commit_message_serializer.cpp b/src/paimon/core/table/sink/commit_message_serializer.cpp new file mode 100644 index 0000000..b2533a7 --- /dev/null +++ b/src/paimon/core/table/sink/commit_message_serializer.cpp @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/table/sink/commit_message_serializer.h" + +#include +#include +#include + +#include "fmt/format.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/io/memory_segment_output_stream.h" +#include "paimon/common/utils/serialization_utils.h" +#include "paimon/core/index/index_file_meta_serializer.h" +#include "paimon/core/index/index_file_meta_v1_deserializer.h" +#include "paimon/core/index/index_file_meta_v2_deserializer.h" +#include "paimon/core/index/index_file_meta_v3_deserializer.h" +#include "paimon/core/io/compact_increment.h" +#include "paimon/core/io/data_file_meta_09_serializer.h" +#include "paimon/core/io/data_file_meta_10_serializer.h" +#include "paimon/core/io/data_file_meta_12_serializer.h" +#include "paimon/core/io/data_file_meta_first_row_id_legacy_serializer.h" +#include "paimon/core/io/data_file_meta_serializer.h" +#include "paimon/core/io/data_increment.h" +#include "paimon/core/table/sink/commit_message_impl.h" +#include "paimon/core/utils/object_serializer.h" +#include "paimon/io/data_input_stream.h" + +namespace paimon { +class MemoryPool; + +const int32_t CommitMessageSerializer::CURRENT_VERSION = 11; + +CommitMessageSerializer::CommitMessageSerializer(const std::shared_ptr& pool) + : memory_pool_(pool), + data_file_serializer_(std::make_unique(pool)), + index_entry_serializer_(std::make_unique(pool)) {} + +CommitMessageSerializer::~CommitMessageSerializer() = default; + +Status CommitMessageSerializer::Serialize(const std::shared_ptr& obj, + MemorySegmentOutputStream* out) { + auto message = std::dynamic_pointer_cast(obj); + if (message == nullptr) { + return Status::Invalid("failed to cast commit message to commit message impl"); + } + PAIMON_RETURN_NOT_OK(SerializationUtils::SerializeBinaryRow(message->Partition(), out)); + out->WriteValue(message->Bucket()); + std::optional total_buckets = message->TotalBuckets(); + if (total_buckets == std::nullopt) { + out->WriteValue(false); + } else { + out->WriteValue(true); + out->WriteValue(total_buckets.value()); + } + // data increment + PAIMON_RETURN_NOT_OK( + data_file_serializer_->SerializeList(message->GetNewFilesIncrement().NewFiles(), out)); + PAIMON_RETURN_NOT_OK( + data_file_serializer_->SerializeList(message->GetNewFilesIncrement().DeletedFiles(), out)); + PAIMON_RETURN_NOT_OK(data_file_serializer_->SerializeList( + message->GetNewFilesIncrement().ChangelogFiles(), out)); + PAIMON_RETURN_NOT_OK(index_entry_serializer_->SerializeList( + message->GetNewFilesIncrement().NewIndexFiles(), out)); + PAIMON_RETURN_NOT_OK(index_entry_serializer_->SerializeList( + message->GetNewFilesIncrement().DeletedIndexFiles(), out)); + // compact increment + PAIMON_RETURN_NOT_OK( + data_file_serializer_->SerializeList(message->GetCompactIncrement().CompactBefore(), out)); + PAIMON_RETURN_NOT_OK( + data_file_serializer_->SerializeList(message->GetCompactIncrement().CompactAfter(), out)); + PAIMON_RETURN_NOT_OK( + data_file_serializer_->SerializeList(message->GetCompactIncrement().ChangelogFiles(), out)); + PAIMON_RETURN_NOT_OK(index_entry_serializer_->SerializeList( + message->GetCompactIncrement().NewIndexFiles(), out)); + PAIMON_RETURN_NOT_OK(index_entry_serializer_->SerializeList( + message->GetCompactIncrement().DeletedIndexFiles(), out)); + + return Status::OK(); +} + +Status CommitMessageSerializer::SerializeList( + const std::vector>& commit_messages, + MemorySegmentOutputStream* out) { + out->WriteValue(commit_messages.size()); + for (const auto& commit_message : commit_messages) { + PAIMON_RETURN_NOT_OK(Serialize(commit_message, out)); + } + return Status::OK(); +} + +Result>> CommitMessageSerializer::DeserializeList( + int32_t version, DataInputStream* in) { + PAIMON_ASSIGN_OR_RAISE(int32_t length, in->ReadValue()); + std::vector> commit_messages; + for (int32_t i = 0; i < length; i++) { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr commit_message, + Deserialize(version, in)); + commit_messages.push_back(commit_message); + } + return commit_messages; +} + +Result> CommitMessageSerializer::Deserialize( + int32_t version, + const ObjectSerializer>* data_file_meta_serializer, + const ObjectSerializer>* index_entry_serializer, + DataInputStream* in) { + PAIMON_ASSIGN_OR_RAISE(BinaryRow partition, + SerializationUtils::DeserializeBinaryRow(in, memory_pool_.get())); + PAIMON_ASSIGN_OR_RAISE(int32_t bucket, in->ReadValue()); + + std::optional total_buckets; + if (version >= 7) { + PAIMON_ASSIGN_OR_RAISE(bool total_buckets_exist, in->ReadValue()); + if (total_buckets_exist) { + PAIMON_ASSIGN_OR_RAISE(total_buckets, in->ReadValue()); + } + } + if (version >= 10) { + // data increment + PAIMON_ASSIGN_OR_RAISE(std::vector> new_files, + data_file_meta_serializer->DeserializeList(in)); + PAIMON_ASSIGN_OR_RAISE(std::vector> deleted_files, + data_file_meta_serializer->DeserializeList(in)); + PAIMON_ASSIGN_OR_RAISE(std::vector> changelog_files, + data_file_meta_serializer->DeserializeList(in)); + PAIMON_ASSIGN_OR_RAISE(std::vector> new_data_index, + index_entry_serializer->DeserializeList(in)); + PAIMON_ASSIGN_OR_RAISE(std::vector> deleted_data_index, + index_entry_serializer->DeserializeList(in)); + // compact increment + PAIMON_ASSIGN_OR_RAISE(std::vector> before, + data_file_meta_serializer->DeserializeList(in)); + PAIMON_ASSIGN_OR_RAISE(std::vector> after, + data_file_meta_serializer->DeserializeList(in)); + PAIMON_ASSIGN_OR_RAISE(std::vector> changelog, + data_file_meta_serializer->DeserializeList(in)); + PAIMON_ASSIGN_OR_RAISE(std::vector> new_compact_index, + index_entry_serializer->DeserializeList(in)); + PAIMON_ASSIGN_OR_RAISE(std::vector> deleted_compact_index, + index_entry_serializer->DeserializeList(in)); + return std::make_shared( + partition, bucket, total_buckets, + DataIncrement(std::move(new_files), std::move(deleted_files), + std::move(changelog_files), std::move(new_data_index), + std::move(deleted_data_index)), + CompactIncrement(std::move(before), std::move(after), std::move(changelog), + std::move(new_compact_index), std::move(deleted_compact_index))); + } + // data increment + PAIMON_ASSIGN_OR_RAISE(std::vector> new_files, + data_file_meta_serializer->DeserializeList(in)); + PAIMON_ASSIGN_OR_RAISE(std::vector> deleted_files, + data_file_meta_serializer->DeserializeList(in)); + PAIMON_ASSIGN_OR_RAISE(std::vector> changelog_files, + data_file_meta_serializer->DeserializeList(in)); + // compact increment + PAIMON_ASSIGN_OR_RAISE(std::vector> before, + data_file_meta_serializer->DeserializeList(in)); + PAIMON_ASSIGN_OR_RAISE(std::vector> after, + data_file_meta_serializer->DeserializeList(in)); + PAIMON_ASSIGN_OR_RAISE(std::vector> changelog, + data_file_meta_serializer->DeserializeList(in)); + // index increment + PAIMON_ASSIGN_OR_RAISE(std::vector> new_index, + index_entry_serializer->DeserializeList(in)); + PAIMON_ASSIGN_OR_RAISE(std::vector> deleted_index, + index_entry_serializer->DeserializeList(in)); + + DataIncrement data_increment(std::move(new_files), std::move(deleted_files), + std::move(changelog_files)); + CompactIncrement compact_increment(std::move(before), std::move(after), std::move(changelog)); + + if (compact_increment.IsEmpty()) { + data_increment.AddNewIndexFiles(std::move(new_index)); + data_increment.AddDeletedIndexFiles(std::move(deleted_index)); + } else { + compact_increment.AddNewIndexFiles(std::move(new_index)); + compact_increment.AddDeletedIndexFiles(std::move(deleted_index)); + } + return std::make_shared(partition, bucket, total_buckets, data_increment, + compact_increment); +} + +Result> CommitMessageSerializer::Deserialize(int32_t version, + DataInputStream* in) { + if (version == CURRENT_VERSION) { + return Deserialize(version, data_file_serializer_.get(), index_entry_serializer_.get(), in); + } else if (version == 9 || version == 10) { + auto index_entry_v3_deserializer = + std::make_unique(memory_pool_); + return Deserialize(version, data_file_serializer_.get(), index_entry_v3_deserializer.get(), + in); + } else if (version == 8) { + auto data_file_meta_first_row_id_legacy_serializer = + std::make_unique(memory_pool_); + auto index_entry_v2_deserializer = + std::make_unique(memory_pool_); + return Deserialize(version, data_file_meta_first_row_id_legacy_serializer.get(), + index_entry_v2_deserializer.get(), in); + } else if (version == 6 || version == 7) { + auto index_entry_v2_deserializer = + std::make_unique(memory_pool_); + auto data_file_meta_12_serializer = + std::make_unique(memory_pool_); + return Deserialize(version, data_file_meta_12_serializer.get(), + index_entry_v2_deserializer.get(), in); + } else if (version == 5) { + auto index_entry_v2_deserializer = + std::make_unique(memory_pool_); + auto data_file_meta_10_serializer = + std::make_unique(memory_pool_); + return Deserialize(version, data_file_meta_10_serializer.get(), + index_entry_v2_deserializer.get(), in); + } else if (version == 4) { + auto index_entry_v1_deserializer = + std::make_unique(memory_pool_); + auto data_file_meta_10_serializer = + std::make_unique(memory_pool_); + return Deserialize(version, data_file_meta_10_serializer.get(), + index_entry_v1_deserializer.get(), in); + } else if (version == 3) { + auto data_file_meta_09_serializer = + std::make_unique(memory_pool_); + auto index_entry_v1_deserializer = + std::make_unique(memory_pool_); + return Deserialize(version, data_file_meta_09_serializer.get(), + index_entry_v1_deserializer.get(), in); + } else if (version <= 2) { + return Status::NotImplemented("deserialize 08 not implemented"); + } else { + return Status::Invalid( + fmt::format("Expecting CommitMessageSerializer version to be smaller or equal than {}, " + "but found {}.", + CURRENT_VERSION, version)); + } +} + +} // namespace paimon diff --git a/src/paimon/core/table/sink/commit_message_serializer.h b/src/paimon/core/table/sink/commit_message_serializer.h new file mode 100644 index 0000000..c038ca0 --- /dev/null +++ b/src/paimon/core/table/sink/commit_message_serializer.h @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/commit_message.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "paimon/type_fwd.h" + +namespace paimon { + +class DataFileMetaSerializer; +class IndexFileMetaSerializer; +class MemorySegmentOutputStream; +class DataInputStream; +struct DataFileMeta; +class IndexFileMeta; +template +class ObjectSerializer; +class MemoryPool; + +class CommitMessageSerializer { + public: + static const int32_t CURRENT_VERSION; + + explicit CommitMessageSerializer(const std::shared_ptr& pool); + ~CommitMessageSerializer(); + Status Serialize(const std::shared_ptr& obj, MemorySegmentOutputStream* out); + Status SerializeList(const std::vector>& commit_messages, + MemorySegmentOutputStream* out); + + Result>> DeserializeList(int32_t version, + DataInputStream* in); + Result> Deserialize(int32_t version, DataInputStream* in); + + private: + Result> Deserialize( + int32_t version, + const ObjectSerializer>* data_file_meta_serializer, + const ObjectSerializer>* index_entry_serializer, + DataInputStream* in); + + private: + std::shared_ptr memory_pool_; + std::unique_ptr data_file_serializer_; + std::unique_ptr index_entry_serializer_; +}; + +} // namespace paimon diff --git a/src/paimon/core/table/sink/commit_message_test.cpp b/src/paimon/core/table/sink/commit_message_test.cpp new file mode 100644 index 0000000..6c00bb9 --- /dev/null +++ b/src/paimon/core/table/sink/commit_message_test.cpp @@ -0,0 +1,1219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/commit_message.h" + +#include +#include +#include +#include + +#include "arrow/array/array_base.h" +#include "arrow/array/builder_binary.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "gtest/gtest.h" +#include "paimon/catalog/catalog.h" +#include "paimon/catalog/identifier.h" +#include "paimon/common/data/binary_row.h" +#include "paimon/common/data/data_define.h" +#include "paimon/common/utils/linked_hash_map.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/core/index/deletion_vector_meta.h" +#include "paimon/core/index/index_file_meta.h" +#include "paimon/core/io/compact_increment.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/io/data_increment.h" +#include "paimon/core/manifest/file_source.h" +#include "paimon/core/stats/simple_stats.h" +#include "paimon/core/table/sink/commit_message_impl.h" +#include "paimon/core/table/sink/commit_message_serializer.h" +#include "paimon/data/decimal.h" +#include "paimon/data/timestamp.h" +#include "paimon/defs.h" +#include "paimon/file_store_write.h" +#include "paimon/fs/file_system.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/record_batch.h" +#include "paimon/status.h" +#include "paimon/testing/utils/binary_row_generator.h" +#include "paimon/testing/utils/testharness.h" +#include "paimon/write_context.h" + +namespace paimon::test { + +TEST(CommitMessageTest, TestCurrentVersion) { + ASSERT_EQ(CommitMessageSerializer::CURRENT_VERSION, CommitMessage::CurrentVersion()); +} + +TEST(CommitMessageTest, TestCompatibleWithVersion11) { + // index file meta: add global index meta + int32_t version = 11; + std::string data_path = + paimon::test::GetDataDir() + + "orc/append_with_global_index_with_partition.db/append_with_global_index_with_partition/" + "commit_messages/commit_messages-01"; + auto file_system = std::make_shared(); + auto buffer_length = file_system->GetFileStatus(data_path).value()->GetLen(); + + std::vector buffer(buffer_length, 0); + ASSERT_OK_AND_ASSIGN(auto in_stream, file_system->Open(data_path)); + ASSERT_OK(in_stream->Read(reinterpret_cast(buffer.data()), buffer.size())); + ASSERT_OK(in_stream->Close()); + + auto pool = GetDefaultPool(); + ASSERT_OK_AND_ASSIGN(std::vector> ret, + CommitMessage::DeserializeList( + version, reinterpret_cast(buffer.data()), buffer.size(), pool)); + std::vector res_msgs; + for (const auto& msg : ret) { + auto msg_impl = std::dynamic_pointer_cast(msg); + res_msgs.emplace_back(*msg_impl); + } + ASSERT_EQ(res_msgs.size(), 1); + + std::vector expected_msgs; + auto index_meta = std::make_shared( + "bitmap", "bitmap-global-index-6f974a9b-07bb-4a06-9696-6646020d8139.index", + /*file_size=*/120, /*row_count=*/5, /*dv_ranges=*/std::nullopt, + /*external_path=*/std::nullopt, + GlobalIndexMeta(/*row_range_start=*/0, /*row_range_end=*/4, /*index_field_id=*/0, + /*extra_field_ids=*/std::nullopt, /*index_meta=*/nullptr)); + + expected_msgs.emplace_back( + /*partition=*/BinaryRowGenerator::GenerateRow({10}, pool.get()), + /*bucket=*/0, /*total_bucket=*/std::nullopt, DataIncrement({index_meta}), + CompactIncrement({}, {}, {})); + + // check result + ASSERT_EQ(res_msgs, expected_msgs); + ASSERT_OK_AND_ASSIGN(std::string serialized_bytes, CommitMessage::SerializeList(ret, pool)); + ASSERT_EQ(serialized_bytes, std::string(reinterpret_cast(buffer.data()), buffer.size())); +} + +TEST(CommitMessageTest, TestCompatibleWithVersion10) { + // test with commit message version 10 + // move IndexFileMeta into DataIncrement & CompactIncrement + int32_t version = 10; + std::string data_path = paimon::test::GetDataDir() + + "orc/pk_dv_index_with_commit_message_version10.db/" + "pk_dv_index_with_commit_message_version10/" + "commit_messages/commit_messages-01"; + auto file_system = std::make_shared(); + auto buffer_length = file_system->GetFileStatus(data_path).value()->GetLen(); + + std::vector buffer(buffer_length, 0); + ASSERT_OK_AND_ASSIGN(auto in_stream, file_system->Open(data_path)); + ASSERT_OK(in_stream->Read(reinterpret_cast(buffer.data()), buffer.size())); + ASSERT_OK(in_stream->Close()); + + auto pool = GetDefaultPool(); + ASSERT_OK_AND_ASSIGN(std::vector> ret, + CommitMessage::DeserializeList( + version, reinterpret_cast(buffer.data()), buffer.size(), pool)); + std::vector res_msgs; + for (const auto& msg : ret) { + auto msg_impl = std::dynamic_pointer_cast(msg); + res_msgs.emplace_back(*msg_impl); + } + ASSERT_EQ(res_msgs.size(), 1); + + std::vector expected_msgs; + auto file_meta = std::make_shared( + "data-a19eec15-e0e3-4a30-85e2-01d23d9945be-1.orc", /*file_size=*/872, /*row_count=*/1, + /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("Tony"), 0}, pool.get()), + /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("Tony"), 0}, pool.get()), + /*key_stats=*/ + BinaryRowGenerator::GenerateStats({std::string("Tony"), 0}, {std::string("Tony"), 0}, + {0, 0}, pool.get()), + /*value_stats=*/ + BinaryRowGenerator::GenerateStats({std::string("Tony"), 10, 0, 14.1}, + {std::string("Tony"), 10, 0, 14.1}, {0, 0, 0, 0}, + pool.get()), + /*min_sequence_number=*/5, /*max_sequence_number=*/5, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1761242383412ll, 0), + /*delete_row_count=*/1, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + + LinkedHashMap dv_ranges; + dv_ranges.insert_or_assign( + "data-a19eec15-e0e3-4a30-85e2-01d23d9945be-0.orc", + DeletionVectorMeta("data-a19eec15-e0e3-4a30-85e2-01d23d9945be-0.orc", /*offset=*/1, + /*length=*/22, /*cardinality=*/1)); + expected_msgs.emplace_back( + /*partition=*/BinaryRowGenerator::GenerateRow({10}, pool.get()), + /*bucket=*/1, /*total_bucket=*/2, DataIncrement({file_meta}, {}, {}, {}, {}), + CompactIncrement({file_meta}, {}, {}, + {std::make_shared( + "DELETION_VECTORS", "index-9c24b2fc-40db-4f58-9a13-55b52ae8880c-1", 31, + 1, dv_ranges, std::nullopt)}, + {})); + + // check result + ASSERT_EQ(res_msgs, expected_msgs); +} + +TEST(CommitMessageTest, TestCompatibleWithVersion9) { + // test with commit message version 9 + // add write_cols in DataFileMeta and external path in IndexFileMeta + int32_t version = 9; + std::string data_path = + paimon::test::GetDataDir() + + "orc/pk_dv_index_not_in_data_no_external.db/pk_dv_index_not_in_data_no_external/" + "commit_messages/commit_messages-01"; + auto file_system = std::make_shared(); + auto buffer_length = file_system->GetFileStatus(data_path).value()->GetLen(); + + std::vector buffer(buffer_length, 0); + ASSERT_OK_AND_ASSIGN(auto in_stream, file_system->Open(data_path)); + ASSERT_OK(in_stream->Read(reinterpret_cast(buffer.data()), buffer.size())); + ASSERT_OK(in_stream->Close()); + + auto pool = GetDefaultPool(); + ASSERT_OK_AND_ASSIGN(std::vector> ret, + CommitMessage::DeserializeList( + version, reinterpret_cast(buffer.data()), buffer.size(), pool)); + std::vector res_msgs; + for (const auto& msg : ret) { + auto msg_impl = std::dynamic_pointer_cast(msg); + res_msgs.emplace_back(*msg_impl); + } + ASSERT_EQ(res_msgs.size(), 1); + + std::vector expected_msgs; + auto file_meta = std::make_shared( + "data-aa87291d-2a90-4846-b106-1bb4c76d74db-1.orc", /*file_size=*/872, /*row_count=*/1, + /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("Tony"), 0}, pool.get()), + /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("Tony"), 0}, pool.get()), + /*key_stats=*/ + BinaryRowGenerator::GenerateStats({std::string("Tony"), 0}, {std::string("Tony"), 0}, + {0, 0}, pool.get()), + /*value_stats=*/ + BinaryRowGenerator::GenerateStats({std::string("Tony"), 10, 0, 14.1}, + {std::string("Tony"), 10, 0, 14.1}, {0, 0, 0, 0}, + pool.get()), + /*min_sequence_number=*/5, /*max_sequence_number=*/5, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1757349273600ll, 0), + /*delete_row_count=*/1, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + + LinkedHashMap dv_ranges; + dv_ranges.insert_or_assign( + "data-aa87291d-2a90-4846-b106-1bb4c76d74db-0.orc", + DeletionVectorMeta("data-aa87291d-2a90-4846-b106-1bb4c76d74db-0.orc", /*offset=*/1, + /*length=*/22, /*cardinality=*/1)); + expected_msgs.emplace_back( + /*partition=*/BinaryRowGenerator::GenerateRow({10}, pool.get()), + /*bucket=*/1, /*total_bucket=*/2, DataIncrement({file_meta}, {}, {}, {}, {}), + CompactIncrement({file_meta}, {}, {}, + {std::make_shared( + "DELETION_VECTORS", "index-aa60193d-d7cd-434f-bc1a-c1adb210e1f7-1", 31, + 1, dv_ranges, std::nullopt)}, + {})); + + // check result + ASSERT_EQ(res_msgs, expected_msgs); + ASSERT_OK(CommitMessage::SerializeList(ret, pool)); +} + +TEST(CommitMessageTest, TestCompatibleWithVersion9WithExternalPathForIndex) { + // test with commit message version 9 + // add write_cols in DataFileMeta and external path in IndexFileMeta + int32_t version = 9; + std::string data_path = + paimon::test::GetDataDir() + + "orc/pk_dv_index_in_data_with_external.db/pk_dv_index_in_data_with_external/" + "commit_messages/commit_messages-01"; + auto file_system = std::make_shared(); + auto buffer_length = file_system->GetFileStatus(data_path).value()->GetLen(); + + std::vector buffer(buffer_length, 0); + ASSERT_OK_AND_ASSIGN(auto in_stream, file_system->Open(data_path)); + ASSERT_OK(in_stream->Read(reinterpret_cast(buffer.data()), buffer.size())); + ASSERT_OK(in_stream->Close()); + + auto pool = GetDefaultPool(); + ASSERT_OK_AND_ASSIGN(std::vector> ret, + CommitMessage::DeserializeList( + version, reinterpret_cast(buffer.data()), buffer.size(), pool)); + std::vector res_msgs; + for (const auto& msg : ret) { + auto msg_impl = std::dynamic_pointer_cast(msg); + res_msgs.emplace_back(*msg_impl); + } + ASSERT_EQ(res_msgs.size(), 1); + + std::vector expected_msgs; + auto file_meta = std::make_shared( + "data-72b62a5f-d698-4db5-b51a-04c0dc027702-1.orc", /*file_size=*/872, /*row_count=*/1, + /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("Tony"), 0}, pool.get()), + /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("Tony"), 0}, pool.get()), + /*key_stats=*/ + BinaryRowGenerator::GenerateStats({std::string("Tony"), 0}, {std::string("Tony"), 0}, + {0, 0}, pool.get()), + /*value_stats=*/ + BinaryRowGenerator::GenerateStats({std::string("Tony"), 10, 0, 14.1}, + {std::string("Tony"), 10, 0, 14.1}, {0, 0, 0, 0}, + pool.get()), + /*min_sequence_number=*/5, /*max_sequence_number=*/5, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1757354416117ll, 0), + /*delete_row_count=*/1, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, + /*external_path=*/ + "FILE:/tmp/external/f1=10/bucket-1/data-72b62a5f-d698-4db5-b51a-04c0dc027702-1.orc", + /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + + LinkedHashMap dv_ranges; + dv_ranges.insert_or_assign( + "data-72b62a5f-d698-4db5-b51a-04c0dc027702-0.orc", + DeletionVectorMeta("data-72b62a5f-d698-4db5-b51a-04c0dc027702-0.orc", /*offset=*/1, + /*length=*/22, /*cardinality=*/1)); + expected_msgs.emplace_back( + /*partition=*/BinaryRowGenerator::GenerateRow({10}, pool.get()), + /*bucket=*/1, /*total_bucket=*/2, DataIncrement({file_meta}, {}, {}, {}, {}), + CompactIncrement( + {file_meta}, {}, {}, + {std::make_shared( + "DELETION_VECTORS", "index-419e7c6b-9cad-49e8-9cd2-6187471df954-1", 31, 1, + dv_ranges, + "FILE:/tmp/external/f1=10/bucket-1/index-419e7c6b-9cad-49e8-9cd2-6187471df954-1")}, + {})); + + // check result + ASSERT_EQ(res_msgs, expected_msgs); + ASSERT_OK(CommitMessage::SerializeList(ret, pool)); +} + +TEST(CommitMessageTest, TestCompatibleWithVersion8) { + // test with commit message version 8 + // commit msg is generated by java paimon 1.3 + int32_t version = 8; + std::string data_path = paimon::test::GetDataDir() + + "/orc/append_table_with_first_row_id.db/append_table_with_first_row_id/" + "commit_messages/commit_messages-01"; + auto file_system = std::make_shared(); + auto buffer_length = file_system->GetFileStatus(data_path).value()->GetLen(); + + std::vector buffer(buffer_length, 0); + ASSERT_OK_AND_ASSIGN(auto in_stream, file_system->Open(data_path)); + ASSERT_OK(in_stream->Read(reinterpret_cast(buffer.data()), buffer.size())); + ASSERT_OK(in_stream->Close()); + + auto pool = GetDefaultPool(); + ASSERT_OK_AND_ASSIGN(std::vector> ret, + CommitMessage::DeserializeList( + version, reinterpret_cast(buffer.data()), buffer.size(), pool)); + std::vector res_msgs; + for (const auto& msg : ret) { + auto msg_impl = std::dynamic_pointer_cast(msg); + res_msgs.emplace_back(*msg_impl); + } + + std::vector expected_msgs; + auto file_meta = std::make_shared( + "data-16bd83f7-282a-479a-9968-0868436516b0-0.orc", /*file_size=*/567, /*row_count=*/1, + /*min_key=*/BinaryRow::EmptyRow(), + /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + /*value_stats=*/ + BinaryRowGenerator::GenerateStats({std::string("Alice"), 10, 1, 11.1}, + {std::string("Alice"), 10, 1, 11.1}, {0, 0, 0, 0}, + pool.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1754068646844ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + expected_msgs.emplace_back(/*partition=*/BinaryRowGenerator::GenerateRow({10}, pool.get()), + /*bucket=*/0, /*total_bucket=*/2, DataIncrement({file_meta}, {}, {}), + CompactIncrement({}, {}, {})); + + auto file_meta2 = std::make_shared( + "data-f33d2740-1205-49db-8ca4-d4fc2bddc99f-0.orc", /*file_size=*/620, /*row_count=*/4, + /*min_key=*/BinaryRow::EmptyRow(), + /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + /*value_stats=*/ + BinaryRowGenerator::GenerateStats({std::string("Alex"), 10, 0, 12.1}, + {std::string("Tony"), 10, 0, 16.1}, {0, 0, 0, 0}, + pool.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/3, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1754068646864ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + expected_msgs.emplace_back(/*partition=*/BinaryRowGenerator::GenerateRow({10}, pool.get()), + /*bucket=*/1, /*total_bucket=*/2, + DataIncrement({file_meta2}, {}, {}), CompactIncrement({}, {}, {})); + // check result + ASSERT_EQ(res_msgs, expected_msgs); + ASSERT_OK(CommitMessage::SerializeList(ret, pool)); +} + +TEST(CommitMessageTest, TestCompatibleWithVersion7) { + // test with commit message version 7 + // commit msg is generated by java paimon 1.1 + int32_t version = 7; + std::string data_path = paimon::test::GetDataDir() + + "/orc/pk_table_with_total_buckets.db/pk_table_with_total_buckets/" + "commit_messages/commit_messages-01"; + auto file_system = std::make_shared(); + auto buffer_length = file_system->GetFileStatus(data_path).value()->GetLen(); + std::vector buffer(buffer_length, 0); + ASSERT_OK_AND_ASSIGN(auto in_stream, file_system->Open(data_path)); + ASSERT_OK(in_stream->Read(reinterpret_cast(buffer.data()), buffer.size())); + ASSERT_OK(in_stream->Close()); + + auto pool = GetDefaultPool(); + ASSERT_OK_AND_ASSIGN(std::vector> ret, + CommitMessage::DeserializeList( + version, reinterpret_cast(buffer.data()), buffer.size(), pool)); + std::vector res_msgs; + for (const auto& msg : ret) { + auto msg_impl = std::dynamic_pointer_cast(msg); + res_msgs.emplace_back(*msg_impl); + } + + std::vector expected_msgs; + auto file_meta = std::make_shared( + "data-d7725088-6bd4-4e70-9ce6-714ae93b47cc-0.orc", /*file_size=*/863, /*row_count=*/1, + /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("Alice"), 1}, pool.get()), + /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("Alice"), 1}, pool.get()), + /*key_stats=*/ + BinaryRowGenerator::GenerateStats({std::string("Alice"), 1}, {std::string("Alice"), 1}, + {0, 0}, pool.get()), + /*value_stats=*/ + BinaryRowGenerator::GenerateStats({std::string("Alice"), 10, 1, 11.1}, + {std::string("Alice"), 10, 1, 11.1}, {0, 0, 0, 0}, + pool.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1743525392885ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + expected_msgs.emplace_back(/*partition=*/BinaryRowGenerator::GenerateRow({10}, pool.get()), + /*bucket=*/0, /*total_bucket=*/2, DataIncrement({file_meta}, {}, {}), + CompactIncrement({}, {}, {})); + + auto file_meta2 = std::make_shared( + "data-5858a84b-7081-4618-b828-ae3918c5e1f6-0.orc", /*file_size=*/943, /*row_count=*/4, + /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("Alex"), 0}, pool.get()), + /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("Tony"), 0}, pool.get()), + /*key_stats=*/ + BinaryRowGenerator::GenerateStats({std::string("Alex"), 0}, {std::string("Tony"), 0}, + {0, 0}, pool.get()), + /*value_stats=*/ + BinaryRowGenerator::GenerateStats({std::string("Alex"), 10, 0, 12.1}, + {std::string("Tony"), 10, 0, 16.1}, {0, 0, 0, 0}, + pool.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/3, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1743525392921ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + expected_msgs.emplace_back(/*partition=*/BinaryRowGenerator::GenerateRow({10}, pool.get()), + /*bucket=*/1, /*total_bucket=*/2, + DataIncrement({file_meta2}, {}, {}), CompactIncrement({}, {}, {})); + // check result + ASSERT_EQ(res_msgs, expected_msgs); + ASSERT_OK_AND_ASSIGN([[maybe_unused]] std::string serialize_ret, + CommitMessage::SerializeList(ret, pool)); +} + +TEST(CommitMessageTest, TestCompatibleWithVersion6) { + // test with commit message version 6 + // commit msg is generated by java paimon 1.0 + int32_t version = 6; + std::string data_path = paimon::test::GetDataDir() + + "/orc/append_10_external_path.db/append_10_external_path/" + "commit_messages/commit_messages-01"; + auto file_system = std::make_shared(); + auto buffer_length = file_system->GetFileStatus(data_path).value()->GetLen(); + std::vector buffer(buffer_length, 0); + ASSERT_OK_AND_ASSIGN(auto in_stream, file_system->Open(data_path)); + ASSERT_OK(in_stream->Read(reinterpret_cast(buffer.data()), buffer.size())); + ASSERT_OK(in_stream->Close()); + + auto pool = GetDefaultPool(); + ASSERT_OK_AND_ASSIGN(std::vector> ret, + CommitMessage::DeserializeList( + version, reinterpret_cast(buffer.data()), buffer.size(), pool)); + std::vector res_msgs; + for (const auto& msg : ret) { + auto msg_impl = std::dynamic_pointer_cast(msg); + res_msgs.emplace_back(*msg_impl); + } + + std::vector expected_msgs; + auto file_meta = std::make_shared( + "data-64d93fc3-eaf2-4253-9cff-a9faa701e207-0.orc", /*file_size=*/645, /*row_count=*/5, + BinaryRow::EmptyRow(), BinaryRow::EmptyRow(), SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Alice"), 10, 0, 11.1}, + {std::string("Tony"), 20, 1, 14.1}, {0, 0, 0, 0}, + pool.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/4, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1737052260143ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + expected_msgs.emplace_back(BinaryRow::EmptyRow(), /*bucket=*/0, /*total_bucket=*/std::nullopt, + DataIncrement({file_meta}, {}, {}), CompactIncrement({}, {}, {})); + // check result + ASSERT_EQ(res_msgs, expected_msgs); + ASSERT_OK_AND_ASSIGN([[maybe_unused]] std::string serialize_ret, + CommitMessage::SerializeList(ret, pool)); +} + +TEST(CommitMessageTest, TestCompatibleWithVersion5) { + // test with commit message version 5 + // commit msg is generated by java paimon 1.0 + int32_t version = 5; + std::string data_path = paimon::test::GetDataDir() + + "/orc/pk_table_with_dv_cardinality.db/pk_table_with_dv_cardinality/" + "commit_messages/commit_messages-01"; + auto file_system = std::make_shared(); + auto buffer_length = file_system->GetFileStatus(data_path).value()->GetLen(); + std::vector buffer(buffer_length, 0); + ASSERT_OK_AND_ASSIGN(auto in_stream, file_system->Open(data_path)); + ASSERT_OK(in_stream->Read(reinterpret_cast(buffer.data()), buffer.size())); + ASSERT_OK(in_stream->Close()); + + auto pool = GetDefaultPool(); + ASSERT_OK_AND_ASSIGN(std::vector> ret, + CommitMessage::DeserializeList( + version, reinterpret_cast(buffer.data()), buffer.size(), pool)); + std::vector res_msgs; + for (const auto& msg : ret) { + auto msg_impl = std::dynamic_pointer_cast(msg); + res_msgs.emplace_back(*msg_impl); + } + + std::vector expected_msgs; + auto file_meta1 = std::make_shared( + "data-0d0f29cc-63c6-4fab-a594-71bd7d06fcde-1.orc", /*file_size=*/859, /*row_count=*/1, + BinaryRowGenerator::GenerateRow({std::string("Alice"), 1}, pool.get()), + BinaryRowGenerator::GenerateRow({std::string("Alice"), 1}, pool.get()), + BinaryRowGenerator::GenerateStats({std::string("Alice"), 1}, {std::string("Alice"), 1}, + {0, 0}, pool.get()), + BinaryRowGenerator::GenerateStats({std::string("Alice"), 10, 1}, + {std::string("Alice"), 10, 1}, {0, 0, 0}, pool.get()), + /*min_sequence_number=*/2, /*max_sequence_number=*/2, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1734707236040ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::optional>({"f0", "f1", "f2"}), + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + auto file_meta1_after_compact = std::make_shared( + "data-0d0f29cc-63c6-4fab-a594-71bd7d06fcde-1.orc", /*file_size=*/859, /*row_count=*/1, + BinaryRowGenerator::GenerateRow({std::string("Alice"), 1}, pool.get()), + BinaryRowGenerator::GenerateRow({std::string("Alice"), 1}, pool.get()), + BinaryRowGenerator::GenerateStats({std::string("Alice"), 1}, {std::string("Alice"), 1}, + {0, 0}, pool.get()), + BinaryRowGenerator::GenerateStats({std::string("Alice"), 10, 1}, + {std::string("Alice"), 10, 1}, {0, 0, 0}, pool.get()), + /*min_sequence_number=*/2, /*max_sequence_number=*/2, /*schema_id=*/0, + /*level=*/4, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1734707236040ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::optional>({"f0", "f1", "f2"}), + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + + DataIncrement data_increment1({file_meta1}, {}, {}, {}, {}); + LinkedHashMap dv_metas1; + dv_metas1.insert_or_assign( + "data-0d0f29cc-63c6-4fab-a594-71bd7d06fcde-0.orc", + DeletionVectorMeta("data-0d0f29cc-63c6-4fab-a594-71bd7d06fcde-0.orc", /*offset=*/1, + /*length=*/22, /*cardinality=*/1)); + auto index_file_meta1 = std::make_shared( + "DELETION_VECTORS", "index-86356766-3238-46e6-990b-656cd7409eaa-0", /*file_size=*/31, + /*row_count=*/1, dv_metas1, /*external_path=*/std::nullopt); + expected_msgs.emplace_back( + BinaryRowGenerator::GenerateRow({10}, pool.get()), /*bucket=*/0, + /*total_bucket=*/std::nullopt, data_increment1, + CompactIncrement({file_meta1}, {file_meta1_after_compact}, {}, {index_file_meta1}, {})); + + auto file_meta2 = std::make_shared( + "data-2ffe7ae9-2cf7-41e9-944b-2065585cde31-1.orc", /*file_size=*/922, /*row_count=*/2, + BinaryRowGenerator::GenerateRow({std::string("Lily"), 0}, pool.get()), + BinaryRowGenerator::GenerateRow({std::string("Tony"), 0}, pool.get()), + BinaryRowGenerator::GenerateStats({std::string("Lily"), 0}, {std::string("Tony"), 0}, + {0, 0}, pool.get()), + BinaryRowGenerator::GenerateStats({std::string("Lily"), 10, 0}, + {std::string("Tony"), 10, 0}, {0, 0, 0}, pool.get()), + /*min_sequence_number=*/7, /*max_sequence_number=*/8, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1734707236109ll, 0), + /*delete_row_count=*/2, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::optional>({"f0", "f1", "f2"}), + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + DataIncrement data_increment2({file_meta2}, {}, {}, {}, {}); + LinkedHashMap dv_metas2; + dv_metas2.insert_or_assign( + "data-2ffe7ae9-2cf7-41e9-944b-2065585cde31-0.orc", + DeletionVectorMeta("data-2ffe7ae9-2cf7-41e9-944b-2065585cde31-0.orc", /*offset=*/1, + /*length=*/24, /*cardinality=*/2)); + auto index_file_meta2 = std::make_shared( + "DELETION_VECTORS", "index-86356766-3238-46e6-990b-656cd7409eaa-1", /*file_size=*/33, + /*row_count=*/1, dv_metas2, /*external_path=*/std::nullopt); + expected_msgs.emplace_back(BinaryRowGenerator::GenerateRow({10}, pool.get()), /*bucket=*/1, + /*total_bucket=*/std::nullopt, data_increment2, + CompactIncrement({file_meta2}, {}, {}, {index_file_meta2}, {})); + + // check result + ASSERT_EQ(res_msgs, expected_msgs); + ASSERT_OK_AND_ASSIGN([[maybe_unused]] std::string serialize_ret, + CommitMessage::SerializeList(ret, pool)); +} + +TEST(CommitMessageTest, TestCompatibleWithVersion4) { + // test with commit message version 4 + // commit msg is generated by java paimon 1.0 + int32_t version = 4; + std::string data_path = paimon::test::GetDataDir() + + "/orc/append_10.db/append_10/commit_messages/commit_messages-01"; + auto file_system = std::make_shared(); + auto buffer_length = file_system->GetFileStatus(data_path).value()->GetLen(); + std::vector buffer(buffer_length, 0); + + ASSERT_OK_AND_ASSIGN(auto in_stream, file_system->Open(data_path)); + ASSERT_OK(in_stream->Read(reinterpret_cast(buffer.data()), buffer.size())); + ASSERT_OK(in_stream->Close()); + + auto pool = GetDefaultPool(); + ASSERT_OK_AND_ASSIGN(std::vector> ret, + CommitMessage::DeserializeList( + version, reinterpret_cast(buffer.data()), buffer.size(), pool)); + std::vector res_msgs; + for (const auto& msg : ret) { + auto msg_impl = std::dynamic_pointer_cast(msg); + res_msgs.emplace_back(*msg_impl); + } + + std::vector expected_msgs; + auto file_meta1 = std::make_shared( + "data-e54f10e3-60ec-4a2a-be29-32f2b6183884-0.orc", /*file_size=*/543, /*row_count=*/1, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Alice"), 10, 1, 11.1}, + {std::string("Alice"), 10, 1, 11.1}, {0, 0, 0, 0}, + pool.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1731404403175ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + DataIncrement data_increment1({file_meta1}, {}, {}); + expected_msgs.emplace_back(BinaryRowGenerator::GenerateRow({10}, pool.get()), /*bucket=*/0, + /*total_bucket=*/std::nullopt, data_increment1, + CompactIncrement({}, {}, {})); + + auto file_meta2 = std::make_shared( + "data-de4e972f-5cc8-49b1-844e-374191534c68-0.orc", /*file_size=*/575, /*row_count=*/3, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Bob"), 10, 0, 12.1}, + {std::string("Tony"), 10, 0, 14.1}, {0, 0, 0, 0}, + pool.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/2, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1731404403198ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + DataIncrement data_increment2({file_meta2}, {}, {}); + expected_msgs.emplace_back(BinaryRowGenerator::GenerateRow({10}, pool.get()), /*bucket=*/1, + /*total_bucket=*/std::nullopt, data_increment2, + CompactIncrement({}, {}, {})); + + auto file_meta3 = std::make_shared( + "data-8ab054d3-0480-4268-84f7-bf2759632f76-0.orc", /*file_size=*/541, /*row_count=*/1, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Lucy"), 20, 1, 14.1}, + {std::string("Lucy"), 20, 1, 14.1}, {0, 0, 0, 0}, + pool.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1731404403214ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + DataIncrement data_increment3({file_meta3}, {}, {}); + expected_msgs.emplace_back(BinaryRowGenerator::GenerateRow({20}, pool.get()), /*bucket=*/0, + /*total_bucket=*/std::nullopt, data_increment3, + CompactIncrement({}, {}, {})); + + // check result + ASSERT_EQ(res_msgs, expected_msgs); + ASSERT_OK_AND_ASSIGN([[maybe_unused]] std::string serialize_ret, + CommitMessage::SerializeList(ret, pool)); +} + +TEST(CommitMessageTest, TestCompatibleWithJavaPaimon10WithStatsDenseStore) { + // test with commit message version 4 + // commit msg is generated by java paimon 1.0 + int32_t version = 4; + std::string data_path = + paimon::test::GetDataDir() + + "/orc/append_10_stats_dense_store.db/append_10_stats_dense_store/commit_messages/" + "commit_messages-01"; + auto file_system = std::make_shared(); + auto buffer_length = file_system->GetFileStatus(data_path).value()->GetLen(); + std::vector buffer(buffer_length, 0); + + ASSERT_OK_AND_ASSIGN(auto in_stream, file_system->Open(data_path)); + ASSERT_OK(in_stream->Read(reinterpret_cast(buffer.data()), buffer.size())); + ASSERT_OK(in_stream->Close()); + + auto pool = GetDefaultPool(); + ASSERT_OK_AND_ASSIGN(std::vector> ret, + CommitMessage::DeserializeList( + version, reinterpret_cast(buffer.data()), buffer.size(), pool)); + std::vector res_msgs; + for (const auto& msg : ret) { + auto msg_impl = std::dynamic_pointer_cast(msg); + res_msgs.emplace_back(*msg_impl); + } + + std::vector expected_msgs; + auto file_meta1 = std::make_shared( + "data-cdb38c8a-31c1-4824-a024-9abd3fbb466f-0.orc", /*file_size=*/543, /*row_count=*/1, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Alice"), 10, 1}, + {std::string("Alice"), 10, 1}, {0, 0, 0}, pool.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1731412938869ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::optional>({"f0", "f1", "f2"}), + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + DataIncrement data_increment1({file_meta1}, {}, {}); + expected_msgs.emplace_back(BinaryRowGenerator::GenerateRow({10}, pool.get()), /*bucket=*/0, + /*total_bucket=*/std::nullopt, data_increment1, + CompactIncrement({}, {}, {})); + + auto file_meta2 = std::make_shared( + "data-c2613568-0412-4cd9-a0c4-1eae8e4ca89b-0.orc", /*file_size=*/575, /*row_count=*/3, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Bob"), 10, 0}, {std::string("Tony"), 10, 0}, + {0, 0, 0}, pool.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/2, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1731412938891ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::optional>({"f0", "f1", "f2"}), + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + DataIncrement data_increment2({file_meta2}, {}, {}); + expected_msgs.emplace_back(BinaryRowGenerator::GenerateRow({10}, pool.get()), /*bucket=*/1, + /*total_bucket=*/std::nullopt, data_increment2, + CompactIncrement({}, {}, {})); + + auto file_meta3 = std::make_shared( + "data-a6d1261a-f798-4fbd-a251-6d6c7d8060dd-0.orc", /*file_size=*/541, /*row_count=*/1, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Lucy"), 20, 1}, + {std::string("Lucy"), 20, 1}, {0, 0, 0}, pool.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1731412938908ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::optional>({"f0", "f1", "f2"}), + /*external_path=*/std::nullopt, /*first_row_id=*/std::nullopt, /*write_cols=*/std::nullopt); + DataIncrement data_increment3({file_meta3}, {}, {}); + expected_msgs.emplace_back(BinaryRowGenerator::GenerateRow({20}, pool.get()), /*bucket=*/0, + /*total_bucket=*/std::nullopt, data_increment3, + CompactIncrement({}, {}, {})); + + // check result + ASSERT_EQ(res_msgs, expected_msgs); + + ASSERT_OK_AND_ASSIGN([[maybe_unused]] std::string serialize_ret, + CommitMessage::SerializeList(ret, pool)); +} + +TEST(CommitMessageTest, TestCompatibleWith09JavaPaimon1) { + // test with commit message version 3 + int32_t version = 3; + std::string data_path = paimon::test::GetDataDir() + + "/orc/append_09.db/append_09/commit_messages/commit_messages-01"; + auto file_system = std::make_shared(); + auto buffer_length = file_system->GetFileStatus(data_path).value()->GetLen(); + std::vector buffer(buffer_length, 0); + + ASSERT_OK_AND_ASSIGN(auto in_stream, file_system->Open(data_path)); + ASSERT_OK(in_stream->Read(reinterpret_cast(buffer.data()), buffer.size())); + ASSERT_OK(in_stream->Close()); + + auto pool = GetDefaultPool(); + ASSERT_OK_AND_ASSIGN(std::vector> ret, + CommitMessage::DeserializeList( + version, reinterpret_cast(buffer.data()), buffer.size(), pool)); + std::vector res_msgs; + for (const auto& msg : ret) { + auto msg_impl = std::dynamic_pointer_cast(msg); + res_msgs.emplace_back(*msg_impl); + } + + std::vector expected_msgs; + auto file_meta1 = std::make_shared( + "data-51a45441-6037-4af3-b67b-5cefd75dc6f2-0.orc", /*file_size=*/543, /*row_count=*/1, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Alice"), 10, 1, 11.1}, + {std::string("Alice"), 10, 1, 11.1}, {0, 0, 0, 0}, + pool.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1724090888706ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + DataIncrement data_increment1({file_meta1}, {}, {}); + expected_msgs.emplace_back(BinaryRowGenerator::GenerateRow({10}, pool.get()), /*bucket=*/0, + /*total_bucket=*/std::nullopt, data_increment1, + CompactIncrement({}, {}, {})); + + auto file_meta2 = std::make_shared( + "data-6828284c-e707-49b5-af6b-69be79af120c-0.orc", /*file_size=*/575, /*row_count=*/3, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Bob"), 10, 0, 12.1}, + {std::string("Tony"), 10, 0, 14.1}, {0, 0, 0, 0}, + pool.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/2, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1724090888727ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + DataIncrement data_increment2({file_meta2}, {}, {}); + expected_msgs.emplace_back(BinaryRowGenerator::GenerateRow({10}, pool.get()), /*bucket=*/1, + /*total_bucket=*/std::nullopt, data_increment2, + CompactIncrement({}, {}, {})); + + auto file_meta3 = std::make_shared( + "data-8dc7f04c-3c98-48b2-9d56-834d746c4a40-0.orc", /*file_size=*/541, /*row_count=*/1, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Lucy"), 20, 1, 14.1}, + {std::string("Lucy"), 20, 1, 14.1}, {0, 0, 0, 0}, + pool.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/0, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1724090888743ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + DataIncrement data_increment3({file_meta3}, {}, {}); + expected_msgs.emplace_back(BinaryRowGenerator::GenerateRow({20}, pool.get()), /*bucket=*/0, + /*total_bucket=*/std::nullopt, data_increment3, + CompactIncrement({}, {}, {})); + + // check result + ASSERT_EQ(res_msgs, expected_msgs); + + ASSERT_OK_AND_ASSIGN([[maybe_unused]] std::string serialize_ret, + CommitMessage::SerializeList(ret, pool)); +} + +TEST(CommitMessageTest, TestCompatibleWith09JavaPaimon2) { + // test with commit message version 3 + int32_t version = 3; + std::string data_path = paimon::test::GetDataDir() + + "/orc/append_09.db/append_09/commit_messages/commit_messages-02"; + auto file_system = std::make_shared(); + auto buffer_length = file_system->GetFileStatus(data_path).value()->GetLen(); + std::vector buffer(buffer_length, 0); + + ASSERT_OK_AND_ASSIGN(auto in_stream, file_system->Open(data_path)); + ASSERT_OK(in_stream->Read(reinterpret_cast(buffer.data()), buffer.size())); + ASSERT_OK(in_stream->Close()); + + auto pool = GetDefaultPool(); + ASSERT_OK_AND_ASSIGN(std::vector> ret, + CommitMessage::DeserializeList( + version, reinterpret_cast(buffer.data()), buffer.size(), pool)); + std::vector res_msgs; + for (const auto& msg : ret) { + auto msg_impl = std::dynamic_pointer_cast(msg); + res_msgs.emplace_back(*msg_impl); + } + + std::vector expected_msgs; + auto file_meta1 = std::make_shared( + "data-fd1d2255-43f2-4534-b4cc-08b29e662940-0.orc", /*file_size=*/589, /*row_count=*/3, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("Alex"), 10, 0, 12.1}, + {std::string("Emily"), 10, 0, 16.1}, {0, 0, 0, 0}, + pool.get()), + /*min_sequence_number=*/3, /*max_sequence_number=*/5, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1724091050427ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + DataIncrement data_increment1({file_meta1}, {}, {}); + expected_msgs.emplace_back(BinaryRowGenerator::GenerateRow({10}, pool.get()), /*bucket=*/1, + /*total_bucket=*/std::nullopt, data_increment1, + CompactIncrement({}, {}, {})); + // handle null value + auto simple_stats = BinaryRowGenerator::GenerateStats( + {std::string("Paul"), 20, 1, 0}, {std::string("Paul"), 20, 1, 0}, {0, 0, 0, 1}, pool.get()); + simple_stats.min_values_.SetNullAt(3); + simple_stats.max_values_.SetNullAt(3); + auto file_meta2 = std::make_shared( + "data-7b3f4cc7-116b-4d2f-9c62-5dadc1f11bcb-0.orc", /*file_size=*/506, /*row_count=*/1, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), simple_stats, + /*min_sequence_number=*/1, /*max_sequence_number=*/1, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1724091050445ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + DataIncrement data_increment2({file_meta2}, {}, {}); + expected_msgs.emplace_back(BinaryRowGenerator::GenerateRow({20}, pool.get()), /*bucket=*/0, + /*total_bucket=*/std::nullopt, data_increment2, + CompactIncrement({}, {}, {})); + + // check result + ASSERT_EQ(res_msgs, expected_msgs); + + ASSERT_OK_AND_ASSIGN([[maybe_unused]] std::string serialize_ret, + CommitMessage::SerializeList(ret, pool)); +} + +TEST(CommitMessageTest, TestCompatibleWith09JavaPaimon3) { + // test with commit message version 3 + int32_t version = 3; + std::string data_path = paimon::test::GetDataDir() + + "/orc/append_09.db/append_09/commit_messages/commit_messages-03"; + auto file_system = std::make_shared(); + auto buffer_length = file_system->GetFileStatus(data_path).value()->GetLen(); + std::vector buffer(buffer_length, 0); + + ASSERT_OK_AND_ASSIGN(auto in_stream, file_system->Open(data_path)); + ASSERT_OK(in_stream->Read(reinterpret_cast(buffer.data()), buffer.size())); + ASSERT_OK(in_stream->Close()); + + auto pool = GetDefaultPool(); + ASSERT_OK_AND_ASSIGN(std::vector> ret, + CommitMessage::DeserializeList( + version, reinterpret_cast(buffer.data()), buffer.size(), pool)); + std::vector res_msgs; + for (const auto& msg : ret) { + auto msg_impl = std::dynamic_pointer_cast(msg); + res_msgs.emplace_back(*msg_impl); + } + + std::vector expected_msgs; + auto file_meta1 = std::make_shared( + "data-2e26b69c-b24a-4760-9654-05b315d7b57f-0.orc", /*file_size=*/541, /*row_count=*/1, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats({std::string("David"), 10, 0, 17.1}, + {std::string("David"), 10, 0, 17.1}, {0, 0, 0, 0}, + pool.get()), + /*min_sequence_number=*/6, /*max_sequence_number=*/6, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1724091126209ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + DataIncrement data_increment1({file_meta1}, {}, {}); + expected_msgs.emplace_back(BinaryRowGenerator::GenerateRow({10}, pool.get()), /*bucket=*/1, + /*total_bucket=*/std::nullopt, data_increment1, + CompactIncrement({}, {}, {})); + + // check result + ASSERT_EQ(res_msgs, expected_msgs); + + ASSERT_OK_AND_ASSIGN([[maybe_unused]] std::string serialize_ret, + CommitMessage::SerializeList(ret, pool)); +} + +TEST(CommitMessageTest, TestPkTableCompatibleWithJavaPaimon09) { + // test with commit message version 3 + int32_t version = 3; + std::string data_path = + paimon::test::GetDataDir() + + "/orc/pk_09_with_dv.db/pk_09_with_dv/commit_messages/commit_messages-01"; + auto file_system = std::make_shared(); + auto buffer_length = file_system->GetFileStatus(data_path).value()->GetLen(); + std::vector buffer(buffer_length, 0); + + ASSERT_OK_AND_ASSIGN(auto in_stream, file_system->Open(data_path)); + ASSERT_OK(in_stream->Read(reinterpret_cast(buffer.data()), buffer.size())); + ASSERT_OK(in_stream->Close()); + + auto pool = GetDefaultPool(); + ASSERT_OK_AND_ASSIGN(std::vector> ret, + CommitMessage::DeserializeList( + version, reinterpret_cast(buffer.data()), buffer.size(), pool)); + std::vector res_msgs; + for (const auto& msg : ret) { + auto msg_impl = std::dynamic_pointer_cast(msg); + res_msgs.emplace_back(*msg_impl); + } + std::vector expected_msgs; + auto file_meta = std::make_shared( + "data-a7615d0f-aa7f-4523-a3a0-4d9000ceec8c-1.orc", /*file_size=*/833, /*row_count=*/2, + /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("Bob"), 0}, pool.get()), + /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("Emily"), 0}, pool.get()), + /*key_stats=*/ + BinaryRowGenerator::GenerateStats({std::string("Bob"), 0}, {std::string("Emily"), 0}, + {0, 0}, pool.get()), + BinaryRowGenerator::GenerateStats({std::string("Bob"), 10, 0, 12.0}, + {std::string("Emily"), 10, 0, 113.1}, {0, 0, 0, 0}, + pool.get()), + /*min_sequence_number=*/4, /*max_sequence_number=*/5, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1747670216104ll, 0), + /*delete_row_count=*/1, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + auto file_meta_with_level = std::make_shared( + "data-2eb2a766-97e4-4fe4-88ce-eb606675c101-0.orc", /*file_size=*/789, /*row_count=*/1, + /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("Bob"), 0}, pool.get()), + /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("Bob"), 0}, pool.get()), + /*key_stats=*/ + BinaryRowGenerator::GenerateStats({std::string("Bob"), 0}, {std::string("Bob"), 0}, {0, 0}, + pool.get()), + BinaryRowGenerator::GenerateStats({std::string("Bob"), 10, 0, 113.1}, + {std::string("Bob"), 10, 0, 113.1}, {0, 0, 0, 0}, + pool.get()), + /*min_sequence_number=*/5, /*max_sequence_number=*/5, /*schema_id=*/0, + /*level=*/4, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1747670216172ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Compact(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + DataIncrement data_increment1({file_meta}, {}, {}, {}, {}); + + LinkedHashMap dv_ranges; + dv_ranges.insert_or_assign( + "data-a7615d0f-aa7f-4523-a3a0-4d9000ceec8c-0.orc", + DeletionVectorMeta("data-a7615d0f-aa7f-4523-a3a0-4d9000ceec8c-0.orc", /*offset=*/1, + /*length=*/24, /*cardinality=*/std::nullopt)); + auto index_file_meta = std::make_shared( + "DELETION_VECTORS", "index-e1bac517-5e97-41ed-a719-e7ee11594946-0", 33, 1, dv_ranges, + /*external_path=*/std::nullopt); + expected_msgs.emplace_back( + BinaryRowGenerator::GenerateRow({10}, pool.get()), /*bucket=*/0, + /*total_bucket=*/std::nullopt, data_increment1, + CompactIncrement({file_meta}, {file_meta_with_level}, {}, {index_file_meta}, {})); + // check result + ASSERT_EQ(res_msgs, expected_msgs); + ASSERT_OK_AND_ASSIGN([[maybe_unused]] std::string serialize_ret, + CommitMessage::SerializeList(ret, pool)); +} + +TEST(CommitMessageTest, TestInvalidMessages) { + int32_t version = 3; + std::string data_path = paimon::test::GetDataDir() + + "/orc/append_09.db/append_09/commit_messages/commit_messages-01"; + auto file_system = std::make_shared(); + auto buffer_length = file_system->GetFileStatus(data_path).value()->GetLen(); + std::vector buffer(buffer_length, 0); + + ASSERT_OK_AND_ASSIGN(auto in_stream, file_system->Open(data_path)); + ASSERT_OK(in_stream->Read(reinterpret_cast(buffer.data()), buffer.size() - 200)); + ASSERT_OK(in_stream->Close()); + + auto pool = GetDefaultPool(); + auto ret = CommitMessage::DeserializeList(version, reinterpret_cast(buffer.data()), + buffer.size() - 200, pool); + ASSERT_FALSE(ret.ok()); +} + +TEST(CommitMessageTest, TestCompatibleWithComplexDataType) { + int32_t version = 3; + std::string data_path = paimon::test::GetDataDir() + + "/orc/append_09.db/append_09/commit_messages/commit_messages_complex"; + auto file_system = std::make_shared(); + auto buffer_length = file_system->GetFileStatus(data_path).value()->GetLen(); + std::vector buffer(buffer_length, 0); + + ASSERT_OK_AND_ASSIGN(auto in_stream, file_system->Open(data_path)); + ASSERT_OK(in_stream->Read(reinterpret_cast(buffer.data()), buffer.size())); + ASSERT_OK(in_stream->Close()); + + auto pool = GetDefaultPool(); + ASSERT_OK_AND_ASSIGN(std::vector> ret, + CommitMessage::DeserializeList( + version, reinterpret_cast(buffer.data()), buffer.size(), pool)); + std::vector res_msgs; + for (const auto& msg : ret) { + auto msg_impl = std::dynamic_pointer_cast(msg); + res_msgs.emplace_back(*msg_impl); + } + + std::vector expected_msgs; + auto file_meta1 = std::make_shared( + "data-1c67085f-28bd-46ca-9fec-9626feca344c-0.orc", /*file_size=*/1155, /*row_count=*/3, + /*min_key=*/BinaryRow::EmptyRow(), /*max_key=*/BinaryRow::EmptyRow(), + /*key_stats=*/SimpleStats::EmptyStats(), + BinaryRowGenerator::GenerateStats( + {NullType(), NullType(), NullType(), TimestampType(Timestamp(0, 0), 9), 24, + Decimal(2, 2, 12)}, + {NullType(), NullType(), NullType(), TimestampType(Timestamp(123123, 123000), 9), 2456, + Decimal(2, 2, 22)}, + {0, 0, 0, 0, 0, 1}, pool.get()), + /*min_sequence_number=*/0, /*max_sequence_number=*/2, /*schema_id=*/0, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/Timestamp(1734713760605ll, 0), + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + DataIncrement data_increment1({file_meta1}, {}, {}); + expected_msgs.emplace_back(BinaryRow::EmptyRow(), /*bucket=*/0, /*total_bucket=*/std::nullopt, + data_increment1, CompactIncrement({}, {}, {})); + + // check result + ASSERT_EQ(res_msgs, expected_msgs); + ASSERT_OK_AND_ASSIGN([[maybe_unused]] std::string serialize_ret, + CommitMessage::SerializeList(ret, pool)); +} + +TEST(CommitMessageTest, TestSerialize) { + arrow::FieldVector fields = { + arrow::field("f0", arrow::boolean()), arrow::field("f1", arrow::int8()), + arrow::field("f3", arrow::int16()), arrow::field("non-partition-field", arrow::utf8())}; + + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + + std::map options = {{Options::FILE_FORMAT, "mock_format"}, + {Options::MANIFEST_FORMAT, "mock_format"}, + {Options::TARGET_FILE_SIZE, "1024"}}; + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(dir->Str(), options)); + ASSERT_OK(catalog->CreateDatabase("foo", options, /*ignore_if_exists=*/false)); + ASSERT_OK(catalog->CreateTable(Identifier("foo", "bar"), &schema, + /*partition_keys=*/{"f0", "f3"}, + /*primary_keys=*/{}, options, + /*ignore_if_exists=*/false)); + + WriteContextBuilder context_builder(PathUtil::JoinPath(dir->Str(), "foo.db/bar"), + "commit_user1"); + ASSERT_OK_AND_ASSIGN(std::unique_ptr write_context, + context_builder.AddOption(Options::FILE_FORMAT, "mock_format") + .AddOption(Options::MANIFEST_FORMAT, "mock_format") + .AddOption(Options::TARGET_FILE_SIZE, "1024") + .Finish()); + + std::string root_path = write_context->GetRootPath(); + ASSERT_OK_AND_ASSIGN(std::unique_ptr file_store_write, + FileStoreWrite::Create(std::move(write_context))); + + for (size_t i = 0; i < 10240; i++) { + auto f0_array = std::make_shared(); + auto f1_array = std::make_shared(); + auto f3_array = std::make_shared(); + auto non_partition_array = std::make_shared(); + arrow::BooleanBuilder f0_builder; + arrow::Int8Builder f1_builder; + arrow::Int16Builder f3_builder; + arrow::StringBuilder non_partition_builder; + for (size_t j = 0; j < 100; j++) { + ASSERT_TRUE(f0_builder.Append(true).ok()); + ASSERT_TRUE(f1_builder.Append(j).ok()); + ASSERT_TRUE(f3_builder.Append(1).ok()); + ASSERT_TRUE(non_partition_builder.Append(std::to_string(j)).ok()); + } + ASSERT_TRUE(f0_builder.Finish(&f0_array).ok()); + ASSERT_TRUE(f1_builder.Finish(&f1_array).ok()); + ASSERT_TRUE(f3_builder.Finish(&f3_array).ok()); + ASSERT_TRUE(non_partition_builder.Finish(&non_partition_array).ok()); + auto struct_array = + arrow::StructArray::Make({f0_array, f1_array, f3_array, non_partition_array}, fields) + .ValueOrDie(); + ::ArrowArray arrow_array; + ASSERT_TRUE(arrow::ExportArray(*struct_array, &arrow_array).ok()); + RecordBatchBuilder batch_builder(&arrow_array); + ASSERT_OK_AND_ASSIGN(std::unique_ptr batch, + batch_builder.SetPartition({{"f0", "true"}, {"f3", "1"}}).Finish()); + ASSERT_OK(file_store_write->Write(std::move(batch))); + } + + ASSERT_OK_AND_ASSIGN(std::vector> results, + file_store_write->PrepareCommit(/*wait_compaction=*/false, 0)); + ASSERT_EQ(results.size(), 1); + std::shared_ptr commit_message = results[0]; + ASSERT_OK_AND_ASSIGN(std::string serialized_commit_message, + CommitMessage::Serialize(commit_message, GetDefaultPool())); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr deserialize_commit_message, + CommitMessage::Deserialize(7, serialized_commit_message.c_str(), + serialized_commit_message.size(), GetDefaultPool())); + ASSERT_OK_AND_ASSIGN(std::string deserialized_commit_message_str, + CommitMessage::ToDebugString(deserialize_commit_message)); + ASSERT_OK_AND_ASSIGN(std::string commit_message_str, + CommitMessage::ToDebugString(commit_message)); + ASSERT_EQ(deserialized_commit_message_str, commit_message_str); + ASSERT_OK_AND_ASSIGN(std::string reserialized_commit_message, + CommitMessage::Serialize(deserialize_commit_message, GetDefaultPool())); + ASSERT_EQ(serialized_commit_message, reserialized_commit_message); + + auto fs = std::make_shared(); + std::vector> status_list; + ASSERT_OK(fs->ListDir(root_path + "/f0=true/f3=1/bucket-0/", &status_list)); + int32_t file_nums = 0; + for (const auto& file_status : status_list) { + if (!file_status->IsDir()) { + file_nums++; + } + } + ASSERT_EQ(file_nums, 10); + ASSERT_OK(fs->Delete(root_path)); +} + +} // namespace paimon::test diff --git a/src/paimon/core/table/system/options_system_table.cpp b/src/paimon/core/table/system/options_system_table.cpp new file mode 100644 index 0000000..3d9c685 --- /dev/null +++ b/src/paimon/core/table/system/options_system_table.cpp @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/table/system/options_system_table.h" + +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/c/bridge.h" +#include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/table/system/system_table_scan.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/read_context.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "paimon/table/source/table_read.h" + +namespace paimon { +namespace { + +std::shared_ptr OptionsSchema() { + return arrow::schema({arrow::field("key", arrow::utf8(), /*nullable=*/false), + arrow::field("value", arrow::utf8(), /*nullable=*/false)}); +} + +class OptionsBatchReader : public BatchReader { + public: + OptionsBatchReader(std::map options, + const std::shared_ptr& pool) + : arrow_pool_(GetArrowPool(pool)), options_(std::move(options)) {} + + Result NextBatch() override { + if (emitted_) { + return BatchReader::MakeEofBatch(); + } + emitted_ = true; + + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::unique_ptr key_array_builder, + arrow::MakeBuilder(arrow::utf8(), arrow_pool_.get())); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::unique_ptr value_array_builder, + arrow::MakeBuilder(arrow::utf8(), arrow_pool_.get())); + auto* key_builder = dynamic_cast(key_array_builder.get()); + auto* value_builder = dynamic_cast(value_array_builder.get()); + if (key_builder == nullptr || value_builder == nullptr) { + return Status::Invalid("cannot create string builders for options system table"); + } + for (const auto& [key, value] : options_) { + PAIMON_RETURN_NOT_OK_FROM_ARROW(key_builder->Append(key)); + PAIMON_RETURN_NOT_OK_FROM_ARROW(value_builder->Append(value)); + } + std::shared_ptr key_array; + std::shared_ptr value_array; + PAIMON_RETURN_NOT_OK_FROM_ARROW(key_builder->Finish(&key_array)); + PAIMON_RETURN_NOT_OK_FROM_ARROW(value_builder->Finish(&value_array)); + auto struct_array = std::make_shared( + arrow::struct_(OptionsSchema()->fields()), key_array->length(), + std::vector>{key_array, value_array}); + + auto c_array = std::make_unique<::ArrowArray>(); + auto c_schema = std::make_unique<::ArrowSchema>(); + PAIMON_RETURN_NOT_OK_FROM_ARROW( + arrow::ExportArray(*struct_array, c_array.get(), c_schema.get())); + return std::make_pair(std::move(c_array), std::move(c_schema)); + } + + std::shared_ptr GetReaderMetrics() const override { + return std::make_shared(); + } + + void Close() override { + emitted_ = true; + } + + private: + std::unique_ptr arrow_pool_; + std::map options_; + bool emitted_ = false; +}; + +class OptionsTableRead : public TableRead { + public: + OptionsTableRead(std::map options, + const std::shared_ptr& memory_pool) + : TableRead(memory_pool), options_(std::move(options)) {} + + Result> CreateReader( + const std::vector>& splits) override { + if (splits.size() != 1) { + return Status::Invalid("options system table expects a single split"); + } + for (const auto& split : splits) { + if (!std::dynamic_pointer_cast(split)) { + return Status::Invalid("unsupported split for options system table"); + } + } + return std::make_unique(options_, GetMemoryPool()); + } + + Result> CreateReader( + const std::shared_ptr& split) override { + std::vector> splits = {split}; + return CreateReader(splits); + } + + private: + std::map options_; +}; + +} // namespace + +OptionsSystemTable::OptionsSystemTable(std::string table_path, + std::shared_ptr table_schema) + : table_path_(std::move(table_path)), table_schema_(std::move(table_schema)) {} + +std::string OptionsSystemTable::Name() const { + return kName; +} + +Result> OptionsSystemTable::ArrowSchema() const { + return OptionsSchema(); +} + +Result> OptionsSystemTable::NewScan( + const std::shared_ptr& /*context*/) const { + return std::make_unique(table_path_); +} + +Result> OptionsSystemTable::NewRead( + const std::shared_ptr& context) const { + return std::make_unique(table_schema_->Options(), context->GetMemoryPool()); +} + +} // namespace paimon diff --git a/src/paimon/core/table/system/options_system_table.h b/src/paimon/core/table/system/options_system_table.h new file mode 100644 index 0000000..2282721 --- /dev/null +++ b/src/paimon/core/table/system/options_system_table.h @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/core/table/system/system_table.h" + +namespace paimon { +class TableSchema; + +class OptionsSystemTable : public SystemTable { + public: + static constexpr const char* kName = "options"; + + OptionsSystemTable(std::string table_path, std::shared_ptr table_schema); + + std::string Name() const override; + Result> ArrowSchema() const override; + Result> NewScan( + const std::shared_ptr& context) const override; + Result> NewRead( + const std::shared_ptr& context) const override; + + private: + std::string table_path_; + std::shared_ptr table_schema_; +}; + +} // namespace paimon diff --git a/src/paimon/core/table/system/system_table.cpp b/src/paimon/core/table/system/system_table.cpp new file mode 100644 index 0000000..676d2a0 --- /dev/null +++ b/src/paimon/core/table/system/system_table.cpp @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/table/system/system_table.h" + +#include +#include +#include +#include + +#include "paimon/catalog/identifier.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/core/schema/schema_manager.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/table/system/audit_log_system_table.h" +#include "paimon/core/table/system/binlog_system_table.h" +#include "paimon/core/table/system/options_system_table.h" +#include "paimon/core/utils/branch_manager.h" +#include "paimon/status.h" + +namespace paimon { + +bool SystemTableLoader::IsSupported(const std::string& system_table_name) { + std::string normalized_name = StringUtils::ToLowerCase(system_table_name); + return normalized_name == OptionsSystemTable::kName || + normalized_name == AuditLogSystemTable::kName || + normalized_name == BinlogSystemTable::kName; +} + +Result> SystemTableLoader::Load( + const std::string& system_table_name, const std::shared_ptr& fs, + const std::string& table_path, const std::shared_ptr& table_schema, + const std::map& dynamic_options) { + std::string normalized_name = StringUtils::ToLowerCase(system_table_name); + if (normalized_name == OptionsSystemTable::kName) { + return std::make_shared(table_path, table_schema); + } + auto options = table_schema->Options(); + for (const auto& [key, value] : dynamic_options) { + options[key] = value; + } + if (normalized_name == AuditLogSystemTable::kName) { + return std::make_shared(fs, table_path, table_schema, options); + } + if (normalized_name == BinlogSystemTable::kName) { + return std::make_shared(fs, table_path, table_schema, options); + } + return Status::NotImplemented("unsupported system table: ", system_table_name); +} + +Result> SystemTableLoader::TryParsePath(const std::string& path) { + std::string table_name = PathUtil::GetName(path); + Identifier identifier(table_name); + PAIMON_ASSIGN_OR_RAISE(bool is_system_table, identifier.IsSystemTable()); + if (!is_system_table) { + return std::optional(); + } + PAIMON_ASSIGN_OR_RAISE(std::string data_table_name, identifier.GetDataTableName()); + PAIMON_ASSIGN_OR_RAISE(std::optional branch, identifier.GetBranchName()); + PAIMON_ASSIGN_OR_RAISE(std::optional system_table_name, + identifier.GetSystemTableName()); + std::string parent = PathUtil::GetParentDirPath(path); + SystemTablePath system_table_path; + system_table_path.table_path = PathUtil::JoinPath(parent, data_table_name); + system_table_path.branch = std::move(branch); + system_table_path.system_table_name = system_table_name.value(); + return std::optional(std::move(system_table_path)); +} + +Result> SystemTableLoader::LoadFromPath( + const std::shared_ptr& fs, const std::string& path, + const std::map& dynamic_options) { + PAIMON_ASSIGN_OR_RAISE(std::optional system_table_path, TryParsePath(path)); + if (!system_table_path) { + return Status::Invalid("path is not a system table path: ", path); + } + const auto& parsed = system_table_path.value(); + SchemaManager schema_manager(fs, parsed.table_path, + parsed.branch.value_or(BranchManager::DEFAULT_MAIN_BRANCH)); + PAIMON_ASSIGN_OR_RAISE(std::optional> latest_schema, + schema_manager.Latest()); + if (!latest_schema) { + return Status::NotExist("base table schema not found for system table path: ", path); + } + auto options = dynamic_options; + if (parsed.branch) { + options[Options::BRANCH] = parsed.branch.value(); + } + return Load(parsed.system_table_name, fs, parsed.table_path, latest_schema.value(), options); +} + +} // namespace paimon diff --git a/src/paimon/core/table/system/system_table.h b/src/paimon/core/table/system/system_table.h new file mode 100644 index 0000000..16e3878 --- /dev/null +++ b/src/paimon/core/table/system/system_table.h @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "arrow/api.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "paimon/type_fwd.h" + +namespace paimon { +class FileSystem; +class ReadContext; +class ScanContext; +class TableScan; +class TableRead; +class TableSchema; + +struct SystemTablePath { + std::string table_path; + std::optional branch; + std::string system_table_name; +}; + +class SystemTable : public std::enable_shared_from_this { + public: + virtual ~SystemTable() = default; + + virtual std::string Name() const = 0; + virtual Result> ArrowSchema() const = 0; + virtual Result> NewScan( + const std::shared_ptr& context) const = 0; + virtual Result> NewRead( + const std::shared_ptr& context) const = 0; +}; + +class SystemTableLoader { + public: + static bool IsSupported(const std::string& system_table_name); + + static Result> Load( + const std::string& system_table_name, const std::shared_ptr& fs, + const std::string& table_path, const std::shared_ptr& table_schema, + const std::map& dynamic_options); + + static Result> TryParsePath(const std::string& path); + + static Result> LoadFromPath( + const std::shared_ptr& fs, const std::string& path, + const std::map& dynamic_options); +}; + +} // namespace paimon diff --git a/src/paimon/core/table/system/system_table_scan.cpp b/src/paimon/core/table/system/system_table_scan.cpp new file mode 100644 index 0000000..1631ac4 --- /dev/null +++ b/src/paimon/core/table/system/system_table_scan.cpp @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/table/system/system_table_scan.h" + +#include +#include + +#include "paimon/core/table/source/plan_impl.h" + +namespace paimon { + +SystemTableScan::SystemTableScan(const std::string& table_path) : table_path_(table_path) {} + +Result> SystemTableScan::CreatePlan() { + std::vector> splits = {std::make_shared(table_path_)}; + return std::make_shared(std::nullopt, splits); +} + +} // namespace paimon diff --git a/src/paimon/core/table/system/system_table_scan.h b/src/paimon/core/table/system/system_table_scan.h new file mode 100644 index 0000000..33201d8 --- /dev/null +++ b/src/paimon/core/table/system/system_table_scan.h @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/table/source/table_scan.h" + +namespace paimon { +class Plan; +class Split; + +class SystemTableSplit : public Split { + public: + explicit SystemTableSplit(const std::string& table_path) : table_path_(table_path) {} + + const std::string& TablePath() const { + return table_path_; + } + + private: + std::string table_path_; +}; + +class SystemTableScan : public TableScan { + public: + explicit SystemTableScan(const std::string& table_path); + + Result> CreatePlan() override; + + private: + std::string table_path_; +}; + +} // namespace paimon diff --git a/src/paimon/core/table/system/system_table_schema.cpp b/src/paimon/core/table/system/system_table_schema.cpp new file mode 100644 index 0000000..62c823f --- /dev/null +++ b/src/paimon/core/table/system/system_table_schema.cpp @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "paimon/core/table/system/system_table_schema.h" + +#include +#include +#include +#include + +#include "arrow/c/bridge.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/field_type_utils.h" +#include "paimon/status.h" + +namespace paimon { + +SystemTableSchema::SystemTableSchema(std::shared_ptr schema) + : schema_(std::move(schema)) { + field_names_ = schema_->field_names(); +} + +Result> SystemTableSchema::GetArrowSchema() const { + auto c_schema = std::make_unique<::ArrowSchema>(); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*schema_, c_schema.get())); + return c_schema; +} + +Result SystemTableSchema::GetJsonSchema() const { + return Status::NotImplemented("system table JSON schema is not supported"); +} + +std::vector SystemTableSchema::FieldNames() const { + return field_names_; +} + +Result SystemTableSchema::GetFieldType(const std::string& field_name) const { + auto field = schema_->GetFieldByName(field_name); + if (!field) { + return Status::NotExist("field ", field_name, " not exist in system table schema"); + } + return FieldTypeUtils::ConvertToFieldType(field->type()->id()); +} + +std::optional SystemTableSchema::Comment() const { + return std::nullopt; +} + +} // namespace paimon diff --git a/src/paimon/core/table/system/system_table_schema.h b/src/paimon/core/table/system/system_table_schema.h new file mode 100644 index 0000000..e735d0a --- /dev/null +++ b/src/paimon/core/table/system/system_table_schema.h @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "paimon/result.h" +#include "paimon/schema/schema.h" + +namespace paimon { + +class SystemTableSchema : public SystemSchema { + public: + explicit SystemTableSchema(std::shared_ptr schema); + + Result> GetArrowSchema() const override; + Result GetJsonSchema() const override; + std::vector FieldNames() const override; + Result GetFieldType(const std::string& field_name) const override; + std::optional Comment() const override; + + private: + std::shared_ptr schema_; + std::vector field_names_; +}; + +} // namespace paimon