diff --git a/include/paimon/snapshot/snapshot_info.h b/include/paimon/snapshot/snapshot_info.h new file mode 100644 index 0000000..726b015 --- /dev/null +++ b/include/paimon/snapshot/snapshot_info.h @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/visibility.h" + +namespace paimon { + +/// Plain snapshot metadata returned by Catalog::ListSnapshots(). +struct PAIMON_EXPORT SnapshotInfo { + /// Commit kind exposed through the public Catalog API. + enum class PAIMON_EXPORT CommitKind : int8_t { + APPEND, + COMPACT, + OVERWRITE, + ANALYZE, + UNKNOWN, + }; + + /// Convert a CommitKind to its canonical string representation. + static std::string CommitKindToString(CommitKind kind); + + int64_t snapshot_id; + int64_t schema_id; + std::string commit_user; + CommitKind commit_kind; + int64_t time_millis; + std::optional total_record_count; + std::optional delta_record_count; + std::optional watermark; +}; + +} // namespace paimon diff --git a/src/paimon/core/snapshot.cpp b/src/paimon/core/snapshot.cpp new file mode 100644 index 0000000..5ba177e --- /dev/null +++ b/src/paimon/core/snapshot.cpp @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/snapshot.h" + +#include +#include +#include + +#include "paimon/common/utils/rapidjson_util.h" +#include "paimon/fs/file_system.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "rapidjson/allocators.h" +#include "rapidjson/document.h" +#include "rapidjson/rapidjson.h" + +namespace paimon { + +const Snapshot::CommitKind Snapshot::CommitKind::Append() { + static const Snapshot::CommitKind commit_kind = Snapshot::CommitKind(0); + return commit_kind; +} + +const Snapshot::CommitKind Snapshot::CommitKind::Compact() { + static const Snapshot::CommitKind commit_kind = Snapshot::CommitKind(1); + return commit_kind; +} + +const Snapshot::CommitKind Snapshot::CommitKind::Overwrite() { + static const Snapshot::CommitKind commit_kind = Snapshot::CommitKind(2); + return commit_kind; +} + +const Snapshot::CommitKind Snapshot::CommitKind::Analyze() { + static const Snapshot::CommitKind commit_kind = Snapshot::CommitKind(3); + return commit_kind; +} + +const Snapshot::CommitKind Snapshot::CommitKind::Unknown() { + static const Snapshot::CommitKind commit_kind = Snapshot::CommitKind(-1); + return commit_kind; +} +bool Snapshot::TEST_Equal(const Snapshot& other) const { + if (this == &other) { + return true; + } + + if ((base_manifest_list_size_ && !other.base_manifest_list_size_) || + (!base_manifest_list_size_ && other.base_manifest_list_size_)) { + return false; + } + if ((delta_manifest_list_size_ && !other.delta_manifest_list_size_) || + (!delta_manifest_list_size_ && other.delta_manifest_list_size_)) { + return false; + } + if ((changelog_manifest_list_ && !other.changelog_manifest_list_) || + (!changelog_manifest_list_ && other.changelog_manifest_list_)) { + return false; + } + if ((changelog_manifest_list_size_ && !other.changelog_manifest_list_size_) || + (!changelog_manifest_list_size_ && other.changelog_manifest_list_size_)) { + return false; + } + + return version_ == other.version_ && id_ == other.id_ && schema_id_ == other.schema_id_ && + index_manifest_ == other.index_manifest_ && commit_user_ == other.commit_user_ && + commit_identifier_ == other.commit_identifier_ && commit_kind_ == other.commit_kind_ && + log_offsets_ == other.log_offsets_ && total_record_count_ == other.total_record_count_ && + delta_record_count_ == other.delta_record_count_ && + changelog_record_count_ == other.changelog_record_count_ && + watermark_ == other.watermark_ && statistics_ == other.statistics_ && + properties_ == other.properties_ && next_row_id_ == other.next_row_id_; +} + +bool Snapshot::operator==(const Snapshot& other) const { + if (this == &other) { + return true; + } + return version_ == other.version_ && id_ == other.id_ && schema_id_ == other.schema_id_ && + base_manifest_list_ == other.base_manifest_list_ && + base_manifest_list_size_ == other.base_manifest_list_size_ && + delta_manifest_list_ == other.delta_manifest_list_ && + delta_manifest_list_size_ == other.delta_manifest_list_size_ && + changelog_manifest_list_ == other.changelog_manifest_list_ && + changelog_manifest_list_size_ == other.changelog_manifest_list_size_ && + index_manifest_ == other.index_manifest_ && commit_user_ == other.commit_user_ && + commit_identifier_ == other.commit_identifier_ && commit_kind_ == other.commit_kind_ && + time_millis_ == other.time_millis_ && log_offsets_ == other.log_offsets_ && + total_record_count_ == other.total_record_count_ && + delta_record_count_ == other.delta_record_count_ && + changelog_record_count_ == other.changelog_record_count_ && + watermark_ == other.watermark_ && statistics_ == other.statistics_ && + properties_ == other.properties_ && next_row_id_ == other.next_row_id_; +} + +std::string Snapshot::CommitKind::ToString(const Snapshot::CommitKind& kind) { + switch (kind.value_) { + case 0: + return "APPEND"; + case 1: + return "COMPACT"; + case 2: + return "OVERWRITE"; + case 3: + return "ANALYZE"; + default: + assert(false); + return "UNKNOWN"; + } +} +Snapshot::CommitKind Snapshot::CommitKind::FromString(const std::string& kind) { + if (kind == "APPEND") { + return Append(); + } else if (kind == "COMPACT") { + return Compact(); + } else if (kind == "OVERWRITE") { + return Overwrite(); + } else if (kind == "ANALYZE") { + return Analyze(); + } + assert(false); + return Unknown(); +} + +Snapshot::Snapshot(const std::optional& version, int64_t id, int64_t schema_id, + const std::string& base_manifest_list, + const std::optional& base_manifest_list_size, + const std::string& delta_manifest_list, + const std::optional& delta_manifest_list_size, + const std::optional& changelog_manifest_list, + const std::optional& changelog_manifest_list_size, + const std::optional& index_manifest, const std::string& commit_user, + int64_t commit_identifier, CommitKind commit_kind, int64_t time_millis, + const std::optional>& log_offsets, + const std::optional& total_record_count, + const std::optional& delta_record_count, + const std::optional& changelog_record_count, + const std::optional& watermark, + const std::optional& statistics, + const std::optional>& properties, + const std::optional& next_row_id) + : version_(version), + id_(id), + schema_id_(schema_id), + base_manifest_list_(base_manifest_list), + base_manifest_list_size_(base_manifest_list_size), + delta_manifest_list_(delta_manifest_list), + delta_manifest_list_size_(delta_manifest_list_size), + changelog_manifest_list_(changelog_manifest_list), + changelog_manifest_list_size_(changelog_manifest_list_size), + index_manifest_(index_manifest), + commit_user_(commit_user), + commit_identifier_(commit_identifier), + commit_kind_(commit_kind), + time_millis_(time_millis), + log_offsets_(log_offsets), + total_record_count_(total_record_count), + delta_record_count_(delta_record_count), + changelog_record_count_(changelog_record_count), + watermark_(watermark), + statistics_(statistics), + properties_(properties), + next_row_id_(next_row_id) {} + +rapidjson::Value Snapshot::ToJson(rapidjson::Document::AllocatorType* allocator) const + noexcept(false) { + rapidjson::Value obj(rapidjson::kObjectType); + obj.AddMember(rapidjson::StringRef(FIELD_VERSION), + RapidJsonUtil::SerializeValue(Version(), allocator).Move(), *allocator); + obj.AddMember(rapidjson::StringRef(FIELD_ID), + RapidJsonUtil::SerializeValue(id_, allocator).Move(), *allocator); + obj.AddMember(rapidjson::StringRef(FIELD_SCHEMA_ID), + RapidJsonUtil::SerializeValue(schema_id_, allocator).Move(), *allocator); + obj.AddMember(rapidjson::StringRef(FIELD_BASE_MANIFEST_LIST), + RapidJsonUtil::SerializeValue(base_manifest_list_, allocator).Move(), *allocator); + if (base_manifest_list_size_) { + obj.AddMember( + rapidjson::StringRef(FIELD_BASE_MANIFEST_LIST_SIZE), + RapidJsonUtil::SerializeValue(base_manifest_list_size_.value(), allocator).Move(), + *allocator); + } + obj.AddMember(rapidjson::StringRef(FIELD_DELTA_MANIFEST_LIST), + RapidJsonUtil::SerializeValue(delta_manifest_list_, allocator).Move(), + *allocator); + if (delta_manifest_list_size_) { + obj.AddMember(rapidjson::StringRef(FIELD_DELTA_MANIFEST_LIST_SIZE), + RapidJsonUtil::SerializeValue(delta_manifest_list_size_, allocator).Move(), + *allocator); + } + obj.AddMember(rapidjson::StringRef(FIELD_CHANGELOG_MANIFEST_LIST), + RapidJsonUtil::SerializeValue(changelog_manifest_list_, allocator).Move(), + *allocator); + if (changelog_manifest_list_size_) { + obj.AddMember( + rapidjson::StringRef(FIELD_CHANGELOG_MANIFEST_LIST_SIZE), + RapidJsonUtil::SerializeValue(changelog_manifest_list_size_, allocator).Move(), + *allocator); + } + if (index_manifest_ != std::nullopt) { + obj.AddMember(rapidjson::StringRef(FIELD_INDEX_MANIFEST), + RapidJsonUtil::SerializeValue(index_manifest_.value(), allocator).Move(), + *allocator); + } + + obj.AddMember(rapidjson::StringRef(FIELD_COMMIT_USER), + RapidJsonUtil::SerializeValue(commit_user_, allocator).Move(), *allocator); + obj.AddMember(rapidjson::StringRef(FIELD_COMMIT_IDENTIFIER), + RapidJsonUtil::SerializeValue(commit_identifier_, allocator).Move(), *allocator); + obj.AddMember( + rapidjson::StringRef(FIELD_COMMIT_KIND), + RapidJsonUtil::SerializeValue(Snapshot::CommitKind::ToString(commit_kind_), allocator) + .Move(), + *allocator); + + obj.AddMember(rapidjson::StringRef(FIELD_TIME_MILLIS), + RapidJsonUtil::SerializeValue(time_millis_, allocator).Move(), *allocator); + if (log_offsets_ != std::nullopt) { + obj.AddMember(rapidjson::StringRef(FIELD_LOG_OFFSETS), + RapidJsonUtil::SerializeValue(log_offsets_.value(), allocator).Move(), + *allocator); + } + obj.AddMember(rapidjson::StringRef(FIELD_TOTAL_RECORD_COUNT), + RapidJsonUtil::SerializeValue(total_record_count_.value(), allocator).Move(), + *allocator); + obj.AddMember(rapidjson::StringRef(FIELD_DELTA_RECORD_COUNT), + RapidJsonUtil::SerializeValue(delta_record_count_.value(), allocator).Move(), + *allocator); + + if (changelog_record_count_ != std::nullopt) { + obj.AddMember( + rapidjson::StringRef(FIELD_CHANGELOG_RECORD_COUNT), + RapidJsonUtil::SerializeValue(changelog_record_count_.value(), allocator).Move(), + *allocator); + } + + if (watermark_ != std::nullopt) { + obj.AddMember(rapidjson::StringRef(FIELD_WATERMARK), + RapidJsonUtil::SerializeValue(watermark_.value(), allocator).Move(), + *allocator); + } + + if (statistics_ != std::nullopt) { + obj.AddMember(rapidjson::StringRef(FIELD_STATISTICS), + RapidJsonUtil::SerializeValue(statistics_.value(), allocator).Move(), + *allocator); + } + if (properties_ != std::nullopt) { + obj.AddMember(rapidjson::StringRef(FIELD_PROPERTIES), + RapidJsonUtil::SerializeValue(properties_.value(), allocator).Move(), + *allocator); + } + if (next_row_id_ != std::nullopt) { + obj.AddMember(rapidjson::StringRef(FIELD_NEXT_ROW_ID), + RapidJsonUtil::SerializeValue(next_row_id_.value(), allocator).Move(), + *allocator); + } + + return obj; +} + +void Snapshot::FromJson(const rapidjson::Value& obj) noexcept(false) { + version_ = RapidJsonUtil::DeserializeKeyValue(obj, FIELD_VERSION, -1); + id_ = RapidJsonUtil::DeserializeKeyValue(obj, FIELD_ID); + schema_id_ = RapidJsonUtil::DeserializeKeyValue(obj, FIELD_SCHEMA_ID); + base_manifest_list_ = + RapidJsonUtil::DeserializeKeyValue(obj, FIELD_BASE_MANIFEST_LIST); + base_manifest_list_size_ = RapidJsonUtil::DeserializeKeyValue>( + obj, FIELD_BASE_MANIFEST_LIST_SIZE); + delta_manifest_list_ = + RapidJsonUtil::DeserializeKeyValue(obj, FIELD_DELTA_MANIFEST_LIST); + delta_manifest_list_size_ = RapidJsonUtil::DeserializeKeyValue>( + obj, FIELD_DELTA_MANIFEST_LIST_SIZE); + changelog_manifest_list_ = RapidJsonUtil::DeserializeKeyValue>( + obj, FIELD_CHANGELOG_MANIFEST_LIST); + changelog_manifest_list_size_ = RapidJsonUtil::DeserializeKeyValue>( + obj, FIELD_CHANGELOG_MANIFEST_LIST_SIZE); + index_manifest_ = + RapidJsonUtil::DeserializeKeyValue>(obj, FIELD_INDEX_MANIFEST); + commit_user_ = RapidJsonUtil::DeserializeKeyValue(obj, FIELD_COMMIT_USER); + commit_identifier_ = RapidJsonUtil::DeserializeKeyValue(obj, FIELD_COMMIT_IDENTIFIER); + commit_kind_ = Snapshot::CommitKind::FromString( + RapidJsonUtil::DeserializeKeyValue(obj, FIELD_COMMIT_KIND)); + if (commit_kind_ == Snapshot::CommitKind::Unknown()) { + throw std::invalid_argument("deserialize CommitKind failed"); + } + time_millis_ = RapidJsonUtil::DeserializeKeyValue(obj, FIELD_TIME_MILLIS); + log_offsets_ = RapidJsonUtil::DeserializeKeyValue>>( + obj, FIELD_LOG_OFFSETS); + total_record_count_ = + RapidJsonUtil::DeserializeKeyValue>(obj, FIELD_TOTAL_RECORD_COUNT); + delta_record_count_ = + RapidJsonUtil::DeserializeKeyValue>(obj, FIELD_DELTA_RECORD_COUNT); + changelog_record_count_ = RapidJsonUtil::DeserializeKeyValue>( + obj, FIELD_CHANGELOG_RECORD_COUNT); + watermark_ = RapidJsonUtil::DeserializeKeyValue>(obj, FIELD_WATERMARK); + statistics_ = + RapidJsonUtil::DeserializeKeyValue>(obj, FIELD_STATISTICS); + properties_ = + RapidJsonUtil::DeserializeKeyValue>>( + obj, FIELD_PROPERTIES); + next_row_id_ = + RapidJsonUtil::DeserializeKeyValue>(obj, FIELD_NEXT_ROW_ID); +} + +Result Snapshot::FromPath(const std::shared_ptr& fs, + const std::string& path) { + std::string json_str; + PAIMON_RETURN_NOT_OK(fs->ReadFile(path, &json_str)); + PAIMON_ASSIGN_OR_RAISE(Snapshot snapshot, Snapshot::FromJsonString(json_str)); + return snapshot; +} + +} // namespace paimon diff --git a/src/paimon/core/snapshot.h b/src/paimon/core/snapshot.h new file mode 100644 index 0000000..827d416 --- /dev/null +++ b/src/paimon/core/snapshot.h @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "paimon/common/utils/jsonizable.h" +#include "paimon/result.h" +#include "paimon/type_fwd.h" +#include "rapidjson/allocators.h" +#include "rapidjson/document.h" +#include "rapidjson/rapidjson.h" + +namespace paimon { +class FileSystem; + +// This file is the entrance to all data committed at some specific time point. +class Snapshot : public Jsonizable { + public: + class CommitKind { + public: + explicit CommitKind(int8_t kind) : value_(kind) {} + /// Changes flushed from the mem table. + static const CommitKind Append(); + + /// Changes by compacting existing data files. + static const CommitKind Compact(); + + /// Changes that clear up the whole partition and then add new records. + static const CommitKind Overwrite(); + + /// Collect statistics. + static const CommitKind Analyze(); + + static const CommitKind Unknown(); + + bool operator==(const CommitKind& other) const { + return value_ == other.value_; + } + static std::string ToString(const CommitKind& kind); + static CommitKind FromString(const std::string& kind); + + private: + int8_t value_; + }; + + static constexpr char FIELD_VERSION[] = "version"; + static constexpr char FIELD_ID[] = "id"; + static constexpr char FIELD_SCHEMA_ID[] = "schemaId"; + static constexpr char FIELD_BASE_MANIFEST_LIST[] = "baseManifestList"; + static constexpr char FIELD_BASE_MANIFEST_LIST_SIZE[] = "baseManifestListSize"; + static constexpr char FIELD_DELTA_MANIFEST_LIST[] = "deltaManifestList"; + static constexpr char FIELD_DELTA_MANIFEST_LIST_SIZE[] = "deltaManifestListSize"; + static constexpr char FIELD_CHANGELOG_MANIFEST_LIST[] = "changelogManifestList"; + static constexpr char FIELD_CHANGELOG_MANIFEST_LIST_SIZE[] = "changelogManifestListSize"; + static constexpr char FIELD_INDEX_MANIFEST[] = "indexManifest"; + static constexpr char FIELD_COMMIT_USER[] = "commitUser"; + static constexpr char FIELD_COMMIT_IDENTIFIER[] = "commitIdentifier"; + static constexpr char FIELD_COMMIT_KIND[] = "commitKind"; + static constexpr char FIELD_TIME_MILLIS[] = "timeMillis"; + static constexpr char FIELD_LOG_OFFSETS[] = "logOffsets"; + static constexpr char FIELD_TOTAL_RECORD_COUNT[] = "totalRecordCount"; + static constexpr char FIELD_DELTA_RECORD_COUNT[] = "deltaRecordCount"; + static constexpr char FIELD_CHANGELOG_RECORD_COUNT[] = "changelogRecordCount"; + static constexpr char FIELD_WATERMARK[] = "watermark"; + static constexpr char FIELD_STATISTICS[] = "statistics"; + static constexpr char FIELD_PROPERTIES[] = "properties"; + static constexpr char FIELD_NEXT_ROW_ID[] = "nextRowId"; + + JSONIZABLE_FRIEND_AND_DEFAULT_CTOR(Snapshot); + + Snapshot(int64_t id, int64_t schema_id, const std::string& base_manifest_list, + const std::optional& base_manifest_list_size, + const std::string& delta_manifest_list, + const std::optional& delta_manifest_list_size, + const std::optional& changelog_manifest_list, + const std::optional& changelog_manifest_list_size, + const std::optional& index_manifest, const std::string& commit_user, + int64_t commit_identifier, CommitKind commit_kind, int64_t time_millis, + const std::optional>& log_offsets, + const std::optional& total_record_count, + const std::optional& delta_record_count, + const std::optional& changelog_record_count, + const std::optional& watermark, const std::optional& statistics, + const std::optional>& properties, + const std::optional& next_row_id) + : Snapshot(CURRENT_VERSION, id, schema_id, base_manifest_list, base_manifest_list_size, + delta_manifest_list, delta_manifest_list_size, changelog_manifest_list, + changelog_manifest_list_size, index_manifest, commit_user, commit_identifier, + commit_kind, time_millis, log_offsets, total_record_count, delta_record_count, + changelog_record_count, watermark, statistics, properties, next_row_id) {} + + Snapshot(const std::optional& version, int64_t id, int64_t schema_id, + const std::string& base_manifest_list, + const std::optional& base_manifest_list_size, + const std::string& delta_manifest_list, + const std::optional& delta_manifest_list_size, + const std::optional& changelog_manifest_list, + const std::optional& changelog_manifest_list_size, + const std::optional& index_manifest, const std::string& commit_user, + int64_t commit_identifier, CommitKind commit_kind, int64_t time_millis, + const std::optional>& log_offsets, + const std::optional& total_record_count, + const std::optional& delta_record_count, + const std::optional& changelog_record_count, + const std::optional& watermark, const std::optional& statistics, + const std::optional>& properties, + const std::optional& next_row_id); + + bool operator==(const Snapshot& other) const; + bool TEST_Equal(const Snapshot& other) const; + + public: + static constexpr int64_t FIRST_SNAPSHOT_ID = 1; + static constexpr int32_t TABLE_STORE_02_VERSION = 1; + static constexpr int32_t CURRENT_VERSION = 3; + + public: + int32_t Version() const { + // there is no version field for paimon <= 0.2 + return version_ == std::nullopt ? TABLE_STORE_02_VERSION : version_.value(); + } + + int64_t Id() const { + return id_; + } + + int64_t SchemaId() const { + return schema_id_; + } + + const std::string& BaseManifestList() const { + return base_manifest_list_; + } + + const std::optional& BaseManifestListSize() const { + return base_manifest_list_size_; + } + + const std::string& DeltaManifestList() const { + return delta_manifest_list_; + } + + const std::optional& DeltaManifestListSize() const { + return delta_manifest_list_size_; + } + + const std::optional& ChangelogManifestList() const { + return changelog_manifest_list_; + } + + const std::optional& ChangelogManifestListSize() const { + return changelog_manifest_list_size_; + } + + const std::optional& IndexManifest() const { + return index_manifest_; + } + + const std::string& CommitUser() const { + return commit_user_; + } + + int64_t CommitIdentifier() const { + return commit_identifier_; + } + + CommitKind GetCommitKind() const { + return commit_kind_; + } + + int64_t TimeMillis() const { + return time_millis_; + } + + const std::optional>& LogOffsets() const { + return log_offsets_; + } + + const std::optional& TotalRecordCount() const { + return total_record_count_; + } + + const std::optional& DeltaRecordCount() const { + return delta_record_count_; + } + + const std::optional& ChangelogRecordCount() const { + return changelog_record_count_; + } + + const std::optional& Watermark() const { + return watermark_; + } + + const std::optional& Statistics() const { + return statistics_; + } + + const std::optional>& Properties() const { + return properties_; + } + + const std::optional& NextRowId() const { + return next_row_id_; + } + + rapidjson::Value ToJson(rapidjson::Document::AllocatorType* allocator) const + noexcept(false) override; + + void FromJson(const rapidjson::Value& obj) noexcept(false) override; + + static Result FromPath(const std::shared_ptr& fs, + const std::string& path); + + private: + // version of snapshot + // null for paimon <= 0.2 + std::optional version_; + int64_t id_ = -1; + int64_t schema_id_ = -1; + + // a manifest list recording all changes from the previous snapshots + std::string base_manifest_list_; + std::optional base_manifest_list_size_; + + // a manifest list recording all new changes occurred in this snapshot + // for faster expire and streaming reads + std::string delta_manifest_list_; + std::optional delta_manifest_list_size_; + + // a manifest list recording all changelog produced in this snapshot + // null if no changelog is produced, or for paimon <= 0.2 + std::optional changelog_manifest_list_; + std::optional changelog_manifest_list_size_; + + // a manifest recording all index files of this table + // null if no index file + std::optional index_manifest_; + + std::string commit_user_; + + // Mainly for snapshot deduplication. + // + // If multiple snapshots have the same commitIdentifier, reading from any of these + // snapshots must produce the same table. + // + // If snapshot A has a smaller commitIdentifier than snapshot B, then snapshot A must + // be committed before snapshot B, and thus snapshot A must contain older records than + // snapshot B. + int64_t commit_identifier_ = std::numeric_limits::min(); + + CommitKind commit_kind_ = CommitKind::Unknown(); + + int64_t time_millis_; + + std::optional> log_offsets_; + + // record count of all changes occurred in this snapshot + // null for paimon <= 0.3 + std::optional total_record_count_; + + // record count of all new changes occurred in this snapshot + // null for paimon <= 0.3 + std::optional delta_record_count_; + + // record count of all changelog produced in this snapshot + // null for paimon <= 0.3 + std::optional changelog_record_count_; + + // watermark for input records + // null for paimon <= 0.3 + // null if there is no watermark in new committing, and the previous snapshot does not + // Have a watermark + std::optional watermark_; + + // stats file name for statistics of this table + // null if no stats file + std::optional statistics_; + + // properties + // null for paimon <= 1.1 or empty properties + std::optional> properties_; + + std::optional next_row_id_; +}; + +} // namespace paimon diff --git a/src/paimon/core/snapshot_info.cpp b/src/paimon/core/snapshot_info.cpp new file mode 100644 index 0000000..46b6467 --- /dev/null +++ b/src/paimon/core/snapshot_info.cpp @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/snapshot/snapshot_info.h" + +namespace paimon { + +std::string SnapshotInfo::CommitKindToString(CommitKind kind) { + switch (kind) { + case CommitKind::APPEND: + return "APPEND"; + case CommitKind::COMPACT: + return "COMPACT"; + case CommitKind::OVERWRITE: + return "OVERWRITE"; + case CommitKind::ANALYZE: + return "ANALYZE"; + case CommitKind::UNKNOWN: + return "UNKNOWN"; + } + return "UNKNOWN"; +} + +} // namespace paimon diff --git a/src/paimon/core/snapshot_test.cpp b/src/paimon/core/snapshot_test.cpp new file mode 100644 index 0000000..b5694e4 --- /dev/null +++ b/src/paimon/core/snapshot_test.cpp @@ -0,0 +1,407 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/snapshot.h" + +#include "gtest/gtest.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/result.h" +#include "paimon/snapshot/snapshot_info.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +class SnapshotTest : public testing::Test { + public: + std::string ReplaceAll(const std::string& str) { + std::string replaced_str = StringUtils::Replace(str, " ", ""); + replaced_str = StringUtils::Replace(replaced_str, "\t", ""); + replaced_str = StringUtils::Replace(replaced_str, "\n", ""); + return replaced_str; + } +}; + +TEST_F(SnapshotTest, TestSimple) { + std::map log_offset = {{25, 30}}; + std::map properties = {{"key1", "value1"}, {"key2", "value2"}}; + Snapshot snapshot( + /*version=*/5, /*id=*/10, /*schema_id=*/15, /*base_manifest_list=*/"base_manifest_list", 10, + /*delta_manifest_list=*/"delta_manifest_list", 20, + /*changelog_manifest_list=*/"changelog_manifest_list", 30, + /*index_manifest=*/"index_manifest", + /*commit_user=*/"commit_user_01", /*commit_identifier=*/20, + /*commit_kind=*/Snapshot::CommitKind::Compact(), /*time_millis=*/1234, log_offset, + /*total_record_count=*/35, + /*delta_record_count=*/40, /*changelog_record_count=*/45, /*watermark=*/50, + /*statistics=*/"statistic_test", properties, /*next_row_id=*/0); + ASSERT_EQ(5, snapshot.Version()); + ASSERT_EQ(10, snapshot.Id()); + ASSERT_EQ(15, snapshot.SchemaId()); + ASSERT_EQ("base_manifest_list", snapshot.BaseManifestList()); + ASSERT_EQ(10, snapshot.BaseManifestListSize().value()); + ASSERT_EQ("delta_manifest_list", snapshot.DeltaManifestList()); + ASSERT_EQ(20, snapshot.DeltaManifestListSize().value()); + ASSERT_EQ("changelog_manifest_list", snapshot.ChangelogManifestList().value()); + ASSERT_EQ(30, snapshot.ChangelogManifestListSize().value()); + ASSERT_EQ("index_manifest", snapshot.IndexManifest().value()); + ASSERT_EQ("commit_user_01", snapshot.CommitUser()); + ASSERT_EQ(20, snapshot.CommitIdentifier()); + ASSERT_EQ(Snapshot::CommitKind::Compact(), snapshot.GetCommitKind()); + ASSERT_EQ(1234, snapshot.TimeMillis()); + ASSERT_EQ(log_offset, snapshot.LogOffsets().value()); + ASSERT_EQ(35, snapshot.TotalRecordCount().value()); + ASSERT_EQ(40, snapshot.DeltaRecordCount().value()); + ASSERT_EQ(45, snapshot.ChangelogRecordCount().value()); + ASSERT_EQ(50, snapshot.Watermark().value()); + ASSERT_EQ("statistic_test", snapshot.Statistics().value()); + ASSERT_EQ(properties, snapshot.Properties().value()); + ASSERT_EQ(0, snapshot.NextRowId().value()); +} + +TEST_F(SnapshotTest, TestFromPath) { + std::string data_path = + paimon::test::GetDataDir() + "/orc/append_09.db/append_09/snapshot/snapshot-1"; + auto fs = std::make_shared(); + ASSERT_OK_AND_ASSIGN(Snapshot snapshot, Snapshot::FromPath(fs, data_path)); + ASSERT_EQ(3, snapshot.Version()); + ASSERT_EQ(1, snapshot.Id()); + ASSERT_EQ(0, snapshot.SchemaId()); + ASSERT_EQ("manifest-list-616d1847-a02c-495f-9cca-2c8b7def0fec-0", snapshot.BaseManifestList()); + ASSERT_EQ(std::nullopt, snapshot.BaseManifestListSize()); + ASSERT_EQ("manifest-list-616d1847-a02c-495f-9cca-2c8b7def0fec-1", snapshot.DeltaManifestList()); + ASSERT_EQ(std::nullopt, snapshot.DeltaManifestListSize()); + ASSERT_EQ(std::nullopt, snapshot.ChangelogManifestList()); + ASSERT_EQ(std::nullopt, snapshot.ChangelogManifestListSize()); + ASSERT_EQ(std::nullopt, snapshot.IndexManifest()); + ASSERT_EQ("b02e4322-9c5f-41e1-a560-c0156fdf7b9c", snapshot.CommitUser()); + ASSERT_EQ(9223372036854775807ll, snapshot.CommitIdentifier()); + ASSERT_EQ(Snapshot::CommitKind::Append(), snapshot.GetCommitKind()); + ASSERT_EQ(1721614343270ll, snapshot.TimeMillis()); + ASSERT_EQ((std::map()), snapshot.LogOffsets().value()); + ASSERT_EQ(5, snapshot.TotalRecordCount().value()); + ASSERT_EQ(5, snapshot.DeltaRecordCount().value()); + ASSERT_EQ(0, snapshot.ChangelogRecordCount().value()); + ASSERT_EQ(std::nullopt, snapshot.Watermark()); + ASSERT_EQ(std::nullopt, snapshot.Statistics()); + ASSERT_EQ(std::nullopt, snapshot.Properties()); + ASSERT_EQ(std::nullopt, snapshot.NextRowId()); +} + +TEST_F(SnapshotTest, TestJsonizable) { + std::string json_str = R"({ + "version" : 3, + "id" : 1, + "schemaId" : 0, + "baseManifestList" : "manifest-list-d96fcc30-99e8-4f45-962b-a1157c56f378-0", + "baseManifestListSize" : 20, + "deltaManifestList" : "manifest-list-d96fcc30-99e8-4f45-962b-a1157c56f378-1", + "deltaManifestListSize" : 50, + "changelogManifestList" : null, + "commitUser" : "0e4d92f7-53b0-40d6-a7c0-102bf3801e6a", + "commitIdentifier" : 9223372036854775807, + "commitKind" : "OVERWRITE", + "timeMillis" : 1711692199281, + "logOffsets" : { }, + "totalRecordCount" : 3, + "deltaRecordCount" : 3, + "changelogRecordCount" : 0 + })"; + + ASSERT_OK_AND_ASSIGN(Snapshot snapshot, Snapshot::FromJsonString(json_str)); + + Snapshot expected_snapshot( + /*version=*/3, /*id=*/1, /*schema_id=*/0, /*base_manifest_list=*/ + "manifest-list-d96fcc30-99e8-4f45-962b-a1157c56f378-0", /*base_manifest_list_size=*/20, + /*delta_manifest_list=*/"manifest-list-d96fcc30-99e8-4f45-962b-a1157c56f378-1", + /*delta_manifest_list_size=*/50, /*changelog_manifest_list=*/std::nullopt, + /*changelog_manifest_list_size=*/std::nullopt, /*index_manifest=*/std::nullopt, + /*commit_user=*/"0e4d92f7-53b0-40d6-a7c0-102bf3801e6a", + /*commit_identifier=*/9223372036854775807ll, + /*commit_kind=*/Snapshot::CommitKind::Overwrite(), /*time_millis=*/1711692199281ll, + /*log_offsets=*/std::map(), + /*total_record_count=*/3, /*delta_record_count=*/3, /*changelog_record_count=*/0, + /*watermark=*/std::nullopt, /*statistics=*/std::nullopt, /*properties=*/std::nullopt, + /*next_row_id=*/std::nullopt); + ASSERT_EQ(expected_snapshot, snapshot); + + ASSERT_OK_AND_ASSIGN(std::string new_json_str, snapshot.ToJsonString()); + ASSERT_EQ(ReplaceAll(json_str), ReplaceAll(new_json_str)); +} + +TEST_F(SnapshotTest, TestSerializeAndDeserialize) { + auto se_and_de = [&](const std::string& data_path) { + auto fs = std::make_shared(); + std::string json_str; + ASSERT_OK(fs->ReadFile(data_path, &json_str)); + ASSERT_OK_AND_ASSIGN(Snapshot snapshot, Snapshot::FromPath(fs, data_path)); + ASSERT_EQ(snapshot, snapshot); + ASSERT_OK_AND_ASSIGN(std::string se_json_str, snapshot.ToJsonString()); + ASSERT_EQ(ReplaceAll(json_str), ReplaceAll(se_json_str)); + }; + auto se_and_de_from_str = [&](const std::string& json_str) { + ASSERT_OK_AND_ASSIGN(Snapshot snapshot, Snapshot::FromJsonString(json_str)); + ASSERT_EQ(snapshot, snapshot); + ASSERT_OK_AND_ASSIGN(std::string se_json_str, snapshot.ToJsonString()); + ASSERT_EQ(ReplaceAll(json_str), ReplaceAll(se_json_str)); + }; + + { + // without indexManifest + std::string data_path = + paimon::test::GetDataDir() + + "/orc/pk_table_scan_and_read_dv.db/pk_table_scan_and_read_dv/snapshot/snapshot-1"; + se_and_de(data_path); + } + { + // with indexManifest + std::string data_path = + paimon::test::GetDataDir() + + "/orc/pk_table_scan_and_read_dv.db/pk_table_scan_and_read_dv/snapshot/snapshot-6"; + se_and_de(data_path); + } + { + // with ManifestListSize + std::string data_path = paimon::test::GetDataDir() + + "/orc/append_with_bsi_bitmap_bloomfilter.db/" + "append_with_bsi_bitmap_bloomfilter/snapshot/snapshot-1"; + se_and_de(data_path); + } + { + // with properties + std::string json_str = R"({ + "version" : 3, + "id" : 10, + "schemaId" : 2, + "baseManifestList" : "base-manifest-list-1", + "baseManifestListSize" : 100, + "deltaManifestList" : "delta-manifest-list-2", + "deltaManifestListSize" : 200, + "changelogManifestList" : null, + "commitUser" : "commit-usr-3", + "commitIdentifier" : 12, + "commitKind" : "APPEND", + "timeMillis" : 1749724197266, + "logOffsets" : { + "0" : 1, + "1" : 3 + }, + "totalRecordCount" : 1024, + "deltaRecordCount" : 4096, + "watermark" : 1749724196266, + "statistics" : "statistics-4", + "properties" : { + "key0" : "value0", + "key1" : "value1" + } + })"; + se_and_de_from_str(json_str); + } + { + // with next_row_id + std::string json_str = R"({ + "version" : 3, + "id" : 10, + "schemaId" : 2, + "baseManifestList" : "base-manifest-list-1", + "baseManifestListSize" : 100, + "deltaManifestList" : "delta-manifest-list-2", + "deltaManifestListSize" : 200, + "changelogManifestList" : null, + "commitUser" : "commit-usr-3", + "commitIdentifier" : 12, + "commitKind" : "APPEND", + "timeMillis" : 1749724197266, + "logOffsets" : { + "0" : 1, + "1" : 3 + }, + "totalRecordCount" : 1024, + "deltaRecordCount" : 4096, + "watermark" : 1749724196266, + "statistics" : "statistics-4", + "properties" : { + "key0" : "value0", + "key1" : "value1" + }, + "nextRowId" : 0 + })"; + se_and_de_from_str(json_str); + } +} + +TEST_F(SnapshotTest, TestCommitKindAnalyze) { + // Test constructing a Snapshot with CommitKind::Analyze + Snapshot snapshot( + /*version=*/3, /*id=*/20, /*schema_id=*/5, + /*base_manifest_list=*/"base-manifest-analyze", + /*base_manifest_list_size=*/100, + /*delta_manifest_list=*/"delta-manifest-analyze", + /*delta_manifest_list_size=*/200, + /*changelog_manifest_list=*/std::nullopt, + /*changelog_manifest_list_size=*/std::nullopt, + /*index_manifest=*/std::nullopt, + /*commit_user=*/"analyze-user", + /*commit_identifier=*/42, + /*commit_kind=*/Snapshot::CommitKind::Analyze(), + /*time_millis=*/1700000000000ll, + /*log_offsets=*/std::map(), + /*total_record_count=*/0, + /*delta_record_count=*/0, + /*changelog_record_count=*/0, + /*watermark=*/std::nullopt, + /*statistics=*/"test-statistics", + /*properties=*/std::nullopt, + /*next_row_id=*/std::nullopt); + + ASSERT_EQ(Snapshot::CommitKind::Analyze(), snapshot.GetCommitKind()); + ASSERT_EQ("ANALYZE", Snapshot::CommitKind::ToString(snapshot.GetCommitKind())); +} + +TEST_F(SnapshotTest, TestCommitKindAnalyzeSerializeAndDeserialize) { + std::string json_str = R"({ + "version" : 3, + "id" : 20, + "schemaId" : 5, + "baseManifestList" : "base-manifest-analyze", + "baseManifestListSize" : 100, + "deltaManifestList" : "delta-manifest-analyze", + "deltaManifestListSize" : 200, + "changelogManifestList" : null, + "commitUser" : "analyze-user", + "commitIdentifier" : 42, + "commitKind" : "ANALYZE", + "timeMillis" : 1700000000000, + "logOffsets" : { }, + "totalRecordCount" : 0, + "deltaRecordCount" : 0, + "changelogRecordCount" : 0, + "statistics" : "test-statistics" + })"; + + ASSERT_OK_AND_ASSIGN(Snapshot snapshot, Snapshot::FromJsonString(json_str)); + + // Verify deserialization + ASSERT_EQ(20, snapshot.Id()); + ASSERT_EQ(5, snapshot.SchemaId()); + ASSERT_EQ(Snapshot::CommitKind::Analyze(), snapshot.GetCommitKind()); + ASSERT_EQ("test-statistics", snapshot.Statistics().value()); + + // Verify round-trip serialization + ASSERT_OK_AND_ASSIGN(std::string serialized, snapshot.ToJsonString()); + ASSERT_EQ(ReplaceAll(json_str), ReplaceAll(serialized)); + + // Verify re-deserialization produces equal snapshot + ASSERT_OK_AND_ASSIGN(Snapshot deserialized, Snapshot::FromJsonString(serialized)); + ASSERT_EQ(snapshot, deserialized); +} + +TEST_F(SnapshotTest, TestCommitKindToStringAndFromString) { + // Verify all CommitKind values round-trip through ToString/FromString + ASSERT_EQ("APPEND", Snapshot::CommitKind::ToString(Snapshot::CommitKind::Append())); + ASSERT_EQ("COMPACT", Snapshot::CommitKind::ToString(Snapshot::CommitKind::Compact())); + ASSERT_EQ("OVERWRITE", Snapshot::CommitKind::ToString(Snapshot::CommitKind::Overwrite())); + ASSERT_EQ("ANALYZE", Snapshot::CommitKind::ToString(Snapshot::CommitKind::Analyze())); + + ASSERT_EQ(Snapshot::CommitKind::Append(), Snapshot::CommitKind::FromString("APPEND")); + ASSERT_EQ(Snapshot::CommitKind::Compact(), Snapshot::CommitKind::FromString("COMPACT")); + ASSERT_EQ(Snapshot::CommitKind::Overwrite(), Snapshot::CommitKind::FromString("OVERWRITE")); + ASSERT_EQ(Snapshot::CommitKind::Analyze(), Snapshot::CommitKind::FromString("ANALYZE")); + + // Verify equality/inequality + ASSERT_FALSE(Snapshot::CommitKind::Analyze() == Snapshot::CommitKind::Append()); + ASSERT_FALSE(Snapshot::CommitKind::Analyze() == Snapshot::CommitKind::Compact()); + ASSERT_FALSE(Snapshot::CommitKind::Analyze() == Snapshot::CommitKind::Overwrite()); + ASSERT_TRUE(Snapshot::CommitKind::Analyze() == Snapshot::CommitKind::Analyze()); +} + +TEST_F(SnapshotTest, TestSnapshotInfoCommitKindToString) { + ASSERT_EQ("APPEND", SnapshotInfo::CommitKindToString(SnapshotInfo::CommitKind::APPEND)); + ASSERT_EQ("COMPACT", SnapshotInfo::CommitKindToString(SnapshotInfo::CommitKind::COMPACT)); + ASSERT_EQ("OVERWRITE", SnapshotInfo::CommitKindToString(SnapshotInfo::CommitKind::OVERWRITE)); + ASSERT_EQ("ANALYZE", SnapshotInfo::CommitKindToString(SnapshotInfo::CommitKind::ANALYZE)); + ASSERT_EQ("UNKNOWN", SnapshotInfo::CommitKindToString(SnapshotInfo::CommitKind::UNKNOWN)); +} + +TEST_F(SnapshotTest, TestChangelogManifestListSerialization) { + // Test with changelog_manifest_list set to a non-null value + { + std::string json_str = R"({ + "version" : 3, + "id" : 1, + "schemaId" : 0, + "baseManifestList" : "base-manifest-list", + "deltaManifestList" : "delta-manifest-list", + "changelogManifestList" : "changelog-manifest-list", + "changelogManifestListSize" : 42, + "commitUser" : "user-01", + "commitIdentifier" : 100, + "commitKind" : "APPEND", + "timeMillis" : 1700000000000, + "logOffsets" : { }, + "totalRecordCount" : 10, + "deltaRecordCount" : 5, + "changelogRecordCount" : 3 + })"; + + ASSERT_OK_AND_ASSIGN(Snapshot snapshot, Snapshot::FromJsonString(json_str)); + ASSERT_EQ("changelog-manifest-list", snapshot.ChangelogManifestList().value()); + ASSERT_EQ(42, snapshot.ChangelogManifestListSize().value()); + + ASSERT_OK_AND_ASSIGN(std::string serialized, snapshot.ToJsonString()); + ASSERT_EQ(ReplaceAll(json_str), ReplaceAll(serialized)); + + // Verify round-trip + ASSERT_OK_AND_ASSIGN(Snapshot deserialized, Snapshot::FromJsonString(serialized)); + ASSERT_EQ(snapshot, deserialized); + } + + // Test with changelog_manifest_list set to null + { + std::string json_str = R"({ + "version" : 3, + "id" : 2, + "schemaId" : 0, + "baseManifestList" : "base-manifest-list", + "deltaManifestList" : "delta-manifest-list", + "changelogManifestList" : null, + "commitUser" : "user-02", + "commitIdentifier" : 200, + "commitKind" : "COMPACT", + "timeMillis" : 1700000001000, + "logOffsets" : { }, + "totalRecordCount" : 20, + "deltaRecordCount" : 10, + "changelogRecordCount" : 0 + })"; + + ASSERT_OK_AND_ASSIGN(Snapshot snapshot, Snapshot::FromJsonString(json_str)); + ASSERT_EQ(std::nullopt, snapshot.ChangelogManifestList()); + ASSERT_EQ(std::nullopt, snapshot.ChangelogManifestListSize()); + + ASSERT_OK_AND_ASSIGN(std::string serialized, snapshot.ToJsonString()); + ASSERT_EQ(ReplaceAll(json_str), ReplaceAll(serialized)); + + // Verify round-trip + ASSERT_OK_AND_ASSIGN(Snapshot deserialized, Snapshot::FromJsonString(serialized)); + ASSERT_EQ(snapshot, deserialized); + } +} + +} // namespace paimon::test diff --git a/src/paimon/core/table/bucket_mode.h b/src/paimon/core/table/bucket_mode.h new file mode 100644 index 0000000..2891dac --- /dev/null +++ b/src/paimon/core/table/bucket_mode.h @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace paimon { + +/// Bucket mode of the table, it affects the writing process and also affects the data skipping in +/// reading. +enum class BucketMode { + /// The fixed number of buckets configured by the user can only be modified through offline + /// commands. The data is distributed to the corresponding buckets according to the hash value + /// of + /// bucket key (default is primary key), and the reading end can perform bucket skipping based + /// on + /// the filtering conditions of the bucket key. + HASH_FIXED = 0, + + /// The dynamic bucket mode records which bucket the key corresponds to through the index files. + /// The index records the correspondence between the hash value of the primary-key and the + /// bucket. This mode cannot support multiple concurrent writes or bucket skipping for reading + /// filter conditions. This mode only works for changelog table. + HASH_DYNAMIC, + + /// The cross partition mode is for cross partition upsert (primary keys not contain all + /// partition fields). It directly maintains the mapping of primary keys to partition and + /// bucket, + /// uses local disks, and initializes indexes by reading all existing keys in the table when + /// starting stream write job. + CROSS_PARTITION, + + /// Ignoring bucket concept, although all data is written to bucket-0, the parallelism of reads + /// and writes is unrestricted. This mode only works for append-only table. + BUCKET_UNAWARE, + + /// Configured by 'bucket' = '-2' (postpone bucket) for primary key table. This mode aims to + /// solve the difficulty to determine a fixed number of buckets and support different buckets + /// for + /// different partitions. The bucket will be adaptively adjusted to the appropriate value in the + /// background. + POSTPONE_MODE +}; + +class BucketModeDefine { + public: + static constexpr int32_t UNAWARE_BUCKET = 0; + static constexpr int32_t POSTPONE_BUCKET = -2; +}; + +} // namespace paimon diff --git a/src/paimon/core/table/table.cpp b/src/paimon/core/table/table.cpp new file mode 100644 index 0000000..557c452 --- /dev/null +++ b/src/paimon/core/table/table.cpp @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/catalog/table.h" + +#include + +#include "fmt/format.h" +#include "paimon/core/schema/schema_manager.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/fs/file_system.h" + +namespace paimon { + +Result> Table::Create(const std::shared_ptr& file_system, + const std::string& table_path, + const Identifier& identifier) { + PAIMON_ASSIGN_OR_RAISE(bool exist, file_system->Exists(table_path)); + if (!exist) { + return Status::NotExist(fmt::format("{} not exist", identifier.ToString())); + } + + SchemaManager schema_manager(file_system, table_path); + PAIMON_ASSIGN_OR_RAISE(std::optional> latest_schema, + schema_manager.Latest()); + if (!latest_schema) { + return Status::NotExist( + fmt::format("load table schema for {} failed", identifier.ToString())); + } + + auto schema = std::static_pointer_cast(*latest_schema); + return std::make_shared(schema, identifier.GetDatabaseName(), identifier.GetTableName()); +} + +std::string Table::FullName() const { + return database_ == Identifier::kUnknownDatabase ? table_name_ : database_ + "." + table_name_; +} + +} // namespace paimon diff --git a/src/paimon/core/table/table_test.cpp b/src/paimon/core/table/table_test.cpp new file mode 100644 index 0000000..f2d12f2 --- /dev/null +++ b/src/paimon/core/table/table_test.cpp @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/catalog/table.h" + +#include "arrow/api.h" +#include "gtest/gtest.h" +#include "paimon/core/schema/schema_manager.h" +#include "paimon/core/schema/table_schema.h" +#include "paimon/schema/schema.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::test { + +TEST(TableTest, TestCreateWithUnknownDatabase) { + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + + SchemaManager schema_manager(dir->GetFileSystem(), dir->Str()); + auto schema = + arrow::schema({arrow::field("id", arrow::int32(), /*nullable=*/false), + arrow::field("name", arrow::utf8()), arrow::field("value", arrow::int64())}); + std::vector partition_keys = {"name"}; + std::vector primary_keys = {"id"}; + std::map options = { + {"file.format", "orc"}, + {"commit.force-compact", "true"}, + }; + + ASSERT_OK_AND_ASSIGN([[maybe_unused]] std::unique_ptr created_schema, + schema_manager.CreateTable(schema, partition_keys, primary_keys, options)); + + ASSERT_OK_AND_ASSIGN(std::shared_ptr
table, + Table::Create(dir->GetFileSystem(), dir->Str(), Identifier("tbl1"))); + + EXPECT_EQ(table->Name(), "tbl1"); + EXPECT_EQ(table->FullName(), "tbl1"); + + std::shared_ptr latest_schema = table->LatestSchema(); + ASSERT_NE(latest_schema, nullptr); + auto data_schema = std::dynamic_pointer_cast(latest_schema); + ASSERT_TRUE(data_schema != nullptr); + ASSERT_TRUE(std::dynamic_pointer_cast(latest_schema) != nullptr); + EXPECT_EQ(data_schema->Id(), 0); + EXPECT_EQ(data_schema->PartitionKeys(), partition_keys); + EXPECT_EQ(data_schema->PrimaryKeys(), primary_keys); +} + +TEST(TableTest, TestCreateFailedWithNonExistSchema) { + auto dir = UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + auto fs = dir->GetFileSystem(); + SchemaManager schema_manager(fs, dir->Str()); + auto schema = + arrow::schema({arrow::field("id", arrow::int32(), /*nullable=*/false), + arrow::field("name", arrow::utf8()), arrow::field("value", arrow::int64())}); + std::vector partition_keys = {"name"}; + std::vector primary_keys = {"id"}; + std::map options = { + {"file.format", "orc"}, + {"commit.force-compact", "true"}, + }; + + ASSERT_OK_AND_ASSIGN([[maybe_unused]] std::unique_ptr created_schema, + schema_manager.CreateTable(schema, partition_keys, primary_keys, options)); + + // remove schema + std::string schema_path = schema_manager.ToSchemaPath(0); + ASSERT_OK(fs->Delete(schema_path, /*recursive=*/false)); + // check create table failed + ASSERT_NOK_WITH_MSG( + Table::Create(fs, dir->Str(), Identifier("tbl1")), + "load table schema for Identifier{database='unknown', table='tbl1'} failed"); +} + +} // namespace paimon::test