From 918d3671f5b9cc2d73b92b4d710b641ec3884a06 Mon Sep 17 00:00:00 2001 From: "yonghao.fyh" Date: Thu, 25 Jun 2026 14:20:47 +0800 Subject: [PATCH] feat(catalog): add catalog interfaces, file system catalog and snapshot commit utilities --- include/paimon/catalog/catalog.h | 205 ++++ include/paimon/catalog/database.h | 55 + include/paimon/catalog/identifier.h | 62 + include/paimon/catalog/table.h | 79 ++ src/paimon/catalog/catalog.cpp | 40 + src/paimon/catalog/catalog_snapshot_commit.h | 50 + src/paimon/catalog/catalog_test.cpp | 44 + src/paimon/catalog/commit_table_request.h | 78 ++ .../catalog/commit_table_request_test.cpp | 106 ++ src/paimon/catalog/file_system_catalog.cpp | 559 +++++++++ src/paimon/catalog/file_system_catalog.h | 100 ++ .../catalog/file_system_catalog_test.cpp | 1072 +++++++++++++++++ src/paimon/catalog/identifier.cpp | 124 ++ src/paimon/catalog/identifier_test.cpp | 134 +++ src/paimon/catalog/renaming_snapshot_commit.h | 77 ++ .../catalog/renaming_snapshot_commit_test.cpp | 67 ++ src/paimon/catalog/snapshot_commit.h | 40 + 17 files changed, 2892 insertions(+) create mode 100644 include/paimon/catalog/catalog.h create mode 100644 include/paimon/catalog/database.h create mode 100644 include/paimon/catalog/identifier.h create mode 100644 include/paimon/catalog/table.h create mode 100644 src/paimon/catalog/catalog.cpp create mode 100644 src/paimon/catalog/catalog_snapshot_commit.h create mode 100644 src/paimon/catalog/catalog_test.cpp create mode 100644 src/paimon/catalog/commit_table_request.h create mode 100644 src/paimon/catalog/commit_table_request_test.cpp create mode 100644 src/paimon/catalog/file_system_catalog.cpp create mode 100644 src/paimon/catalog/file_system_catalog.h create mode 100644 src/paimon/catalog/file_system_catalog_test.cpp create mode 100644 src/paimon/catalog/identifier.cpp create mode 100644 src/paimon/catalog/identifier_test.cpp create mode 100644 src/paimon/catalog/renaming_snapshot_commit.h create mode 100644 src/paimon/catalog/renaming_snapshot_commit_test.cpp create mode 100644 src/paimon/catalog/snapshot_commit.h diff --git a/include/paimon/catalog/catalog.h b/include/paimon/catalog/catalog.h new file mode 100644 index 0000000..4b0fd1e --- /dev/null +++ b/include/paimon/catalog/catalog.h @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include "paimon/catalog/identifier.h" +#include "paimon/result.h" +#include "paimon/snapshot/snapshot_info.h" +#include "paimon/status.h" +#include "paimon/type_fwd.h" +#include "paimon/visibility.h" + +struct ArrowSchema; + +namespace paimon { +// Tag name or snapshot id +using Instant = std::variant; + +class Database; +class Table; +class View; +class Schema; +class Snapshot; +class PartitionStatistics; +class Tag; +class Identifier; +/// This interface is responsible for reading and writing metadata such as database/table from a +/// paimon catalog. +class PAIMON_EXPORT Catalog { + public: + static const char SYSTEM_DATABASE_NAME[]; + static const char SYSTEM_TABLE_SPLITTER[]; + static const char DB_SUFFIX[]; + static const char DB_LOCATION_PROP[]; + + /// %Factory method for creating a `Catalog` instance. + /// + /// @param root_path Path to the root directory where the catalog is located. + /// @param options Configuration options for catalog initialization. + /// @param file_system Specifies the file system for file operations. + /// If not set, use default file system (configured in + /// `Options::FILE_SYSTEM`) + /// @return A result containing a unique pointer to a `Catalog` instance, or an error status. + static Result> Create( + const std::string& root_path, const std::map& options, + const std::shared_ptr& file_system = nullptr); + + virtual ~Catalog() = default; + + /// Creates a database with the specified properties. + /// + /// @param name Name of the database to be created. + /// @param options Additional properties associated with the database. + /// @param ignore_if_exists If true, no action is taken if the database already exists. + /// If false, an error status is returned if the database exists. + /// @return A status indicating success or failure. + virtual Status CreateDatabase(const std::string& name, + const std::map& options, + bool ignore_if_exists) = 0; + + /// Creates a new table in the catalog. + /// + /// @note System tables cannot be created using this method. + /// + /// @param identifier Identifier of the table to be created. + /// @param c_schema The schema of the table to be created. + /// @param partition_keys List of columns that should be used as partition keys for the table. + /// @param primary_keys List of columns that should be used as primary keys for the table. + /// @param options Additional table-specific options. + /// @param ignore_if_exists If true, no action is taken if the table already exists. + /// If false, an error status is returned if the table exists. + /// @return A status indicating success or failure. + virtual Status CreateTable(const Identifier& identifier, ArrowSchema* c_schema, + const std::vector& partition_keys, + const std::vector& primary_keys, + const std::map& options, + bool ignore_if_exists) = 0; + + /// Lists all the databases available in the catalog. + /// + /// @return A result containing a vector of database names, or an error status. + virtual Result> ListDatabases() const = 0; + + /// Lists all the tables within a specified database. + /// + /// @note System tables will not be listed. + /// + /// @param db_name The name of the database to list tables from. + /// @return A result containing a vector of table names in the specified database, or an error + /// status. + virtual Result> ListTables(const std::string& db_name) const = 0; + + /// Drops a database. + /// + /// @param name Name of the database to be dropped. + /// @param ignore_if_not_exists If true, no action is taken if the database does not exist. + /// @param cascade If true, drops all tables and functions in the database before dropping the + /// database. + /// @return A status indicating success or failure. + virtual Status DropDatabase(const std::string& name, bool ignore_if_not_exists, + bool cascade) = 0; + + /// Drops a table. + /// + /// @param identifier Identifier of the table to drop. + /// @param ignore_if_not_exists If true, no action is taken if the table does not exist. + /// @return A status indicating success or failure. + virtual Status DropTable(const Identifier& identifier, bool ignore_if_not_exists) = 0; + + /// Renames a table. + /// + /// @param from_table Current identifier of the table. + /// @param to_table New identifier for the table. + /// @param ignore_if_not_exists If true, no action is taken if the table does not exist. + /// @return A status indicating success or failure. + virtual Status RenameTable(const Identifier& from_table, const Identifier& to_table, + bool ignore_if_not_exists) = 0; + + /// Gets a table. + /// + /// @param identifier Identifier of the table to get. + /// @return A result containing the table, or an error status. + virtual Result> GetTable(const Identifier& identifier) const = 0; + + /// Checks whether a database with the specified name exists in the catalog. + /// + /// @param db_name The name of the database to check for existence. + /// @return A result containing true if the database exists, false otherwise, or an error + /// status. + virtual Result DatabaseExists(const std::string& db_name) const = 0; + + /// Checks whether a table with the specified identifier exists in the catalog. + /// + /// @param identifier The identifier of the table to check for existence. + /// @return A result containing true if the table exists, false otherwise, or an error status. + virtual Result TableExists(const Identifier& identifier) const = 0; + + /// Returns the expected location of a specified database. + /// + /// @note This does not check whether the database actually exists. + /// + /// @param db_name The name of the database to get the location for. + /// @return A string representing the expected location of the database. + virtual std::string GetDatabaseLocation(const std::string& db_name) const = 0; + + /// Returns the expected location of a specified table. + /// + /// @note This does not check whether the table actually exists. + /// + /// @param identifier The table identifier containing database and table name. + /// @return A string representing the expected location of the table. + virtual std::string GetTableLocation(const Identifier& identifier) const = 0; + + /// Returns the root path of the catalog. + /// + /// @return A string representing the root path of the catalog. + virtual std::string GetRootPath() const = 0; + + /// Returns the file system used by the catalog. + /// + /// @return A shared pointer to the file system instance. + virtual std::shared_ptr GetFileSystem() const = 0; + + /// Loads the latest schema of a specified table. + /// + /// @note System tables will not be supported. + /// + /// @param identifier The identifier (database and table name) of the table to load. + /// @return A result containing table schema if the table exists, or an error status on failure. + virtual Result> LoadTableSchema(const Identifier& identifier) const = 0; + + /// Lists all snapshots of the specified table, ordered by snapshot id. + /// + /// @param identifier The identifier (database and table name) of the table. + /// @param branch Branch name; empty string means the main branch. + /// @return A result containing a vector of SnapshotInfo ordered by + /// snapshot id ascending, or an error status. + virtual Result> ListSnapshots( + const Identifier& identifier, const std::string& branch = "") const = 0; +}; + +} // namespace paimon diff --git a/include/paimon/catalog/database.h b/include/paimon/catalog/database.h new file mode 100644 index 0000000..4bbf529 --- /dev/null +++ b/include/paimon/catalog/database.h @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "paimon/result.h" +#include "paimon/status.h" +#include "paimon/type_fwd.h" +#include "paimon/visibility.h" + +struct ArrowSchema; + +namespace paimon { + +/// Interface of a database in a catalog. +class PAIMON_EXPORT Database { + public: + virtual ~Database() = default; + + /// ================== Database Metadata ===================== + + /// A name to identify this database. + virtual std::string Name() const = 0; + + /// Get the database-level options associated with this database. + /// @return Options + virtual const std::map& Options() const = 0; + + /// Get an optional comment describing the database. + /// @return The database comment if set, or std::nullopt otherwise. + virtual std::optional Comment() const = 0; +}; + +} // namespace paimon diff --git a/include/paimon/catalog/identifier.h b/include/paimon/catalog/identifier.h new file mode 100644 index 0000000..62d6ab8 --- /dev/null +++ b/include/paimon/catalog/identifier.h @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/result.h" +#include "paimon/type_fwd.h" +#include "paimon/visibility.h" + +namespace paimon { + +/// An identifier for a table containing database and table name. +class PAIMON_EXPORT Identifier { + public: + static const char kUnknownDatabase[]; + static const char kSystemTableSplitter[]; + static const char kSystemBranchPrefix[]; + static const char kDefaultMainBranch[]; + + explicit Identifier(const std::string& table); + Identifier(const std::string& database, const std::string& table); + + bool operator==(const Identifier& other); + const std::string& GetDatabaseName() const; + const std::string& GetTableName() const; + Result GetDataTableName() const; + Result> GetBranchName() const; + Result GetBranchNameOrDefault() const; + Result> GetSystemTableName() const; + Result IsSystemTable() const; + std::string ToString() const; + + private: + Status SplitTableName() const; + + const std::string database_; + const std::string table_; + mutable bool parsed_ = false; + mutable std::string data_table_; + mutable std::optional branch_; + mutable std::optional system_table_; +}; + +} // namespace paimon diff --git a/include/paimon/catalog/table.h b/include/paimon/catalog/table.h new file mode 100644 index 0000000..71fe2a1 --- /dev/null +++ b/include/paimon/catalog/table.h @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "paimon/catalog/identifier.h" +#include "paimon/result.h" +#include "paimon/schema/schema.h" +#include "paimon/status.h" +#include "paimon/type_fwd.h" +#include "paimon/visibility.h" + +namespace paimon { + +/// A table provides basic abstraction for table type. +class PAIMON_EXPORT Table { + public: + /// Creates a table by loading the latest schema from the specified table path. + /// + /// @param file_system File system used to access the table directory and schema files. + /// @param table_path Root path of the table in the file system. + /// @param identifier Logical table identifier used for naming and error messages. + /// @return A table initialized with the latest available schema. + static Result> Create(const std::shared_ptr& file_system, + const std::string& table_path, + const Identifier& identifier); + + Table(const std::shared_ptr& schema, const std::string& database, + const std::string& table_name) + : schema_(schema), database_(database), table_name_(table_name) {} + + ~Table() = default; + + /// A name to identify this table. + std::string Name() const { + return table_name_; + } + + /// Full name of the table, default is database.tableName. + std::string FullName() const; + + /// UUID of the table, metastore can provide the true UUID of this table, default is the full + /// name. + std::string Uuid() const { + return FullName(); + } + + /// Loads the latest schema of table. + std::shared_ptr LatestSchema() const { + return schema_; + } + + private: + std::shared_ptr schema_; + std::string database_; + std::string table_name_; +}; + +} // namespace paimon diff --git a/src/paimon/catalog/catalog.cpp b/src/paimon/catalog/catalog.cpp new file mode 100644 index 0000000..4c1b06b --- /dev/null +++ b/src/paimon/catalog/catalog.cpp @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/catalog/catalog.h" + +#include + +#include "paimon/core/catalog/file_system_catalog.h" +#include "paimon/core/core_options.h" + +namespace paimon { + +const char Catalog::SYSTEM_DATABASE_NAME[] = "sys"; +const char Catalog::SYSTEM_TABLE_SPLITTER[] = "$"; +const char Catalog::DB_SUFFIX[] = ".db"; +const char Catalog::DB_LOCATION_PROP[] = "location"; + +Result> Catalog::Create(const std::string& root_path, + const std::map& options, + const std::shared_ptr& file_system) { + PAIMON_ASSIGN_OR_RAISE(CoreOptions core_options, CoreOptions::FromMap(options, file_system)); + return std::make_unique(core_options.GetFileSystem(), root_path); +} + +} // namespace paimon diff --git a/src/paimon/catalog/catalog_snapshot_commit.h b/src/paimon/catalog/catalog_snapshot_commit.h new file mode 100644 index 0000000..27c0c88 --- /dev/null +++ b/src/paimon/catalog/catalog_snapshot_commit.h @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/core/catalog/commit_table_request.h" +#include "paimon/core/catalog/snapshot_commit.h" + +namespace paimon { + +/// A `SnapshotCommit` using `Catalog` to commit. +class CatalogSnapshotCommit : public SnapshotCommit { + public: + Result Commit(const Snapshot& snapshot, + const std::vector& statistics) override { + commit_table_request_ = CommitTableRequest(snapshot, statistics); + return true; + } + + Result GetLastCommitTableRequest() override { + if (commit_table_request_) { + return commit_table_request_.value().ToJsonString(); + } else { + return Status::Invalid("Should call Commit first before GetLastCommitTableRequest."); + } + } + + private: + std::optional commit_table_request_; +}; + +} // namespace paimon diff --git a/src/paimon/catalog/catalog_test.cpp b/src/paimon/catalog/catalog_test.cpp new file mode 100644 index 0000000..f5271e8 --- /dev/null +++ b/src/paimon/catalog/catalog_test.cpp @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/catalog/catalog.h" + +#include "gtest/gtest.h" +#include "paimon/defs.h" +#include "paimon/testing/mock/mock_file_system.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(CatalogTest, Create) { + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create("path", options)); +} + +TEST(CatalogTest, CreateWithSpecificFileSystem) { + std::map options; + const std::string path = "path"; + const auto fs = std::make_shared(); + ASSERT_OK_AND_ASSIGN(auto catalog, Catalog::Create(path, options, fs)); + ASSERT_EQ(path, catalog->GetRootPath()); + ASSERT_EQ(fs, catalog->GetFileSystem()); +} + +} // namespace paimon::test diff --git a/src/paimon/catalog/commit_table_request.h b/src/paimon/catalog/commit_table_request.h new file mode 100644 index 0000000..fa53789 --- /dev/null +++ b/src/paimon/catalog/commit_table_request.h @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/common/utils/jsonizable.h" +#include "paimon/common/utils/rapidjson_util.h" +#include "paimon/core/partition/partition_statistics.h" +#include "paimon/core/snapshot.h" +#include "rapidjson/allocators.h" +#include "rapidjson/document.h" +#include "rapidjson/rapidjson.h" + +namespace paimon { + +class CommitTableRequest : public Jsonizable { + public: + CommitTableRequest(const Snapshot& snapshot, const std::vector& statistics) + : snapshot_(snapshot), statistics_(statistics) {} + + rapidjson::Value ToJson(rapidjson::Document::AllocatorType* allocator) const + noexcept(false) override { + rapidjson::Value obj(rapidjson::kObjectType); + obj.AddMember(rapidjson::StringRef(FIELD_SNAPSHOT), + RapidJsonUtil::SerializeValue(snapshot_, allocator).Move(), *allocator); + obj.AddMember(rapidjson::StringRef(FIELD_STATISTICS), + RapidJsonUtil::SerializeValue(statistics_, allocator).Move(), *allocator); + return obj; + } + + void FromJson(const rapidjson::Value& obj) noexcept(false) override { + snapshot_ = RapidJsonUtil::DeserializeKeyValue(obj, FIELD_SNAPSHOT); + statistics_ = RapidJsonUtil::DeserializeKeyValue>( + obj, FIELD_STATISTICS); + } + + bool TEST_Equal(const CommitTableRequest& other) const { + if (this == &other) { + return true; + } + return snapshot_.TEST_Equal(other.snapshot_) && statistics_ == other.statistics_; + } + + bool operator==(const CommitTableRequest& other) const { + return snapshot_ == other.snapshot_ && statistics_ == other.statistics_; + } + + private: + JSONIZABLE_FRIEND_AND_DEFAULT_CTOR(CommitTableRequest); + + private: + static constexpr const char* FIELD_SNAPSHOT = "snapshot"; + static constexpr const char* FIELD_STATISTICS = "statistics"; + + Snapshot snapshot_; + std::vector statistics_; +}; + +} // namespace paimon diff --git a/src/paimon/catalog/commit_table_request_test.cpp b/src/paimon/catalog/commit_table_request_test.cpp new file mode 100644 index 0000000..39b9ff9 --- /dev/null +++ b/src/paimon/catalog/commit_table_request_test.cpp @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/catalog/commit_table_request.h" + +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(CommitTableRequestTest, TestSimple) { + Snapshot snapshot( + /*version=*/3, /*id=*/1, /*schema_id=*/0, + /*base_manifest_list=*/"manifest-list-3879e56f-2f27-49ae-a2f3-3dcbb8eb0beb-0", + /*base_manifest_list_size=*/291, + /*delta_manifest_list=*/"manifest-list-3879e56f-2f27-49ae-a2f3-3dcbb8eb0beb-1", + /*delta_manifest_list_size=*/1342, /*changelog_manifest_list=*/std::nullopt, + /*changelog_manifest_list_size=*/std::nullopt, /*index_manifest=*/std::nullopt, + /*commit_user=*/"commit_user_1", /*commit_identifier=*/9223372036854775807, + /*commit_kind=*/Snapshot::CommitKind::Append(), /*time_millis=*/1758097357597, + /*log_offsets=*/std::map(), /*total_record_count=*/5, + /*delta_record_count=*/5, /*changelog_record_count=*/0, /*watermark=*/std::nullopt, + /*statistics=*/std::nullopt, /*properties=*/std::nullopt, /*next_row_id=*/0); + std::vector partition_statistics = { + PartitionStatistics(/*spec=*/{{"f1", "20"}}, /*record_count=*/1, /*file_size_in_bytes=*/541, + /*file_count=*/1, /*last_file_creation_time=*/1724090888743, + /*total_buckets=*/-1), + PartitionStatistics(/*spec=*/{{"f1", "10"}}, /*record_count=*/4, + /*file_size_in_bytes=*/1118, /*file_count=*/2, + /*last_file_creation_time=*/1724090888727, /*total_buckets=*/-1)}; + std::string expected_request_str = R"({ + "snapshot": { + "version": 3, + "id": 1, + "schemaId": 0, + "baseManifestList": "manifest-list-3879e56f-2f27-49ae-a2f3-3dcbb8eb0beb-0", + "baseManifestListSize": 291, + "deltaManifestList": "manifest-list-3879e56f-2f27-49ae-a2f3-3dcbb8eb0beb-1", + "deltaManifestListSize": 1342, + "changelogManifestList": null, + "commitUser": "commit_user_1", + "commitIdentifier": 9223372036854775807, + "commitKind": "APPEND", + "timeMillis": 1758097357597, + "logOffsets": {}, + "totalRecordCount": 5, + "deltaRecordCount": 5, + "changelogRecordCount": 0, + "nextRowId": 0 + }, + "statistics": [ + { + "spec": { + "f1": "20" + }, + "recordCount": 1, + "fileSizeInBytes": 541, + "fileCount": 1, + "lastFileCreationTime": 1724090888743, + "totalBuckets": -1 + }, + { + "spec": { + "f1": "10" + }, + "recordCount": 4, + "fileSizeInBytes": 1118, + "fileCount": 2, + "lastFileCreationTime": 1724090888727, + "totalBuckets": -1 + } + ] +})"; + + CommitTableRequest request1(snapshot, partition_statistics); + ASSERT_OK_AND_ASSIGN(std::string request_str1, request1.ToJsonString()); + ASSERT_EQ(request_str1, expected_request_str); + ASSERT_OK_AND_ASSIGN(CommitTableRequest request2, + CommitTableRequest::FromJsonString(request_str1)); + ASSERT_EQ(request1, request2); + ASSERT_OK_AND_ASSIGN(std::string request_str2, request2.ToJsonString()); + ASSERT_EQ(request_str1, request_str2); +} + +} // namespace paimon::test diff --git a/src/paimon/catalog/file_system_catalog.cpp b/src/paimon/catalog/file_system_catalog.cpp new file mode 100644 index 0000000..277f0b8 --- /dev/null +++ b/src/paimon/catalog/file_system_catalog.cpp @@ -0,0 +1,559 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/catalog/file_system_catalog.h" + +#include +#include +#include +#include +#include + +#include "arrow/c/bridge.h" +#include "fmt/format.h" +#include "fmt/ranges.h" +#include "paimon/catalog/identifier.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/core/core_options.h" +#include "paimon/core/snapshot.h" +#include "paimon/core/table/system/system_table.h" +#include "paimon/core/table/system/system_table_schema.h" +#include "paimon/core/utils/branch_manager.h" +#include "paimon/core/utils/snapshot_manager.h" +#include "paimon/defs.h" +#include "paimon/fs/file_system.h" +#include "paimon/logging.h" +#include "paimon/result.h" + +namespace arrow { +class Schema; +} // namespace arrow +struct ArrowSchema; + +namespace paimon { +FileSystemCatalog::FileSystemCatalog(const std::shared_ptr& fs, + const std::string& warehouse) + : fs_(fs), warehouse_(warehouse), logger_(Logger::GetLogger("FileSystemCatalog")) {} + +Status FileSystemCatalog::CreateDatabase(const std::string& db_name, + const std::map& options, + bool ignore_if_exists) { + if (IsSystemDatabase(db_name)) { + return Status::Invalid( + fmt::format("Cannot create database for system database {}.", db_name)); + } + PAIMON_ASSIGN_OR_RAISE(bool exist, DatabaseExists(db_name)); + if (exist) { + if (ignore_if_exists) { + return Status::OK(); + } else { + return Status::Invalid(fmt::format("database {} already exist", db_name)); + } + } + return CreateDatabaseImpl(db_name, options); +} + +Status FileSystemCatalog::CreateDatabaseImpl(const std::string& db_name, + const std::map& options) { + if (options.find(Catalog::DB_LOCATION_PROP) != options.end()) { + return Status::Invalid( + "Cannot specify location for a database when using fileSystem catalog."); + } + if (!options.empty()) { + std::string log_msg = fmt::format( + "Currently filesystem catalog can't store database properties, discard properties: " + "{{{}}}", + fmt::join(options, ", ")); + PAIMON_LOG_DEBUG(logger_, "%s", log_msg.c_str()); + } + std::string db_path = NewDatabasePath(warehouse_, db_name); + PAIMON_RETURN_NOT_OK(fs_->Mkdirs(db_path)); + return Status::OK(); +} + +Result FileSystemCatalog::DatabaseExists(const std::string& db_name) const { + if (IsSystemDatabase(db_name)) { + return Status::NotImplemented( + "do not support checking DatabaseExists for system database."); + } + return fs_->Exists(NewDatabasePath(warehouse_, db_name)); +} + +Result FileSystemCatalog::TableExists(const Identifier& identifier) const { + PAIMON_ASSIGN_OR_RAISE(bool is_system_table, identifier.IsSystemTable()); + if (is_system_table) { + PAIMON_ASSIGN_OR_RAISE(std::optional system_table_name, + identifier.GetSystemTableName()); + if (!system_table_name || !SystemTableLoader::IsSupported(system_table_name.value())) { + return false; + } + PAIMON_ASSIGN_OR_RAISE(std::string data_table_name, identifier.GetDataTableName()); + Identifier data_identifier(identifier.GetDatabaseName(), data_table_name); + PAIMON_ASSIGN_OR_RAISE(std::optional> latest_schema, + TableSchemaExists(data_identifier)); + return latest_schema != std::nullopt; + } + PAIMON_ASSIGN_OR_RAISE(std::optional> latest_schema, + TableSchemaExists(identifier)); + return latest_schema != std::nullopt; +} + +std::string FileSystemCatalog::GetDatabaseLocation(const std::string& db_name) const { + return NewDatabasePath(warehouse_, db_name); +} + +std::string FileSystemCatalog::GetTableLocation(const Identifier& identifier) const { + return NewDataTablePath(warehouse_, identifier); +} + +Status FileSystemCatalog::CreateTable(const Identifier& identifier, ArrowSchema* c_schema, + const std::vector& partition_keys, + const std::vector& primary_keys, + const std::map& options, + bool ignore_if_exists) { + PAIMON_ASSIGN_OR_RAISE(bool is_system_table, IsSystemTable(identifier)); + if (is_system_table) { + return Status::Invalid( + fmt::format("Cannot create table for system table {}, please use data table.", + identifier.ToString())); + } + PAIMON_ASSIGN_OR_RAISE(bool db_exist, DatabaseExists(identifier.GetDatabaseName())); + if (!db_exist) { + return Status::Invalid( + fmt::format("database {} is not exist", identifier.GetDatabaseName())); + } + PAIMON_ASSIGN_OR_RAISE(std::optional> latest_schema, + TableSchemaExists(identifier)); + bool table_exist = (latest_schema != std::nullopt); + if (table_exist) { + if (ignore_if_exists) { + return Status::OK(); + } else { + return Status::Invalid( + fmt::format("table {} already exist", identifier.GetTableName())); + } + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr schema, + arrow::ImportSchema(c_schema)); + PAIMON_ASSIGN_OR_RAISE(bool is_object_store, FileSystem::IsObjectStore(warehouse_)); + if (is_object_store && + options.find("enable-object-store-catalog-in-inte-test") == options.end()) { + return Status::NotImplemented( + "create table operation does not support object store file system for now"); + } + SchemaManager schema_manager(fs_, NewDataTablePath(warehouse_, identifier)); + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr table_schema, + schema_manager.CreateTable(schema, partition_keys, primary_keys, options)); + return Status::OK(); +} + +Result>> FileSystemCatalog::TableSchemaExists( + const Identifier& identifier) const { + PAIMON_ASSIGN_OR_RAISE(bool is_system_table, IsSystemTable(identifier)); + if (is_system_table) { + return Status::NotImplemented( + "do not support checking TableSchemaExists for system table."); + } + SchemaManager schema_manager(fs_, NewDataTablePath(warehouse_, identifier)); + return schema_manager.Latest(); +} + +std::string FileSystemCatalog::GetRootPath() const { + return warehouse_; +} + +std::shared_ptr FileSystemCatalog::GetFileSystem() const { + return fs_; +} + +bool FileSystemCatalog::IsSystemDatabase(const std::string& db_name) { + return db_name == SYSTEM_DATABASE_NAME; +} + +Result FileSystemCatalog::IsSpecifiedSystemTable(const Identifier& identifier) { + return identifier.IsSystemTable(); +} + +Result FileSystemCatalog::IsSystemTable(const Identifier& identifier) { + if (IsSystemDatabase(identifier.GetDatabaseName())) { + return true; + } + return IsSpecifiedSystemTable(identifier); +} + +std::string FileSystemCatalog::NewDatabasePath(const std::string& warehouse, + const std::string& db_name) { + return PathUtil::JoinPath(warehouse, db_name + DB_SUFFIX); +} + +std::string FileSystemCatalog::NewDataTablePath(const std::string& warehouse, + const Identifier& identifier) { + return PathUtil::JoinPath(NewDatabasePath(warehouse, identifier.GetDatabaseName()), + identifier.GetTableName()); +} + +Result> FileSystemCatalog::ListDatabases() const { + std::vector> file_status_list; + PAIMON_RETURN_NOT_OK(fs_->ListDir(warehouse_, &file_status_list)); + std::vector db_names; + for (const auto& file_status : file_status_list) { + if (file_status->IsDir()) { + std::string name = PathUtil::GetName(file_status->GetPath()); + if (StringUtils::EndsWith(name, DB_SUFFIX)) { + db_names.push_back(name.substr(0, name.length() - std::strlen(DB_SUFFIX))); + } + } + } + return db_names; +} + +Result> FileSystemCatalog::ListTables(const std::string& db_name) const { + if (IsSystemDatabase(db_name)) { + return Status::NotImplemented("do not support listing tables for system database."); + } + std::string database_path = NewDatabasePath(warehouse_, db_name); + std::vector> file_status_list; + PAIMON_RETURN_NOT_OK(fs_->ListDir(database_path, &file_status_list)); + std::vector table_names; + for (const auto& file_status : file_status_list) { + if (file_status->IsDir()) { + std::string table_path = file_status->GetPath(); + PAIMON_ASSIGN_OR_RAISE(bool table_exist, TableExistsInFileSystem(table_path)); + if (table_exist) { + table_names.push_back(PathUtil::GetName(table_path)); + } + } + } + return table_names; +} + +Result FileSystemCatalog::TableExistsInFileSystem(const std::string& table_path) const { + SchemaManager schema_manager(fs_, table_path); + // in order to improve the performance, check the schema-0 firstly. + PAIMON_ASSIGN_OR_RAISE(bool schema_zero_exists, schema_manager.SchemaExists(0)); + if (schema_zero_exists) { + return true; + } else { + // if schema-0 not exists, fallback to check other schemas + PAIMON_ASSIGN_OR_RAISE(std::vector schema_ids, schema_manager.ListAllIds()); + return !schema_ids.empty(); + } +} + +Result> FileSystemCatalog::LoadTableSchema( + const Identifier& identifier) const { + PAIMON_ASSIGN_OR_RAISE(bool is_system_table, identifier.IsSystemTable()); + if (is_system_table) { + PAIMON_ASSIGN_OR_RAISE(std::optional system_table_name, + identifier.GetSystemTableName()); + if (!system_table_name || !SystemTableLoader::IsSupported(system_table_name.value())) { + return Status::NotExist(fmt::format("{} not exist", identifier.ToString())); + } + PAIMON_ASSIGN_OR_RAISE(std::string data_table_name, identifier.GetDataTableName()); + Identifier data_identifier(identifier.GetDatabaseName(), data_table_name); + PAIMON_ASSIGN_OR_RAISE(std::optional> latest_schema, + TableSchemaExists(data_identifier)); + if (!latest_schema) { + return Status::NotExist(fmt::format("{} not exist", data_identifier.ToString())); + } + std::map dynamic_options; + PAIMON_ASSIGN_OR_RAISE(std::optional branch, identifier.GetBranchName()); + if (branch) { + dynamic_options[Options::BRANCH] = branch.value(); + } + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr system_table, + SystemTableLoader::Load(system_table_name.value(), fs_, + GetTableLocation(data_identifier), + latest_schema.value(), dynamic_options)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr arrow_schema, + system_table->ArrowSchema()); + return std::make_shared(std::move(arrow_schema)); + } + PAIMON_ASSIGN_OR_RAISE(std::optional> latest_schema, + TableSchemaExists(identifier)); + if (!latest_schema) { + return Status::NotExist(fmt::format("{} not exist", identifier.ToString())); + } + return std::static_pointer_cast(*latest_schema); +} + +Result> FileSystemCatalog::GetTable(const Identifier& identifier) const { + PAIMON_ASSIGN_OR_RAISE(bool is_system_table, identifier.IsSystemTable()); + if (is_system_table) { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr schema, LoadTableSchema(identifier)); + return std::make_shared(schema, identifier.GetDatabaseName(), + identifier.GetTableName()); + } + return Table::Create(fs_, GetTableLocation(identifier), identifier); +} + +Status FileSystemCatalog::DropDatabase(const std::string& name, bool ignore_if_not_exists, + bool cascade) { + if (IsSystemDatabase(name)) { + return Status::Invalid(fmt::format("Cannot drop system database {}.", name)); + } + + PAIMON_ASSIGN_OR_RAISE(bool exist, DatabaseExists(name)); + if (!exist) { + if (ignore_if_not_exists) { + return Status::OK(); + } else { + return Status::NotExist(fmt::format("database {} does not exist", name)); + } + } + + std::string db_path = NewDatabasePath(warehouse_, name); + + if (cascade) { + // List all tables in the database and drop them + PAIMON_ASSIGN_OR_RAISE(std::vector tables, ListTables(name)); + for (const std::string& table_name : tables) { + Identifier table_id(name, table_name); + PAIMON_RETURN_NOT_OK(DropTable(table_id, false)); + } + } else { + // Check if database is empty + PAIMON_ASSIGN_OR_RAISE(std::vector tables, ListTables(name)); + if (!tables.empty()) { + return Status::Invalid( + fmt::format("Cannot drop non-empty database {}. Use cascade=true to force.", name)); + } + } + + // Delete the database directory + PAIMON_RETURN_NOT_OK(fs_->Delete(db_path)); + return Status::OK(); +} + +Result> FileSystemCatalog::GetSchemaExternalPaths( + const std::vector>& schemas) const { + std::set external_paths_set; + for (const auto& schema : schemas) { + const auto& options = schema->Options(); + PAIMON_ASSIGN_OR_RAISE(CoreOptions core_options, CoreOptions::FromMap(options)); + // collect external data file path + PAIMON_ASSIGN_OR_RAISE(std::vector data_external_paths, + core_options.CreateExternalPaths()); + for (const auto& path : data_external_paths) { + external_paths_set.insert(path); + } + // collect external global index path + PAIMON_ASSIGN_OR_RAISE(std::optional index_external_path, + core_options.CreateGlobalIndexExternalPath()); + if (index_external_path != std::nullopt) { + external_paths_set.insert(index_external_path.value()); + } + } + return std::vector(external_paths_set.begin(), external_paths_set.end()); +} + +Result> FileSystemCatalog::GetTableBranches( + const std::string& table_path) const { + std::vector branches; + std::string branch_dir = PathUtil::JoinPath(table_path, "branch"); + PAIMON_ASSIGN_OR_RAISE(bool branch_dir_exists, fs_->Exists(branch_dir)); + if (!branch_dir_exists) { + return branches; + } + + std::vector> file_status_list; + PAIMON_RETURN_NOT_OK(fs_->ListDir(branch_dir, &file_status_list)); + + for (const auto& file_status : file_status_list) { + if (file_status->IsDir()) { + std::string dir_name = PathUtil::GetName(file_status->GetPath()); + // Branch directory name format: branch-{branch_name} + const std::string branch_prefix = BranchManager::BRANCH_PREFIX; + if (StringUtils::StartsWith(dir_name, branch_prefix, /*start_pos=*/0)) { + std::string branch_name = dir_name.substr(branch_prefix.length()); + branches.push_back(branch_name); + } + } + } + return branches; +} + +Status FileSystemCatalog::DropTableImpl(const Identifier& identifier, + const std::vector& external_paths) { + std::string table_path = GetTableLocation(identifier); + + // Delete external paths first + for (const auto& external_path : external_paths) { + PAIMON_ASSIGN_OR_RAISE(bool exists, fs_->Exists(external_path)); + if (exists) { + PAIMON_RETURN_NOT_OK(fs_->Delete(external_path)); + } + } + + // Delete the table directory + PAIMON_RETURN_NOT_OK(fs_->Delete(table_path)); + return Status::OK(); +} + +Status FileSystemCatalog::DropTable(const Identifier& identifier, bool ignore_if_not_exists) { + PAIMON_ASSIGN_OR_RAISE(bool is_system_table, IsSystemTable(identifier)); + if (is_system_table) { + return Status::Invalid(fmt::format("Cannot drop system table {}.", identifier.ToString())); + } + + std::string table_path = GetTableLocation(identifier); + PAIMON_ASSIGN_OR_RAISE(bool exist, fs_->Exists(table_path)); + if (!exist) { + if (ignore_if_not_exists) { + return Status::OK(); + } else { + return Status::NotExist(fmt::format("table {} does not exist", identifier.ToString())); + } + } + + // Check if table has valid schema (table exists) + PAIMON_ASSIGN_OR_RAISE(std::optional> latest_schema, + TableSchemaExists(identifier)); + if (!latest_schema) { + if (ignore_if_not_exists) { + return Status::OK(); + } else { + return Status::NotExist(fmt::format("table {} does not exist", identifier.ToString())); + } + } + + // Collect external paths from all schemas + std::set external_paths_set; + + // Get external paths from main branch + SchemaManager schema_manager(fs_, table_path); + PAIMON_ASSIGN_OR_RAISE(std::vector schema_ids, schema_manager.ListAllIds()); + std::vector> schemas; + for (int64_t id : schema_ids) { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr schema, schema_manager.ReadSchema(id)); + schemas.push_back(schema); + } + PAIMON_ASSIGN_OR_RAISE(std::vector main_external_paths, + GetSchemaExternalPaths(schemas)); + external_paths_set.insert(main_external_paths.begin(), main_external_paths.end()); + + // Get external paths from all branches + PAIMON_ASSIGN_OR_RAISE(std::vector branches, GetTableBranches(table_path)); + for (const auto& branch : branches) { + SchemaManager branch_schema_manager(fs_, table_path, branch); + PAIMON_ASSIGN_OR_RAISE(std::vector branch_schema_ids, + branch_schema_manager.ListAllIds()); + std::vector> branch_schemas; + for (int64_t id : branch_schema_ids) { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr schema, + branch_schema_manager.ReadSchema(id)); + branch_schemas.push_back(schema); + } + PAIMON_ASSIGN_OR_RAISE(std::vector branch_external_paths, + GetSchemaExternalPaths(branch_schemas)); + external_paths_set.insert(branch_external_paths.begin(), branch_external_paths.end()); + } + + std::vector external_paths(external_paths_set.begin(), external_paths_set.end()); + return DropTableImpl(identifier, external_paths); +} + +Status FileSystemCatalog::RenameTable(const Identifier& from_table, const Identifier& to_table, + bool ignore_if_not_exists) { + PAIMON_ASSIGN_OR_RAISE(bool is_from_system_table, IsSystemTable(from_table)); + PAIMON_ASSIGN_OR_RAISE(bool is_to_system_table, IsSystemTable(to_table)); + if (is_from_system_table || is_to_system_table) { + return Status::Invalid(fmt::format("Cannot rename system table {} or {}.", + from_table.ToString(), to_table.ToString())); + } + + if (from_table.GetDatabaseName() != to_table.GetDatabaseName()) { + return Status::Invalid( + "Cannot rename table across databases. Cross-database rename is not supported."); + } + + PAIMON_ASSIGN_OR_RAISE(bool from_exist, TableExists(from_table)); + if (!from_exist) { + if (ignore_if_not_exists) { + return Status::OK(); + } else { + return Status::NotExist( + fmt::format("source table {} does not exist", from_table.ToString())); + } + } + + PAIMON_ASSIGN_OR_RAISE(bool to_exist, TableExists(to_table)); + if (to_exist) { + return Status::Invalid(fmt::format("target table {} already exists", to_table.ToString())); + } + + std::string from_path = GetTableLocation(from_table); + std::string to_path = GetTableLocation(to_table); + PAIMON_RETURN_NOT_OK(fs_->Rename(from_path, to_path)); + return Status::OK(); +} + +namespace { +SnapshotInfo::CommitKind ConvertCommitKind(Snapshot::CommitKind internal) { + if (internal == Snapshot::CommitKind::Append()) { + return SnapshotInfo::CommitKind::APPEND; + } + if (internal == Snapshot::CommitKind::Compact()) { + return SnapshotInfo::CommitKind::COMPACT; + } + if (internal == Snapshot::CommitKind::Overwrite()) { + return SnapshotInfo::CommitKind::OVERWRITE; + } + if (internal == Snapshot::CommitKind::Analyze()) { + return SnapshotInfo::CommitKind::ANALYZE; + } + return SnapshotInfo::CommitKind::UNKNOWN; +} +} // namespace + +Result> FileSystemCatalog::ListSnapshots( + const Identifier& identifier, const std::string& branch) const { + PAIMON_ASSIGN_OR_RAISE(bool exists, TableExists(identifier)); + if (!exists) { + return Status::NotExist(fmt::format("table {} does not exist", identifier.ToString())); + } + + auto table_path = GetTableLocation(identifier); + SnapshotManager mgr(fs_, table_path, branch); + PAIMON_ASSIGN_OR_RAISE(std::vector snapshots, mgr.GetAllSnapshots()); + std::sort(snapshots.begin(), snapshots.end(), + [](const Snapshot& a, const Snapshot& b) { return a.Id() < b.Id(); }); + + std::vector result; + result.reserve(snapshots.size()); + + for (const auto& snap : snapshots) { + SnapshotInfo info; + info.snapshot_id = snap.Id(); + info.schema_id = snap.SchemaId(); + info.commit_user = snap.CommitUser(); + info.commit_kind = ConvertCommitKind(snap.GetCommitKind()); + info.time_millis = snap.TimeMillis(); + info.total_record_count = snap.TotalRecordCount(); + info.delta_record_count = snap.DeltaRecordCount(); + info.watermark = snap.Watermark(); + result.push_back(std::move(info)); + } + + return result; +} + +} // namespace paimon diff --git a/src/paimon/catalog/file_system_catalog.h b/src/paimon/catalog/file_system_catalog.h new file mode 100644 index 0000000..c4f8306 --- /dev/null +++ b/src/paimon/catalog/file_system_catalog.h @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "paimon/catalog/catalog.h" +#include "paimon/catalog/table.h" +#include "paimon/core/schema/schema_manager.h" +#include "paimon/logging.h" +#include "paimon/result.h" +#include "paimon/status.h" + +struct ArrowSchema; + +namespace paimon { +class TableSchema; +class FileSystem; +class Identifier; +class Logger; + +class FileSystemCatalog : public Catalog { + public: + FileSystemCatalog(const std::shared_ptr& fs, const std::string& warehouse); + + Status CreateDatabase(const std::string& db_name, + const std::map& options, + bool ignore_if_exists) override; + Status CreateTable(const Identifier& identifier, ArrowSchema* c_schema, + const std::vector& partition_keys, + const std::vector& primary_keys, + const std::map& options, + bool ignore_if_exists) override; + Status DropDatabase(const std::string& name, bool ignore_if_not_exists, bool cascade) override; + Status DropTable(const Identifier& identifier, bool ignore_if_not_exists) override; + Status RenameTable(const Identifier& from_table, const Identifier& to_table, + bool ignore_if_not_exists) override; + Result> ListDatabases() const override; + Result> ListTables(const std::string& db_name) const override; + Result DatabaseExists(const std::string& db_name) const override; + Result TableExists(const Identifier& identifier) const override; + std::string GetDatabaseLocation(const std::string& db_name) const override; + std::string GetTableLocation(const Identifier& identifier) const override; + Result> LoadTableSchema(const Identifier& identifier) const override; + std::string GetRootPath() const override; + std::shared_ptr GetFileSystem() const override; + Result> GetTable(const Identifier& identifier) const override; + Result> ListSnapshots(const Identifier& identifier, + const std::string& branch) const override; + + private: + static std::string NewDatabasePath(const std::string& warehouse, const std::string& db_name); + static std::string NewDataTablePath(const std::string& warehouse, const Identifier& identifier); + static bool IsSystemDatabase(const std::string& db_name); + static Result IsSpecifiedSystemTable(const Identifier& identifier); + static Result IsSystemTable(const Identifier& identifier); + Result>> TableSchemaExists( + const Identifier& identifier) const; + + Status CreateDatabaseImpl(const std::string& db_name, + const std::map& options); + Result TableExistsInFileSystem(const std::string& table_path) const; + + // Get all external paths from a list of schemas + Result> GetSchemaExternalPaths( + const std::vector>& schemas) const; + + // Get all branch names for a table + Result> GetTableBranches(const std::string& table_path) const; + + // Drop table implementation with external paths cleanup + Status DropTableImpl(const Identifier& identifier, + const std::vector& external_paths); + + std::shared_ptr fs_; + std::string warehouse_; + + std::shared_ptr logger_; +}; + +} // namespace paimon diff --git a/src/paimon/catalog/file_system_catalog_test.cpp b/src/paimon/catalog/file_system_catalog_test.cpp new file mode 100644 index 0000000..0ab2ac2 --- /dev/null +++ b/src/paimon/catalog/file_system_catalog_test.cpp @@ -0,0 +1,1072 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/catalog/file_system_catalog.h" + +#include "arrow/api.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "arrow/c/helpers.h" +#include "gtest/gtest.h" +#include "paimon/catalog/identifier.h" +#include "paimon/common/data/blob_utils.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/core/core_options.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/core/table/system/system_table_schema.h" +#include "paimon/defs.h" +#include "paimon/fs/file_system.h" +#include "paimon/fs/file_system_factory.h" +#include "paimon/snapshot/snapshot_info.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(FileSystemCatalogTest, TestDatabaseExists) { + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str()); + + ASSERT_OK_AND_ASSIGN(auto exist, catalog.DatabaseExists("db1")); + ASSERT_FALSE(exist); + + ASSERT_OK(catalog.CreateDatabase("db1", options, /*ignore_if_exists=*/false)); + ASSERT_NOK(catalog.CreateDatabase("db1", options, /*ignore_if_exists=*/false)); + ASSERT_OK(catalog.CreateDatabase("db1", options, /*ignore_if_exists=*/true)); + + ASSERT_OK_AND_ASSIGN(exist, catalog.DatabaseExists("db1")); + ASSERT_TRUE(exist); + ASSERT_OK_AND_ASSIGN(std::vector db_names, catalog.ListDatabases()); + ASSERT_EQ(1, db_names.size()); + ASSERT_EQ(db_names[0], "db1"); + ASSERT_EQ(catalog.GetDatabaseLocation("db1"), PathUtil::JoinPath(dir->Str(), "db1.db")); +} + +TEST(FileSystemCatalogTest, TestInvalidCreateDatabase) { + { + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + options[Catalog::DB_LOCATION_PROP] = "loc"; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str()); + + ASSERT_NOK_WITH_MSG( + catalog.CreateDatabase("db1", options, /*ignore_if_exists=*/true), + "Cannot specify location for a database when using fileSystem catalog."); + } +} + +TEST(FileSystemCatalogTest, TestCreateSystemDatabaseAndTable) { + // do not support create system database + { + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str()); + ASSERT_NOK_WITH_MSG(catalog.CreateDatabase(Catalog::SYSTEM_DATABASE_NAME, options, + /*ignore_if_exists=*/true), + "Cannot create database for system database"); + } + // do not support create system table + { + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str()); + ASSERT_OK(catalog.CreateDatabase("db1", options, /*ignore_if_exists=*/true)); + arrow::FieldVector fields = { + arrow::field("f0", arrow::boolean()), + arrow::field("f1", arrow::int8()), + arrow::field("f2", arrow::int8()), + arrow::field("f3", arrow::int16()), + }; + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + ASSERT_NOK_WITH_MSG( + catalog.CreateTable(Identifier("db1", "ta$ble"), &schema, {"f1"}, {}, options, false), + "Cannot create table for system table"); + ArrowSchemaRelease(&schema); + } +} + +TEST(FileSystemCatalogTest, TestCreateTable) { + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str()); + ASSERT_OK(catalog.CreateDatabase("db1", options, /*ignore_if_exists=*/true)); + + arrow::FieldVector fields = { + arrow::field("f0", arrow::boolean()), arrow::field("f1", arrow::int8()), + arrow::field("f2", arrow::int8()), arrow::field("f3", arrow::int16()), + arrow::field("f4", arrow::int16()), arrow::field("f5", arrow::int32()), + arrow::field("f6", arrow::int32()), arrow::field("f7", arrow::int64()), + arrow::field("f8", arrow::int64()), arrow::field("f9", arrow::float32()), + arrow::field("f10", arrow::float64()), arrow::field("f11", arrow::utf8()), + arrow::field("f12", arrow::binary()), arrow::field("non-partition-field", arrow::int32())}; + + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + Identifier identifier("db1", "tbl1"); + ASSERT_OK_AND_ASSIGN(auto exist, catalog.TableExists(identifier)); + ASSERT_FALSE(exist); + + ASSERT_OK(catalog.CreateTable(identifier, &schema, + /*partition_keys=*/{"f1", "f2"}, /*primary_keys=*/{"f3"}, options, + false)); + + ASSERT_OK_AND_ASSIGN(exist, catalog.TableExists(identifier)); + ASSERT_TRUE(exist); + ArrowSchemaRelease(&schema); +} + +TEST(FileSystemCatalogTest, TestOptionsSystemTableCatalog) { + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + options["custom.option"] = "custom-value"; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str()); + ASSERT_OK(catalog.CreateDatabase("db1", options, /*ignore_if_exists=*/true)); + + auto typed_schema = arrow::schema({arrow::field("f0", arrow::int32())}); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(*typed_schema, &schema).ok()); + ASSERT_OK(catalog.CreateTable(Identifier("db1", "tbl1"), &schema, + /*partition_keys=*/{}, /*primary_keys=*/{}, options, + /*ignore_if_exists=*/false)); + ArrowSchemaRelease(&schema); + + Identifier options_identifier("db1", "tbl1$options"); + ASSERT_OK_AND_ASSIGN(bool exists, catalog.TableExists(options_identifier)); + ASSERT_TRUE(exists); + ASSERT_OK_AND_ASSIGN(exists, catalog.TableExists(Identifier("db1", "tbl1$unknown"))); + ASSERT_FALSE(exists); + ASSERT_OK_AND_ASSIGN(exists, catalog.TableExists(Identifier("db1", "missing$options"))); + ASSERT_FALSE(exists); + ASSERT_EQ(catalog.GetTableLocation(options_identifier), + PathUtil::JoinPath(PathUtil::JoinPath(dir->Str(), "db1.db"), "tbl1$options")); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr system_schema, + catalog.LoadTableSchema(options_identifier)); + ASSERT_TRUE(std::dynamic_pointer_cast(system_schema) != nullptr); + ASSERT_TRUE(std::dynamic_pointer_cast(system_schema) != nullptr); + ASSERT_OK_AND_ASSIGN(auto c_schema, system_schema->GetArrowSchema()); + auto loaded_schema_result = arrow::ImportSchema(c_schema.get()); + ASSERT_TRUE(loaded_schema_result.ok()) << loaded_schema_result.status().ToString(); + auto loaded_schema = loaded_schema_result.ValueUnsafe(); + ASSERT_EQ(loaded_schema->field_names(), (std::vector{"key", "value"})); + ASSERT_EQ(loaded_schema->field(0)->type()->id(), arrow::Type::STRING); + ASSERT_EQ(loaded_schema->field(1)->type()->id(), arrow::Type::STRING); + ASSERT_FALSE(loaded_schema->field(0)->nullable()); + ASSERT_FALSE(loaded_schema->field(1)->nullable()); + + ASSERT_OK_AND_ASSIGN(auto system_table, catalog.GetTable(options_identifier)); + ASSERT_EQ(system_table->Name(), "tbl1$options"); + ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(Identifier("db1", "tbl1$unknown")), "not exist"); + ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(Identifier("db1", "missing$options")), "not exist"); + + ::ArrowSchema system_create_schema; + ASSERT_TRUE(arrow::ExportSchema(*typed_schema, &system_create_schema).ok()); + ASSERT_NOK_WITH_MSG( + catalog.CreateTable(options_identifier, &system_create_schema, {}, {}, options, false), + "Cannot create table for system table"); + ArrowSchemaRelease(&system_create_schema); + ASSERT_NOK_WITH_MSG(catalog.DropTable(options_identifier, false), "Cannot drop system table"); + ASSERT_NOK_WITH_MSG(catalog.RenameTable(options_identifier, Identifier("db1", "tbl2"), false), + "Cannot rename system table"); +} + +TEST(FileSystemCatalogTest, TestAuditLogAndBinlogSystemTableCatalog) { + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str()); + ASSERT_OK(catalog.CreateDatabase("db1", options, /*ignore_if_exists=*/true)); + + auto typed_schema = + arrow::schema({arrow::field("pk", arrow::utf8()), arrow::field("v", arrow::int32(), true)}); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(*typed_schema, &schema).ok()); + ASSERT_OK(catalog.CreateTable(Identifier("db1", "tbl1"), &schema, + /*partition_keys=*/{}, /*primary_keys=*/{"pk"}, options, + /*ignore_if_exists=*/false)); + ArrowSchemaRelease(&schema); + + Identifier audit_log_identifier("db1", "tbl1$audit_log"); + Identifier binlog_identifier("db1", "tbl1$binlog"); + ASSERT_OK_AND_ASSIGN(bool exists, catalog.TableExists(audit_log_identifier)); + ASSERT_TRUE(exists); + ASSERT_OK_AND_ASSIGN(exists, catalog.TableExists(binlog_identifier)); + ASSERT_TRUE(exists); + ASSERT_OK_AND_ASSIGN(exists, catalog.TableExists(Identifier("db1", "tbl1$unknown"))); + ASSERT_FALSE(exists); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr audit_log_system_schema, + catalog.LoadTableSchema(audit_log_identifier)); + ASSERT_TRUE(std::dynamic_pointer_cast(audit_log_system_schema) != nullptr); + ASSERT_OK_AND_ASSIGN(auto audit_log_c_schema, audit_log_system_schema->GetArrowSchema()); + auto audit_log_schema_result = arrow::ImportSchema(audit_log_c_schema.get()); + ASSERT_TRUE(audit_log_schema_result.ok()) << audit_log_schema_result.status().ToString(); + auto audit_log_schema = audit_log_schema_result.ValueUnsafe(); + ASSERT_EQ(audit_log_schema->field_names(), (std::vector{"rowkind", "pk", "v"})); + ASSERT_EQ(audit_log_schema->field(0)->type()->id(), arrow::Type::STRING); + ASSERT_EQ(audit_log_schema->field(1)->type()->id(), arrow::Type::STRING); + ASSERT_EQ(audit_log_schema->field(2)->type()->id(), arrow::Type::INT32); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr binlog_system_schema, + catalog.LoadTableSchema(binlog_identifier)); + ASSERT_TRUE(std::dynamic_pointer_cast(binlog_system_schema) != nullptr); + ASSERT_OK_AND_ASSIGN(auto binlog_c_schema, binlog_system_schema->GetArrowSchema()); + auto binlog_schema_result = arrow::ImportSchema(binlog_c_schema.get()); + ASSERT_TRUE(binlog_schema_result.ok()) << binlog_schema_result.status().ToString(); + auto binlog_schema = binlog_schema_result.ValueUnsafe(); + ASSERT_EQ(binlog_schema->field_names(), (std::vector{"rowkind", "pk", "v"})); + ASSERT_EQ(binlog_schema->field(0)->type()->id(), arrow::Type::STRING); + ASSERT_EQ(binlog_schema->field(1)->type()->id(), arrow::Type::LIST); + ASSERT_EQ(binlog_schema->field(2)->type()->id(), arrow::Type::LIST); + auto binlog_pk_type = + std::dynamic_pointer_cast(binlog_schema->field(1)->type()); + auto binlog_v_type = + std::dynamic_pointer_cast(binlog_schema->field(2)->type()); + ASSERT_TRUE(binlog_pk_type); + ASSERT_TRUE(binlog_v_type); + ASSERT_EQ(binlog_pk_type->value_type()->id(), arrow::Type::STRING); + ASSERT_EQ(binlog_v_type->value_type()->id(), arrow::Type::INT32); + + ::ArrowSchema system_create_schema; + ASSERT_TRUE(arrow::ExportSchema(*typed_schema, &system_create_schema).ok()); + ASSERT_NOK_WITH_MSG( + catalog.CreateTable(audit_log_identifier, &system_create_schema, {}, {}, options, false), + "Cannot create table for system table"); + ArrowSchemaRelease(&system_create_schema); + ASSERT_NOK_WITH_MSG(catalog.DropTable(binlog_identifier, false), "Cannot drop system table"); + ASSERT_NOK_WITH_MSG(catalog.RenameTable(audit_log_identifier, Identifier("db1", "tbl2"), false), + "Cannot rename system table"); +} + +TEST(FileSystemCatalogTest, TestCreateTableWithBlob) { + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + options[Options::DATA_EVOLUTION_ENABLED] = "true"; + options[Options::ROW_TRACKING_ENABLED] = "true"; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str()); + ASSERT_OK(catalog.CreateDatabase("db1", options, /*ignore_if_exists=*/true)); + + arrow::FieldVector fields = {arrow::field("f0", arrow::boolean()), + arrow::field("f1", arrow::int8()), + arrow::field("f2", arrow::int8()), + arrow::field("f3", arrow::int16()), + arrow::field("f4", arrow::int16()), + arrow::field("f5", arrow::int32()), + arrow::field("f6", arrow::int32()), + arrow::field("f7", arrow::int64()), + arrow::field("f8", arrow::int64()), + arrow::field("f9", arrow::float32()), + arrow::field("f10", arrow::float64()), + arrow::field("f11", arrow::utf8()), + arrow::field("f12", arrow::binary()), + arrow::field("non-partition-field", arrow::int32()), + BlobUtils::ToArrowField("f13", /*nullable=*/false)}; + + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + ASSERT_OK(catalog.CreateTable(Identifier("db1", "tbl1"), &schema, + /*partition_keys=*/{"f1", "f2"}, /*primary_keys=*/{}, options, + false)); + ASSERT_OK_AND_ASSIGN(std::vector table_names, catalog.ListTables("db1")); + ASSERT_EQ(1, table_names.size()); + ASSERT_EQ(table_names[0], "tbl1"); + ASSERT_OK_AND_ASSIGN(std::shared_ptr table_schema, + catalog.LoadTableSchema(Identifier("db1", "tbl1"))); + ASSERT_TRUE(std::dynamic_pointer_cast(table_schema) != nullptr); + ASSERT_TRUE(std::dynamic_pointer_cast(table_schema) != nullptr); + ASSERT_OK_AND_ASSIGN(auto arrow_schema, table_schema->GetArrowSchema()); + auto loaded_schema = arrow::ImportSchema(arrow_schema.get()).ValueOrDie(); + ASSERT_TRUE(typed_schema.Equals(loaded_schema)); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr
table, catalog.GetTable(Identifier("db1", "tbl1"))); + ASSERT_OK_AND_ASSIGN(auto arrow_schema_from_get_table, table->LatestSchema()->GetArrowSchema()); + auto schema_from_get_table = + arrow::ImportSchema(arrow_schema_from_get_table.get()).ValueOrDie(); + ASSERT_TRUE(typed_schema.Equals(schema_from_get_table)); + ASSERT_EQ(table->FullName(), "db1.tbl1"); + + ASSERT_NOK_WITH_MSG(catalog.GetTable(Identifier("db1", "table_xaxa")), + "Identifier{database='db1', table='table_xaxa'} not exist"); + + ArrowSchemaRelease(&schema); +} + +TEST(FileSystemCatalogTest, TestInvalidCreateTable) { + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str()); + ASSERT_OK(catalog.CreateDatabase("db1", options, /*ignore_if_exists=*/true)); + arrow::FieldVector fields = { + arrow::field("f0", arrow::boolean()), arrow::field("f1", arrow::int8()), + arrow::field("f2", arrow::struct_({arrow::field("s1", arrow::int8()), + arrow::field("s1", arrow::int16())}))}; + + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + ASSERT_NOK_WITH_MSG( + catalog.CreateTable(Identifier("db1", "tbl1"), &schema, {"f0"}, {"f1"}, options, false), + "validate schema failed: read schema has duplicate field s1"); + ASSERT_OK_AND_ASSIGN(std::vector table_names, catalog.ListTables("db1")); + ASSERT_EQ(0, table_names.size()); + ArrowSchemaRelease(&schema); +} + +TEST(FileSystemCatalogTest, TestCreateTableWhileDbNotExist) { + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str()); + arrow::FieldVector fields = { + arrow::field("f0", arrow::boolean()), + arrow::field("f1", arrow::int8()), + arrow::field("f2", arrow::int8()), + arrow::field("f3", arrow::int16()), + }; + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + ASSERT_NOK_WITH_MSG( + catalog.CreateTable(Identifier("db1", "table"), &schema, {"f1"}, {}, options, false), + "database db1 is not exist"); + ArrowSchemaRelease(&schema); +} + +TEST(FileSystemCatalogTest, TestCreateTableWhileTableExist) { + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str()); + { + ASSERT_OK(catalog.CreateDatabase("db1", options, /*ignore_if_exists=*/true)); + arrow::FieldVector fields = { + arrow::field("f0", arrow::boolean()), + arrow::field("f1", arrow::int8()), + arrow::field("f2", arrow::int8()), + arrow::field("f3", arrow::int16()), + }; + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + ASSERT_OK(catalog.CreateTable(Identifier("db1", "tbl1"), &schema, {"f1"}, {}, options, + /*ignore_if_exists=*/false)); + ASSERT_OK(catalog.CreateTable(Identifier("db1", "tbl1"), &schema, {"f1"}, {}, options, + /*ignore_if_exists=*/true)); + ASSERT_OK_AND_ASSIGN(std::vector table_names, catalog.ListTables("db1")); + ASSERT_EQ(1, table_names.size()); + ASSERT_EQ(table_names[0], "tbl1"); + } + { + ASSERT_OK(catalog.CreateDatabase("db1", options, /*ignore_if_exists=*/true)); + arrow::FieldVector fields = { + arrow::field("f0", arrow::boolean()), + arrow::field("f1", arrow::int8()), + arrow::field("f2", arrow::int8()), + arrow::field("f3", arrow::int16()), + }; + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + ASSERT_OK(catalog.CreateTable(Identifier("db1", "tbl1"), &schema, {"f1"}, {}, options, + /*ignore_if_exists=*/true)); + ASSERT_NOK_WITH_MSG( + catalog.CreateTable(Identifier("db1", "tbl1"), &schema, {"f1"}, {}, options, + /*ignore_if_exists=*/false), + "already exist"); + ArrowSchemaRelease(&schema); + } + { + ASSERT_OK(catalog.CreateDatabase("db1", options, /*ignore_if_exists=*/true)); + arrow::FieldVector fields = { + arrow::field("f0", arrow::boolean()), + arrow::field("f1", arrow::int8()), + arrow::field("f2", arrow::int8()), + arrow::field("f3", arrow::int16()), + }; + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + Identifier identifier("db1", "tbl1"); + ASSERT_OK(catalog.CreateTable(identifier, &schema, {"f1"}, {}, options, + /*ignore_if_exists=*/true)); + ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFactory::Get("local", dir->Str(), {})); + ASSERT_OK(fs->Delete( + PathUtil::JoinPath(catalog.GetTableLocation(identifier), "schema/schema-0"))); + ASSERT_OK(catalog.CreateTable(identifier, &schema, {"f1"}, {}, options, + /*ignore_if_exists=*/false)); + } +} + +TEST(FileSystemCatalogTest, TestInvalidList) { + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str()); + ASSERT_NOK_WITH_MSG(catalog.ListTables("sys"), + "do not support listing tables for system database."); +} + +TEST(FileSystemCatalogTest, TestValidateTableSchema) { + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str()); + ASSERT_OK(catalog.CreateDatabase("db1", options, /*ignore_if_exists=*/true)); + arrow::FieldVector fields = { + arrow::field("f0", arrow::utf8()), + arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int32()), + arrow::field("f3", arrow::float64()), + }; + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + Identifier identifier("db1", "tbl1"); + ASSERT_OK(catalog.CreateTable(identifier, &schema, {"f1"}, {}, options, + /*ignore_if_exists=*/false)); + + ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(Identifier("db0", "tbl0")), + "Identifier{database=\'db0\', table=\'tbl0\'} not exist"); + ASSERT_OK_AND_ASSIGN(std::shared_ptr table_schema, catalog.LoadTableSchema(identifier)); + auto data_schema = std::dynamic_pointer_cast(table_schema); + ASSERT_TRUE(data_schema != nullptr); + ASSERT_EQ(0, data_schema->Id()); + ASSERT_EQ(3, data_schema->HighestFieldId()); + ASSERT_EQ(1, data_schema->PartitionKeys().size()); + ASSERT_EQ(0, data_schema->PrimaryKeys().size()); + ASSERT_EQ(-1, data_schema->NumBuckets()); + ASSERT_FALSE(table_schema->Comment().has_value()); + std::vector field_names = table_schema->FieldNames(); + std::vector expected_field_names = {"f0", "f1", "f2", "f3"}; + ASSERT_EQ(field_names, expected_field_names); + + FieldType type; + ASSERT_OK_AND_ASSIGN(type, table_schema->GetFieldType("f0")); + ASSERT_EQ(type, FieldType::STRING); + ASSERT_OK_AND_ASSIGN(type, table_schema->GetFieldType("f1")); + ASSERT_EQ(type, FieldType::INT); + ASSERT_OK_AND_ASSIGN(type, table_schema->GetFieldType("f2")); + ASSERT_EQ(type, FieldType::INT); + ASSERT_OK_AND_ASSIGN(type, table_schema->GetFieldType("f3")); + ASSERT_EQ(type, FieldType::DOUBLE); + ASSERT_NOK(table_schema->GetFieldType("f4")); + + ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFactory::Get("local", dir->Str(), {})); + std::string schema_path = + PathUtil::JoinPath(catalog.GetTableLocation(identifier), "schema/schema-0"); + std::string expected_json_schema; + ASSERT_OK(fs->ReadFile(schema_path, &expected_json_schema)); + + ASSERT_OK_AND_ASSIGN(auto json_schema, table_schema->GetJsonSchema()); + ASSERT_EQ(expected_json_schema, json_schema); + + ASSERT_OK_AND_ASSIGN(auto arrow_schema, table_schema->GetArrowSchema()); + auto loaded_schema = arrow::ImportSchema(arrow_schema.get()).ValueOrDie(); + ASSERT_TRUE(typed_schema.Equals(loaded_schema)); + + ASSERT_OK(fs->Delete(schema_path)); + ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(identifier), + "Identifier{database=\'db1\', table=\'tbl1\'} not exist"); + + ASSERT_NOK_WITH_MSG(catalog.LoadTableSchema(Identifier("db1", "tbl$11")), "not exist"); + ArrowSchemaRelease(&schema); +} + +TEST(FileSystemCatalogTest, TestDropDatabase) { + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str()); + + // Test 1: Drop non-existent database with ignore_if_not_exists=true + ASSERT_OK(catalog.DropDatabase("non_existent_db", /*ignore_if_not_exists=*/true, + /*cascade=*/false)); + + // Test 2: Drop non-existent database with ignore_if_not_exists=false + ASSERT_NOK_WITH_MSG(catalog.DropDatabase("non_existent_db", /*ignore_if_not_exists=*/false, + /*cascade=*/false), + "database non_existent_db does not exist"); + + // Test 3: Drop empty database + ASSERT_OK(catalog.CreateDatabase("test_db", options, /*ignore_if_exists=*/false)); + ASSERT_OK(catalog.DropDatabase("test_db", /*ignore_if_not_exists=*/false, + /*cascade=*/false)); + ASSERT_OK_AND_ASSIGN(bool exist, catalog.DatabaseExists("test_db")); + ASSERT_FALSE(exist); + + // Test 4: Drop non-empty database without cascade + ASSERT_OK(catalog.CreateDatabase("test_db2", options, /*ignore_if_exists=*/false)); + arrow::FieldVector fields = { + arrow::field("f0", arrow::int32()), + arrow::field("f1", arrow::utf8()), + }; + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + ASSERT_OK(catalog.CreateTable(Identifier("test_db2", "tbl1"), &schema, {}, {}, options, false)); + ASSERT_NOK_WITH_MSG(catalog.DropDatabase("test_db2", /*ignore_if_not_exists=*/false, + /*cascade=*/false), + "Cannot drop non-empty database test_db2. Use cascade=true to force."); + + // Test 5: Drop non-empty database with cascade + ASSERT_OK(catalog.DropDatabase("test_db2", /*ignore_if_not_exists=*/false, + /*cascade=*/true)); + ASSERT_OK_AND_ASSIGN(exist, catalog.DatabaseExists("test_db2")); + ASSERT_FALSE(exist); + ASSERT_OK_AND_ASSIGN(std::vector tables, catalog.ListTables("test_db2")); + ASSERT_TRUE(tables.empty()); + + // Test 6: Drop system database + ASSERT_NOK_WITH_MSG(catalog.DropDatabase(Catalog::SYSTEM_DATABASE_NAME, + /*ignore_if_not_exists=*/false, + /*cascade=*/false), + "Cannot drop system database sys."); + + ArrowSchemaRelease(&schema); +} + +TEST(FileSystemCatalogTest, TestDropTable) { + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str()); + ASSERT_OK(catalog.CreateDatabase("test_db", options, /*ignore_if_exists=*/false)); + + // Test 1: Drop non-existent table with ignore_if_not_exists=true + ASSERT_OK(catalog.DropTable(Identifier("test_db", "non_existent_tbl"), + /*ignore_if_not_exists=*/true)); + + // Test 2: Drop non-existent table with ignore_if_not_exists=false + ASSERT_NOK_WITH_MSG( + catalog.DropTable(Identifier("test_db", "non_existent_tbl"), + /*ignore_if_not_exists=*/false), + "table Identifier{database='test_db', table='non_existent_tbl'} does not exist"); + + // Test 3: Drop existing table + arrow::FieldVector fields = { + arrow::field("f0", arrow::int32()), + arrow::field("f1", arrow::utf8()), + }; + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + ASSERT_OK(catalog.CreateTable(Identifier("test_db", "tbl1"), &schema, {}, {}, options, false)); + ASSERT_OK(catalog.DropTable(Identifier("test_db", "tbl1"), /*ignore_if_not_exists=*/false)); + ASSERT_OK_AND_ASSIGN(bool exist, catalog.TableExists(Identifier("test_db", "tbl1"))); + ASSERT_FALSE(exist); + + // Test 4: Drop system table + ASSERT_NOK_WITH_MSG( + catalog.DropTable(Identifier("test_db", "tbl$system"), + /*ignore_if_not_exists=*/false), + "Cannot drop system table Identifier{database='test_db', table='tbl$system'}."); + + ArrowSchemaRelease(&schema); +} + +TEST(FileSystemCatalogTest, TestRenameTable) { + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str()); + ASSERT_OK(catalog.CreateDatabase("test_db", options, /*ignore_if_exists=*/false)); + + // Test 1: Rename non-existent table with ignore_if_not_exists=true + ASSERT_OK(catalog.RenameTable(Identifier("test_db", "non_existent_tbl"), + Identifier("test_db", "new_tbl"), + /*ignore_if_not_exists=*/true)); + + // Test 2: Rename non-existent table with ignore_if_not_exists=false + ASSERT_NOK_WITH_MSG( + catalog.RenameTable(Identifier("test_db", "non_existent_tbl"), + Identifier("test_db", "new_tbl"), + /*ignore_if_not_exists=*/false), + "source table Identifier{database='test_db', table='non_existent_tbl'} does not exist"); + + // Test 3: Normal rename + arrow::FieldVector fields = { + arrow::field("f0", arrow::int32()), + arrow::field("f1", arrow::utf8()), + }; + arrow::Schema typed_schema(fields); + ::ArrowSchema schema1; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema1).ok()); + ASSERT_OK( + catalog.CreateTable(Identifier("test_db", "old_tbl"), &schema1, {}, {}, options, false)); + ASSERT_OK(catalog.RenameTable(Identifier("test_db", "old_tbl"), + Identifier("test_db", "new_tbl"), + /*ignore_if_not_exists=*/false)); + ASSERT_OK_AND_ASSIGN(bool old_exist, catalog.TableExists(Identifier("test_db", "old_tbl"))); + ASSERT_FALSE(old_exist); + ASSERT_OK_AND_ASSIGN(bool new_exist, catalog.TableExists(Identifier("test_db", "new_tbl"))); + ASSERT_TRUE(new_exist); + + // Test 4: Rename to existing table + ::ArrowSchema schema2; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema2).ok()); + ASSERT_OK(catalog.CreateTable(Identifier("test_db", "tbl2"), &schema2, {}, {}, options, false)); + ASSERT_NOK_WITH_MSG( + catalog.RenameTable(Identifier("test_db", "new_tbl"), Identifier("test_db", "tbl2"), + /*ignore_if_not_exists=*/false), + "target table Identifier{database='test_db', table='tbl2'} already exists"); + + // Test 5: Cross-database rename + ASSERT_OK(catalog.CreateDatabase("test_db2", options, /*ignore_if_exists=*/false)); + ASSERT_NOK_WITH_MSG( + catalog.RenameTable(Identifier("test_db", "new_tbl"), + Identifier("test_db2", "cross_db_tbl"), + /*ignore_if_not_exists=*/false), + "Cannot rename table across databases. Cross-database rename is not supported."); + + // Test 6: Rename system table + ASSERT_NOK_WITH_MSG(catalog.RenameTable(Identifier("test_db", "tbl$system"), + Identifier("test_db", "new_system_tbl"), + /*ignore_if_not_exists=*/false), + "Cannot rename system table"); + + ArrowSchemaRelease(&schema1); + ArrowSchemaRelease(&schema2); +} + +TEST(FileSystemCatalogTest, TestDropTableWithExternalPath) { + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str()); + ASSERT_OK(catalog.CreateDatabase("test_db", options, /*ignore_if_exists=*/false)); + + // Create external path directory + auto external_dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(external_dir); + std::string external_path = external_dir->Str(); + external_path = "FILE://" + external_path; + + // Create a file in external path to simulate external data + ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFactory::Get("local", dir->Str(), {})); + std::string external_data_file = PathUtil::JoinPath(external_path, "data-file.parquet"); + ASSERT_OK(fs->WriteFile(external_data_file, "test data", /*overwrite=*/true)); + + // Verify external data file exists + ASSERT_OK_AND_ASSIGN(bool external_file_exists, fs->Exists(external_data_file)); + ASSERT_TRUE(external_file_exists); + + // Create table with external path + arrow::FieldVector fields = { + arrow::field("f0", arrow::int32()), + arrow::field("f1", arrow::utf8()), + }; + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + + std::map table_options = options; + table_options[Options::DATA_FILE_EXTERNAL_PATHS] = external_path; + table_options[Options::DATA_FILE_EXTERNAL_PATHS_STRATEGY] = "round-robin"; + + ASSERT_OK(catalog.CreateTable(Identifier("test_db", "tbl_with_external"), &schema, {}, {}, + table_options, false)); + + // Verify table exists + ASSERT_OK_AND_ASSIGN(bool table_exists, + catalog.TableExists(Identifier("test_db", "tbl_with_external"))); + ASSERT_TRUE(table_exists); + + // Drop the table + ASSERT_OK(catalog.DropTable(Identifier("test_db", "tbl_with_external"), + /*ignore_if_not_exists=*/false)); + + // Verify table is dropped + ASSERT_OK_AND_ASSIGN(table_exists, + catalog.TableExists(Identifier("test_db", "tbl_with_external"))); + ASSERT_FALSE(table_exists); + + // Verify external path is also cleaned up + ASSERT_OK_AND_ASSIGN(external_file_exists, fs->Exists(external_path)); + ASSERT_FALSE(external_file_exists); + + ArrowSchemaRelease(&schema); +} + +TEST(FileSystemCatalogTest, TestDropTableWithMultipleExternalPaths) { + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str()); + ASSERT_OK(catalog.CreateDatabase("test_db", options, /*ignore_if_exists=*/false)); + + // Create multiple external path directories + auto external_dir1 = UniqueTestDirectory::Create(); + auto external_dir2 = UniqueTestDirectory::Create(); + ASSERT_TRUE(external_dir1); + ASSERT_TRUE(external_dir2); + + std::string external_path1 = external_dir1->Str(); + external_path1 = "FILE://" + external_path1; + std::string external_path2 = external_dir2->Str(); + external_path2 = "FILE://" + external_path2; + + // Create files in external paths + ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFactory::Get("local", dir->Str(), {})); + std::string external_data_file1 = PathUtil::JoinPath(external_path1, "data-1.parquet"); + std::string external_data_file2 = PathUtil::JoinPath(external_path2, "data-2.parquet"); + ASSERT_OK(fs->WriteFile(external_data_file1, "test data 1", /*overwrite=*/true)); + ASSERT_OK(fs->WriteFile(external_data_file2, "test data 2", /*overwrite=*/true)); + + // Create table with multiple external paths + arrow::FieldVector fields = { + arrow::field("f0", arrow::int32()), + arrow::field("f1", arrow::utf8()), + }; + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + + std::map table_options = options; + table_options[Options::DATA_FILE_EXTERNAL_PATHS] = external_path1 + "," + external_path2; + table_options[Options::DATA_FILE_EXTERNAL_PATHS_STRATEGY] = "round-robin"; + + ASSERT_OK(catalog.CreateTable(Identifier("test_db", "tbl_multi_external"), &schema, {}, {}, + table_options, false)); + + // Drop the table + ASSERT_OK(catalog.DropTable(Identifier("test_db", "tbl_multi_external"), + /*ignore_if_not_exists=*/false)); + + // Verify both external paths are cleaned up + ASSERT_OK_AND_ASSIGN(bool exists1, fs->Exists(external_path1)); + ASSERT_OK_AND_ASSIGN(bool exists2, fs->Exists(external_path2)); + ASSERT_FALSE(exists1); + ASSERT_FALSE(exists2); + + ArrowSchemaRelease(&schema); +} + +TEST(FileSystemCatalogTest, TestDropTableWithGlobalIndexExternalPathOnMainBranch) { + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str()); + ASSERT_OK(catalog.CreateDatabase("test_db", options, /*ignore_if_exists=*/false)); + + // Create external path directory for global index + auto global_index_dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(global_index_dir); + std::string global_index_path = global_index_dir->Str(); + global_index_path = "FILE://" + global_index_path; + + // Create a file in external path to simulate global index data + ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFactory::Get("local", dir->Str(), {})); + std::string index_file = PathUtil::JoinPath(global_index_path, "index-file.bin"); + ASSERT_OK(fs->WriteFile(index_file, "index data", /*overwrite=*/true)); + + // Verify index file exists + ASSERT_OK_AND_ASSIGN(bool index_file_exists, fs->Exists(index_file)); + ASSERT_TRUE(index_file_exists); + + // Create table with global index external path on main branch + arrow::FieldVector fields = { + arrow::field("f0", arrow::int32()), + arrow::field("f1", arrow::utf8()), + }; + arrow::Schema typed_schema(fields); + ::ArrowSchema schema; + ASSERT_TRUE(arrow::ExportSchema(typed_schema, &schema).ok()); + + std::map table_options = options; + table_options[Options::GLOBAL_INDEX_EXTERNAL_PATH] = global_index_path; + + ASSERT_OK(catalog.CreateTable(Identifier("test_db", "tbl_global_index"), &schema, {}, {}, + table_options, false)); + + // Verify table exists + ASSERT_OK_AND_ASSIGN(bool table_exists, + catalog.TableExists(Identifier("test_db", "tbl_global_index"))); + ASSERT_TRUE(table_exists); + + // Drop the table + ASSERT_OK(catalog.DropTable(Identifier("test_db", "tbl_global_index"), + /*ignore_if_not_exists=*/false)); + + // Verify table is dropped + ASSERT_OK_AND_ASSIGN(table_exists, + catalog.TableExists(Identifier("test_db", "tbl_global_index"))); + ASSERT_FALSE(table_exists); + + // Verify global index external path is also cleaned up + ASSERT_OK_AND_ASSIGN(bool global_index_exists, fs->Exists(global_index_path)); + ASSERT_FALSE(global_index_exists); + + ArrowSchemaRelease(&schema); +} + +TEST(FileSystemCatalogTest, TestDropTableWithGlobalIndexExternalPathOnBranch) { + // Copy the real append_table_with_rt_branch.db test data, then patch the + // branch-rt schema-1 to include a GLOBAL_INDEX_EXTERNAL_PATH option. + // DropTable should discover and clean up that external path. + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + + // Copy the real test_data db into a temp directory + std::string test_data_path = GetDataDir() + "/orc/append_table_with_rt_branch.db"; + std::string db_path = dir->Str() + "/test_db.db"; + ASSERT_TRUE(TestUtil::CopyDirectory(test_data_path, db_path)); + + // Create a temporary external directory that represents branch global index data + auto branch_external_dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(branch_external_dir); + std::string branch_external_path = branch_external_dir->Str(); + branch_external_path = "FILE://" + branch_external_path; + + ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFactory::Get("local", dir->Str(), {})); + ASSERT_OK(fs->WriteFile(PathUtil::JoinPath(branch_external_path, "index.bin"), + "branch global index data", /*overwrite=*/true)); + + // Patch branch-rt/schema/schema-1: add GLOBAL_INDEX_EXTERNAL_PATH to options + std::string branch_schema_path = + PathUtil::JoinPath(db_path, "append_table_with_rt_branch/branch/branch-rt/schema/schema-1"); + std::string schema_content; + ASSERT_OK(fs->ReadFile(branch_schema_path, &schema_content)); + + // Insert the global index external path option into the JSON options block. + // Original: "file.format" : "orc" + // Patched: "file.format" : "orc", + // "global-index.external-path" : "" + std::string search_str = R"("file.format" : "orc")"; + std::string replace_str = R"("file.format" : "orc", + "global-index.external-path" : ")" + + branch_external_path + R"(")"; + auto pos = schema_content.rfind(search_str); + ASSERT_NE(pos, std::string::npos); + schema_content.replace(pos, search_str.length(), replace_str); + ASSERT_OK(fs->WriteFile(branch_schema_path, schema_content, /*overwrite=*/true)); + + // Verify external path exists before drop + ASSERT_OK_AND_ASSIGN(bool external_exists, fs->Exists(branch_external_path)); + ASSERT_TRUE(external_exists); + + // Drop the table via catalog + FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str()); + Identifier identifier("test_db", "append_table_with_rt_branch"); + ASSERT_OK_AND_ASSIGN(bool table_exists, catalog.TableExists(identifier)); + ASSERT_TRUE(table_exists); + + ASSERT_OK(catalog.DropTable(identifier, /*ignore_if_not_exists=*/false)); + + // Verify table is dropped + ASSERT_OK_AND_ASSIGN(table_exists, catalog.TableExists(identifier)); + ASSERT_FALSE(table_exists); + + // Verify the branch global index external path is cleaned up by DropTable + ASSERT_OK_AND_ASSIGN(external_exists, fs->Exists(branch_external_path)); + ASSERT_FALSE(external_exists); +} + +TEST(FileSystemCatalogTest, TestListSnapshots) { + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + + std::string test_data_path = GetDataDir() + "/append_table_with_multiple_file_format.db"; + std::string db_path = dir->Str() + "/test_db.db"; + ASSERT_TRUE(TestUtil::CopyDirectory(test_data_path, db_path)); + + FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str()); + + Identifier id("test_db", "append_table_with_multiple_file_format"); + ASSERT_OK_AND_ASSIGN(std::vector snapshots, catalog.ListSnapshots(id, "")); + + ASSERT_EQ(snapshots.size(), 2); + + ASSERT_EQ(snapshots[0].snapshot_id, 1); + ASSERT_EQ(snapshots[0].schema_id, 0); + ASSERT_EQ(snapshots[0].commit_kind, SnapshotInfo::CommitKind::APPEND); + ASSERT_EQ(snapshots[0].time_millis, 1755671728191); + ASSERT_EQ(snapshots[0].total_record_count, 5); + ASSERT_EQ(snapshots[0].delta_record_count, 5); + ASSERT_FALSE(snapshots[0].watermark.has_value()); + + ASSERT_EQ(snapshots[1].snapshot_id, 2); + ASSERT_EQ(snapshots[1].schema_id, 1); + ASSERT_EQ(snapshots[1].commit_kind, SnapshotInfo::CommitKind::APPEND); + ASSERT_EQ(snapshots[1].time_millis, 1755671956423); + ASSERT_EQ(snapshots[1].total_record_count, 7); + ASSERT_EQ(snapshots[1].delta_record_count, 2); + ASSERT_FALSE(snapshots[1].watermark.has_value()); + + // Verify ascending order by snapshot_id + ASSERT_LT(snapshots[0].snapshot_id, snapshots[1].snapshot_id); +} + +TEST(FileSystemCatalogTest, TestListSnapshotsTableNotExist) { + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str()); + + ASSERT_NOK_WITH_MSG( + catalog.ListSnapshots(Identifier("non_existent_db", "non_existent_table"), ""), + "does not exist"); +} + +TEST(FileSystemCatalogTest, TestDropTableWithBranchExternalPaths) { + // Copy the real append_table_with_rt_branch.db test data, then patch the + // branch-rt schema-1 to include a DATA_FILE_EXTERNAL_PATHS option. + // DropTable should discover and clean up that external path. + std::map options; + options[Options::FILE_SYSTEM] = "local"; + options[Options::FILE_FORMAT] = "orc"; + ASSERT_OK_AND_ASSIGN(auto core_options, CoreOptions::FromMap(options)); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + + // Copy the real test_data db into a temp directory + std::string test_data_path = GetDataDir() + "orc/append_table_with_rt_branch.db"; + std::string db_path = dir->Str() + "/test_db.db"; + ASSERT_TRUE(TestUtil::CopyDirectory(test_data_path, db_path)); + + // Create a temporary external directory that represents branch external data + auto branch_external_dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(branch_external_dir); + std::string branch_external_path = branch_external_dir->Str(); + branch_external_path = "FILE://" + branch_external_path; + + ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFactory::Get("local", dir->Str(), {})); + ASSERT_OK(fs->WriteFile(PathUtil::JoinPath(branch_external_path, "data.orc"), + "branch external data", /*overwrite=*/true)); + + // Patch branch-rt/schema/schema-1: add DATA_FILE_EXTERNAL_PATHS to options + std::string branch_schema_path = + PathUtil::JoinPath(db_path, "append_table_with_rt_branch/branch/branch-rt/schema/schema-1"); + std::string schema_content; + ASSERT_OK(fs->ReadFile(branch_schema_path, &schema_content)); + + // Insert the external path option into the JSON options block. + // Original: "file.format" : "orc" + // Patched: "file.format" : "orc", + // "data-file.external-paths" : "" + std::string search_str = R"("file.format" : "orc")"; + std::string replace_str = R"("file.format" : "orc", + "data-file.external-paths.strategy" : "round-robin", + "data-file.external-paths" : ")" + + branch_external_path + R"(")"; + auto pos = schema_content.rfind(search_str); + ASSERT_NE(pos, std::string::npos); + schema_content.replace(pos, search_str.length(), replace_str); + ASSERT_OK(fs->WriteFile(branch_schema_path, schema_content, /*overwrite=*/true)); + + // Verify external path exists before drop + ASSERT_OK_AND_ASSIGN(bool external_exists, fs->Exists(branch_external_path)); + ASSERT_TRUE(external_exists); + + // Drop the table via catalog + FileSystemCatalog catalog(core_options.GetFileSystem(), dir->Str()); + Identifier identifier("test_db", "append_table_with_rt_branch"); + ASSERT_OK_AND_ASSIGN(bool table_exists, catalog.TableExists(identifier)); + ASSERT_TRUE(table_exists); + + ASSERT_OK(catalog.DropTable(identifier, /*ignore_if_not_exists=*/false)); + + // Verify table is dropped + ASSERT_OK_AND_ASSIGN(table_exists, catalog.TableExists(identifier)); + ASSERT_FALSE(table_exists); + + // Verify the branch external path is cleaned up by DropTable + ASSERT_OK_AND_ASSIGN(external_exists, fs->Exists(branch_external_path)); + ASSERT_FALSE(external_exists); +} + +} // namespace paimon::test diff --git a/src/paimon/catalog/identifier.cpp b/src/paimon/catalog/identifier.cpp new file mode 100644 index 0000000..3b403d3 --- /dev/null +++ b/src/paimon/catalog/identifier.cpp @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/catalog/identifier.h" + +#include +#include +#include + +#include "fmt/format.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace paimon { + +const char Identifier::kUnknownDatabase[] = "unknown"; +const char Identifier::kSystemTableSplitter[] = "$"; +const char Identifier::kSystemBranchPrefix[] = "branch_"; +const char Identifier::kDefaultMainBranch[] = "main"; + +Identifier::Identifier(const std::string& table) + : Identifier(std::string(kUnknownDatabase), table) {} + +Identifier::Identifier(const std::string& database, const std::string& table) + : database_(database), table_(table) {} + +bool Identifier::operator==(const Identifier& other) { + return (database_ == other.database_ && table_ == other.table_); +} + +const std::string& Identifier::GetDatabaseName() const { + return database_; +} + +const std::string& Identifier::GetTableName() const { + return table_; +} + +Result Identifier::GetDataTableName() const { + PAIMON_RETURN_NOT_OK(SplitTableName()); + return data_table_; +} + +Result> Identifier::GetBranchName() const { + PAIMON_RETURN_NOT_OK(SplitTableName()); + return branch_; +} + +Result Identifier::GetBranchNameOrDefault() const { + PAIMON_ASSIGN_OR_RAISE(std::optional branch, GetBranchName()); + return branch.value_or(kDefaultMainBranch); +} + +Result> Identifier::GetSystemTableName() const { + PAIMON_RETURN_NOT_OK(SplitTableName()); + return system_table_; +} + +Result Identifier::IsSystemTable() const { + PAIMON_ASSIGN_OR_RAISE(std::optional system_table, GetSystemTableName()); + return system_table.has_value(); +} + +std::string Identifier::ToString() const { + return fmt::format("Identifier{{database='{}', table='{}'}}", database_, table_); +} + +Status Identifier::SplitTableName() const { + if (parsed_) { + return Status::OK(); + } + std::string data_table; + std::optional branch; + std::optional system_table; + std::vector splits = + StringUtils::Split(table_, kSystemTableSplitter, /*ignore_empty=*/false); + if (splits.size() == 1) { + data_table = table_; + } else if (splits.size() == 2) { + data_table = splits[0]; + if (StringUtils::StartsWith(splits[1], kSystemBranchPrefix, /*start_pos=*/0)) { + branch = splits[1].substr(std::strlen(kSystemBranchPrefix)); + } else { + system_table = splits[1]; + } + } else if (splits.size() == 3) { + if (!StringUtils::StartsWith(splits[1], kSystemBranchPrefix, /*start_pos=*/0)) { + return Status::Invalid(fmt::format( + "System table can only contain one '$' separator, but this is: {}", table_)); + } + data_table = splits[0]; + branch = splits[1].substr(std::strlen(kSystemBranchPrefix)); + system_table = splits[2]; + } else { + return Status::Invalid(fmt::format("Invalid table name: {}", table_)); + } + if (data_table.empty() || (branch && branch->empty()) || + (system_table && system_table->empty())) { + return Status::Invalid(fmt::format("Invalid table name: {}", table_)); + } + data_table_ = std::move(data_table); + branch_ = std::move(branch); + system_table_ = std::move(system_table); + parsed_ = true; + return Status::OK(); +} + +} // namespace paimon diff --git a/src/paimon/catalog/identifier_test.cpp b/src/paimon/catalog/identifier_test.cpp new file mode 100644 index 0000000..771c2f3 --- /dev/null +++ b/src/paimon/catalog/identifier_test.cpp @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/catalog/identifier.h" + +#include "gtest/gtest.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(IdentifierTest, ConstructorAndGetters) { + Identifier id("test_db", "test_table"); + ASSERT_EQ(id.GetDatabaseName(), "test_db"); + ASSERT_EQ(id.GetTableName(), "test_table"); +} + +TEST(IdentifierTest, SingleArgumentConstructorUsesUnknownDatabase) { + Identifier id("test_table"); + ASSERT_EQ(id.GetDatabaseName(), Identifier::kUnknownDatabase); + ASSERT_EQ(id.GetTableName(), "test_table"); +} + +TEST(IdentifierTest, EqualityOperator) { + Identifier id1("db1", "table1"); + Identifier id2("db1", "table1"); + Identifier id3("db2", "table2"); + + ASSERT_TRUE(id1 == id2); + ASSERT_FALSE(id1 == id3); +} + +TEST(IdentifierTest, ToString) { + Identifier id("my_db", "my_table"); + ASSERT_EQ(id.ToString(), "Identifier{database='my_db', table='my_table'}"); +} + +TEST(IdentifierTest, EmptyDatabaseRemainsEmpty) { + Identifier id("", "my_table"); + ASSERT_EQ(id.GetDatabaseName(), ""); + ASSERT_EQ(id.GetTableName(), "my_table"); +} + +TEST(IdentifierTest, ParseDataTable) { + Identifier id("db", "tbl"); + ASSERT_EQ(id.GetTableName(), "tbl"); + ASSERT_OK_AND_ASSIGN(std::string data_table_name, id.GetDataTableName()); + ASSERT_EQ(data_table_name, "tbl"); + ASSERT_OK_AND_ASSIGN(std::optional branch_name, id.GetBranchName()); + ASSERT_FALSE(branch_name); + ASSERT_OK_AND_ASSIGN(std::string branch_name_or_default, id.GetBranchNameOrDefault()); + ASSERT_EQ(branch_name_or_default, Identifier::kDefaultMainBranch); + ASSERT_OK_AND_ASSIGN(std::optional system_table_name, id.GetSystemTableName()); + ASSERT_FALSE(system_table_name); + ASSERT_OK_AND_ASSIGN(bool is_system_table, id.IsSystemTable()); + ASSERT_FALSE(is_system_table); +} + +TEST(IdentifierTest, ParseSystemTable) { + Identifier id("db", "tbl$options"); + ASSERT_EQ(id.GetTableName(), "tbl$options"); + ASSERT_OK_AND_ASSIGN(std::string data_table_name, id.GetDataTableName()); + ASSERT_EQ(data_table_name, "tbl"); + ASSERT_OK_AND_ASSIGN(std::optional branch_name, id.GetBranchName()); + ASSERT_FALSE(branch_name); + ASSERT_OK_AND_ASSIGN(std::optional system_table_name, id.GetSystemTableName()); + ASSERT_TRUE(system_table_name); + ASSERT_EQ(system_table_name.value(), "options"); + ASSERT_OK_AND_ASSIGN(bool is_system_table, id.IsSystemTable()); + ASSERT_TRUE(is_system_table); +} + +TEST(IdentifierTest, ParseBranchTable) { + Identifier id("db", "tbl$branch_dev"); + ASSERT_OK_AND_ASSIGN(std::string data_table_name, id.GetDataTableName()); + ASSERT_EQ(data_table_name, "tbl"); + ASSERT_OK_AND_ASSIGN(std::optional branch_name, id.GetBranchName()); + ASSERT_TRUE(branch_name); + ASSERT_EQ(branch_name.value(), "dev"); + ASSERT_OK_AND_ASSIGN(std::string branch_name_or_default, id.GetBranchNameOrDefault()); + ASSERT_EQ(branch_name_or_default, "dev"); + ASSERT_OK_AND_ASSIGN(std::optional system_table_name, id.GetSystemTableName()); + ASSERT_FALSE(system_table_name); + ASSERT_OK_AND_ASSIGN(bool is_system_table, id.IsSystemTable()); + ASSERT_FALSE(is_system_table); +} + +TEST(IdentifierTest, ParseBranchSystemTable) { + Identifier id("db", "tbl$branch_dev$options"); + ASSERT_OK_AND_ASSIGN(std::string data_table_name, id.GetDataTableName()); + ASSERT_EQ(data_table_name, "tbl"); + ASSERT_OK_AND_ASSIGN(std::optional branch_name, id.GetBranchName()); + ASSERT_TRUE(branch_name); + ASSERT_EQ(branch_name.value(), "dev"); + ASSERT_OK_AND_ASSIGN(std::optional system_table_name, id.GetSystemTableName()); + ASSERT_TRUE(system_table_name); + ASSERT_EQ(system_table_name.value(), "options"); + ASSERT_OK_AND_ASSIGN(bool is_system_table, id.IsSystemTable()); + ASSERT_TRUE(is_system_table); +} + +TEST(IdentifierTest, InvalidSystemTableName) { + Identifier invalid_middle("db", "tbl$bad$options"); + ASSERT_NOK_WITH_MSG(invalid_middle.IsSystemTable(), + "System table can only contain one '$' separator"); + + Identifier too_many("db", "tbl$branch_dev$options$extra"); + ASSERT_NOK_WITH_MSG(too_many.IsSystemTable(), "Invalid table name"); +} + +TEST(IdentifierTest, InvalidEmptySystemTableNameParts) { + ASSERT_NOK_WITH_MSG(Identifier("db", "$options").IsSystemTable(), "Invalid table name"); + ASSERT_NOK_WITH_MSG(Identifier("db", "tbl$").IsSystemTable(), "Invalid table name"); + ASSERT_NOK_WITH_MSG(Identifier("db", "tbl$branch_").IsSystemTable(), "Invalid table name"); + ASSERT_NOK_WITH_MSG(Identifier("db", "tbl$branch_dev$").IsSystemTable(), "Invalid table name"); + ASSERT_NOK_WITH_MSG(Identifier("db", "tbl$$options").IsSystemTable(), + "System table can only contain one '$' separator"); +} + +} // namespace paimon::test diff --git a/src/paimon/catalog/renaming_snapshot_commit.h b/src/paimon/catalog/renaming_snapshot_commit.h new file mode 100644 index 0000000..e493fe3 --- /dev/null +++ b/src/paimon/catalog/renaming_snapshot_commit.h @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "paimon/core/catalog/snapshot_commit.h" +#include "paimon/core/snapshot.h" +#include "paimon/core/utils/snapshot_manager.h" +#include "paimon/fs/file_system.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace paimon { +class PartitionStatistics; + +/// A `SnapshotCommit` using file renaming to commit. +/// +/// @note When the file system is local or hdfs, rename is atomic. But if the file system is object +/// storage, we need additional lock protection. +// TODO(jinli.zjw): add additional lock protection for object storage. +class RenamingSnapshotCommit : public SnapshotCommit { + public: + RenamingSnapshotCommit(const std::shared_ptr& fs, + const std::shared_ptr& snapshot_manager) + : fs_(fs), snapshot_manager_(snapshot_manager) {} + + Result Commit(const Snapshot& snapshot, + const std::vector& statistics) override { + PAIMON_ASSIGN_OR_RAISE(std::string json_str, snapshot.ToJsonString()); + std::string snapshot_path = snapshot_manager_->SnapshotPath(snapshot.Id()); + PAIMON_ASSIGN_OR_RAISE(bool is_exist, fs_->Exists(snapshot_path)); + if (is_exist) { + return false; + } + // To prevent the case where an atomic write times out but actually succeeds, + // retrying the commit could lead to the snapshot file being committed multiple times. + // Therefore, retries should be handled by the upper layer, + // which should call FilterAndCommit to avoid duplicate commits. + // Therefore, we should not trigger cleanup here, + // as it may delete meta files from a snapshot that was just written by ourselves, + // leading to an incomplete or corrupted snapshot. + PAIMON_RETURN_NOT_OK(fs_->AtomicStore(snapshot_path, json_str)); + PAIMON_RETURN_NOT_OK(snapshot_manager_->CommitLatestHint(snapshot.Id())); + return true; + } + + Result GetLastCommitTableRequest() override { + return Status::Invalid( + "renaming snapshot commit do not support get last commit table request"); + } + + private: + std::shared_ptr fs_; + std::shared_ptr snapshot_manager_; +}; + +} // namespace paimon diff --git a/src/paimon/catalog/renaming_snapshot_commit_test.cpp b/src/paimon/catalog/renaming_snapshot_commit_test.cpp new file mode 100644 index 0000000..f00da0e --- /dev/null +++ b/src/paimon/catalog/renaming_snapshot_commit_test.cpp @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/catalog/renaming_snapshot_commit.h" + +#include +#include +#include + +#include "gtest/gtest.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/core/partition/partition_statistics.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(RenamingSnapshotCommitTest, TestSimple) { + auto fs = std::make_shared(); + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto snapshot_manager = std::make_shared(fs, dir->Str()); + + auto commit = std::make_shared(fs, snapshot_manager); + ASSERT_NOK_WITH_MSG(commit->GetLastCommitTableRequest(), + "renaming snapshot commit do not support get last commit table request"); + Snapshot snapshot( + /*version=*/3, /*id=*/1, /*schema_id=*/0, + /*base_manifest_list=*/"manifest-list-3879e56f-2f27-49ae-a2f3-3dcbb8eb0beb-0", + /*base_manifest_list_size=*/291, + /*delta_manifest_list=*/"manifest-list-3879e56f-2f27-49ae-a2f3-3dcbb8eb0beb-1", + /*delta_manifest_list_size=*/1342, /*changelog_manifest_list=*/std::nullopt, + /*changelog_manifest_list_size=*/std::nullopt, /*index_manifest=*/std::nullopt, + /*commit_user=*/"commit_user_1", /*commit_identifier=*/9223372036854775807, + /*commit_kind=*/Snapshot::CommitKind::Append(), /*time_millis=*/1758097357597, + /*log_offsets=*/std::map(), /*total_record_count=*/5, + /*delta_record_count=*/5, /*changelog_record_count=*/0, /*watermark=*/std::nullopt, + /*statistics=*/std::nullopt, /*properties=*/std::nullopt, /*next_row_id=*/0); + ASSERT_OK_AND_ASSIGN(bool success, commit->Commit(snapshot, /*statistics=*/{})); + ASSERT_TRUE(success); + ASSERT_OK_AND_ASSIGN(bool exist, + fs->Exists(PathUtil::JoinPath(dir->Str(), "snapshot/snapshot-1"))); + ASSERT_TRUE(exist); + ASSERT_OK_AND_ASSIGN(bool exist1, + fs->Exists(PathUtil::JoinPath(dir->Str(), "snapshot/LATEST"))); + ASSERT_TRUE(exist1); + // duplicate commit for snapshot-1 + ASSERT_OK_AND_ASSIGN(bool success1, commit->Commit(snapshot, /*statistics=*/{})); + ASSERT_FALSE(success1); +} + +} // namespace paimon::test diff --git a/src/paimon/catalog/snapshot_commit.h b/src/paimon/catalog/snapshot_commit.h new file mode 100644 index 0000000..0a5dc88 --- /dev/null +++ b/src/paimon/catalog/snapshot_commit.h @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "paimon/core/partition/partition_statistics.h" +#include "paimon/core/snapshot.h" + +namespace paimon { + +/// Interface to commit snapshot atomically. +class SnapshotCommit { + public: + virtual ~SnapshotCommit() = default; + + virtual Result Commit(const Snapshot& snapshot, + const std::vector& statistics) = 0; + + virtual Result GetLastCommitTableRequest() = 0; +}; + +} // namespace paimon