diff --git a/src/paimon/core/compact/cancellation_controller.h b/src/paimon/core/compact/cancellation_controller.h new file mode 100644 index 0000000..cf4d2b9 --- /dev/null +++ b/src/paimon/core/compact/cancellation_controller.h @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace paimon { + +class CancellationController { + public: + void Cancel() { + cancelled_.store(true, std::memory_order_relaxed); + } + + void Reset() { + cancelled_.store(false, std::memory_order_relaxed); + } + + bool IsCancelled() const { + return cancelled_.load(std::memory_order_relaxed); + } + + private: + std::atomic_bool cancelled_{false}; +}; + +} // namespace paimon diff --git a/src/paimon/core/compact/compact_deletion_file.h b/src/paimon/core/compact/compact_deletion_file.h new file mode 100644 index 0000000..70d2372 --- /dev/null +++ b/src/paimon/core/compact/compact_deletion_file.h @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/core/deletionvectors/bucketed_dv_maintainer.h" +#include "paimon/core/deletionvectors/deletion_vectors_index_file.h" +#include "paimon/core/index/index_file_meta.h" +#include "paimon/result.h" + +namespace paimon { + +/// Deletion File from compaction. +class CompactDeletionFile { + public: + virtual ~CompactDeletionFile() = default; + + /// Used by async compaction, when compaction task is completed, deletions file will be + /// generated immediately, so when updateCompactResult, we need to merge old deletion files + /// (just delete them). + static Result> GenerateFiles( + const std::shared_ptr& maintainer); + + /// For sync compaction, only create deletion files when prepare commit. + static Result> LazyGeneration( + const std::shared_ptr& maintainer); + + virtual Result>> GetOrCompute() = 0; + + virtual Result> MergeOldFile( + const std::shared_ptr& old) = 0; + + virtual void Clean() = 0; +}; + +/// A generated files implementation of `CompactDeletionFile`. +class GeneratedDeletionFile : public CompactDeletionFile, + public std::enable_shared_from_this { + public: + GeneratedDeletionFile(const std::shared_ptr& deletion_file, + const std::shared_ptr& dv_index_file) + : deletion_file_(deletion_file), dv_index_file_(dv_index_file) {} + + Result>> GetOrCompute() override { + get_invoked_ = true; + return deletion_file_ ? std::optional>(deletion_file_) + : std::nullopt; + } + + Result> MergeOldFile( + const std::shared_ptr& old) override { + auto derived = dynamic_cast(old.get()); + if (derived == nullptr) { + return Status::Invalid("old should be a GeneratedDeletionFile, but it is not"); + } + if (derived->get_invoked_) { + return Status::Invalid("old should not be get, this is a bug."); + } + if (deletion_file_ == nullptr) { + return old; + } + old->Clean(); + return shared_from_this(); + } + + void Clean() override { + if (deletion_file_ != nullptr) { + dv_index_file_->Delete(deletion_file_); + } + } + + private: + std::shared_ptr deletion_file_; + std::shared_ptr dv_index_file_; + bool get_invoked_ = false; +}; + +/// A lazy generation implementation of `CompactDeletionFile`. +class LazyCompactDeletionFile : public CompactDeletionFile, + public std::enable_shared_from_this { + public: + explicit LazyCompactDeletionFile(const std::shared_ptr& maintainer) + : maintainer_(maintainer) {} + + Result>> GetOrCompute() override { + generated_ = true; + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr generated, + CompactDeletionFile::GenerateFiles(maintainer_)); + return generated->GetOrCompute(); + } + + Result> MergeOldFile( + const std::shared_ptr& old) override { + auto derived = dynamic_cast(old.get()); + if (derived == nullptr) { + return Status::Invalid("old should be a LazyCompactDeletionFile, but it is not"); + } + if (derived->generated_) { + return Status::Invalid("old should not be generated, this is a bug."); + } + return shared_from_this(); + } + + void Clean() override {} + + private: + std::shared_ptr maintainer_; + bool generated_ = false; +}; + +inline Result> CompactDeletionFile::GenerateFiles( + const std::shared_ptr& maintainer) { + PAIMON_ASSIGN_OR_RAISE(std::optional> file, + maintainer->WriteDeletionVectorsIndex()); + return std::make_shared(file.value_or(nullptr), + maintainer->DvIndexFile()); +} + +inline Result> CompactDeletionFile::LazyGeneration( + const std::shared_ptr& maintainer) { + return std::make_shared(maintainer); +} + +} // namespace paimon diff --git a/src/paimon/core/compact/compact_deletion_file_test.cpp b/src/paimon/core/compact/compact_deletion_file_test.cpp new file mode 100644 index 0000000..826444f --- /dev/null +++ b/src/paimon/core/compact/compact_deletion_file_test.cpp @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/compact/compact_deletion_file.h" + +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/core/deletionvectors/bitmap_deletion_vector.h" +#include "paimon/fs/file_system_factory.h" +#include "paimon/testing/mock/mock_index_path_factory.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +namespace { + +std::shared_ptr CreateMaintainer( + const std::shared_ptr& fs, + const std::shared_ptr& path_factory, + const std::shared_ptr& pool, + const std::map>& deletion_vectors) { + auto dv_index_file = + std::make_shared(fs, path_factory, /*bitmap64=*/false, pool); + return std::make_shared(dv_index_file, deletion_vectors); +} + +class TestGeneratedDeletionFile : public GeneratedDeletionFile { + public: + using GeneratedDeletionFile::GeneratedDeletionFile; + + void Clean() override { + cleaned_ = true; + } + + bool Cleaned() const { + return cleaned_; + } + + private: + bool cleaned_ = false; +}; + +class NonGeneratedCompactDeletionFile : public CompactDeletionFile { + public: + Result>> GetOrCompute() override { + return std::optional>(); + } + + Result> MergeOldFile( + const std::shared_ptr&) override { + return Status::Invalid("not used in this test"); + } + + void Clean() override {} +}; + +} // namespace + +TEST(CompactDeletionFileTest, GenerateFilesShouldReturnFileWhenModified) { + auto dir = UniqueTestDirectory::Create(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr fs, + FileSystemFactory::Get("local", dir->Str(), {})); + auto path_factory = std::make_shared(dir->Str()); + auto pool = GetDefaultPool(); + + RoaringBitmap32 roaring; + roaring.Add(1); + std::map> deletion_vectors; + deletion_vectors["data-a"] = std::make_shared(roaring); + + auto maintainer = CreateMaintainer(fs, path_factory, pool, deletion_vectors); + maintainer->RemoveDeletionVectorOf("data-a"); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr generated, + CompactDeletionFile::GenerateFiles(maintainer)); + ASSERT_OK_AND_ASSIGN(auto file, generated->GetOrCompute()); + ASSERT_TRUE(file.has_value()); + ASSERT_NE(file.value(), nullptr); + ASSERT_EQ(file.value()->IndexType(), DeletionVectorsIndexFile::DELETION_VECTORS_INDEX); +} + +TEST(CompactDeletionFileTest, GenerateFilesShouldReturnNulloptWhenNotModified) { + auto dir = UniqueTestDirectory::Create(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr fs, + FileSystemFactory::Get("local", dir->Str(), {})); + auto path_factory = std::make_shared(dir->Str()); + auto pool = GetDefaultPool(); + + std::map> deletion_vectors; + auto maintainer = CreateMaintainer(fs, path_factory, pool, deletion_vectors); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr generated, + CompactDeletionFile::GenerateFiles(maintainer)); + ASSERT_OK_AND_ASSIGN(auto file, generated->GetOrCompute()); + ASSERT_FALSE(file.has_value()); +} + +TEST(CompactDeletionFileTest, MergeOldFileShouldRejectNonGeneratedType) { + auto dir = UniqueTestDirectory::Create(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr fs, + FileSystemFactory::Get("local", dir->Str(), {})); + auto path_factory = std::make_shared(dir->Str()); + auto pool = GetDefaultPool(); + + auto dv_index_file = + std::make_shared(fs, path_factory, /*bitmap64=*/false, pool); + auto meta = std::make_shared( + DeletionVectorsIndexFile::DELETION_VECTORS_INDEX, "index-0", 1, 0, + LinkedHashMap(), std::nullopt); + + auto current = std::make_shared(meta, dv_index_file); + auto old = std::make_shared(); + + ASSERT_NOK_WITH_MSG(current->MergeOldFile(old), "old should be a GeneratedDeletionFile"); +} + +TEST(CompactDeletionFileTest, MergeOldFileShouldRejectInvokedOldFile) { + auto dir = UniqueTestDirectory::Create(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr fs, + FileSystemFactory::Get("local", dir->Str(), {})); + auto path_factory = std::make_shared(dir->Str()); + auto pool = GetDefaultPool(); + + auto dv_index_file = + std::make_shared(fs, path_factory, /*bitmap64=*/false, pool); + auto current_meta = std::make_shared( + DeletionVectorsIndexFile::DELETION_VECTORS_INDEX, "index-current", 1, 0, + LinkedHashMap(), std::nullopt); + auto old_meta = std::make_shared( + DeletionVectorsIndexFile::DELETION_VECTORS_INDEX, "index-old", 1, 0, + LinkedHashMap(), std::nullopt); + + auto current = std::make_shared(current_meta, dv_index_file); + auto old = std::make_shared(old_meta, dv_index_file); + ASSERT_OK_AND_ASSIGN(auto old_file, old->GetOrCompute()); + ASSERT_TRUE(old_file.has_value()); + + ASSERT_NOK_WITH_MSG(current->MergeOldFile(old), "old should not be get"); +} + +TEST(CompactDeletionFileTest, MergeOldFileShouldReturnOldWhenCurrentIsNull) { + auto dir = UniqueTestDirectory::Create(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr fs, + FileSystemFactory::Get("local", dir->Str(), {})); + auto path_factory = std::make_shared(dir->Str()); + auto pool = GetDefaultPool(); + + auto dv_index_file = + std::make_shared(fs, path_factory, /*bitmap64=*/false, pool); + auto old_meta = std::make_shared( + DeletionVectorsIndexFile::DELETION_VECTORS_INDEX, "index-old", 1, 0, + LinkedHashMap(), std::nullopt); + + auto current = std::make_shared(nullptr, dv_index_file); + auto old = std::make_shared(old_meta, dv_index_file); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr merged, current->MergeOldFile(old)); + ASSERT_EQ(merged.get(), old.get()); + ASSERT_FALSE(old->Cleaned()); +} + +TEST(CompactDeletionFileTest, MergeOldFileShouldCleanOldAndReturnCurrent) { + auto dir = UniqueTestDirectory::Create(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr fs, + FileSystemFactory::Get("local", dir->Str(), {})); + auto path_factory = std::make_shared(dir->Str()); + auto pool = GetDefaultPool(); + + auto dv_index_file = + std::make_shared(fs, path_factory, /*bitmap64=*/false, pool); + auto current_meta = std::make_shared( + DeletionVectorsIndexFile::DELETION_VECTORS_INDEX, "index-current", 1, 0, + LinkedHashMap(), std::nullopt); + auto old_meta = std::make_shared( + DeletionVectorsIndexFile::DELETION_VECTORS_INDEX, "index-old", 1, 0, + LinkedHashMap(), std::nullopt); + + auto current = std::make_shared(current_meta, dv_index_file); + auto old = std::make_shared(old_meta, dv_index_file); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr merged, current->MergeOldFile(old)); + ASSERT_EQ(merged.get(), current.get()); + ASSERT_TRUE(old->Cleaned()); +} + +TEST(CompactDeletionFileTest, CleanShouldDeleteIndexFile) { + auto dir = UniqueTestDirectory::Create(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr fs, + FileSystemFactory::Get("local", dir->Str(), {})); + auto path_factory = std::make_shared(dir->Str()); + auto pool = GetDefaultPool(); + + auto dv_index_file = + std::make_shared(fs, path_factory, /*bitmap64=*/false, pool); + + auto file_meta = std::make_shared( + DeletionVectorsIndexFile::DELETION_VECTORS_INDEX, "index-clean-target", 1, 0, + LinkedHashMap(), std::nullopt); + const std::string file_path = PathUtil::JoinPath(dir->Str(), file_meta->FileName()); + ASSERT_OK_AND_ASSIGN(std::shared_ptr out, + fs->Create(file_path, /*overwrite=*/true)); + ASSERT_OK(out->Close()); + + ASSERT_OK_AND_ASSIGN(bool exists_before, dv_index_file->Exists(file_meta)); + ASSERT_TRUE(exists_before); + + GeneratedDeletionFile generated(file_meta, dv_index_file); + generated.Clean(); + + ASSERT_OK_AND_ASSIGN(bool exists_after, dv_index_file->Exists(file_meta)); + ASSERT_FALSE(exists_after); +} + +TEST(CompactDeletionFileTest, LazyGenerationShouldComputeWhenInvoked) { + auto dir = UniqueTestDirectory::Create(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr fs, + FileSystemFactory::Get("local", dir->Str(), {})); + auto path_factory = std::make_shared(dir->Str()); + auto pool = GetDefaultPool(); + + RoaringBitmap32 roaring; + roaring.Add(1); + std::map> deletion_vectors; + deletion_vectors["data-a"] = std::make_shared(roaring); + + auto maintainer = CreateMaintainer(fs, path_factory, pool, deletion_vectors); + maintainer->RemoveDeletionVectorOf("data-a"); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr lazy, + CompactDeletionFile::LazyGeneration(maintainer)); + ASSERT_OK_AND_ASSIGN(auto file, lazy->GetOrCompute()); + ASSERT_TRUE(file.has_value()); + ASSERT_NE(file.value(), nullptr); + ASSERT_EQ(file.value()->IndexType(), DeletionVectorsIndexFile::DELETION_VECTORS_INDEX); +} + +TEST(CompactDeletionFileTest, LazyMergeOldFileShouldRejectNonLazyType) { + auto dir = UniqueTestDirectory::Create(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr fs, + FileSystemFactory::Get("local", dir->Str(), {})); + auto path_factory = std::make_shared(dir->Str()); + auto pool = GetDefaultPool(); + + std::map> deletion_vectors; + auto maintainer = CreateMaintainer(fs, path_factory, pool, deletion_vectors); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr lazy, + CompactDeletionFile::LazyGeneration(maintainer)); + auto old = std::make_shared(); + + ASSERT_NOK_WITH_MSG(lazy->MergeOldFile(old), "LazyCompactDeletionFile"); +} + +TEST(CompactDeletionFileTest, LazyMergeOldFileShouldRejectGeneratedOldLazy) { + auto dir = UniqueTestDirectory::Create(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr fs, + FileSystemFactory::Get("local", dir->Str(), {})); + auto path_factory = std::make_shared(dir->Str()); + auto pool = GetDefaultPool(); + + RoaringBitmap32 roaring; + roaring.Add(1); + std::map> deletion_vectors; + deletion_vectors["data-a"] = std::make_shared(roaring); + auto maintainer = CreateMaintainer(fs, path_factory, pool, deletion_vectors); + maintainer->RemoveDeletionVectorOf("data-a"); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr current, + CompactDeletionFile::LazyGeneration(maintainer)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr old, + CompactDeletionFile::LazyGeneration(maintainer)); + + ASSERT_OK_AND_ASSIGN(auto old_file, old->GetOrCompute()); + ASSERT_TRUE(old_file.has_value()); + + ASSERT_NOK_WITH_MSG(current->MergeOldFile(old), "old should not be generated"); +} + +} // namespace paimon::test diff --git a/src/paimon/core/compact/compact_future_manager.h b/src/paimon/core/compact/compact_future_manager.h new file mode 100644 index 0000000..42ba7b2 --- /dev/null +++ b/src/paimon/core/compact/compact_future_manager.h @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "paimon/core/compact/compact_manager.h" + +namespace paimon { + +class CompactFutureManager : public CompactManager { + public: + ~CompactFutureManager() override { + if (task_future_.valid()) { + task_future_.wait(); + } + } + + /// Request cancellation for future-based compaction. + /// + /// `std::future` itself cannot be cancelled. Subclasses must override this + /// method to signal their concrete cancellation controller. + void RequestCancelCompaction() override = 0; + + /// Wait for the current compaction task to exit if it is running. + /// @note This is a blocking join operation and may leave behind orphan files. + void WaitForCompactionToExit() override { + // std::future does not support cancellation natively. + // Move away the active future first, then wait for completion so callers + // can safely start a new task without reusing cancellation state. + if (task_future_.valid()) { + auto cancelled = std::move(task_future_); + cancelled.wait(); + } + } + + bool CompactNotCompleted() const override { + return task_future_.valid(); + } + + protected: + Result> ObtainCompactResult( + std::future>> task_future) { + return task_future.get(); + } + + Result>> InnerGetCompactionResult(bool blocking) { + if (!task_future_.valid()) { + return std::optional>(); + } + bool ready = blocking || + (task_future_.wait_for(std::chrono::seconds(0)) == std::future_status::ready); + if (ready) { + Result> result = + ObtainCompactResult(std::move(task_future_)); + if (result.status().ok()) { + return std::make_optional(std::move(result).value()); + } else if (result.status().IsCancelled()) { + return std::optional>(); + } else { + return result.status(); + } + } + return std::optional>(); + } + + std::future>> task_future_; +}; + +} // namespace paimon diff --git a/src/paimon/core/compact/compact_future_manager_test.cpp b/src/paimon/core/compact/compact_future_manager_test.cpp new file mode 100644 index 0000000..5938a8b --- /dev/null +++ b/src/paimon/core/compact/compact_future_manager_test.cpp @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/compact/compact_future_manager.h" + +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/core/compact/compact_result.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { +class FakeCompactFutureManager : public CompactFutureManager { + public: + Status AddNewFile(const std::shared_ptr& /*file*/) override { + return Status::OK(); + } + + std::vector> AllFiles() const override { + return {}; + } + + Status TriggerCompaction(bool /*full_compaction*/) override { + return Status::OK(); + } + + Result>> GetCompactionResult( + bool blocking) override { + return InnerGetCompactionResult(blocking); + } + + void RequestCancelCompaction() override {} + + bool ShouldWaitForLatestCompaction() const override { + return false; + } + + bool ShouldWaitForPreparingCheckpoint() const override { + return false; + } + + Status Close() override { + return Status::OK(); + } + + void SetTaskFuture(std::future>> future) { + task_future_ = std::move(future); + } +}; + +TEST(CompactFutureManagerTest, TestCancelledStatusReturnsEmptyOptional) { + FakeCompactFutureManager manager; + + std::promise>> promise; + manager.SetTaskFuture(promise.get_future()); + + // Simulate compaction returning a Cancelled status. + promise.set_value(Status::Cancelled("Compaction is cancelled")); + + ASSERT_OK_AND_ASSIGN(auto result, manager.GetCompactionResult(/*blocking=*/true)); + // The IsCancelled branch should return an empty optional (not an error). + ASSERT_FALSE(result.has_value()); + // After consuming the cancelled future, task should no longer be active. + ASSERT_FALSE(manager.CompactNotCompleted()); +} +} // namespace paimon::test diff --git a/src/paimon/core/compact/compact_manager.h b/src/paimon/core/compact/compact_manager.h new file mode 100644 index 0000000..9efe5a9 --- /dev/null +++ b/src/paimon/core/compact/compact_manager.h @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/core/compact/compact_result.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace paimon { + +class CompactManager { + public: + virtual ~CompactManager() = default; + /// Add a new file. + virtual Status AddNewFile(const std::shared_ptr& file) = 0; + + virtual std::vector> AllFiles() const = 0; + + /// Trigger a new compaction task. + /// + /// @param full_compaction if caller needs a guaranteed full compaction + virtual Status TriggerCompaction(bool full_compaction) = 0; + + /// Get compaction result. Wait finish if `blocking` is true. + virtual Result>> GetCompactionResult( + bool blocking) = 0; + + /// Request cancellation for currently running compaction task. + /// + /// This method should be non-blocking. It only signals cancellation and returns immediately. + virtual void RequestCancelCompaction() = 0; + + /// Wait until the current compaction task exits. + /// + /// This method may block until the running compaction task finishes. + virtual void WaitForCompactionToExit() = 0; + + /// Request cancellation and wait for compaction task exit. + void CancelAndWaitCompaction() { + RequestCancelCompaction(); + WaitForCompactionToExit(); + } + + /// Check if a compaction is in progress, or if a compaction result remains to be fetched, or if + /// a compaction should be triggered later. + virtual bool CompactNotCompleted() const = 0; + + virtual bool ShouldWaitForLatestCompaction() const = 0; + + virtual bool ShouldWaitForPreparingCheckpoint() const = 0; + + virtual Status Close() = 0; +}; + +} // namespace paimon diff --git a/src/paimon/core/compact/compact_result.h b/src/paimon/core/compact/compact_result.h new file mode 100644 index 0000000..6f57667 --- /dev/null +++ b/src/paimon/core/compact/compact_result.h @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/core/io/data_file_meta.h" +#include "paimon/status.h" + +namespace paimon { +class CompactDeletionFile; + +/// Result of compaction. +class CompactResult { + public: + CompactResult() = default; + CompactResult(const std::vector>& before, + const std::vector>& after) + : CompactResult(before, after, {}) {} + + CompactResult(const std::vector>& before, + const std::vector>& after, + const std::vector>& changelog) + : before_(before), after_(after), changelog_(changelog) {} + + const std::vector>& Before() const { + return before_; + } + + const std::vector>& After() const { + return after_; + } + + const std::vector>& Changelog() const { + return changelog_; + } + + std::shared_ptr DeletionFile() const { + return deletion_file_; + } + + void SetDeletionFile(const std::shared_ptr& deletion_file) { + deletion_file_ = deletion_file; + } + + Status Merge(const CompactResult& other) { + before_.insert(before_.end(), other.Before().begin(), other.Before().end()); + after_.insert(after_.end(), other.After().begin(), other.After().end()); + changelog_.insert(changelog_.end(), other.Changelog().begin(), other.Changelog().end()); + + if (deletion_file_ != nullptr || other.deletion_file_ != nullptr) { + return Status::NotImplemented( + "There is a bug, deletion file can't be set before merge."); + } + return Status::OK(); + } + + private: + std::vector> before_; + std::vector> after_; + std::vector> changelog_; + std::shared_ptr deletion_file_; +}; + +} // namespace paimon diff --git a/src/paimon/core/compact/compact_task.h b/src/paimon/core/compact/compact_task.h new file mode 100644 index 0000000..edaee9a --- /dev/null +++ b/src/paimon/core/compact/compact_task.h @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/core/compact/compact_result.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/operation/metrics/compaction_metrics.h" +#include "paimon/core/utils/duration.h" +#include "paimon/logging.h" +#include "paimon/result.h" + +namespace paimon { + +/// Base Compact task for metrics. +class CompactTask { + public: + explicit CompactTask(const std::shared_ptr& reporter) + : reporter_(reporter), logger_(Logger::GetLogger("CompactTask")) {} + + virtual ~CompactTask() = default; + + Result> Execute() { + Duration duration; + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr result, DoCompact()); + if (reporter_) { + reporter_->ReportCompactionTime(static_cast(duration.Get())); + reporter_->IncreaseCompactionsCompletedCount(); + reporter_->ReportCompactionInputSize(CollectRewriteSize(result->Before())); + reporter_->ReportCompactionOutputSize(CollectRewriteSize(result->After())); + } + PAIMON_LOG_DEBUG( + logger_, + "Done compacting %zu files to %zu files in %lldms. Rewrite input file size " + "= %lld, output file size = %lld", + result->Before().size(), result->After().size(), static_cast(duration.Get()), + CollectRewriteSize(result->Before()), CollectRewriteSize(result->After())); + return result; + } + + protected: + /// Perform compaction. + /// + /// @return `CompactResult` of compact before and compact after files. + virtual Result> DoCompact() = 0; + + private: + static int64_t CollectRewriteSize(const std::vector>& files) { + int64_t size = 0; + for (const auto& file : files) { + size += file->file_size; + } + return size; + } + + std::shared_ptr reporter_; + std::unique_ptr logger_; +}; + +} // namespace paimon diff --git a/src/paimon/core/compact/compact_unit.h b/src/paimon/core/compact/compact_unit.h new file mode 100644 index 0000000..e89b13b --- /dev/null +++ b/src/paimon/core/compact/compact_unit.h @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/mergetree/level_sorted_run.h" +namespace paimon { +/// A files unit for compaction. +struct CompactUnit { + static CompactUnit FromLevelRuns(int32_t output_level, + const std::vector& runs) { + std::vector> files; + for (const auto& run : runs) { + const auto& files_in_run = run.run.Files(); + files.insert(files.end(), files_in_run.begin(), files_in_run.end()); + } + return FromFiles(output_level, files, /*file_rewrite=*/false); + } + + static CompactUnit FromFiles(int32_t output_level, + const std::vector>& files, + bool file_rewrite) { + return CompactUnit(output_level, files, file_rewrite); + } + + CompactUnit(int32_t _output_level, const std::vector>& _files, + bool _file_rewrite) + : output_level(_output_level), files(_files), file_rewrite(_file_rewrite) {} + + int32_t output_level; + std::vector> files; + bool file_rewrite; +}; +} // namespace paimon diff --git a/src/paimon/core/compact/noop_compact_manager.h b/src/paimon/core/compact/noop_compact_manager.h new file mode 100644 index 0000000..562937c --- /dev/null +++ b/src/paimon/core/compact/noop_compact_manager.h @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "fmt/format.h" +#include "paimon/core/compact/compact_manager.h" +#include "paimon/core/compact/compact_result.h" +#include "paimon/defs.h" + +namespace paimon { + +/// A `CompactManager` which never compacts. +class NoopCompactManager : public CompactManager { + public: + NoopCompactManager() = default; + ~NoopCompactManager() override = default; + + Status AddNewFile(const std::shared_ptr& file) override { + (void)file; + return Status::OK(); + } + + std::vector> AllFiles() const override { + static std::vector> empty; + return empty; + } + + Status TriggerCompaction(bool full_compaction) override { + if (full_compaction) { + return Status::Invalid( + fmt::format("NoopCompactManager does not support user triggered compaction.\n" + "If you really need a guaranteed compaction, please set {} property of " + "this table to false.", + Options::WRITE_ONLY)); + } + return Status::OK(); + } + + /// Get compaction result. Wait finish if `blocking` is true. + Result>> GetCompactionResult( + bool blocking) override { + return std::optional>(); + } + + /// Request cancellation for currently running compaction task. + void RequestCancelCompaction() override {} + + /// Wait until the current compaction task exits. + void WaitForCompactionToExit() override {} + + /// Check if a compaction is in progress, or if a compaction result remains to be fetched, or if + /// a compaction should be triggered later. + bool CompactNotCompleted() const override { + return false; + } + + bool ShouldWaitForLatestCompaction() const override { + return false; + } + + bool ShouldWaitForPreparingCheckpoint() const override { + return false; + } + + Status Close() override { + return Status::OK(); + } +}; + +} // namespace paimon diff --git a/src/paimon/core/compact/noop_compact_manager_test.cpp b/src/paimon/core/compact/noop_compact_manager_test.cpp new file mode 100644 index 0000000..ca47dd8 --- /dev/null +++ b/src/paimon/core/compact/noop_compact_manager_test.cpp @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/compact/noop_compact_manager.h" + +#include +#include + +#include "gtest/gtest.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(NoopCompactManagerTest, ShouldIgnoreAddedFilesAndStayIdle) { + NoopCompactManager manager; + + ASSERT_OK(manager.AddNewFile(nullptr)); + ASSERT_TRUE(manager.AllFiles().empty()); + ASSERT_FALSE(manager.CompactNotCompleted()); + ASSERT_FALSE(manager.ShouldWaitForLatestCompaction()); + ASSERT_FALSE(manager.ShouldWaitForPreparingCheckpoint()); +} + +TEST(NoopCompactManagerTest, ShouldRejectUserTriggeredFullCompaction) { + NoopCompactManager manager; + + ASSERT_NOK_WITH_MSG(manager.TriggerCompaction(/*full_compaction=*/true), + "NoopCompactManager does not support user triggered compaction"); + ASSERT_NOK_WITH_MSG(manager.TriggerCompaction(/*full_compaction=*/true), Options::WRITE_ONLY); +} + +TEST(NoopCompactManagerTest, ShouldReturnEmptyCompactionResult) { + NoopCompactManager manager; + + ASSERT_OK_AND_ASSIGN(std::optional> non_blocking, + manager.GetCompactionResult(/*blocking=*/false)); + ASSERT_FALSE(non_blocking.has_value()); + + ASSERT_OK_AND_ASSIGN(std::optional> blocking, + manager.GetCompactionResult(/*blocking=*/true)); + ASSERT_FALSE(blocking.has_value()); +} + +TEST(NoopCompactManagerTest, ShouldAllowNoopLifecycleOperations) { + NoopCompactManager manager; + + ASSERT_OK(manager.TriggerCompaction(/*full_compaction=*/false)); + manager.CancelAndWaitCompaction(); + ASSERT_OK(manager.Close()); +} + +} // namespace paimon::test diff --git a/src/paimon/core/postpone/postpone_bucket_file_store_write.h b/src/paimon/core/postpone/postpone_bucket_file_store_write.h new file mode 100644 index 0000000..d1c2ef4 --- /dev/null +++ b/src/paimon/core/postpone/postpone_bucket_file_store_write.h @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "paimon/common/utils/preconditions.h" +#include "paimon/core/operation/abstract_file_store_write.h" +#include "paimon/core/operation/file_store_scan.h" +#include "paimon/core/postpone/postpone_bucket_writer.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/table/bucket_mode.h" +#include "paimon/core/utils/file_store_path_factory.h" +#include "paimon/format/file_format.h" + +namespace paimon { + +class PostponeBucketFileStoreWrite : public AbstractFileStoreWrite { + public: + static Result> Create( + const std::shared_ptr& snapshot_manager, + const std::shared_ptr& schema_manager, const std::string& commit_user, + const std::string& root_path, const std::shared_ptr& table_schema, + const std::shared_ptr& schema, + const std::shared_ptr& partition_schema, + const std::shared_ptr& io_manager, const CoreOptions& options, + bool is_streaming_mode, bool ignore_num_bucket_check, + const std::optional& write_id, + const std::map& fs_scheme_to_identifier_map, + const std::shared_ptr& executor, const std::shared_ptr& pool, + const std::shared_ptr& file_system) { + // Each writer should have its unique prefix, so files from the same writer can be consumed + // by the same compaction reader to keep the input order. + std::optional id = write_id; + if (id == std::nullopt) { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution dist(0, INT32_MAX - 1); + id = std::optional(dist(gen)); + } + + auto options_map = options.ToMap(); + options_map[Options::DATA_FILE_PREFIX] = + fmt::format("{}-u-{}-s-{}-w-", options.DataFilePrefix(), commit_user, id.value()); + PAIMON_ASSIGN_OR_RAISE( + CoreOptions new_options, + CoreOptions::FromMap(options_map, file_system, fs_scheme_to_identifier_map)); + // prepare FileStorePathFactory + PAIMON_ASSIGN_OR_RAISE(std::vector external_paths, + new_options.CreateExternalPaths()); + PAIMON_ASSIGN_OR_RAISE(std::optional global_index_external_path, + new_options.CreateGlobalIndexExternalPath()); + + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr file_store_path_factory, + FileStorePathFactory::Create(root_path, schema, table_schema->PartitionKeys(), + new_options.GetPartitionDefaultName(), + new_options.GetWriteFileFormat(/*level=*/0)->Identifier(), + new_options.DataFilePrefix(), + new_options.LegacyPartitionNameEnabled(), external_paths, + global_index_external_path, + new_options.IndexFileInDataFileDir(), pool)); + + // Ignoring previous files saves scanning time. + // For postpone bucket tables, we only append new files to bucket = -2 directories. + // Also, we don't need to know current largest sequence id, because when compacting these + // files, we will read the records file by file without merging, and then give them to + // normal bucket writers. + // Because there is no merging when reading, sequence id across files are useless. + return std::unique_ptr(new PostponeBucketFileStoreWrite( + file_store_path_factory, snapshot_manager, schema_manager, commit_user, root_path, + table_schema, schema, partition_schema, /*dv_maintainer_factory=*/nullptr, io_manager, + new_options, + /*ignore_previous_files=*/true, is_streaming_mode, ignore_num_bucket_check, executor, + pool)); + } + + private: + PostponeBucketFileStoreWrite( + const std::shared_ptr& file_store_path_factory, + const std::shared_ptr& snapshot_manager, + const std::shared_ptr& schema_manager, const std::string& commit_user, + const std::string& root_path, const std::shared_ptr& table_schema, + const std::shared_ptr& schema, + const std::shared_ptr& partition_schema, + const std::shared_ptr& dv_maintainer_factory, + const std::shared_ptr& io_manager, const CoreOptions& options, + bool ignore_previous_files, bool is_streaming_mode, bool ignore_num_bucket_check, + const std::shared_ptr& executor, const std::shared_ptr& pool) + : AbstractFileStoreWrite(file_store_path_factory, snapshot_manager, schema_manager, + commit_user, root_path, table_schema, schema, + /*write_schema=*/schema, partition_schema, dv_maintainer_factory, + io_manager, options, ignore_previous_files, is_streaming_mode, + ignore_num_bucket_check, executor, pool) {} + + Result> CreateWriter( + const BinaryRow& partition, int32_t bucket, + const std::vector>& restore_data_files, + int64_t restore_max_seq_number, + const std::shared_ptr& dv_maintainer) override { + PAIMON_RETURN_NOT_OK( + Preconditions::CheckState(bucket == BucketModeDefine::POSTPONE_BUCKET, + "bucket mode is supposed to be postpone bucket")); + PAIMON_ASSIGN_OR_RAISE(std::vector trimmed_primary_keys, + table_schema_->TrimmedPrimaryKeys()); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr data_file_path_factory, + file_store_path_factory_->CreateDataFilePathFactory(partition, bucket)); + auto writer = + std::make_shared(trimmed_primary_keys, data_file_path_factory, + table_schema_->Id(), schema_, options_, pool_); + return std::shared_ptr(writer); + } + + Result> CreateFileStoreScan( + const std::shared_ptr& filter) const override { + return Status::Invalid("do not support scan process in PostponeBucketFileStoreWrite"); + } +}; + +} // namespace paimon diff --git a/src/paimon/core/postpone/postpone_bucket_writer.cpp b/src/paimon/core/postpone/postpone_bucket_writer.cpp new file mode 100644 index 0000000..543f83d --- /dev/null +++ b/src/paimon/core/postpone/postpone_bucket_writer.cpp @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/postpone/postpone_bucket_writer.h" + +#include +#include + +#include "arrow/api.h" +#include "arrow/array/array_base.h" +#include "arrow/array/array_nested.h" +#include "arrow/array/array_primitive.h" +#include "arrow/array/util.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/c/helpers.h" +#include "arrow/scalar.h" +#include "arrow/util/checked_cast.h" +#include "fmt/format.h" +#include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/table/special_fields.h" +#include "paimon/common/types/data_field.h" +#include "paimon/common/types/row_kind.h" +#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/scope_guard.h" +#include "paimon/core/io/compact_increment.h" +#include "paimon/core/io/data_file_path_factory.h" +#include "paimon/core/io/data_increment.h" +#include "paimon/core/io/key_value_data_file_writer.h" +#include "paimon/core/io/single_file_writer.h" +#include "paimon/core/manifest/file_source.h" +#include "paimon/core/utils/commit_increment.h" +#include "paimon/format/file_format.h" +#include "paimon/format/writer_builder.h" +#include "paimon/metrics.h" + +namespace paimon { +class InternalRow; +class MemoryPool; + +PostponeBucketWriter::PostponeBucketWriter(const std::vector& trimmed_primary_keys, + const std::shared_ptr& path_factory, + int64_t schema_id, + const std::shared_ptr& value_schema, + const CoreOptions& options, + const std::shared_ptr& pool) + : pool_(pool), + arrow_pool_(GetArrowPool(pool)), + trimmed_primary_keys_(trimmed_primary_keys), + options_(options), + path_factory_(path_factory), + schema_id_(schema_id), + value_type_(arrow::struct_(value_schema->fields())), + metrics_(std::make_shared()) { + arrow::FieldVector target_fields; + target_fields.push_back( + DataField::ConvertDataFieldToArrowField(SpecialFields::SequenceNumber())); + target_fields.push_back(DataField::ConvertDataFieldToArrowField(SpecialFields::ValueKind())); + target_fields.insert(target_fields.end(), value_schema->fields().begin(), + value_schema->fields().end()); + write_schema_ = arrow::schema(target_fields); +} + +Status PostponeBucketWriter::Write(std::unique_ptr&& moved_batch) { + if (moved_batch->GetData()->length == 0) { + return Status::OK(); + } + std::unique_ptr batch = std::move(moved_batch); + // check input array + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr value_struct_array, + CheckAndCastValueArray(batch->GetData())); + + // prepare sequence number & row kind array + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr sequence_number_array, + PrepareSequenceNumberArray(value_struct_array->length())); + + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr row_kind_array, + PrepareRowKindArray(value_struct_array->length(), batch->GetRowKind())); + + // make value array and special array into a write array + arrow::ArrayVector write_fields = {sequence_number_array, row_kind_array}; + write_fields.insert(write_fields.end(), value_struct_array->fields().begin(), + value_struct_array->fields().end()); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + std::shared_ptr write_array, + arrow::StructArray::Make(write_fields, write_schema_->field_names())); + // prepare min max key + std::pair, std::shared_ptr> min_max_key; + PAIMON_ASSIGN_OR_RAISE(min_max_key, PrepareMinMaxKey(value_struct_array)); + + // construct KeyValueBatch + KeyValueBatch key_value_batch; + key_value_batch.min_sequence_number = KeyValue::UNKNOWN_SEQUENCE; + key_value_batch.max_sequence_number = KeyValue::UNKNOWN_SEQUENCE; + key_value_batch.delete_row_count = GetDeleteRowCount(batch->GetRowKind()); + key_value_batch.batch = std::make_unique(); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*write_array, key_value_batch.batch.get())); + key_value_batch.min_key = min_max_key.first; + key_value_batch.max_key = min_max_key.second; + + // write KeyValueBatch to RollingFileWriter + if (!writer_) { + writer_ = CreateRollingRowWriter(); + } + PAIMON_RETURN_NOT_OK(writer_->Write(std::move(key_value_batch))); + return Status::OK(); +} + +Result PostponeBucketWriter::PrepareCommit(bool wait_compaction) { + PAIMON_RETURN_NOT_OK(Flush()); + return DrainIncrement(); +} + +Result> PostponeBucketWriter::CheckAndCastValueArray( + ArrowArray* value_array) const { + if (ArrowArrayIsReleased(value_array)) { + return Status::Invalid("invalid batch: data is released"); + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr arrow_array, + arrow::ImportArray(value_array, value_type_)); + auto value_struct_array = + arrow::internal::checked_pointer_cast(arrow_array); + if (value_struct_array == nullptr) { + return Status::Invalid("invalid RecordBatch: cannot cast to StructArray"); + } + return value_struct_array; +} + +Result> PostponeBucketWriter::PrepareSequenceNumberArray( + int32_t value_array_length) { + if (!sequence_number_array_ || sequence_number_array_->length() < value_array_length) { + auto sequence_number_scalar = + std::make_shared(KeyValue::UNKNOWN_SEQUENCE); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + sequence_number_array_, + arrow::MakeArrayFromScalar(*sequence_number_scalar, value_array_length, + arrow_pool_.get())); + return sequence_number_array_; + } + assert(sequence_number_array_->length() >= value_array_length); + return sequence_number_array_->Slice(0, value_array_length); +} + +Result> PostponeBucketWriter::PrepareRowKindArray( + int32_t value_array_length, const std::vector& row_kind_vec) { + std::shared_ptr> row_kind_array; + if (!row_kind_array_ || row_kind_array_->length() < value_array_length) { + auto row_kind_scalar = + std::make_shared(RowKind::Insert()->ToByteValue()); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + std::shared_ptr scalar_array, + arrow::MakeArrayFromScalar(*row_kind_scalar, value_array_length, arrow_pool_.get())); + auto typed_row_kind_array = + arrow::internal::checked_pointer_cast>( + scalar_array); + assert(typed_row_kind_array); + row_kind_array_ = std::move(typed_row_kind_array); + row_kind_array = row_kind_array_; + } else { + assert(row_kind_array_->length() >= value_array_length); + row_kind_array = + arrow::internal::checked_pointer_cast>( + row_kind_array_->Slice(0, value_array_length)); + assert(row_kind_array); + } + + if (!row_kind_vec.empty()) { + if (row_kind_vec.size() != static_cast(value_array_length)) { + return Status::Invalid( + fmt::format("length of row_kind {} mismatches length of value array {}", + row_kind_vec.size(), value_array_length)); + } + memcpy(const_cast(row_kind_array->raw_values()), row_kind_vec.data(), + sizeof(int8_t) * value_array_length); + } else { + // all are INSERT + memset(const_cast(row_kind_array->raw_values()), + static_cast(RecordBatch::RowKind::INSERT), + sizeof(int8_t) * value_array_length); + } + return row_kind_array; +} + +int64_t PostponeBucketWriter::GetDeleteRowCount( + const std::vector& row_kind_vec) { + if (row_kind_vec.empty()) { + // all are INSERT + return 0; + } + int64_t delete_row_count = 0; + for (const auto& row_kind : row_kind_vec) { + if (row_kind == RecordBatch::RowKind::UPDATE_BEFORE || + row_kind == RecordBatch::RowKind::DELETE) { + delete_row_count++; + } + } + return delete_row_count; +} + +Result, std::shared_ptr>> +PostponeBucketWriter::PrepareMinMaxKey( + const std::shared_ptr& value_struct_array) const { + if (value_struct_array->length() <= 0) { + return Status::Invalid( + "in PostponeBucketWriter, for PrepareMinMaxKey, value array should not be empty"); + } + // same as java, in postpone bucket, we do not sort pk, min and max key simply use the first and + // last key + arrow::ArrayVector key_array_vec; + key_array_vec.reserve(trimmed_primary_keys_.size()); + for (const auto& pk : trimmed_primary_keys_) { + auto key_array = value_struct_array->GetFieldByName(pk); + if (key_array == nullptr) { + return Status::Invalid( + fmt::format("primary key {} not in input array in PostponeBucketWriter", pk)); + } + key_array_vec.push_back(key_array); + } + return std::make_pair( + std::make_shared(value_struct_array, key_array_vec, pool_, 0), + std::make_shared(value_struct_array, key_array_vec, pool_, + value_struct_array->length() - 1)); +} + +std::unique_ptr>> +PostponeBucketWriter::CreateRollingRowWriter() const { + auto create_file_writer = [&]() + -> Result>>> { + ::ArrowSchema arrow_schema; + ScopeGuard guard([&arrow_schema]() { ArrowSchemaRelease(&arrow_schema); }); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportSchema(*write_schema_, &arrow_schema)); + auto format = options_.GetWriteFileFormat(/*level=*/0); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr writer_builder, + format->CreateWriterBuilder(&arrow_schema, options_.GetWriteBatchSize())); + writer_builder->WithMemoryPool(pool_); + auto converter = [](KeyValueBatch key_value_batch, ArrowArray* array) -> Status { + ArrowArrayMove(key_value_batch.batch.get(), array); + return Status::OK(); + }; + auto writer = std::make_unique( + options_.GetWriteFileCompression(0), converter, schema_id_, /*level=*/0, + FileSource::Append(), trimmed_primary_keys_, /*stats_extractor=*/nullptr, write_schema_, + path_factory_->IsExternalPath(), pool_); + PAIMON_RETURN_NOT_OK( + writer->Init(options_.GetFileSystem(), path_factory_->NewPath(), writer_builder)); + return writer; + }; + return std::make_unique>>( + options_.GetTargetFileSize(/*has_primary_key=*/true), create_file_writer); +} + +Status PostponeBucketWriter::Flush() { + if (!writer_) { + return Status::OK(); + } + PAIMON_RETURN_NOT_OK(writer_->Close()); + PAIMON_ASSIGN_OR_RAISE(std::vector> flushed_files, + writer_->GetResult()); + new_files_.insert(new_files_.end(), flushed_files.begin(), flushed_files.end()); + metrics_->Merge(writer_->GetMetrics()); + writer_.reset(); + return Status::OK(); +} + +Result PostponeBucketWriter::DrainIncrement() { + DataIncrement data_increment(std::move(new_files_), /*deleted_files=*/{}, {}); + CompactIncrement compact_increment({}, {}, {}); + new_files_.clear(); + return CommitIncrement(data_increment, compact_increment, /*compact_deletion_file=*/nullptr); +} + +} // namespace paimon diff --git a/src/paimon/core/postpone/postpone_bucket_writer.h b/src/paimon/core/postpone/postpone_bucket_writer.h new file mode 100644 index 0000000..9da8f32 --- /dev/null +++ b/src/paimon/core/postpone/postpone_bucket_writer.h @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/array/array_base.h" +#include "arrow/array/array_nested.h" +#include "paimon/common/data/columnar/columnar_row.h" +#include "paimon/core/core_options.h" +#include "paimon/core/io/data_file_meta.h" +#include "paimon/core/io/data_file_path_factory.h" +#include "paimon/core/io/rolling_file_writer.h" +#include "paimon/core/key_value.h" +#include "paimon/core/utils/batch_writer.h" +#include "paimon/core/utils/commit_increment.h" +#include "paimon/record_batch.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace arrow { +template +class NumericArray; +} // namespace arrow +struct ArrowArray; + +namespace paimon { +class DataFilePathFactory; +class MemoryPool; +class Metrics; + +class PostponeBucketWriter : public BatchWriter { + public: + PostponeBucketWriter(const std::vector& trimmed_primary_keys, + const std::shared_ptr& path_factory, + int64_t schema_id, const std::shared_ptr& value_schema, + const CoreOptions& options, const std::shared_ptr& pool); + + ~PostponeBucketWriter() override { + [[maybe_unused]] auto status = DoClose(); + } + + Status Write(std::unique_ptr&& batch) override; + + Status Compact(bool full_compaction) override { + return Status::NotImplemented("not implemented"); + } + + Result CompactNotCompleted() override { + return false; + } + + Status Sync() override { + return Status::NotImplemented("not implemented"); + } + + uint64_t GetMemoryUsage() const override { + // used for spill, PostponeBucketWriter do not support spill, so return 0 to avoid + // triggering spill + return 0; + } + + Status FlushMemory() override { + return Flush(); + } + + Result PrepareCommit(bool wait_compaction) override; + + Status Close() override { + return DoClose(); + } + + std::shared_ptr GetMetrics() const override { + return metrics_; + } + + private: + Status DoClose() { + sequence_number_array_.reset(); + row_kind_array_.reset(); + if (writer_) { + writer_->Abort(); + writer_.reset(); + } + return Status::OK(); + } + + Result> CheckAndCastValueArray( + ArrowArray* value_array) const; + + Result> PrepareSequenceNumberArray(int32_t value_array_length); + + Result> PrepareRowKindArray( + int32_t value_array_length, const std::vector& row_kind_vec); + + static int64_t GetDeleteRowCount(const std::vector& row_kind_vec); + + Result, std::shared_ptr>> PrepareMinMaxKey( + const std::shared_ptr& value_struct_array) const; + + Status Flush(); + Result DrainIncrement(); + + std::unique_ptr>> + CreateRollingRowWriter() const; + + private: + std::shared_ptr pool_; + std::unique_ptr arrow_pool_; + std::vector trimmed_primary_keys_; + CoreOptions options_; + std::shared_ptr path_factory_; + int64_t schema_id_; + // write_schema = value_schema + special fields + std::shared_ptr value_type_; + std::shared_ptr write_schema_; + std::shared_ptr metrics_; + std::vector> new_files_; + std::unique_ptr>> writer_; + std::shared_ptr sequence_number_array_; + std::shared_ptr> row_kind_array_; +}; +} // namespace paimon diff --git a/src/paimon/core/postpone/postpone_bucket_writer_test.cpp b/src/paimon/core/postpone/postpone_bucket_writer_test.cpp new file mode 100644 index 0000000..4e2d211 --- /dev/null +++ b/src/paimon/core/postpone/postpone_bucket_writer_test.cpp @@ -0,0 +1,623 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/postpone/postpone_bucket_writer.h" + +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/ipc/json_simple.h" +#include "gtest/gtest.h" +#include "paimon/common/data/data_define.h" +#include "paimon/common/factories/io_hook.h" +#include "paimon/common/fs/external_path_provider.h" +#include "paimon/common/table/special_fields.h" +#include "paimon/common/types/data_field.h" +#include "paimon/common/utils/scope_guard.h" +#include "paimon/core/io/compact_increment.h" +#include "paimon/core/io/data_file_path_factory.h" +#include "paimon/core/io/data_increment.h" +#include "paimon/core/manifest/file_source.h" +#include "paimon/core/stats/simple_stats.h" +#include "paimon/core/utils/commit_increment.h" +#include "paimon/defs.h" +#include "paimon/format/file_format.h" +#include "paimon/format/file_format_factory.h" +#include "paimon/format/reader_builder.h" +#include "paimon/fs/file_system.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/metrics.h" +#include "paimon/reader/file_batch_reader.h" +#include "paimon/testing/utils/binary_row_generator.h" +#include "paimon/testing/utils/io_exception_helper.h" +#include "paimon/testing/utils/read_result_collector.h" +#include "paimon/testing/utils/testharness.h" + +namespace arrow { +class Array; +} // namespace arrow + +namespace paimon::test { +class PostponeBucketWriterTest : public ::testing::Test, + public ::testing::WithParamInterface { + public: + void SetUp() override { + pool_ = GetDefaultPool(); + file_system_ = std::make_shared(); + value_fields_ = {DataField(0, arrow::field("f0", arrow::utf8())), + DataField(1, arrow::field("f1", arrow::int32())), + DataField(2, arrow::field("f2", arrow::int32())), + DataField(3, arrow::field("f3", arrow::float64()))}; + value_schema_ = DataField::ConvertDataFieldsToArrowSchema(value_fields_); + value_type_ = DataField::ConvertDataFieldsToArrowStructType(value_fields_); + primary_keys_ = {"f0"}; + std::vector write_fields = {SpecialFields::SequenceNumber(), + SpecialFields::ValueKind()}; + write_fields.insert(write_fields.end(), value_fields_.begin(), value_fields_.end()); + write_type_ = DataField::ConvertDataFieldsToArrowStructType(write_fields); + } + void TearDown() override {} + + void WriteBatch(const std::shared_ptr& array, + const std::vector& row_kinds, + PostponeBucketWriter* writer) const { + ::ArrowArray c_array; + ASSERT_TRUE(arrow::ExportArray(*array, &c_array).ok()); + RecordBatchBuilder batch_builder(&c_array); + batch_builder.SetRowKinds(row_kinds); + ASSERT_OK_AND_ASSIGN(std::unique_ptr batch, batch_builder.Finish()); + ASSERT_OK(writer->Write(std::move(batch))); + } + + void CheckFileContent(const std::string& file_format_str, const std::string& data_file_name, + const std::shared_ptr& file_schema, + const std::shared_ptr& expected_array) const { + ASSERT_OK_AND_ASSIGN(std::shared_ptr input_stream, + file_system_->Open(data_file_name)); + ASSERT_OK_AND_ASSIGN(std::unique_ptr file_format, + FileFormatFactory::Get(file_format_str, {})); + ASSERT_OK_AND_ASSIGN(auto reader_builder, + file_format->CreateReaderBuilder(/*batch_size=*/10)); + ASSERT_OK_AND_ASSIGN(auto batch_reader, reader_builder->Build(input_stream)); + auto c_schema = std::make_unique<::ArrowSchema>(); + ASSERT_TRUE(arrow::ExportType(*file_schema, c_schema.get()).ok()); + ASSERT_OK(batch_reader->SetReadSchema(c_schema.get(), /*predicate=*/nullptr, + /*selection_bitmap=*/std::nullopt)); + ASSERT_OK_AND_ASSIGN(std::shared_ptr result_array, + ReadResultCollector::CollectResult(batch_reader.get())); + ASSERT_TRUE(expected_array->Equals(result_array)) << result_array->ToString() << "\n != \n" + << expected_array->ToString(); + } + + private: + std::shared_ptr pool_; + std::shared_ptr file_system_; + std::vector value_fields_; + std::shared_ptr value_schema_; + std::shared_ptr value_type_; + std::vector primary_keys_; + std::shared_ptr write_type_; +}; + +std::vector GetTestValuesForPostponeBucketWriterTest() { + std::vector values = {"parquet"}; +#ifdef PAIMON_ENABLE_ORC + values.emplace_back("orc"); +#endif +#ifdef PAIMON_ENABLE_AVRO + values.emplace_back("avro"); +#endif + return values; +} + +INSTANTIATE_TEST_SUITE_P(FileFormat, PostponeBucketWriterTest, + ::testing::ValuesIn(GetTestValuesForPostponeBucketWriterTest())); + +TEST_P(PostponeBucketWriterTest, TestSimple) { + auto file_format = GetParam(); + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::FILE_FORMAT, file_format}})); + + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto path_factory = std::make_shared(); + ASSERT_OK(path_factory->Init(dir->Str(), file_format, options.DataFilePrefix(), nullptr)); + std::string uuid = path_factory->uuid_; + + auto postpone_bucket_writer = std::make_shared( + primary_keys_, path_factory, /*schema_id=*/1, value_schema_, options, pool_); + + // write batch + std::shared_ptr array1 = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Lucy", 20, 1, 14.1], + ["Paul", 40, 2, null], + ["Lucy", 30, 3, 15.1], + ["Alice", 10, 0, 13.1] + ])") + .ValueOrDie(); + WriteBatch(array1, /*row_kinds=*/{}, postpone_bucket_writer.get()); + + // prepare commit + ASSERT_OK_AND_ASSIGN(CommitIncrement commit_increment, + postpone_bucket_writer->PrepareCommit(/*wait_compaction=*/false)); + ASSERT_OK(postpone_bucket_writer->Close()); + + // check data file exist and read ok + std::string expected_data_file_name = "data-" + uuid + "-0." + file_format; + std::string expected_data_file_path = dir->Str() + "/" + expected_data_file_name; + ASSERT_OK_AND_ASSIGN(std::unique_ptr data_file_status, + options.GetFileSystem()->GetFileStatus(expected_data_file_path)); + + std::shared_ptr expected_array; + auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(write_type_, {R"([ + [-1, 0, "Lucy", 20, 1, 14.1], + [-1, 0, "Paul", 40, 2, null], + [-1, 0, "Lucy", 30, 3, 15.1], + [-1, 0, "Alice", 10, 0, 13.1] + ])"}, + &expected_array); + ASSERT_TRUE(array_status.ok()); + CheckFileContent(file_format, expected_data_file_path, write_type_, expected_array); + + // check data file meta + ASSERT_TRUE(commit_increment.GetCompactIncrement().IsEmpty()); + ASSERT_EQ(1, commit_increment.GetNewFilesIncrement().NewFiles().size()); + auto expected_data_file_meta = std::make_shared( + expected_data_file_name, /*file_size=*/data_file_status->GetLen(), /*row_count=*/4, + /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("Lucy")}, pool_.get()), + /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("Alice")}, pool_.get()), + /*key_stats=*/ + BinaryRowGenerator::GenerateStats({NullType()}, {NullType()}, {-1}, pool_.get()), + /*value_stats=*/SimpleStats::EmptyStats(), + /*min_sequence_number=*/-1, /*max_sequence_number=*/-1, /*schema_id=*/1, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/commit_increment.GetNewFilesIncrement().NewFiles()[0]->creation_time, + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + DataIncrement expected_data_increment({expected_data_file_meta}, /*deleted_files=*/{}, + /*changelog_files=*/{}); + ASSERT_EQ(expected_data_increment, commit_increment.GetNewFilesIncrement()); +} + +TEST_P(PostponeBucketWriterTest, TestNestedType) { + auto file_format = GetParam(); + arrow::FieldVector fields = { + arrow::field("key", arrow::utf8()), + arrow::field("f0", arrow::list(arrow::struct_( + {field("a", arrow::int64()), field("b", arrow::boolean())}))), + arrow::field("f1", arrow::map(arrow::struct_({field("a", arrow::int64()), + field("b", arrow::boolean())}), + arrow::boolean()))}; + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::FILE_FORMAT, file_format}})); + + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto path_factory = std::make_shared(); + ASSERT_OK(path_factory->Init(dir->Str(), file_format, options.DataFilePrefix(), nullptr)); + std::string uuid = path_factory->uuid_; + + auto postpone_bucket_writer = std::make_shared( + std::vector{"key"}, path_factory, /*schema_id=*/1, arrow::schema(fields), + options, pool_); + + // write batch + auto array1 = arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ + ["Lucy", [null, [1, true], null], [[[1, true], true]]], + ["Bob", [[2, false], null], null], + ["David", [[2, false], [3, true], [4, null]], [[[1, true], true], [[5, false], null]]], + ["Alice", null, null] + ])") + .ValueOrDie(); + ASSERT_TRUE(array1); + WriteBatch(array1, /*row_kinds=*/{}, postpone_bucket_writer.get()); + + // prepare commit + ASSERT_OK_AND_ASSIGN(CommitIncrement commit_increment, + postpone_bucket_writer->PrepareCommit(/*wait_compaction=*/false)); + ASSERT_OK(postpone_bucket_writer->Close()); + + // check data file exist and read ok + std::string expected_data_file_name = "data-" + uuid + "-0." + file_format; + std::string expected_data_file_path = dir->Str() + "/" + expected_data_file_name; + ASSERT_OK_AND_ASSIGN(std::unique_ptr data_file_status, + options.GetFileSystem()->GetFileStatus(expected_data_file_path)); + + arrow::FieldVector write_fields = {arrow::field("_SEQUENCE_NUMBER", arrow::int64()), + arrow::field("_VALUE_KIND", arrow::int8())}; + write_fields.insert(write_fields.end(), fields.begin(), fields.end()); + std::shared_ptr expected_array; + auto write_type = arrow::struct_(write_fields); + auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(write_type, {R"([ + [-1, 0, "Lucy", [null, [1, true], null], [[[1, true], true]]], + [-1, 0, "Bob", [[2, false], null], null], + [-1, 0, "David", [[2, false], [3, true], [4, null]], [[[1, true], true], [[5, false], null]]], + [-1, 0, "Alice", null, null] + ])"}, + &expected_array); + ASSERT_TRUE(array_status.ok()); + CheckFileContent(file_format, expected_data_file_path, write_type, expected_array); + + // check data file meta + ASSERT_TRUE(commit_increment.GetCompactIncrement().IsEmpty()); + ASSERT_EQ(1, commit_increment.GetNewFilesIncrement().NewFiles().size()); + auto expected_data_file_meta = std::make_shared( + expected_data_file_name, /*file_size=*/data_file_status->GetLen(), /*row_count=*/4, + /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("Lucy")}, pool_.get()), + /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("Alice")}, pool_.get()), + /*key_stats=*/ + BinaryRowGenerator::GenerateStats({NullType()}, {NullType()}, {-1}, pool_.get()), + /*value_stats=*/SimpleStats::EmptyStats(), + /*min_sequence_number=*/-1, /*max_sequence_number=*/-1, /*schema_id=*/1, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/commit_increment.GetNewFilesIncrement().NewFiles()[0]->creation_time, + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + DataIncrement expected_data_increment({expected_data_file_meta}, /*deleted_files=*/{}, + /*changelog_files=*/{}); + ASSERT_EQ(expected_data_increment, commit_increment.GetNewFilesIncrement()); +} + +TEST_P(PostponeBucketWriterTest, TestWriteMultiBatch) { + auto file_format = GetParam(); + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::FILE_FORMAT, file_format}})); + + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto path_factory = std::make_shared(); + ASSERT_OK(path_factory->Init(dir->Str(), file_format, options.DataFilePrefix(), nullptr)); + std::string uuid = path_factory->uuid_; + + auto postpone_bucket_writer = std::make_shared( + primary_keys_, path_factory, /*schema_id=*/1, value_schema_, options, pool_); + + // write batch 1, batch size = 3 + std::shared_ptr array1 = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["David", 120, 11, null], + ["Bob", 140, 12, null], + ["Alex", 110, 10, null] + ])") + .ValueOrDie(); + WriteBatch(array1, /*row_kinds=*/{}, postpone_bucket_writer.get()); + + // write batch 2, batch size = 4, with retracted row + std::shared_ptr array2 = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Lucy", 20, 1, 14.1], + ["Paul", 40, 2, null], + ["Lucy", 30, 3, 15.1], + ["Alice", 10, 0, 13.1] + ])") + .ValueOrDie(); + WriteBatch(array2, /*row_kinds=*/ + {RecordBatch::RowKind::INSERT, RecordBatch::RowKind::UPDATE_BEFORE, + RecordBatch::RowKind::UPDATE_AFTER, RecordBatch::RowKind::DELETE}, + postpone_bucket_writer.get()); + + // write batch 3, batch size = 2 + std::shared_ptr array3 = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Judy", 220, 21, 24.1], + ["Tom", 210, 20, 23.1] + ])") + .ValueOrDie(); + WriteBatch(array3, /*row_kinds=*/{}, postpone_bucket_writer.get()); + + // prepare commit + ASSERT_OK_AND_ASSIGN(CommitIncrement commit_increment, + postpone_bucket_writer->PrepareCommit(/*wait_compaction=*/false)); + ASSERT_OK(postpone_bucket_writer->Close()); + + // check data file exist and read ok + std::string expected_data_file_name = "data-" + uuid + "-0." + file_format; + std::string expected_data_file_path = dir->Str() + "/" + expected_data_file_name; + ASSERT_OK_AND_ASSIGN(std::unique_ptr data_file_status, + options.GetFileSystem()->GetFileStatus(expected_data_file_path)); + + std::shared_ptr expected_array; + auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(write_type_, {R"([ + [-1, 0, "David", 120, 11, null], + [-1, 0, "Bob", 140, 12, null], + [-1, 0, "Alex", 110, 10, null], + [-1, 0, "Lucy", 20, 1, 14.1], + [-1, 1, "Paul", 40, 2, null], + [-1, 2, "Lucy", 30, 3, 15.1], + [-1, 3, "Alice", 10, 0, 13.1], + [-1, 0, "Judy", 220, 21, 24.1], + [-1, 0, "Tom", 210, 20, 23.1] + ])"}, + &expected_array); + ASSERT_TRUE(array_status.ok()); + CheckFileContent(file_format, expected_data_file_path, write_type_, expected_array); + + // check data file meta + ASSERT_TRUE(commit_increment.GetCompactIncrement().IsEmpty()); + ASSERT_EQ(1, commit_increment.GetNewFilesIncrement().NewFiles().size()); + auto expected_data_file_meta = std::make_shared( + expected_data_file_name, /*file_size=*/data_file_status->GetLen(), /*row_count=*/9, + /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("David")}, pool_.get()), + /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("Tom")}, pool_.get()), + /*key_stats=*/ + BinaryRowGenerator::GenerateStats({NullType()}, {NullType()}, {-1}, pool_.get()), + /*value_stats=*/SimpleStats::EmptyStats(), + /*min_sequence_number=*/-1, /*max_sequence_number=*/-1, /*schema_id=*/1, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/commit_increment.GetNewFilesIncrement().NewFiles()[0]->creation_time, + /*delete_row_count=*/2, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + DataIncrement expected_data_increment({expected_data_file_meta}, /*deleted_files=*/{}, + /*changelog_files=*/{}); + ASSERT_EQ(expected_data_increment, commit_increment.GetNewFilesIncrement()); +} + +TEST_P(PostponeBucketWriterTest, TestMultiplePrepareCommit) { + auto file_format = GetParam(); + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::FILE_FORMAT, file_format}, + {"orc.write.enable-metrics", "true"}})); + + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto path_factory = std::make_shared(); + ASSERT_OK(path_factory->Init(dir->Str(), file_format, options.DataFilePrefix(), nullptr)); + std::string uuid = path_factory->uuid_; + + auto postpone_bucket_writer = std::make_shared( + primary_keys_, path_factory, /*schema_id=*/1, value_schema_, options, pool_); + + // write batch 1, batch size = 3 + std::shared_ptr array1 = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["David", 120, 11, null], + ["Bob", 140, 12, null], + ["Alex", 110, 10, null] + ])") + .ValueOrDie(); + WriteBatch(array1, /*row_kinds=*/{}, postpone_bucket_writer.get()); + // prepare commit 1 + ASSERT_OK_AND_ASSIGN(CommitIncrement commit_increment1, + postpone_bucket_writer->PrepareCommit(/*wait_compaction=*/false)); + // check metrics + auto metrics = postpone_bucket_writer->GetMetrics(); + uint64_t write_io_count = 0; + if (file_format == "orc") { + ASSERT_OK_AND_ASSIGN(write_io_count, metrics->GetCounter("orc.write.io.count")); + ASSERT_GT(write_io_count, 0); + } + + // write batch 2, batch size = 2 + std::shared_ptr array2 = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Judy", 220, 21, 24.1], + ["Tom", 210, 20, 23.1] + ])") + .ValueOrDie(); + WriteBatch(array2, /*row_kinds=*/{}, postpone_bucket_writer.get()); + // prepare commit 2 + ASSERT_OK_AND_ASSIGN(CommitIncrement commit_increment2, + postpone_bucket_writer->PrepareCommit(/*wait_compaction=*/false)); + // check metrics + metrics = postpone_bucket_writer->GetMetrics(); + if (file_format == "orc") { + ASSERT_OK_AND_ASSIGN(uint64_t write_io_count2, metrics->GetCounter("orc.write.io.count")); + ASSERT_GT(write_io_count2, write_io_count); + } + + ASSERT_OK(postpone_bucket_writer->Close()); + + // check data file exist and read ok + std::string expected_data_file_name1 = "data-" + uuid + "-0." + file_format; + std::string expected_data_file_name2 = "data-" + uuid + "-1." + file_format; + + std::string expected_data_file_dir = dir->Str() + "/"; + ASSERT_OK_AND_ASSIGN( + std::unique_ptr data_file_status1, + options.GetFileSystem()->GetFileStatus(expected_data_file_dir + expected_data_file_name1)); + ASSERT_OK_AND_ASSIGN( + std::unique_ptr data_file_status2, + options.GetFileSystem()->GetFileStatus(expected_data_file_dir + expected_data_file_name2)); + + std::shared_ptr expected_array1; + auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(write_type_, {R"([ + [-1, 0, "David", 120, 11, null], + [-1, 0, "Bob", 140, 12, null], + [-1, 0, "Alex", 110, 10, null] + ])"}, + &expected_array1); + ASSERT_TRUE(array_status.ok()); + CheckFileContent(file_format, expected_data_file_dir + expected_data_file_name1, write_type_, + expected_array1); + + std::shared_ptr expected_array2; + array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(write_type_, {R"([ + [-1, 0, "Judy", 220, 21, 24.1], + [-1, 0, "Tom", 210, 20, 23.1] + ])"}, + &expected_array2); + ASSERT_TRUE(array_status.ok()); + CheckFileContent(file_format, expected_data_file_dir + expected_data_file_name2, write_type_, + expected_array2); + + // check data file meta + ASSERT_TRUE(commit_increment1.GetCompactIncrement().IsEmpty()); + ASSERT_EQ(1, commit_increment1.GetNewFilesIncrement().NewFiles().size()); + auto expected_data_file_meta1 = std::make_shared( + expected_data_file_name1, /*file_size=*/data_file_status1->GetLen(), /*row_count=*/3, + /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("David")}, pool_.get()), + /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("Alex")}, pool_.get()), + /*key_stats=*/ + BinaryRowGenerator::GenerateStats({NullType()}, {NullType()}, {-1}, pool_.get()), + /*value_stats=*/SimpleStats::EmptyStats(), + /*min_sequence_number=*/-1, /*max_sequence_number=*/-1, /*schema_id=*/1, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/commit_increment1.GetNewFilesIncrement().NewFiles()[0]->creation_time, + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + DataIncrement expected_data_increment1({expected_data_file_meta1}, /*deleted_files=*/{}, + /*changelog_files=*/{}); + ASSERT_EQ(expected_data_increment1, commit_increment1.GetNewFilesIncrement()); + + ASSERT_TRUE(commit_increment2.GetCompactIncrement().IsEmpty()); + ASSERT_EQ(1, commit_increment2.GetNewFilesIncrement().NewFiles().size()); + auto expected_data_file_meta2 = std::make_shared( + expected_data_file_name2, /*file_size=*/data_file_status2->GetLen(), /*row_count=*/2, + /*min_key=*/BinaryRowGenerator::GenerateRow({std::string("Judy")}, pool_.get()), + /*max_key=*/BinaryRowGenerator::GenerateRow({std::string("Tom")}, pool_.get()), + /*key_stats=*/ + BinaryRowGenerator::GenerateStats({NullType()}, {NullType()}, {-1}, pool_.get()), + /*value_stats=*/SimpleStats::EmptyStats(), + /*min_sequence_number=*/-1, /*max_sequence_number=*/-1, /*schema_id=*/1, + /*level=*/0, /*extra_files=*/std::vector>(), + /*creation_time=*/commit_increment2.GetNewFilesIncrement().NewFiles()[0]->creation_time, + /*delete_row_count=*/0, /*embedded_index=*/nullptr, FileSource::Append(), + /*value_stats_cols=*/std::nullopt, /*external_path=*/std::nullopt, + /*first_row_id=*/std::nullopt, + /*write_cols=*/std::nullopt); + DataIncrement expected_data_increment2({expected_data_file_meta2}, /*deleted_files=*/{}, + /*changelog_files=*/{}); + ASSERT_EQ(expected_data_increment2, commit_increment2.GetNewFilesIncrement()); +} + +TEST_P(PostponeBucketWriterTest, TestPrepareCommitForEmptyData) { + auto file_format = GetParam(); + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::FILE_FORMAT, file_format}})); + + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto path_factory = std::make_shared(); + ASSERT_OK(path_factory->Init(dir->Str(), file_format, options.DataFilePrefix(), nullptr)); + std::string uuid = path_factory->uuid_; + + auto postpone_bucket_writer = std::make_shared( + primary_keys_, path_factory, /*schema_id=*/1, value_schema_, options, pool_); + + // prepare commit, without write + ASSERT_OK_AND_ASSIGN(CommitIncrement commit_increment, + postpone_bucket_writer->PrepareCommit(/*wait_compaction=*/false)); + // check data file meta empty + ASSERT_TRUE(commit_increment.GetCompactIncrement().IsEmpty()); + ASSERT_TRUE(commit_increment.GetNewFilesIncrement().NewFiles().empty()); + + // write empty batch + std::shared_ptr array = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([])").ValueOrDie(); + WriteBatch(array, /*row_kinds=*/{}, postpone_bucket_writer.get()); + // prepare commit, without write + ASSERT_OK_AND_ASSIGN(commit_increment, + postpone_bucket_writer->PrepareCommit(/*wait_compaction=*/false)); + // check data file meta empty + ASSERT_TRUE(commit_increment.GetCompactIncrement().IsEmpty()); + ASSERT_TRUE(commit_increment.GetNewFilesIncrement().NewFiles().empty()); + + ASSERT_OK(postpone_bucket_writer->Close()); + + // check data file not exist + std::string expected_data_file_name = "data-" + uuid + "-0." + file_format; + std::string expected_data_file_path = dir->Str() + "/" + expected_data_file_name; + ASSERT_FALSE(options.GetFileSystem()->Exists(expected_data_file_path).value()); +} + +TEST_P(PostponeBucketWriterTest, TestCloseBeforePrepareCommit) { + auto file_format = GetParam(); + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::FILE_FORMAT, file_format}})); + + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto path_factory = std::make_shared(); + ASSERT_OK(path_factory->Init(dir->Str(), file_format, options.DataFilePrefix(), nullptr)); + std::string uuid = path_factory->uuid_; + + auto postpone_bucket_writer = std::make_shared( + primary_keys_, path_factory, /*schema_id=*/1, value_schema_, options, pool_); + + // write batch + std::shared_ptr array1 = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Lucy", 20, 1, 14.1], + ["Paul", 40, 2, null], + ["Lucy", 30, 3, 15.1], + ["Alice", 10, 0, 13.1] + ])") + .ValueOrDie(); + WriteBatch(array1, /*row_kinds=*/{}, postpone_bucket_writer.get()); + ASSERT_OK(postpone_bucket_writer->Close()); +} + +TEST_P(PostponeBucketWriterTest, TestIOException) { + auto file_format = GetParam(); + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::FILE_FORMAT, file_format}})); + + bool run_complete = false; + auto io_hook = IOHook::GetInstance(); + for (size_t i = 0; i < 200; i++) { + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + ScopeGuard guard([&io_hook]() { io_hook->Clear(); }); + io_hook->Reset(i, IOHook::Mode::RETURN_ERROR); + auto path_factory = std::make_shared(); + ASSERT_OK(path_factory->Init(dir->Str(), file_format, options.DataFilePrefix(), nullptr)); + std::string uuid = path_factory->uuid_; + + auto postpone_bucket_writer = std::make_shared( + primary_keys_, path_factory, /*schema_id=*/1, value_schema_, options, pool_); + + // write batch + std::shared_ptr array = + arrow::ipc::internal::json::ArrayFromJSON(value_type_, R"([ + ["Lucy", 20, 1, 14.1], + ["Paul", 40, 2, null], + ["Lucy", 30, 3, 15.1], + ["Alice", 10, 0, 13.1] + ])") + .ValueOrDie(); + ::ArrowArray c_array; + ASSERT_TRUE(arrow::ExportArray(*array, &c_array).ok()); + RecordBatchBuilder batch_builder(&c_array); + ASSERT_OK_AND_ASSIGN(std::unique_ptr batch, batch_builder.Finish()); + CHECK_HOOK_STATUS(postpone_bucket_writer->Write(std::move(batch)), i); + auto commit_increment = postpone_bucket_writer->PrepareCommit(/*wait_compaction=*/false); + CHECK_HOOK_STATUS(commit_increment.status(), i); + ASSERT_FALSE(commit_increment.value().GetNewFilesIncrement().NewFiles().empty()); + ASSERT_OK(postpone_bucket_writer->Close()); + run_complete = true; + break; + } + ASSERT_TRUE(run_complete); +} + +} // namespace paimon::test