diff --git a/src/paimon/core/core_options.cpp b/src/paimon/core/core_options.cpp new file mode 100644 index 0000000..4cb0c81 --- /dev/null +++ b/src/paimon/core/core_options.cpp @@ -0,0 +1,1452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/core_options.h" + +#include +#include +#include +#include +#include + +#include "fmt/format.h" +#include "paimon/common/fs/resolving_file_system.h" +#include "paimon/common/options/memory_size.h" +#include "paimon/common/options/time_duration.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/core/options/expire_config.h" +#include "paimon/core/options/lookup_strategy.h" +#include "paimon/core/options/sort_order.h" +#include "paimon/core/utils/branch_manager.h" +#include "paimon/defs.h" +#include "paimon/format/file_format.h" +#include "paimon/format/file_format_factory.h" +#include "paimon/status.h" + +namespace paimon { + +// ConfigParser is a helper class for parsing configurations from a map of strings. +class ConfigParser { + public: + explicit ConfigParser(const std::map& map) : config_map_(map) {} + + // Parse basic type configurations + template + Status Parse(const std::string& key, T* value) const { + auto iter = config_map_.find(key); + if (iter != config_map_.end()) { + auto result = StringUtils::StringToValue(iter->second); + if (result) { + *value = result.value(); + return Status::OK(); + } + return Status::Invalid(fmt::format("Invalid Config [{}: {}]", key, iter->second)); + } + return Status::OK(); // Return success even if the configuration does not exist + } + + // Parse optional basic type configurations + template + Status Parse(const std::string& key, std::optional* value) const { + auto iter = config_map_.find(key); + if (iter != config_map_.end()) { + auto result = StringUtils::StringToValue(iter->second); + if (result) { + *value = result.value(); + return Status::OK(); + } + return Status::Invalid(fmt::format("Invalid Config [{}: {}]", key, iter->second)); + } + return Status::OK(); // Return success even if the configuration does not exist + } + + // Parse list configurations + template + Status ParseList(const std::string& key, const std::string& delimiter, std::vector* list, + bool need_trim = false) const { + auto iter = config_map_.find(key); + if (iter != config_map_.end()) { + auto value_str_vec = StringUtils::Split(iter->second, delimiter, /*ignore_empty=*/true); + for (auto& value_str : value_str_vec) { + if (need_trim) { + StringUtils::Trim(&value_str); + } + if constexpr (std::is_same_v) { + list->emplace_back(value_str); + } else { + auto value = StringUtils::StringToValue(value_str); + if (!value) { + return Status::Invalid( + fmt::format("Invalid Config [{}: {}]", key, iter->second)); + } + list->emplace_back(value.value()); + } + } + } + return Status::OK(); // Return success even if the configuration does not exist + } + + // Parse memory size configurations + template + Status ParseMemorySize(const std::string& key, T* value) const { + static_assert(std::is_same_v || std::is_same_v>, + "ParseMemorySize only supports int64_t and std::optional"); + auto iter = config_map_.find(key); + if (iter != config_map_.end()) { + PAIMON_ASSIGN_OR_RAISE(*value, MemorySize::ParseBytes(iter->second)); + } + return Status::OK(); + } + + // Parse time duration configurations + template + Status ParseTimeDuration(const std::string& key, T* value) const { + static_assert(std::is_same_v || std::is_same_v>, + "ParseTimeDuration only supports int64_t and std::optional"); + auto iter = config_map_.find(key); + if (iter != config_map_.end()) { + PAIMON_ASSIGN_OR_RAISE(*value, TimeDuration::Parse(iter->second)); + } + return Status::OK(); + } + + // Parse object configurations + template + Status ParseObject(const std::string& key, const std::string& default_identifier, + std::shared_ptr* value) const { + auto iter = config_map_.find(key); + if (iter != config_map_.end()) { + std::string normalized_value = StringUtils::ToLowerCase(iter->second); + PAIMON_ASSIGN_OR_RAISE(*value, Factory::Get(normalized_value, config_map_)); + } else { + PAIMON_ASSIGN_OR_RAISE( + *value, Factory::Get(StringUtils::ToLowerCase(default_identifier), config_map_)); + } + return Status::OK(); + } + + // Parse file system + Status ParseFileSystem(const std::map& fs_scheme_to_identifier_map, + const std::shared_ptr& specified_file_system, + std::shared_ptr* value) const { + if (specified_file_system) { + // if exists user specified file system, first use + *value = specified_file_system; + return Status::OK(); + } + std::string default_fs_identifier = "local"; + auto iter = config_map_.find(Options::FILE_SYSTEM); + if (iter != config_map_.end()) { + default_fs_identifier = StringUtils::ToLowerCase(iter->second); + } + *value = std::make_shared(fs_scheme_to_identifier_map, + default_fs_identifier, config_map_); + return Status::OK(); + } + + // Parse SortOrder + Status ParseSortOrder(SortOrder* sort_order) const { + auto iter = config_map_.find(Options::SEQUENCE_FIELD_SORT_ORDER); + if (iter != config_map_.end()) { + std::string str = StringUtils::ToLowerCase(iter->second); + if (str == "ascending") { + *sort_order = SortOrder::ASCENDING; + } else if (str == "descending") { + *sort_order = SortOrder::DESCENDING; + } else { + return Status::Invalid(fmt::format("invalid sort order: {}", str)); + } + } + return Status::OK(); + } + + // Parse LookupCompactMode + Status ParseLookupCompactMode(LookupCompactMode* mode) const { + auto iter = config_map_.find(Options::LOOKUP_COMPACT); + if (iter != config_map_.end()) { + std::string str = StringUtils::ToLowerCase(iter->second); + if (str == "radical") { + *mode = LookupCompactMode::RADICAL; + } else if (str == "gentle") { + *mode = LookupCompactMode::GENTLE; + } else { + return Status::Invalid(fmt::format("invalid lookup mode: {}", str)); + } + } + return Status::OK(); + } + + // Parse SortEngine + Status ParseSortEngine(SortEngine* sort_engine) const { + auto iter = config_map_.find(Options::SORT_ENGINE); + if (iter != config_map_.end()) { + std::string str = StringUtils::ToLowerCase(iter->second); + if (str == "min-heap") { + *sort_engine = SortEngine::MIN_HEAP; + } else if (str == "loser-tree") { + *sort_engine = SortEngine::LOSER_TREE; + } else { + return Status::Invalid(fmt::format("invalid sort engine: {}", str)); + } + } + return Status::OK(); + } + + // Parse MergeEngine + Status ParseMergeEngine(MergeEngine* merge_engine) const { + auto iter = config_map_.find(Options::MERGE_ENGINE); + if (iter != config_map_.end()) { + std::string str = StringUtils::ToLowerCase(iter->second); + if (str == "deduplicate") { + *merge_engine = MergeEngine::DEDUPLICATE; + } else if (str == "partial-update") { + *merge_engine = MergeEngine::PARTIAL_UPDATE; + } else if (str == "aggregation") { + *merge_engine = MergeEngine::AGGREGATE; + } else if (str == "first-row") { + *merge_engine = MergeEngine::FIRST_ROW; + } else { + return Status::Invalid(fmt::format("invalid merge engine: {}", str)); + } + } + return Status::OK(); + } + + // Parse ChangelogProducer + Status ParseChangelogProducer(ChangelogProducer* changelog_producer) const { + auto iter = config_map_.find(Options::CHANGELOG_PRODUCER); + if (iter != config_map_.end()) { + std::string str = StringUtils::ToLowerCase(iter->second); + if (str == "none") { + *changelog_producer = ChangelogProducer::NONE; + } else if (str == "input") { + *changelog_producer = ChangelogProducer::INPUT; + } else if (str == "full-compaction") { + *changelog_producer = ChangelogProducer::FULL_COMPACTION; + } else if (str == "lookup") { + *changelog_producer = ChangelogProducer::LOOKUP; + } else { + return Status::Invalid(fmt::format("invalid changelog producer: {}", str)); + } + } + return Status::OK(); + } + + // Parse ExternalPathStrategy + Status ParseExternalPathStrategy(ExternalPathStrategy* external_path_strategy) const { + auto iter = config_map_.find(Options::DATA_FILE_EXTERNAL_PATHS_STRATEGY); + if (iter != config_map_.end()) { + std::string str = StringUtils::ToLowerCase(iter->second); + if (str == "none") { + *external_path_strategy = ExternalPathStrategy::NONE; + } else if (str == "specific-fs") { + *external_path_strategy = ExternalPathStrategy::SPECIFIC_FS; + } else if (str == "round-robin") { + *external_path_strategy = ExternalPathStrategy::ROUND_ROBIN; + } else { + return Status::Invalid(fmt::format("invalid external path strategy: {}", str)); + } + } + return Status::OK(); + } + + // Parse BucketFunctionType + Status ParseBucketFunctionType(BucketFunctionType* bucket_function_type) const { + auto iter = config_map_.find(Options::BUCKET_FUNCTION_TYPE); + if (iter != config_map_.end()) { + std::string str = StringUtils::ToLowerCase(iter->second); + if (str == "default") { + *bucket_function_type = BucketFunctionType::DEFAULT; + } else if (str == "mod") { + *bucket_function_type = BucketFunctionType::MOD; + } else if (str == "hive") { + *bucket_function_type = BucketFunctionType::HIVE; + } else { + return Status::Invalid(fmt::format("invalid bucket function type: {}", str)); + } + } + return Status::OK(); + } + + // Parse StartupMode + Status ParseStartupMode(StartupMode* startup_mode) const { + auto iter = config_map_.find(Options::SCAN_MODE); + if (iter != config_map_.end()) { + std::string str = StringUtils::ToLowerCase(iter->second); + PAIMON_ASSIGN_OR_RAISE(*startup_mode, StartupMode::FromString(str)); + } + return Status::OK(); + } + + // parse file.format.per.level + Status ParseFileFormatPerLevel( + std::map>* file_format_per_level_ptr) const { + auto& file_format_per_level = *file_format_per_level_ptr; + std::string file_format_per_level_str; + PAIMON_RETURN_NOT_OK(Parse(Options::FILE_FORMAT_PER_LEVEL, &file_format_per_level_str)); + auto level2format = + StringUtils::Split(file_format_per_level_str, std::string(","), std::string(":")); + for (const auto& single_level : level2format) { + if (single_level.size() != 2) { + return Status::Invalid( + fmt::format("fail to parse key {}, value {} (usage example: 0:avro,3:parquet)", + Options::FILE_FORMAT_PER_LEVEL, file_format_per_level_str)); + } + auto level = StringUtils::StringToValue(single_level[0]); + if (!level || level.value() < 0) { + return Status::Invalid( + fmt::format("fail to parse level {} from string to int in {}", single_level[0], + Options::FILE_FORMAT_PER_LEVEL)); + } + std::shared_ptr file_format; + PAIMON_RETURN_NOT_OK(ParseObject( + "_no_use", /*default_identifier=*/single_level[1], &file_format)); + file_format_per_level[level.value()] = file_format; + } + return Status::OK(); + } + + // parse file.compression.per.level + Status ParseFileCompressionPerLevel( + std::map* file_compression_per_level_ptr) const { + auto& file_compression_per_level = *file_compression_per_level_ptr; + std::string file_compression_per_level_str; + PAIMON_RETURN_NOT_OK( + Parse(Options::FILE_COMPRESSION_PER_LEVEL, &file_compression_per_level_str)); + auto level2compression = + StringUtils::Split(file_compression_per_level_str, std::string(","), std::string(":")); + for (const auto& single_level : level2compression) { + if (single_level.size() != 2) { + return Status::Invalid(fmt::format( + "fail to parse key {}, value {} (usage example: 0:lz4,1:zstd)", + Options::FILE_COMPRESSION_PER_LEVEL, file_compression_per_level_str)); + } + auto level = StringUtils::StringToValue(single_level[0]); + if (!level || level.value() < 0) { + return Status::Invalid( + fmt::format("fail to parse level {} from string to int in {}", single_level[0], + Options::FILE_COMPRESSION_PER_LEVEL)); + } + file_compression_per_level[level.value()] = single_level[1]; + } + return Status::OK(); + } + + bool ContainsKey(const std::string& key) const { + return config_map_.find(key) != config_map_.end(); + } + + private: + const std::map config_map_; +}; + +// Impl is a private implementation of CoreOptions, +// storing various configurable fields and their default values. +struct CoreOptions::Impl { + int64_t page_size = 64 * 1024; + std::optional target_file_size; + std::optional blob_target_file_size; + int64_t source_split_target_size = 128 * 1024 * 1024; + int64_t source_split_open_file_cost = 4 * 1024 * 1024; + int64_t manifest_target_file_size = 8 * 1024 * 1024; + int64_t deletion_vector_target_file_size = 2 * 1024 * 1024; + int64_t manifest_full_compaction_file_size = 16 * 1024 * 1024; + int64_t write_buffer_size = 256 * 1024 * 1024; + int64_t commit_timeout = std::numeric_limits::max(); + + std::shared_ptr file_format; + std::shared_ptr file_system; + std::shared_ptr manifest_file_format; + + std::optional scan_snapshot_id; + std::optional scan_timestamp_millis; + ExpireConfig expire_config; + std::vector sequence_field; + std::vector remove_record_on_sequence_group; + std::vector blob_fields; + std::vector blob_descriptor_fields; + std::vector blob_view_fields; + std::vector blob_external_storage_fields; + + std::string partition_default_name = "__DEFAULT_PARTITION__"; + StartupMode startup_mode = StartupMode::Default(); + std::string file_compression = "zstd"; + std::string manifest_compression = "zstd"; + std::string branch = BranchManager::DEFAULT_MAIN_BRANCH; + std::string data_file_prefix = "data-"; + std::string file_system_scheme_to_identifier_map_str; + + std::optional field_default_func; + std::optional scan_fallback_branch; + std::optional data_file_external_paths; + std::optional blob_external_storage_path; + + std::map raw_options; + + int32_t bucket = -1; + + int32_t manifest_merge_min_count = 30; + int32_t read_batch_size = 1024; + int32_t write_batch_size = 1024; + int32_t local_sort_max_num_file_handles = 128; + int32_t commit_max_retries = 10; + int32_t compaction_min_file_num = 5; + int32_t compaction_max_size_amplification_percent = 200; + int32_t compaction_size_ratio = 1; + int32_t num_sorted_runs_compaction_trigger = 5; + std::optional num_sorted_runs_stop_trigger; + std::optional num_levels; + + SortOrder sequence_field_sort_order = SortOrder::ASCENDING; + MergeEngine merge_engine = MergeEngine::DEDUPLICATE; + SortEngine sort_engine = SortEngine::LOSER_TREE; + ChangelogProducer changelog_producer = ChangelogProducer::NONE; + ExternalPathStrategy external_path_strategy = ExternalPathStrategy::NONE; + LookupCompactMode lookup_compact_mode = LookupCompactMode::RADICAL; + std::optional lookup_compact_max_interval; + BucketFunctionType bucket_function_type = BucketFunctionType::DEFAULT; + + int32_t file_compression_zstd_level = 1; + int64_t write_buffer_spill_max_disk_size = std::numeric_limits::max(); + + bool ignore_delete = false; + bool write_buffer_spillable = true; + bool write_only = false; + bool deletion_vectors_enabled = false; + bool deletion_vectors_bitmap64 = false; + bool force_lookup = false; + bool lookup_wait = true; + bool partial_update_remove_record_on_delete = false; + bool aggregation_remove_record_on_delete = false; + bool table_read_sequence_number_enabled = false; + bool key_value_sequence_number_enabled = false; + bool file_index_read_enabled = true; + bool enable_adaptive_prefetch_strategy = true; + bool index_file_in_data_file_dir = false; + bool row_tracking_enabled = false; + bool row_tracking_partition_group_on_commit = true; + bool data_evolution_enabled = false; + bool legacy_partition_name_enabled = true; + bool global_index_enabled = true; + std::optional global_index_thread_num; + bool commit_force_compact = false; + bool compaction_force_rewrite_all_files = false; + bool compaction_force_up_level_0 = false; + std::optional global_index_external_path; + + std::optional scan_tag_name; + std::optional optimized_compaction_interval; + std::optional compaction_total_size_threshold; + std::optional compaction_incremental_size_threshold; + int32_t compact_off_peak_start_hour = -1; + int32_t compact_off_peak_end_hour = -1; + int32_t compact_off_peak_ratio = 0; + bool lookup_cache_bloom_filter = true; + double lookup_cache_bloom_filter_fpp = 0.05; + bool lookup_remote_file_enabled = false; + int32_t lookup_remote_level_threshold = INT32_MIN; + CompressOptions lookup_compress_options{"zstd", 1}; + CompressOptions spill_compress_options{"zstd", 1}; + int64_t cache_page_size = 64 * 1024; // 64KB + std::map> file_format_per_level; + std::map file_compression_per_level; + int64_t lookup_cache_max_memory = 256 * 1024 * 1024; + double lookup_cache_high_prio_pool_ratio = 0.25; + int64_t lookup_cache_file_retention_ms = 1 * 3600 * 1000; // 1 hour + int64_t lookup_cache_max_disk_size = INT64_MAX; + + // Parse basic table options: bucket, partition, file sizes, batch sizes, file system, etc. + Status ParseBasicOptions( + const ConfigParser& parser, const std::shared_ptr& specified_file_system, + const std::map& fs_scheme_to_identifier_map) { + // Parse bucket - bucket number, -1 for dynamic bucket mode, >0 for fixed bucket mode + PAIMON_RETURN_NOT_OK(parser.Parse(Options::BUCKET, &bucket)); + // Parse partition.default-name - default partition name for null/empty partition values + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::PARTITION_DEFAULT_NAME, &partition_default_name)); + // Parse page-size - memory page size, default 64 kb + PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::PAGE_SIZE, &page_size)); + // Parse target-file-size - target size of a data file + PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::TARGET_FILE_SIZE, &target_file_size)); + // Parse blob.target-file-size - target size of a blob file + PAIMON_RETURN_NOT_OK( + parser.ParseMemorySize(Options::BLOB_TARGET_FILE_SIZE, &blob_target_file_size)); + // Parse source.split.target-size - target size of a source split when scanning a bucket + PAIMON_RETURN_NOT_OK( + parser.ParseMemorySize(Options::SOURCE_SPLIT_TARGET_SIZE, &source_split_target_size)); + // Parse source.split.open-file-cost - open file cost to avoid reading too many files + PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::SOURCE_SPLIT_OPEN_FILE_COST, + &source_split_open_file_cost)); + // Parse read.batch-size - read batch size for file formats + PAIMON_RETURN_NOT_OK(parser.Parse(Options::READ_BATCH_SIZE, &read_batch_size)); + // Parse write.batch-size - write batch size for file formats + PAIMON_RETURN_NOT_OK(parser.Parse(Options::WRITE_BATCH_SIZE, &write_batch_size)); + // Parse write-buffer-size - data to build up in memory before flushing to disk + PAIMON_RETURN_NOT_OK( + parser.ParseMemorySize(Options::WRITE_BUFFER_SIZE, &write_buffer_size)); + // Parse write-buffer-spillable - whether write buffer may spill to disk + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::WRITE_BUFFER_SPILLABLE, &write_buffer_spillable)); + // Parse write-buffer-spill.max-disk-size - max disk size for spill files + PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::WRITE_BUFFER_SPILL_MAX_DISK_SIZE, + &write_buffer_spill_max_disk_size)); + // Parse local-sort.max-num-file-handles - spill file handle cap for local merge + PAIMON_RETURN_NOT_OK(parser.Parse(Options::LOCAL_SORT_MAX_NUM_FILE_HANDLES, + &local_sort_max_num_file_handles)); + // Parse spill-compression - compression codec for spill files + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::SPILL_COMPRESSION, &spill_compress_options.compress)); + // Parse spill-compression.zstd-level - zstd level for spill compression, default 1 + PAIMON_RETURN_NOT_OK(parser.Parse(Options::SPILL_COMPRESSION_ZSTD_LEVEL, + &(spill_compress_options.zstd_level))); + // Parse file-system - file system type, default "local" + PAIMON_RETURN_NOT_OK(parser.ParseFileSystem(fs_scheme_to_identifier_map, + specified_file_system, &file_system)); + // Parse write-only - if true, compactions and snapshot expiration will be skipped + PAIMON_RETURN_NOT_OK(parser.Parse(Options::WRITE_ONLY, &write_only)); + // Parse partition.legacy-name - use legacy ToString for partition names, default true + PAIMON_RETURN_NOT_OK(parser.Parse(Options::PARTITION_GENERATE_LEGACY_NAME, + &legacy_partition_name_enabled)); + // Only for test, parse enable-adaptive-prefetch-strategy + PAIMON_RETURN_NOT_OK(parser.Parse("test.enable-adaptive-prefetch-strategy", + &enable_adaptive_prefetch_strategy)); + // Parse data-file.external-paths - external paths for data files, comma separated + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::DATA_FILE_EXTERNAL_PATHS, &data_file_external_paths)); + // Parse data-file.external-paths.strategy - strategy for selecting external path + PAIMON_RETURN_NOT_OK(parser.ParseExternalPathStrategy(&external_path_strategy)); + // Parse data-file.prefix - file name prefix of data files, default "data-" + PAIMON_RETURN_NOT_OK(parser.Parse(Options::DATA_FILE_PREFIX, &data_file_prefix)); + // Parse row-tracking.enabled - whether to enable unique row id for append table + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::ROW_TRACKING_ENABLED, &row_tracking_enabled)); + // Parse row-tracking.partition-group-on-commit - whether to group delta files by partition + PAIMON_RETURN_NOT_OK(parser.Parse(Options::ROW_TRACKING_PARTITION_GROUP_ON_COMMIT, + &row_tracking_partition_group_on_commit)); + // Parse data-evolution.enabled - whether to enable data evolution for row tracking + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::DATA_EVOLUTION_ENABLED, &data_evolution_enabled)); + // Parse bucket-function - bucket function type, default "DEFAULT" + PAIMON_RETURN_NOT_OK(parser.ParseBucketFunctionType(&bucket_function_type)); + // Parse blob-field - column names to store as blob type, comma separated + PAIMON_RETURN_NOT_OK(parser.ParseList( + Options::BLOB_FIELD, Options::FIELDS_SEPARATOR, &blob_fields, /*need_trim=*/true)); + // Parse blob-descriptor-field - BLOB fields stored inline as serialized descriptors + PAIMON_RETURN_NOT_OK( + parser.ParseList(Options::BLOB_DESCRIPTOR_FIELD, Options::FIELDS_SEPARATOR, + &blob_descriptor_fields, /*need_trim=*/true)); + if (blob_descriptor_fields.empty()) { + PAIMON_RETURN_NOT_OK(parser.ParseList( + Options::FALLBACK_BLOB_DESCRIPTOR_FIELD, Options::FIELDS_SEPARATOR, + &blob_descriptor_fields, /*need_trim=*/true)); + } + // Parse blob-view-field - BLOB fields stored inline as serialized view metadata + PAIMON_RETURN_NOT_OK(parser.ParseList(Options::BLOB_VIEW_FIELD, + Options::FIELDS_SEPARATOR, + &blob_view_fields, /*need_trim=*/true)); + // Parse blob-external-storage-field - descriptor BLOB fields written to external storage + PAIMON_RETURN_NOT_OK(parser.ParseList( + Options::BLOB_EXTERNAL_STORAGE_FIELD, Options::FIELDS_SEPARATOR, + &blob_external_storage_fields, /*need_trim=*/true)); + // Parse blob-external-storage-path - external storage path for configured BLOB fields + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::BLOB_EXTERNAL_STORAGE_PATH, &blob_external_storage_path)); + return Status::OK(); + } + + // Parse data file format, compression, and per-level format/compression configurations. + Status ParseFileFormatOptions(const ConfigParser& parser) { + // Parse file.format - data file format, default "parquet" + PAIMON_RETURN_NOT_OK(parser.ParseObject( + Options::FILE_FORMAT, /*default_identifier=*/"parquet", &file_format)); + // Parse file.compression - default file compression, default "zstd" + PAIMON_RETURN_NOT_OK(parser.Parse(Options::FILE_COMPRESSION, &file_compression)); + // Parse file.compression.zstd-level - zstd compression level, default 1 + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::FILE_COMPRESSION_ZSTD_LEVEL, &file_compression_zstd_level)); + // Parse file.format.per.level - different file format for different levels + PAIMON_RETURN_NOT_OK(parser.ParseFileFormatPerLevel(&file_format_per_level)); + // Parse file.compression.per.level - different compression for different levels + PAIMON_RETURN_NOT_OK(parser.ParseFileCompressionPerLevel(&file_compression_per_level)); + return Status::OK(); + } + + // Parse manifest file configurations: format, compression, merge, and compaction thresholds. + Status ParseManifestOptions(const ConfigParser& parser) { + // Parse manifest.format - manifest file format, default "avro" + PAIMON_RETURN_NOT_OK(parser.ParseObject( + Options::MANIFEST_FORMAT, /*default_identifier=*/"avro", &manifest_file_format)); + // Parse manifest.compression - manifest file compression, default "zstd" + PAIMON_RETURN_NOT_OK(parser.Parse(Options::MANIFEST_COMPRESSION, &manifest_compression)); + // Parse manifest.target-file-size - suggested manifest file size, default 8MB + PAIMON_RETURN_NOT_OK( + parser.ParseMemorySize(Options::MANIFEST_TARGET_FILE_SIZE, &manifest_target_file_size)); + // Parse manifest.merge-min-count - minimum ManifestFileMeta count to trigger merge + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::MANIFEST_MERGE_MIN_COUNT, &manifest_merge_min_count)); + // Parse manifest.full-compaction-threshold-size - size threshold for full compaction + PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::MANIFEST_FULL_COMPACTION_FILE_SIZE, + &manifest_full_compaction_file_size)); + return Status::OK(); + } + + // Parse snapshot expiration and retention configurations. + Status ParseExpireOptions(const ConfigParser& parser) { + // Parse snapshot.num-retained.min - minimum completed snapshots to retain, default 10 + int32_t snapshot_num_retain_min = 10; + // Parse snapshot.num-retained.max - maximum completed snapshots to retain + int32_t snapshot_num_retain_max = std::numeric_limits::max(); + // Parse snapshot.expire.limit - maximum snapshots allowed to expire at a time, default 50 + int32_t snapshot_expire_limit = 50; + // Parse snapshot.time-retained - maximum time of completed snapshots to retain + int64_t snapshot_time_retained = 1 * 3600 * 1000; // 1 hour + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::SNAPSHOT_NUM_RETAINED_MIN, &snapshot_num_retain_min)); + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::SNAPSHOT_NUM_RETAINED_MAX, &snapshot_num_retain_max)); + PAIMON_RETURN_NOT_OK(parser.Parse(Options::SNAPSHOT_EXPIRE_LIMIT, &snapshot_expire_limit)); + PAIMON_RETURN_NOT_OK( + parser.ParseTimeDuration(Options::SNAPSHOT_TIME_RETAINED, &snapshot_time_retained)); + // Parse snapshot.clean-empty-directories - whether to clean empty dirs on expiration + bool snapshot_clean_empty_directories = false; + PAIMON_RETURN_NOT_OK(parser.Parse(Options::SNAPSHOT_CLEAN_EMPTY_DIRECTORIES, + &snapshot_clean_empty_directories)); + expire_config = + ExpireConfig(snapshot_num_retain_max, snapshot_num_retain_min, snapshot_time_retained, + snapshot_expire_limit, snapshot_clean_empty_directories); + return Status::OK(); + } + + // Parse commit configurations: timeout, retries, and force-compact. + Status ParseCommitOptions(const ConfigParser& parser) { + // Parse commit.force-compact - whether to force compaction before commit, default false + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::COMMIT_FORCE_COMPACT, &commit_force_compact)); + // Parse commit.timeout - timeout duration of retry when commit failed + PAIMON_RETURN_NOT_OK(parser.ParseTimeDuration(Options::COMMIT_TIMEOUT, &commit_timeout)); + // Parse commit.max-retries - maximum retries when commit failed, default 10 + PAIMON_RETURN_NOT_OK(parser.Parse(Options::COMMIT_MAX_RETRIES, &commit_max_retries)); + return Status::OK(); + } + + // Parse merge engine, sort engine, sequence field, changelog, and partial-update options. + Status ParseMergeAndSequenceOptions(const ConfigParser& parser) { + // Parse sequence.field - field that generates sequence number for primary key table + PAIMON_RETURN_NOT_OK(parser.ParseList( + Options::SEQUENCE_FIELD, Options::FIELDS_SEPARATOR, &sequence_field)); + // Parse sequence.field.sort-order - order of sequence field, default "ascending" + PAIMON_RETURN_NOT_OK(parser.ParseSortOrder(&sequence_field_sort_order)); + // Parse sort-engine - sort engine for primary key table, default "loser-tree" + PAIMON_RETURN_NOT_OK(parser.ParseSortEngine(&sort_engine)); + // Parse merge-engine - merge engine for primary key table, default "deduplicate" + PAIMON_RETURN_NOT_OK(parser.ParseMergeEngine(&merge_engine)); + // Parse ignore-delete - whether to ignore delete records, default false + PAIMON_RETURN_NOT_OK(parser.Parse(Options::IGNORE_DELETE, &ignore_delete)); + // Parse fields.default-aggregate-function - default agg function for partial-update + PAIMON_RETURN_NOT_OK(parser.Parse(Options::FIELDS_DEFAULT_AGG_FUNC, &field_default_func)); + // Parse changelog-producer - whether to double write to a changelog file, default "none" + PAIMON_RETURN_NOT_OK(parser.ParseChangelogProducer(&changelog_producer)); + // Parse partial-update.remove-record-on-delete - remove whole row on delete + PAIMON_RETURN_NOT_OK(parser.Parse(Options::PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE, + &partial_update_remove_record_on_delete)); + // Parse aggregation_remove_record_on_delete + PAIMON_RETURN_NOT_OK(parser.Parse(Options::AGGREGATION_REMOVE_RECORD_ON_DELETE, + &aggregation_remove_record_on_delete)); + // Parse table-read.sequence-number.enabled - expose sequence number in system tables + PAIMON_RETURN_NOT_OK(parser.Parse(Options::TABLE_READ_SEQUENCE_NUMBER_ENABLED, + &table_read_sequence_number_enabled)); + // Parse key-value.sequence_number.enabled - internal sequence number read switch + PAIMON_RETURN_NOT_OK(parser.Parse(Options::KEY_VALUE_SEQUENCE_NUMBER_ENABLED, + &key_value_sequence_number_enabled)); + // Parse partial-update.remove-record-on-sequence-group + PAIMON_RETURN_NOT_OK(parser.ParseList( + Options::PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP, Options::FIELDS_SEPARATOR, + &remove_record_on_sequence_group)); + return Status::OK(); + } + + // Parse deletion vector configurations. + Status ParseDeletionVectorOptions(const ConfigParser& parser) { + // Parse deletion-vectors.enabled - whether to enable deletion vectors mode, default false + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::DELETION_VECTORS_ENABLED, &deletion_vectors_enabled)); + // Parse deletion-vector.index-file.target-size - target size of dv index file, default 2MB + PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::DELETION_VECTOR_INDEX_FILE_TARGET_SIZE, + &deletion_vector_target_file_size)); + // Parse deletion-vectors.bitmap64 - enable 64 bit bitmap implementation, default false + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::DELETION_VECTOR_BITMAP64, &deletion_vectors_bitmap64)); + return Status::OK(); + } + + // Parse scan, branch, and tag related configurations. + Status ParseScanAndBranchOptions(const ConfigParser& parser) { + // Parse scan.snapshot-id - optional snapshot id for "from-snapshot" scan mode + PAIMON_RETURN_NOT_OK(parser.Parse(Options::SCAN_SNAPSHOT_ID, &scan_snapshot_id)); + // Parse scan.timestamp-millis and scan.timestamp + std::string scan_timestamp_str; + PAIMON_RETURN_NOT_OK(parser.Parse(Options::SCAN_TIMESTAMP, &scan_timestamp_str)); + PAIMON_RETURN_NOT_OK(parser.Parse(Options::SCAN_TIMESTAMP_MILLIS, &scan_timestamp_millis)); + if (scan_timestamp_millis != std::nullopt && !scan_timestamp_str.empty()) { + return Status::Invalid( + "scan.timestamp-millis and scan.timestamp cannot be set at the same time"); + } + if (!scan_timestamp_str.empty()) { + PAIMON_ASSIGN_OR_RAISE(int64_t millis, + StringUtils::StringToTimestampMillis(scan_timestamp_str)); + scan_timestamp_millis = millis; + } + // Parse scan.mode - scanning behavior of the source, default "default" + PAIMON_RETURN_NOT_OK(parser.ParseStartupMode(&startup_mode)); + // Parse scan.fallback-branch - fallback branch when partition not found + PAIMON_RETURN_NOT_OK(parser.Parse(Options::SCAN_FALLBACK_BRANCH, &scan_fallback_branch)); + // Parse branch - branch name, default "main" + PAIMON_RETURN_NOT_OK(parser.Parse(Options::BRANCH, &branch)); + // Parse scan.tag-name - optional tag name for "from-snapshot" scan mode + PAIMON_RETURN_NOT_OK(parser.Parse(Options::SCAN_TAG_NAME, &scan_tag_name)); + return Status::OK(); + } + + // Parse index-related configurations: file index, global index. + Status ParseIndexOptions(const ConfigParser& parser) { + // Parse file-index.read.enabled - whether to enable reading file index, default true + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::FILE_INDEX_READ_ENABLED, &file_index_read_enabled)); + // Parse index-file-in-data-file-dir - whether index file in data file directory + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::INDEX_FILE_IN_DATA_FILE_DIR, &index_file_in_data_file_dir)); + // Parse global-index.enabled - whether to enable global index for scan, default true + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::GLOBAL_INDEX_ENABLED, &global_index_enabled)); + // Parse global-index.thread-num - the maximum number of concurrent scanner for global + // index, no default value + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::GLOBAL_INDEX_THREAD_NUM, &global_index_thread_num)); + // Parse global-index.external-path - global index root directory + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::GLOBAL_INDEX_EXTERNAL_PATH, &global_index_external_path)); + return Status::OK(); + } + + // Parse compaction configurations: sorted run triggers, size ratios, thresholds, off-peak. + Status ParseCompactionOptions(const ConfigParser& parser) { + // Parse compaction.min.file-num - minimum file number to trigger compaction, default 5 + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::COMPACTION_MIN_FILE_NUM, &compaction_min_file_num)); + // Parse compaction.max-size-amplification-percent - size amplification percent, default 200 + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT, + &compaction_max_size_amplification_percent)); + // Parse compaction.size-ratio - percentage flexibility for sorted run comparison, default 1 + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::COMPACTION_SIZE_RATIO, &compaction_size_ratio)); + // Parse num-sorted-run.compaction-trigger - sorted run number to trigger compaction + PAIMON_RETURN_NOT_OK(parser.Parse(Options::NUM_SORTED_RUNS_COMPACTION_TRIGGER, + &num_sorted_runs_compaction_trigger)); + // Parse num-sorted-run.stop-trigger - sorted run number to stop writes + PAIMON_RETURN_NOT_OK(parser.Parse(Options::NUM_SORTED_RUNS_STOP_TRIGGER, + &num_sorted_runs_stop_trigger)); + // Parse num-levels - total level number for LSM tree + PAIMON_RETURN_NOT_OK(parser.Parse(Options::NUM_LEVELS, &num_levels)); + // Parse compaction.force-rewrite-all-files - force pick all files for full compaction + PAIMON_RETURN_NOT_OK(parser.Parse(Options::COMPACTION_FORCE_REWRITE_ALL_FILES, + &compaction_force_rewrite_all_files)); + // Parse compaction.force-up-level-0 - always include all level 0 files in candidates + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::COMPACTION_FORCE_UP_LEVEL_0, &compaction_force_up_level_0)); + // Parse compaction.optimization-interval - how often to perform optimization compaction + PAIMON_RETURN_NOT_OK(parser.ParseTimeDuration(Options::COMPACTION_OPTIMIZATION_INTERVAL, + &optimized_compaction_interval)); + // Parse compaction.total-size-threshold - force full compaction when total size is smaller + PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::COMPACTION_TOTAL_SIZE_THRESHOLD, + &compaction_total_size_threshold)); + // Parse compaction.incremental-size-threshold - force full compaction when incremental size + // is bigger + PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::COMPACTION_INCREMENTAL_SIZE_THRESHOLD, + &compaction_incremental_size_threshold)); + // Parse compaction.offpeak.start.hour - start of off-peak hours (0-23), -1 to disable + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::COMPACT_OFFPEAK_START_HOUR, &compact_off_peak_start_hour)); + // Parse compaction.offpeak.end.hour - end of off-peak hours (0-23), -1 to disable + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::COMPACT_OFFPEAK_END_HOUR, &compact_off_peak_end_hour)); + // Parse compaction.offpeak-ratio - more aggressive ratio during off-peak hours, default 0 + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::COMPACTION_OFFPEAK_RATIO, &compact_off_peak_ratio)); + return Status::OK(); + } + + // Parse lookup configurations: compact mode, bloom filter, remote file, cache, compression. + Status ParseLookupOptions(const ConfigParser& parser) { + // Parse force-lookup - whether to force lookup for compaction, default false + PAIMON_RETURN_NOT_OK(parser.Parse(Options::FORCE_LOOKUP, &force_lookup)); + // Parse lookup-wait - commit will wait for compaction by lookup, default true + PAIMON_RETURN_NOT_OK(parser.Parse(Options::LOOKUP_WAIT, &lookup_wait)); + // Parse lookup-compact - lookup compact mode, default RADICAL + PAIMON_RETURN_NOT_OK(parser.ParseLookupCompactMode(&lookup_compact_mode)); + // Parse lookup-compact.max-interval - max interval for gentle mode lookup compaction + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::LOOKUP_COMPACT_MAX_INTERVAL, &lookup_compact_max_interval)); + // Parse lookup.cache.bloom.filter.enabled - enable bloom filter for lookup cache + PAIMON_RETURN_NOT_OK(parser.Parse(Options::LOOKUP_CACHE_BLOOM_FILTER_ENABLED, + &lookup_cache_bloom_filter)); + // Parse lookup.cache.bloom.filter.fpp - false positive probability, default 0.05 + PAIMON_RETURN_NOT_OK(parser.Parse(Options::LOOKUP_CACHE_BLOOM_FILTER_FPP, + &lookup_cache_bloom_filter_fpp)); + // Parse lookup.remote-file.enabled - whether to enable remote file for lookup + PAIMON_RETURN_NOT_OK( + parser.Parse(Options::LOOKUP_REMOTE_FILE_ENABLED, &lookup_remote_file_enabled)); + // Parse lookup.remote-file.level-threshold - level threshold for remote lookup files + PAIMON_RETURN_NOT_OK(parser.Parse(Options::LOOKUP_REMOTE_LEVEL_THRESHOLD, + &lookup_remote_level_threshold)); + // Parse lookup.cache-spill-compression - spill compression for lookup cache, default "zstd" + PAIMON_RETURN_NOT_OK(parser.Parse(Options::LOOKUP_CACHE_SPILL_COMPRESSION, + &lookup_compress_options.compress)); + // Parse spill-compression.zstd-level - zstd level for spill compression, default 1 + PAIMON_RETURN_NOT_OK(parser.Parse(Options::SPILL_COMPRESSION_ZSTD_LEVEL, + &(lookup_compress_options.zstd_level))); + // Parse cache-page-size - memory page size for caching, default 64 kb + PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::CACHE_PAGE_SIZE, &cache_page_size)); + // Parse lookup.cache-max-memory-size - max memory size for lookup cache, default 256 mb + PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::LOOKUP_CACHE_MAX_MEMORY_SIZE, + &lookup_cache_max_memory)); + // Parse lookup.cache.high-priority-pool-ratio - fraction for high-priority data, default + // 0.25 + PAIMON_RETURN_NOT_OK(parser.Parse(Options::LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO, + &lookup_cache_high_prio_pool_ratio)); + if (lookup_cache_high_prio_pool_ratio < 0.0 || lookup_cache_high_prio_pool_ratio >= 1.0) { + return Status::Invalid(fmt::format( + "The high priority pool ratio should in the range [0, 1), while input is {}", + lookup_cache_high_prio_pool_ratio)); + } + // Parse lookup.cache-file-retention - cached files retention time, default "1 hour" + PAIMON_RETURN_NOT_OK(parser.ParseTimeDuration(Options::LOOKUP_CACHE_FILE_RETENTION, + &lookup_cache_file_retention_ms)); + // Parse lookup.cache-max-disk-size - max disk size for lookup cache, default unlimited + PAIMON_RETURN_NOT_OK(parser.ParseMemorySize(Options::LOOKUP_CACHE_MAX_DISK_SIZE, + &lookup_cache_max_disk_size)); + return Status::OK(); + } +}; + +// Parse configurations from a map and return a populated CoreOptions object. +Result CoreOptions::FromMap( + const std::map& options_map, + const std::shared_ptr& specified_file_system, + const std::map& fs_scheme_to_identifier_map) { + CoreOptions options; + auto& impl = options.impl_; + impl->raw_options = options_map; + ConfigParser parser(options_map); + + PAIMON_RETURN_NOT_OK( + impl->ParseBasicOptions(parser, specified_file_system, fs_scheme_to_identifier_map)); + PAIMON_RETURN_NOT_OK(impl->ParseFileFormatOptions(parser)); + PAIMON_RETURN_NOT_OK(impl->ParseManifestOptions(parser)); + PAIMON_RETURN_NOT_OK(impl->ParseExpireOptions(parser)); + PAIMON_RETURN_NOT_OK(impl->ParseCommitOptions(parser)); + PAIMON_RETURN_NOT_OK(impl->ParseMergeAndSequenceOptions(parser)); + PAIMON_RETURN_NOT_OK(impl->ParseDeletionVectorOptions(parser)); + PAIMON_RETURN_NOT_OK(impl->ParseScanAndBranchOptions(parser)); + PAIMON_RETURN_NOT_OK(impl->ParseIndexOptions(parser)); + PAIMON_RETURN_NOT_OK(impl->ParseCompactionOptions(parser)); + PAIMON_RETURN_NOT_OK(impl->ParseLookupOptions(parser)); + + return options; +} + +CoreOptions::CoreOptions() : impl_(std::make_unique()) {} + +CoreOptions::CoreOptions(const CoreOptions& rhs) + : impl_(std::make_unique(*(rhs.impl_.get()))) {} + +CoreOptions& CoreOptions::operator=(const CoreOptions& rhs) { + if (this != &rhs) { + impl_ = std::make_unique(*(rhs.impl_.get())); + } + return *this; +} + +CoreOptions::~CoreOptions() = default; + +int32_t CoreOptions::GetBucket() const { + return impl_->bucket; +} + +std::shared_ptr CoreOptions::GetWriteFileFormat(int32_t level) const { + auto iter = impl_->file_format_per_level.find(level); + if (iter != impl_->file_format_per_level.end()) { + return iter->second; + } + return impl_->file_format; +} + +std::shared_ptr CoreOptions::GetFileFormat() const { + return impl_->file_format; +} + +std::shared_ptr CoreOptions::GetFileSystem() const { + return impl_->file_system; +} + +const std::string& CoreOptions::GetFileCompression() const { + return impl_->file_compression; +} + +const std::string& CoreOptions::GetWriteFileCompression(int32_t level) const { + auto iter = impl_->file_compression_per_level.find(level); + if (iter != impl_->file_compression_per_level.end()) { + return iter->second; + } + return impl_->file_compression; +} + +int32_t CoreOptions::GetFileCompressionZstdLevel() const { + return impl_->file_compression_zstd_level; +} + +int64_t CoreOptions::GetPageSize() const { + return impl_->page_size; +} + +int64_t CoreOptions::GetTargetFileSize(bool has_primary_key) const { + if (impl_->target_file_size == std::nullopt) { + return has_primary_key ? 128 * 1024 * 1024 : 256 * 1024 * 1024; + } + return impl_->target_file_size.value(); +} + +int64_t CoreOptions::GetBlobTargetFileSize() const { + if (impl_->blob_target_file_size == std::nullopt) { + return GetTargetFileSize(/*has_primary_key=*/false); + } + return impl_->blob_target_file_size.value(); +} + +int64_t CoreOptions::GetCompactionFileSize(bool has_primary_key) const { + // file size to join the compaction, we don't process on middle file size to avoid + // compact a same file twice (the compression is not calculate so accurately. the output + // file maybe be less than target file generated by rolling file write). + return GetTargetFileSize(has_primary_key) / 10 * 7; +} + +std::string CoreOptions::GetPartitionDefaultName() const { + return impl_->partition_default_name; +} + +std::shared_ptr CoreOptions::GetManifestFormat() const { + return impl_->manifest_file_format; +} + +int64_t CoreOptions::GetSourceSplitTargetSize() const { + return impl_->source_split_target_size; +} +int64_t CoreOptions::GetSourceSplitOpenFileCost() const { + return impl_->source_split_open_file_cost; +} +std::optional CoreOptions::GetScanSnapshotId() const { + return impl_->scan_snapshot_id; +} +std::optional CoreOptions::GetScanTimestampMillis() const { + return impl_->scan_timestamp_millis; +} +int64_t CoreOptions::GetManifestTargetFileSize() const { + return impl_->manifest_target_file_size; +} + +int32_t CoreOptions::GetManifestMergeMinCount() const { + return impl_->manifest_merge_min_count; +} + +int64_t CoreOptions::GetManifestFullCompactionThresholdSize() const { + return impl_->manifest_full_compaction_file_size; +} + +const std::string& CoreOptions::GetManifestCompression() const { + return impl_->manifest_compression; +} + +StartupMode CoreOptions::GetStartupMode() const { + if (impl_->startup_mode == StartupMode::Default()) { + if (GetScanSnapshotId() != std::nullopt || GetScanTagName() != std::nullopt) { + return StartupMode::FromSnapshot(); + } + if (GetScanTimestampMillis() != std::nullopt) { + return StartupMode::FromTimestamp(); + } + return StartupMode::LatestFull(); + } + return impl_->startup_mode; +} + +int32_t CoreOptions::GetReadBatchSize() const { + return impl_->read_batch_size; +} + +int32_t CoreOptions::GetWriteBatchSize() const { + return impl_->write_batch_size; +} + +int64_t CoreOptions::GetWriteBufferSize() const { + return impl_->write_buffer_size; +} + +bool CoreOptions::GetWriteBufferSpillable() const { + return impl_->write_buffer_spillable; +} + +int64_t CoreOptions::GetWriteBufferSpillMaxDiskSize() const { + return impl_->write_buffer_spill_max_disk_size; +} + +int32_t CoreOptions::GetLocalSortMaxNumFileHandles() const { + return impl_->local_sort_max_num_file_handles; +} + +const CompressOptions& CoreOptions::GetSpillCompressOptions() const { + return impl_->spill_compress_options; +} + +bool CoreOptions::CommitForceCompact() const { + return impl_->commit_force_compact; +} + +int64_t CoreOptions::GetCommitTimeout() const { + return impl_->commit_timeout; +} + +int32_t CoreOptions::GetCommitMaxRetries() const { + return impl_->commit_max_retries; +} + +int32_t CoreOptions::GetCompactionMinFileNum() const { + return impl_->compaction_min_file_num; +} + +int32_t CoreOptions::GetCompactionMaxSizeAmplificationPercent() const { + return impl_->compaction_max_size_amplification_percent; +} + +int32_t CoreOptions::GetCompactionSizeRatio() const { + return impl_->compaction_size_ratio; +} + +int32_t CoreOptions::GetNumSortedRunsCompactionTrigger() const { + return impl_->num_sorted_runs_compaction_trigger; +} + +int32_t CoreOptions::GetNumSortedRunsStopTrigger() const { + int32_t compact_trigger = GetNumSortedRunsCompactionTrigger(); + int32_t stop_trigger = 0; + if (impl_->num_sorted_runs_stop_trigger.has_value()) { + stop_trigger = impl_->num_sorted_runs_stop_trigger.value(); + } else { + int64_t computed = static_cast(compact_trigger) + 3; + if (computed > std::numeric_limits::max()) { + computed = std::numeric_limits::max(); + } + stop_trigger = static_cast(computed); + } + return std::max(compact_trigger, stop_trigger); +} + +int32_t CoreOptions::GetNumLevels() const { + // By default, this ensures that the compaction does not fall to level 0, but at least to + // level 1 + if (impl_->num_levels.has_value()) { + return impl_->num_levels.value(); + } + + int64_t incremented = static_cast(GetNumSortedRunsCompactionTrigger()) + 1; + if (incremented > std::numeric_limits::max()) { + incremented = std::numeric_limits::max(); + } + return static_cast(incremented); +} + +LookupCompactMode CoreOptions::GetLookupCompactMode() const { + return impl_->lookup_compact_mode; +} + +int32_t CoreOptions::GetLookupCompactMaxInterval() const { + int32_t compact_trigger = GetNumSortedRunsCompactionTrigger(); + int32_t max_interval; + if (impl_->lookup_compact_max_interval.has_value()) { + max_interval = impl_->lookup_compact_max_interval.value(); + } else { + int64_t doubled = static_cast(compact_trigger) * 2; + if (doubled > std::numeric_limits::max()) { + doubled = std::numeric_limits::max(); + } + max_interval = static_cast(doubled); + } + + if (max_interval < compact_trigger) { + max_interval = compact_trigger; + } + return max_interval; +} + +const ExpireConfig& CoreOptions::GetExpireConfig() const { + return impl_->expire_config; +} + +const std::vector& CoreOptions::GetSequenceField() const { + return impl_->sequence_field; +} + +bool CoreOptions::SequenceFieldSortOrderIsAscending() const { + return impl_->sequence_field_sort_order == SortOrder::ASCENDING; +} + +MergeEngine CoreOptions::GetMergeEngine() const { + return impl_->merge_engine; +} + +SortEngine CoreOptions::GetSortEngine() const { + return impl_->sort_engine; +} + +bool CoreOptions::IgnoreDelete() const { + return impl_->ignore_delete; +} + +bool CoreOptions::WriteOnly() const { + return impl_->write_only; +} + +std::optional CoreOptions::GetFieldsDefaultFunc() const { + return impl_->field_default_func; +} + +bool CoreOptions::EnableAdaptivePrefetchStrategy() const { + return impl_->enable_adaptive_prefetch_strategy; +} + +Result> CoreOptions::GetFieldAggFunc( + const std::string& field_name) const { + ConfigParser parser(impl_->raw_options); + std::optional field_agg_func; + std::string key = std::string(Options::FIELDS_PREFIX) + "." + field_name + "." + + std::string(Options::AGG_FUNCTION); + PAIMON_RETURN_NOT_OK(parser.Parse(key, &field_agg_func)); + return field_agg_func; +} + +Result CoreOptions::FieldAggIgnoreRetract(const std::string& field_name) const { + ConfigParser parser(impl_->raw_options); + bool field_agg_ignore_retract = false; + std::string key = std::string(Options::FIELDS_PREFIX) + "." + field_name + "." + + std::string(Options::IGNORE_RETRACT); + PAIMON_RETURN_NOT_OK(parser.Parse(key, &field_agg_ignore_retract)); + return field_agg_ignore_retract; +} + +Result CoreOptions::FieldListAggDelimiter(const std::string& field_name) const { + ConfigParser parser(impl_->raw_options); + std::string delimiter = ","; + std::string key = std::string(Options::FIELDS_PREFIX) + "." + field_name + "." + + std::string(Options::LIST_AGG_DELIMITER); + PAIMON_RETURN_NOT_OK(parser.Parse(key, &delimiter)); + return delimiter; +} + +Result CoreOptions::FieldCollectAggDistinct(const std::string& field_name) const { + ConfigParser parser(impl_->raw_options); + bool distinct = false; + std::string key = std::string(Options::FIELDS_PREFIX) + "." + field_name + "." + + std::string(Options::DISTINCT); + PAIMON_RETURN_NOT_OK(parser.Parse(key, &distinct)); + return distinct; +} + +bool CoreOptions::DeletionVectorsEnabled() const { + return impl_->deletion_vectors_enabled; +} + +bool CoreOptions::DeletionVectorsBitmap64() const { + return impl_->deletion_vectors_bitmap64; +} +int64_t CoreOptions::DeletionVectorTargetFileSize() const { + return impl_->deletion_vector_target_file_size; +} + +ChangelogProducer CoreOptions::GetChangelogProducer() const { + return impl_->changelog_producer; +} + +LookupStrategy CoreOptions::GetLookupStrategy() const { + return LookupStrategy::From( + /*is_first_row=*/GetMergeEngine() == MergeEngine::FIRST_ROW, + /*produce_changelog=*/GetChangelogProducer() == ChangelogProducer::LOOKUP, + /*deletion_vector=*/DeletionVectorsEnabled(), + /*force_lookup=*/impl_->force_lookup); +} + +const std::map& CoreOptions::ToMap() const { + return impl_->raw_options; +} + +bool CoreOptions::NeedLookup() const { + return GetLookupStrategy().need_lookup; +} + +bool CoreOptions::PrepareCommitWaitCompaction() const { + if (!NeedLookup()) { + return false; + } + return impl_->lookup_wait; +} + +bool CoreOptions::CompactionForceRewriteAllFiles() const { + return impl_->compaction_force_rewrite_all_files; +} + +bool CoreOptions::CompactionForceUpLevel0() const { + return impl_->compaction_force_up_level_0; +} + +std::map CoreOptions::GetFieldsSequenceGroups() const { + auto raw_options = impl_->raw_options; + std::map sequence_groups; + for (const auto& [key, value] : raw_options) { + if (StringUtils::StartsWith(key, Options::FIELDS_PREFIX, /*start_pos=*/0) && + StringUtils::EndsWith(key, Options::SEQUENCE_GROUP)) { + std::string seq_fields_str = + key.substr(std::strlen(Options::FIELDS_PREFIX) + 1, + key.size() - std::strlen(Options::FIELDS_PREFIX) - + std::strlen(Options::SEQUENCE_GROUP) - 2); + sequence_groups[seq_fields_str] = value; + } + } + return sequence_groups; +} + +bool CoreOptions::PartialUpdateRemoveRecordOnDelete() const { + return impl_->partial_update_remove_record_on_delete; +} + +bool CoreOptions::AggregationRemoveRecordOnDelete() const { + return impl_->aggregation_remove_record_on_delete; +} + +bool CoreOptions::TableReadSequenceNumberEnabled() const { + return impl_->table_read_sequence_number_enabled; +} + +bool CoreOptions::KeyValueSequenceNumberEnabled() const { + return impl_->key_value_sequence_number_enabled; +} + +std::vector CoreOptions::GetPartialUpdateRemoveRecordOnSequenceGroup() const { + return impl_->remove_record_on_sequence_group; +} + +std::optional CoreOptions::GetScanFallbackBranch() const { + return impl_->scan_fallback_branch; +} + +std::string CoreOptions::GetBranch() const { + return impl_->branch; +} + +bool CoreOptions::FileIndexReadEnabled() const { + return impl_->file_index_read_enabled; +} + +std::optional CoreOptions::GetDataFileExternalPaths() const { + return impl_->data_file_external_paths; +} + +ExternalPathStrategy CoreOptions::GetExternalPathStrategy() const { + return impl_->external_path_strategy; +} + +Result> CoreOptions::CreateExternalPaths() const { + std::vector external_paths; + std::optional data_file_external_paths = GetDataFileExternalPaths(); + ExternalPathStrategy strategy = GetExternalPathStrategy(); + if (strategy == ExternalPathStrategy::SPECIFIC_FS) { + return Status::NotImplemented("do not support specific-fs external path strategy for now"); + } + if (data_file_external_paths == std::nullopt || data_file_external_paths->empty() || + strategy == ExternalPathStrategy::NONE) { + return external_paths; + } + for (const auto& p : StringUtils::Split(data_file_external_paths.value(), ",")) { + std::string tmp_path = p; + StringUtils::Trim(&tmp_path); + PAIMON_ASSIGN_OR_RAISE(Path path, PathUtil::ToPath(tmp_path)); + if (path.scheme.empty()) { + return Status::Invalid(fmt::format("scheme is null, path is {}", p)); + } + external_paths.push_back(path.ToString()); + } + if (external_paths.empty()) { + return Status::Invalid("external paths is empty"); + } + return external_paths; +} + +std::string CoreOptions::DataFilePrefix() const { + return impl_->data_file_prefix; +} + +bool CoreOptions::IndexFileInDataFileDir() const { + return impl_->index_file_in_data_file_dir; +} + +bool CoreOptions::RowTrackingEnabled() const { + return impl_->row_tracking_enabled; +} + +bool CoreOptions::RowTrackingPartitionGroupOnCommit() const { + return impl_->row_tracking_partition_group_on_commit; +} + +bool CoreOptions::DataEvolutionEnabled() const { + return impl_->data_evolution_enabled; +} + +bool CoreOptions::LegacyPartitionNameEnabled() const { + return impl_->legacy_partition_name_enabled; +} + +bool CoreOptions::GlobalIndexEnabled() const { + return impl_->global_index_enabled; +} + +std::optional CoreOptions::GetGlobalIndexThreadNum() const { + return impl_->global_index_thread_num; +} + +std::optional CoreOptions::GetGlobalIndexExternalPath() const { + return impl_->global_index_external_path; +} + +Result> CoreOptions::CreateGlobalIndexExternalPath() const { + std::optional global_index_external_path = GetGlobalIndexExternalPath(); + if (global_index_external_path == std::nullopt || global_index_external_path->empty()) { + return std::optional(); + } + std::string tmp_path = global_index_external_path.value(); + StringUtils::Trim(&tmp_path); + PAIMON_ASSIGN_OR_RAISE(Path path, PathUtil::ToPath(tmp_path)); + if (path.scheme.empty()) { + return Status::Invalid(fmt::format("scheme is null, path is {}", tmp_path)); + } + return std::optional(path.ToString()); +} + +std::optional CoreOptions::GetScanTagName() const { + return impl_->scan_tag_name; +} + +std::optional CoreOptions::GetOptimizedCompactionInterval() const { + return impl_->optimized_compaction_interval; +} +std::optional CoreOptions::GetCompactionTotalSizeThreshold() const { + return impl_->compaction_total_size_threshold; +} +std::optional CoreOptions::GetCompactionIncrementalSizeThreshold() const { + return impl_->compaction_incremental_size_threshold; +} + +int32_t CoreOptions::GetCompactOffPeakStartHour() const { + return impl_->compact_off_peak_start_hour; +} +int32_t CoreOptions::GetCompactOffPeakEndHour() const { + return impl_->compact_off_peak_end_hour; +} +int32_t CoreOptions::GetCompactOffPeakRatio() const { + return impl_->compact_off_peak_ratio; +} + +bool CoreOptions::LookupCacheBloomFilterEnabled() const { + return impl_->lookup_cache_bloom_filter; +} + +double CoreOptions::GetLookupCacheBloomFilterFpp() const { + return impl_->lookup_cache_bloom_filter_fpp; +} + +const CompressOptions& CoreOptions::GetLookupCompressOptions() const { + return impl_->lookup_compress_options; +} + +bool CoreOptions::LookupRemoteFileEnabled() const { + return impl_->lookup_remote_file_enabled; +} + +int32_t CoreOptions::GetLookupRemoteLevelThreshold() const { + return impl_->lookup_remote_level_threshold; +} + +int32_t CoreOptions::GetCachePageSize() const { + return static_cast(impl_->cache_page_size); +} + +int64_t CoreOptions::GetLookupCacheMaxMemory() const { + return impl_->lookup_cache_max_memory; +} + +double CoreOptions::GetLookupCacheHighPrioPoolRatio() const { + return impl_->lookup_cache_high_prio_pool_ratio; +} + +BucketFunctionType CoreOptions::GetBucketFunctionType() const { + return impl_->bucket_function_type; +} + +const std::vector& CoreOptions::GetBlobFields() const { + return impl_->blob_fields; +} + +const std::vector& CoreOptions::GetBlobDescriptorFields() const { + return impl_->blob_descriptor_fields; +} + +const std::vector& CoreOptions::GetBlobViewFields() const { + return impl_->blob_view_fields; +} + +std::vector CoreOptions::GetBlobInlineFields() const { + std::vector blob_inline_fields = impl_->blob_descriptor_fields; + blob_inline_fields.insert(blob_inline_fields.end(), impl_->blob_view_fields.begin(), + impl_->blob_view_fields.end()); + return blob_inline_fields; +} + +const std::vector& CoreOptions::GetBlobExternalStorageFields() const { + return impl_->blob_external_storage_fields; +} + +std::optional CoreOptions::GetBlobExternalStoragePath() const { + return impl_->blob_external_storage_path; +} + +int64_t CoreOptions::GetLookupCacheFileRetentionMs() const { + return impl_->lookup_cache_file_retention_ms; +} + +int64_t CoreOptions::GetLookupCacheMaxDiskSize() const { + return impl_->lookup_cache_max_disk_size; +} + +} // namespace paimon diff --git a/src/paimon/core/core_options.h b/src/paimon/core/core_options.h new file mode 100644 index 0000000..c047700 --- /dev/null +++ b/src/paimon/core/core_options.h @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "paimon/bucket/bucket_function_type.h" +#include "paimon/core/options/changelog_producer.h" +#include "paimon/core/options/compress_options.h" +#include "paimon/core/options/external_path_strategy.h" +#include "paimon/core/options/lookup_compact_mode.h" +#include "paimon/core/options/lookup_strategy.h" +#include "paimon/core/options/merge_engine.h" +#include "paimon/core/options/sort_engine.h" +#include "paimon/format/file_format.h" +#include "paimon/fs/file_system.h" +#include "paimon/result.h" +#include "paimon/table/source/startup_mode.h" +#include "paimon/type_fwd.h" +#include "paimon/visibility.h" + +namespace paimon { + +class ExpireConfig; + +class PAIMON_EXPORT CoreOptions { + public: + static Result FromMap( + const std::map& options_map, + const std::shared_ptr& specified_file_system = nullptr, + const std::map& fs_scheme_to_identifier_map = {}); + + CoreOptions(); + CoreOptions(const CoreOptions&); + CoreOptions& operator=(const CoreOptions&); + ~CoreOptions(); + + int32_t GetBucket() const; + std::shared_ptr GetFileFormat() const; + std::shared_ptr GetWriteFileFormat(int32_t level) const; + std::shared_ptr GetFileSystem() const; + const std::string& GetFileCompression() const; + const std::string& GetWriteFileCompression(int32_t level) const; + int32_t GetFileCompressionZstdLevel() const; + int64_t GetPageSize() const; + int64_t GetTargetFileSize(bool has_primary_key) const; + int64_t GetBlobTargetFileSize() const; + int64_t GetCompactionFileSize(bool has_primary_key) const; + std::string GetPartitionDefaultName() const; + + std::shared_ptr GetManifestFormat() const; + const std::string& GetManifestCompression() const; + int32_t GetManifestMergeMinCount() const; + int64_t GetManifestFullCompactionThresholdSize() const; + int64_t GetSourceSplitTargetSize() const; + int64_t GetSourceSplitOpenFileCost() const; + std::optional GetScanSnapshotId() const; + std::optional GetScanTimestampMillis() const; + + int64_t GetManifestTargetFileSize() const; + StartupMode GetStartupMode() const; + + int32_t GetReadBatchSize() const; + int32_t GetWriteBatchSize() const; + int64_t GetWriteBufferSize() const; + bool GetWriteBufferSpillable() const; + int64_t GetWriteBufferSpillMaxDiskSize() const; + int32_t GetLocalSortMaxNumFileHandles() const; + const CompressOptions& GetSpillCompressOptions() const; + + const ExpireConfig& GetExpireConfig() const; + + bool CommitForceCompact() const; + bool CompactionForceRewriteAllFiles() const; + bool CompactionForceUpLevel0() const; + int64_t GetCommitTimeout() const; + int32_t GetCommitMaxRetries() const; + int32_t GetCompactionMinFileNum() const; + int32_t GetCompactionMaxSizeAmplificationPercent() const; + int32_t GetCompactionSizeRatio() const; + int32_t GetNumSortedRunsCompactionTrigger() const; + int32_t GetNumSortedRunsStopTrigger() const; + int32_t GetNumLevels() const; + LookupCompactMode GetLookupCompactMode() const; + int32_t GetLookupCompactMaxInterval() const; + + const std::vector& GetSequenceField() const; + bool SequenceFieldSortOrderIsAscending() const; + MergeEngine GetMergeEngine() const; + SortEngine GetSortEngine() const; + bool IgnoreDelete() const; + bool WriteOnly() const; + + std::optional GetFieldsDefaultFunc() const; + Result> GetFieldAggFunc(const std::string& field_name) const; + Result FieldAggIgnoreRetract(const std::string& field_name) const; + Result FieldListAggDelimiter(const std::string& field_name) const; + Result FieldCollectAggDistinct(const std::string& field_name) const; + bool DeletionVectorsEnabled() const; + bool DeletionVectorsBitmap64() const; + int64_t DeletionVectorTargetFileSize() const; + ChangelogProducer GetChangelogProducer() const; + LookupStrategy GetLookupStrategy() const; + + bool NeedLookup() const; + bool PrepareCommitWaitCompaction() const; + bool FileIndexReadEnabled() const; + + std::map GetFieldsSequenceGroups() const; + bool PartialUpdateRemoveRecordOnDelete() const; + bool AggregationRemoveRecordOnDelete() const; + bool TableReadSequenceNumberEnabled() const; + bool KeyValueSequenceNumberEnabled() const; + std::vector GetPartialUpdateRemoveRecordOnSequenceGroup() const; + + std::optional GetScanFallbackBranch() const; + std::string GetBranch() const; + + ExternalPathStrategy GetExternalPathStrategy() const; + Result> CreateExternalPaths() const; + bool EnableAdaptivePrefetchStrategy() const; + + std::string DataFilePrefix() const; + + bool IndexFileInDataFileDir() const; + + bool RowTrackingEnabled() const; + bool RowTrackingPartitionGroupOnCommit() const; + bool DataEvolutionEnabled() const; + + bool LegacyPartitionNameEnabled() const; + + bool GlobalIndexEnabled() const; + Result> CreateGlobalIndexExternalPath() const; + + std::optional GetScanTagName() const; + + std::optional GetOptimizedCompactionInterval() const; + std::optional GetCompactionTotalSizeThreshold() const; + std::optional GetCompactionIncrementalSizeThreshold() const; + + int32_t GetCompactOffPeakStartHour() const; + int32_t GetCompactOffPeakEndHour() const; + int32_t GetCompactOffPeakRatio() const; + + bool LookupCacheBloomFilterEnabled() const; + double GetLookupCacheBloomFilterFpp() const; + + bool LookupRemoteFileEnabled() const; + int32_t GetLookupRemoteLevelThreshold() const; + + const CompressOptions& GetLookupCompressOptions() const; + int32_t GetCachePageSize() const; + + int64_t GetLookupCacheMaxMemory() const; + double GetLookupCacheHighPrioPoolRatio() const; + + int64_t GetLookupCacheFileRetentionMs() const; + int64_t GetLookupCacheMaxDiskSize() const; + + BucketFunctionType GetBucketFunctionType() const; + std::optional GetGlobalIndexThreadNum() const; + + const std::vector& GetBlobFields() const; + const std::vector& GetBlobDescriptorFields() const; + const std::vector& GetBlobViewFields() const; + std::vector GetBlobInlineFields() const; + const std::vector& GetBlobExternalStorageFields() const; + std::optional GetBlobExternalStoragePath() const; + + const std::map& ToMap() const; + + private: + std::optional GetDataFileExternalPaths() const; + std::optional GetGlobalIndexExternalPath() const; + + private: + struct Impl; + std::unique_ptr impl_; +}; + +} // namespace paimon diff --git a/src/paimon/core/core_options_test.cpp b/src/paimon/core/core_options_test.cpp new file mode 100644 index 0000000..39fb195 --- /dev/null +++ b/src/paimon/core/core_options_test.cpp @@ -0,0 +1,908 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/core/core_options.h" + +#include + +#include "gtest/gtest.h" +#include "paimon/bucket/bucket_function_type.h" +#include "paimon/common/fs/resolving_file_system.h" +#include "paimon/core/options/expire_config.h" +#include "paimon/defs.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/testing/mock/mock_file_system.h" +#include "paimon/testing/utils/testharness.h" +#include "paimon/testing/utils/timezone_guard.h" +namespace paimon::test { + +TEST(CoreOptionsTest, TestDefaultValue) { + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap({})); + ASSERT_EQ(core_options.GetManifestFormat()->Identifier(), "avro"); + ASSERT_EQ(core_options.GetFileFormat()->Identifier(), "parquet"); + ASSERT_EQ(core_options.GetWriteFileFormat(0)->Identifier(), "parquet"); + ASSERT_EQ(core_options.GetWriteFileFormat(3)->Identifier(), "parquet"); + ASSERT_TRUE(core_options.GetFileSystem()); + ASSERT_EQ(-1, core_options.GetBucket()); + ASSERT_EQ(64 * 1024L, core_options.GetPageSize()); + ASSERT_EQ(256 * 1024 * 1024L, core_options.GetTargetFileSize(/*has_primary_key=*/false)); + ASSERT_EQ(128 * 1024 * 1024L, core_options.GetTargetFileSize(/*has_primary_key=*/true)); + ASSERT_EQ(256 * 1024 * 1024L, core_options.GetBlobTargetFileSize()); + ASSERT_EQ(187904815, core_options.GetCompactionFileSize(/*has_primary_key=*/false)); + ASSERT_EQ(93952404, core_options.GetCompactionFileSize(/*has_primary_key=*/true)); + + ASSERT_EQ("__DEFAULT_PARTITION__", core_options.GetPartitionDefaultName()); + ASSERT_EQ(std::nullopt, core_options.GetScanSnapshotId()); + ASSERT_EQ("zstd", core_options.GetFileCompression()); + ASSERT_EQ("zstd", core_options.GetWriteFileCompression(0)); + ASSERT_EQ("zstd", core_options.GetWriteFileCompression(3)); + ASSERT_EQ("zstd", core_options.GetManifestCompression()); + ASSERT_EQ(1, core_options.GetFileCompressionZstdLevel()); + ASSERT_EQ(StartupMode::LatestFull(), core_options.GetStartupMode()); + ASSERT_EQ(8 * 1024 * 1024L, core_options.GetManifestTargetFileSize()); + ASSERT_EQ(16 * 1024 * 1024L, core_options.GetManifestFullCompactionThresholdSize()); + ASSERT_EQ(30, core_options.GetManifestMergeMinCount()); + ASSERT_EQ(128 * 1024 * 1024L, core_options.GetSourceSplitTargetSize()); + ASSERT_EQ(4 * 1024 * 1024L, core_options.GetSourceSplitOpenFileCost()); + ASSERT_EQ(1024, core_options.GetReadBatchSize()); + ASSERT_EQ(1024, core_options.GetWriteBatchSize()); + ASSERT_EQ(256 * 1024 * 1024, core_options.GetWriteBufferSize()); + ASSERT_TRUE(core_options.GetWriteBufferSpillable()); + ASSERT_EQ(std::numeric_limits::max(), core_options.GetWriteBufferSpillMaxDiskSize()); + ASSERT_EQ(128, core_options.GetLocalSortMaxNumFileHandles()); + ASSERT_EQ("zstd", core_options.GetSpillCompressOptions().compress); + ASSERT_EQ(1, core_options.GetSpillCompressOptions().zstd_level); + ASSERT_FALSE(core_options.CommitForceCompact()); + ASSERT_EQ(std::numeric_limits::max(), core_options.GetCommitTimeout()); + ASSERT_EQ(10, core_options.GetCommitMaxRetries()); + ExpireConfig expire_config = core_options.GetExpireConfig(); + ASSERT_EQ(10, expire_config.GetSnapshotRetainMin()); + ASSERT_EQ(std::numeric_limits::max(), expire_config.GetSnapshotRetainMax()); + ASSERT_EQ(50, expire_config.GetSnapshotMaxDeletes()); + ASSERT_FALSE(expire_config.CleanEmptyDirectories()); + ASSERT_EQ(1 * 3600 * 1000L, expire_config.GetSnapshotTimeRetainMs()); + ASSERT_EQ(std::vector(), core_options.GetSequenceField()); + ASSERT_TRUE(core_options.SequenceFieldSortOrderIsAscending()); + ASSERT_EQ(MergeEngine::DEDUPLICATE, core_options.GetMergeEngine()); + ASSERT_EQ(SortEngine::LOSER_TREE, core_options.GetSortEngine()); + ASSERT_FALSE(core_options.IgnoreDelete()); + ASSERT_FALSE(core_options.WriteOnly()); + ASSERT_EQ(5, core_options.GetCompactionMinFileNum()); + ASSERT_FALSE(core_options.CompactionForceRewriteAllFiles()); + ASSERT_FALSE(core_options.CompactionForceUpLevel0()); + ASSERT_EQ(std::nullopt, core_options.GetFieldsDefaultFunc()); + ASSERT_EQ(std::nullopt, core_options.GetFieldAggFunc("f0").value()); + ASSERT_FALSE(core_options.FieldAggIgnoreRetract("f1").value()); + ASSERT_EQ(",", core_options.FieldListAggDelimiter("f1").value()); + ASSERT_FALSE(core_options.FieldCollectAggDistinct("f1").value()); + ASSERT_FALSE(core_options.DeletionVectorsEnabled()); + ASSERT_FALSE(core_options.DeletionVectorsBitmap64()); + ASSERT_EQ(2 * 1024 * 1024, core_options.DeletionVectorTargetFileSize()); + ASSERT_EQ(ChangelogProducer::NONE, core_options.GetChangelogProducer()); + ASSERT_FALSE(core_options.NeedLookup()); + ASSERT_FALSE(core_options.PrepareCommitWaitCompaction()); + LookupStrategy expected_lookup_strategy = {/*is_first_row=*/false, + /*produce_changelog=*/false, + /*deletion_vector=*/false, /*force_lookup=*/false}; + ASSERT_EQ(expected_lookup_strategy, core_options.GetLookupStrategy()); + ASSERT_TRUE(core_options.GetFieldsSequenceGroups().empty()); + ASSERT_FALSE(core_options.AggregationRemoveRecordOnDelete()); + ASSERT_FALSE(core_options.PartialUpdateRemoveRecordOnDelete()); + ASSERT_TRUE(core_options.GetPartialUpdateRemoveRecordOnSequenceGroup().empty()); + ASSERT_EQ(std::nullopt, core_options.GetScanFallbackBranch()); + ASSERT_EQ("main", core_options.GetBranch()); + ASSERT_TRUE(core_options.FileIndexReadEnabled()); + ASSERT_EQ(std::nullopt, core_options.GetDataFileExternalPaths()); + ASSERT_EQ(ExternalPathStrategy::NONE, core_options.GetExternalPathStrategy()); + ASSERT_TRUE(core_options.EnableAdaptivePrefetchStrategy()); + ASSERT_EQ(core_options.DataFilePrefix(), "data-"); + ASSERT_FALSE(core_options.IndexFileInDataFileDir()); + ASSERT_FALSE(core_options.RowTrackingEnabled()); + ASSERT_TRUE(core_options.RowTrackingPartitionGroupOnCommit()); + ASSERT_FALSE(core_options.DataEvolutionEnabled()); + ASSERT_TRUE(core_options.GetBlobFields().empty()); + ASSERT_TRUE(core_options.GetBlobDescriptorFields().empty()); + ASSERT_TRUE(core_options.GetBlobViewFields().empty()); + ASSERT_TRUE(core_options.GetBlobInlineFields().empty()); + ASSERT_TRUE(core_options.GetBlobExternalStorageFields().empty()); + ASSERT_EQ(std::nullopt, core_options.GetBlobExternalStoragePath()); + ASSERT_TRUE(core_options.LegacyPartitionNameEnabled()); + ASSERT_TRUE(core_options.GlobalIndexEnabled()); + ASSERT_EQ(std::nullopt, core_options.GetGlobalIndexExternalPath()); + ASSERT_EQ(std::nullopt, core_options.GetGlobalIndexThreadNum()); + ASSERT_EQ(std::nullopt, core_options.GetScanTagName()); + ASSERT_EQ(std::nullopt, core_options.GetOptimizedCompactionInterval()); + ASSERT_EQ(std::nullopt, core_options.GetCompactionTotalSizeThreshold()); + ASSERT_EQ(std::nullopt, core_options.GetCompactionIncrementalSizeThreshold()); + ASSERT_EQ(-1, core_options.GetCompactOffPeakStartHour()); + ASSERT_EQ(-1, core_options.GetCompactOffPeakEndHour()); + ASSERT_EQ(0, core_options.GetCompactOffPeakRatio()); + ASSERT_TRUE(core_options.LookupCacheBloomFilterEnabled()); + ASSERT_EQ(0.05, core_options.GetLookupCacheBloomFilterFpp()); + ASSERT_EQ("zstd", core_options.GetLookupCompressOptions().compress); + ASSERT_EQ(1, core_options.GetLookupCompressOptions().zstd_level); + ASSERT_EQ(64 * 1024, core_options.GetCachePageSize()); + ASSERT_EQ(200, core_options.GetCompactionMaxSizeAmplificationPercent()); + ASSERT_EQ(1, core_options.GetCompactionSizeRatio()); + ASSERT_EQ(5, core_options.GetNumSortedRunsCompactionTrigger()); + ASSERT_EQ(8, core_options.GetNumSortedRunsStopTrigger()); + ASSERT_EQ(6, core_options.GetNumLevels()); + ASSERT_EQ(LookupCompactMode::RADICAL, core_options.GetLookupCompactMode()); + ASSERT_EQ(10, core_options.GetLookupCompactMaxInterval()); + ASSERT_EQ(256 * 1024 * 1024, core_options.GetLookupCacheMaxMemory()); + ASSERT_EQ(0.25, core_options.GetLookupCacheHighPrioPoolRatio()); + ASSERT_EQ(1 * 3600 * 1000, core_options.GetLookupCacheFileRetentionMs()); + ASSERT_FALSE(core_options.TableReadSequenceNumberEnabled()); + ASSERT_FALSE(core_options.KeyValueSequenceNumberEnabled()); + ASSERT_EQ(INT64_MAX, core_options.GetLookupCacheMaxDiskSize()); + ASSERT_FALSE(core_options.LookupRemoteFileEnabled()); + ASSERT_EQ(core_options.GetLookupRemoteLevelThreshold(), INT32_MIN); + ASSERT_EQ(BucketFunctionType::DEFAULT, core_options.GetBucketFunctionType()); +} + +TEST(CoreOptionsTest, TestFromMap) { + std::map options = { + {Options::FILE_SYSTEM, "Local"}, + {Options::FILE_FORMAT, "ORC"}, + {Options::MANIFEST_FORMAT, "avRo"}, + {Options::BUCKET, "3"}, + {Options::PAGE_SIZE, "128 kb"}, + {Options::TARGET_FILE_SIZE, "512MB"}, + {Options::BLOB_TARGET_FILE_SIZE, "1G"}, + {Options::PARTITION_DEFAULT_NAME, "foo"}, + {Options::MANIFEST_TARGET_FILE_SIZE, "16MB"}, + {Options::MANIFEST_FULL_COMPACTION_FILE_SIZE, "32MB"}, + {Options::MANIFEST_MERGE_MIN_COUNT, "2"}, + {Options::SOURCE_SPLIT_TARGET_SIZE, "24MB"}, + {Options::SOURCE_SPLIT_OPEN_FILE_COST, "32MB"}, + {Options::READ_BATCH_SIZE, "2048"}, + {Options::WRITE_BUFFER_SIZE, "16MB"}, + {Options::WRITE_BATCH_SIZE, "1234"}, + {Options::WRITE_BUFFER_SPILLABLE, "false"}, + {Options::WRITE_BUFFER_SPILL_MAX_DISK_SIZE, "7GB"}, + {Options::LOCAL_SORT_MAX_NUM_FILE_HANDLES, "64"}, + {Options::SPILL_COMPRESSION, "lz4"}, + {Options::COMMIT_FORCE_COMPACT, "true"}, + {Options::COMMIT_TIMEOUT, "120s"}, + {Options::COMMIT_MAX_RETRIES, "20"}, + {Options::SCAN_SNAPSHOT_ID, "5"}, + {Options::SCAN_MODE, "from-snapshot-full"}, + {Options::SNAPSHOT_NUM_RETAINED_MIN, "15"}, + {Options::SNAPSHOT_NUM_RETAINED_MAX, "30"}, + {Options::SNAPSHOT_EXPIRE_LIMIT, "20"}, + {Options::SNAPSHOT_TIME_RETAINED, "2h"}, + {Options::SNAPSHOT_CLEAN_EMPTY_DIRECTORIES, "true"}, + {Options::SEQUENCE_FIELD, "f1,f2,f3"}, + {Options::SEQUENCE_FIELD_SORT_ORDER, "descending"}, + {Options::MERGE_ENGINE, "partial-update"}, + {Options::SORT_ENGINE, "min-heap"}, + {Options::IGNORE_DELETE, "true"}, + {Options::FIELDS_DEFAULT_AGG_FUNC, "sum"}, + {"fields.f0.aggregate-function", "min"}, + {"fields.f1.ignore-retract", "true"}, + {"fields.f2.list-agg-delimiter", " | "}, + {"fields.f2.distinct", "true"}, + {Options::DELETION_VECTORS_ENABLED, "true"}, + {Options::DELETION_VECTOR_BITMAP64, "true"}, + {Options::DELETION_VECTOR_INDEX_FILE_TARGET_SIZE, "4MB"}, + {Options::CHANGELOG_PRODUCER, "full-compaction"}, + {Options::FORCE_LOOKUP, "true"}, + {"fields.g_1,g_3.sequence-group", "c,d"}, + {Options::AGGREGATION_REMOVE_RECORD_ON_DELETE, "true"}, + {Options::PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE, "true"}, + {Options::PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP, "a,b"}, + {Options::SCAN_FALLBACK_BRANCH, "fallback"}, + {Options::BRANCH, "rt"}, + {Options::FILE_INDEX_READ_ENABLED, "false"}, + {Options::DATA_FILE_EXTERNAL_PATHS, "FILE:///tmp/index"}, + {Options::DATA_FILE_EXTERNAL_PATHS_STRATEGY, "round-robin"}, + {Options::FILE_COMPRESSION, "snappy"}, + {Options::MANIFEST_COMPRESSION, "zlib"}, + {Options::FILE_COMPRESSION_ZSTD_LEVEL, "2"}, + {"test.enable-adaptive-prefetch-strategy", "false"}, + {Options::DATA_FILE_PREFIX, "test-data-"}, + {Options::INDEX_FILE_IN_DATA_FILE_DIR, "true"}, + {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::ROW_TRACKING_PARTITION_GROUP_ON_COMMIT, "false"}, + {Options::DATA_EVOLUTION_ENABLED, "true"}, + {Options::BLOB_FIELD, "blob1,blob2"}, + {Options::BLOB_DESCRIPTOR_FIELD, "blob3,blob4"}, + {Options::BLOB_VIEW_FIELD, "blob5"}, + {Options::BLOB_EXTERNAL_STORAGE_FIELD, "blob3,blob4"}, + {Options::BLOB_EXTERNAL_STORAGE_PATH, "FILE:///tmp/blob_external_storage/"}, + {Options::PARTITION_GENERATE_LEGACY_NAME, "false"}, + {Options::GLOBAL_INDEX_ENABLED, "false"}, + {Options::GLOBAL_INDEX_THREAD_NUM, "4"}, + {Options::GLOBAL_INDEX_EXTERNAL_PATH, "FILE:///tmp/global_index/"}, + {Options::SCAN_TAG_NAME, "test-tag"}, + {Options::WRITE_ONLY, "true"}, + {Options::COMPACTION_MIN_FILE_NUM, "10"}, + {Options::COMPACTION_FORCE_REWRITE_ALL_FILES, "true"}, + {Options::COMPACTION_FORCE_UP_LEVEL_0, "true"}, + {Options::COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT, "123"}, + {Options::COMPACTION_SIZE_RATIO, "9"}, + {Options::NUM_SORTED_RUNS_COMPACTION_TRIGGER, "11"}, + {Options::NUM_SORTED_RUNS_STOP_TRIGGER, "17"}, + {Options::NUM_LEVELS, "9"}, + {Options::LOOKUP_COMPACT, "gentle"}, + {Options::LOOKUP_COMPACT_MAX_INTERVAL, "7"}, + {Options::COMPACTION_OPTIMIZATION_INTERVAL, "2s"}, + {Options::COMPACTION_TOTAL_SIZE_THRESHOLD, "5 GB"}, + {Options::COMPACTION_INCREMENTAL_SIZE_THRESHOLD, "12 kB"}, + {Options::COMPACT_OFFPEAK_START_HOUR, "3"}, + {Options::COMPACT_OFFPEAK_END_HOUR, "16"}, + {Options::COMPACTION_OFFPEAK_RATIO, "8"}, + {Options::LOOKUP_CACHE_BLOOM_FILTER_ENABLED, "false"}, + {Options::LOOKUP_CACHE_BLOOM_FILTER_FPP, "0.5"}, + {Options::LOOKUP_CACHE_SPILL_COMPRESSION, "lz4"}, + {Options::SPILL_COMPRESSION_ZSTD_LEVEL, "2"}, + {Options::CACHE_PAGE_SIZE, "6MB"}, + {Options::FILE_FORMAT_PER_LEVEL, "0:AVRO,3:parquet"}, + {Options::FILE_COMPRESSION_PER_LEVEL, "0:lz4,3:none"}, + {Options::LOOKUP_CACHE_MAX_MEMORY_SIZE, "1MB"}, + {Options::LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO, "0.35"}, + {Options::LOOKUP_CACHE_FILE_RETENTION, "30min"}, + {Options::LOOKUP_CACHE_MAX_DISK_SIZE, "10GB"}, + {Options::LOOKUP_REMOTE_FILE_ENABLED, "True"}, + {Options::LOOKUP_REMOTE_LEVEL_THRESHOLD, "2"}, + {Options::TABLE_READ_SEQUENCE_NUMBER_ENABLED, "true"}, + {Options::KEY_VALUE_SEQUENCE_NUMBER_ENABLED, "true"}, + {Options::BUCKET_FUNCTION_TYPE, "mod"}}; + + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + auto fs = core_options.GetFileSystem(); + ASSERT_TRUE(fs); + + ASSERT_EQ(core_options.GetFileFormat()->Identifier(), "orc"); + ASSERT_EQ(core_options.GetWriteFileFormat(0)->Identifier(), "avro"); + ASSERT_EQ(core_options.GetWriteFileFormat(1)->Identifier(), "orc"); + ASSERT_EQ(core_options.GetWriteFileFormat(3)->Identifier(), "parquet"); + + auto manifest_format = core_options.GetManifestFormat(); + ASSERT_EQ(manifest_format->Identifier(), "avro"); + + ASSERT_EQ(3, core_options.GetBucket()); + ASSERT_EQ(128 * 1024L, core_options.GetPageSize()); + ASSERT_EQ(512 * 1024 * 1024L, core_options.GetTargetFileSize(/*has_primary_key=*/true)); + ASSERT_EQ(512 * 1024 * 1024L, core_options.GetTargetFileSize(/*has_primary_key=*/false)); + ASSERT_EQ(1024 * 1024 * 1024L, core_options.GetBlobTargetFileSize()); + ASSERT_EQ("foo", core_options.GetPartitionDefaultName()); + ASSERT_EQ(16 * 1024 * 1024L, core_options.GetManifestTargetFileSize()); + ASSERT_EQ(32 * 1024 * 1024L, core_options.GetManifestFullCompactionThresholdSize()); + ASSERT_EQ(2, core_options.GetManifestMergeMinCount()); + ASSERT_EQ(24 * 1024 * 1024L, core_options.GetSourceSplitTargetSize()); + ASSERT_EQ(32 * 1024 * 1024L, core_options.GetSourceSplitOpenFileCost()); + ASSERT_EQ(2048, core_options.GetReadBatchSize()); + ASSERT_EQ(1234, core_options.GetWriteBatchSize()); + ASSERT_EQ(16 * 1024 * 1024, core_options.GetWriteBufferSize()); + ASSERT_FALSE(core_options.GetWriteBufferSpillable()); + ASSERT_EQ(7L * 1024 * 1024 * 1024, core_options.GetWriteBufferSpillMaxDiskSize()); + ASSERT_EQ(64, core_options.GetLocalSortMaxNumFileHandles()); + ASSERT_EQ("lz4", core_options.GetSpillCompressOptions().compress); + ASSERT_EQ(2, core_options.GetSpillCompressOptions().zstd_level); + ASSERT_TRUE(core_options.CommitForceCompact()); + ASSERT_EQ(120 * 1000, core_options.GetCommitTimeout()); + ASSERT_EQ(20, core_options.GetCommitMaxRetries()); + ASSERT_EQ(5, core_options.GetScanSnapshotId().value_or(-1)); + ExpireConfig expire_config = core_options.GetExpireConfig(); + ASSERT_EQ(15, expire_config.GetSnapshotRetainMin()); + ASSERT_EQ(30, expire_config.GetSnapshotRetainMax()); + ASSERT_EQ(20, expire_config.GetSnapshotMaxDeletes()); + ASSERT_EQ(2 * 3600 * 1000L, expire_config.GetSnapshotTimeRetainMs()); + ASSERT_TRUE(expire_config.CleanEmptyDirectories()); + ASSERT_EQ(std::vector({"f1", "f2", "f3"}), core_options.GetSequenceField()); + ASSERT_FALSE(core_options.SequenceFieldSortOrderIsAscending()); + ASSERT_EQ(MergeEngine::PARTIAL_UPDATE, core_options.GetMergeEngine()); + ASSERT_EQ(SortEngine::MIN_HEAP, core_options.GetSortEngine()); + ASSERT_TRUE(core_options.IgnoreDelete()); + ASSERT_EQ("sum", core_options.GetFieldsDefaultFunc().value()); + ASSERT_EQ("min", core_options.GetFieldAggFunc("f0").value().value()); + ASSERT_TRUE(core_options.FieldAggIgnoreRetract("f1").value()); + ASSERT_TRUE(core_options.FieldAggIgnoreRetract("f1").value()); + ASSERT_EQ(" | ", core_options.FieldListAggDelimiter("f2").value()); + ASSERT_TRUE(core_options.FieldCollectAggDistinct("f2").value()); + ASSERT_TRUE(core_options.DeletionVectorsEnabled()); + ASSERT_TRUE(core_options.DeletionVectorsBitmap64()); + ASSERT_EQ(4 * 1024 * 1024, core_options.DeletionVectorTargetFileSize()); + ASSERT_EQ(ChangelogProducer::FULL_COMPACTION, core_options.GetChangelogProducer()); + ASSERT_TRUE(core_options.NeedLookup()); + ASSERT_TRUE(core_options.PrepareCommitWaitCompaction()); + LookupStrategy expected_lookup_strategy = {/*is_first_row=*/false, + /*produce_changelog=*/false, + /*deletion_vector=*/true, /*force_lookup=*/true}; + ASSERT_EQ(expected_lookup_strategy, core_options.GetLookupStrategy()); + + std::map seq_grp; + seq_grp["g_1,g_3"] = "c,d"; + ASSERT_EQ(core_options.GetFieldsSequenceGroups(), seq_grp); + ASSERT_TRUE(core_options.AggregationRemoveRecordOnDelete()); + ASSERT_TRUE(core_options.PartialUpdateRemoveRecordOnDelete()); + ASSERT_EQ(core_options.GetPartialUpdateRemoveRecordOnSequenceGroup(), + std::vector({"a", "b"})); + ASSERT_EQ(core_options.GetScanFallbackBranch(), std::optional("fallback")); + ASSERT_EQ(core_options.GetBranch(), "rt"); + ASSERT_FALSE(core_options.FileIndexReadEnabled()); + ASSERT_EQ(core_options.GetDataFileExternalPaths(), + std::optional("FILE:///tmp/index")); + ASSERT_EQ(core_options.GetExternalPathStrategy(), ExternalPathStrategy::ROUND_ROBIN); + ASSERT_EQ("snappy", core_options.GetFileCompression()); + ASSERT_EQ("lz4", core_options.GetWriteFileCompression(0)); + ASSERT_EQ("snappy", core_options.GetWriteFileCompression(1)); + ASSERT_EQ("none", core_options.GetWriteFileCompression(3)); + ASSERT_EQ("snappy", core_options.GetWriteFileCompression(5)); + ASSERT_EQ("zlib", core_options.GetManifestCompression()); + ASSERT_EQ(2, core_options.GetFileCompressionZstdLevel()); + ASSERT_FALSE(core_options.EnableAdaptivePrefetchStrategy()); + ASSERT_EQ(core_options.DataFilePrefix(), "test-data-"); + ASSERT_TRUE(core_options.IndexFileInDataFileDir()); + ASSERT_TRUE(core_options.RowTrackingEnabled()); + ASSERT_FALSE(core_options.RowTrackingPartitionGroupOnCommit()); + ASSERT_TRUE(core_options.DataEvolutionEnabled()); + ASSERT_EQ(core_options.GetBlobFields(), std::vector({"blob1", "blob2"})); + ASSERT_EQ(core_options.GetBlobDescriptorFields(), std::vector({"blob3", "blob4"})); + ASSERT_EQ(core_options.GetBlobViewFields(), std::vector({"blob5"})); + ASSERT_EQ(core_options.GetBlobInlineFields(), + std::vector({"blob3", "blob4", "blob5"})); + ASSERT_EQ(core_options.GetBlobExternalStorageFields(), + std::vector({"blob3", "blob4"})); + ASSERT_EQ(core_options.GetBlobExternalStoragePath(), + std::optional("FILE:///tmp/blob_external_storage/")); + ASSERT_FALSE(core_options.LegacyPartitionNameEnabled()); + ASSERT_FALSE(core_options.GlobalIndexEnabled()); + ASSERT_EQ(core_options.GetGlobalIndexThreadNum(), 4); + ASSERT_TRUE(core_options.GetGlobalIndexExternalPath()); + ASSERT_EQ(core_options.GetGlobalIndexExternalPath().value(), "FILE:///tmp/global_index/"); + ASSERT_EQ("test-tag", core_options.GetScanTagName().value()); + ASSERT_EQ(StartupMode::FromSnapshotFull(), core_options.GetStartupMode()); + ASSERT_EQ(375809637, core_options.GetCompactionFileSize(/*has_primary_key=*/true)); + ASSERT_EQ(375809637, core_options.GetCompactionFileSize(/*has_primary_key=*/false)); + ASSERT_TRUE(core_options.WriteOnly()); + ASSERT_EQ(10, core_options.GetCompactionMinFileNum()); + ASSERT_EQ(123, core_options.GetCompactionMaxSizeAmplificationPercent()); + ASSERT_EQ(9, core_options.GetCompactionSizeRatio()); + ASSERT_EQ(11, core_options.GetNumSortedRunsCompactionTrigger()); + ASSERT_EQ(17, core_options.GetNumSortedRunsStopTrigger()); + ASSERT_EQ(9, core_options.GetNumLevels()); + ASSERT_EQ(LookupCompactMode::GENTLE, core_options.GetLookupCompactMode()); + ASSERT_EQ(11, core_options.GetLookupCompactMaxInterval()); + ASSERT_TRUE(core_options.CompactionForceRewriteAllFiles()); + ASSERT_TRUE(core_options.CompactionForceUpLevel0()); + ASSERT_EQ(2000, core_options.GetOptimizedCompactionInterval().value()); + ASSERT_EQ(5l * 1024 * 1024 * 1024, core_options.GetCompactionTotalSizeThreshold().value()); + ASSERT_EQ(12l * 1024, core_options.GetCompactionIncrementalSizeThreshold().value()); + ASSERT_EQ(3, core_options.GetCompactOffPeakStartHour()); + ASSERT_EQ(16, core_options.GetCompactOffPeakEndHour()); + ASSERT_EQ(8, core_options.GetCompactOffPeakRatio()); + ASSERT_FALSE(core_options.LookupCacheBloomFilterEnabled()); + ASSERT_EQ(0.5, core_options.GetLookupCacheBloomFilterFpp()); + ASSERT_EQ("lz4", core_options.GetLookupCompressOptions().compress); + ASSERT_EQ(2, core_options.GetLookupCompressOptions().zstd_level); + ASSERT_EQ(6 * 1024 * 1024, core_options.GetCachePageSize()); + ASSERT_EQ(1024 * 1024, core_options.GetLookupCacheMaxMemory()); + ASSERT_EQ(0.35, core_options.GetLookupCacheHighPrioPoolRatio()); + ASSERT_EQ(30 * 60 * 1000, core_options.GetLookupCacheFileRetentionMs()); + ASSERT_EQ(10L * 1024 * 1024 * 1024, core_options.GetLookupCacheMaxDiskSize()); + ASSERT_TRUE(core_options.TableReadSequenceNumberEnabled()); + ASSERT_TRUE(core_options.KeyValueSequenceNumberEnabled()); + ASSERT_TRUE(core_options.LookupRemoteFileEnabled()); + ASSERT_EQ(core_options.GetLookupRemoteLevelThreshold(), 2); + ASSERT_EQ(BucketFunctionType::MOD, core_options.GetBucketFunctionType()); +} + +TEST(CoreOptionsTest, TestInvalidCase) { + ASSERT_NOK_WITH_MSG(CoreOptions::FromMap({{Options::BUCKET, "3.5"}}), + "Invalid Config [bucket: 3.5]"); + ASSERT_NOK_WITH_MSG(CoreOptions::FromMap({{Options::SCAN_SNAPSHOT_ID, "3.5"}}), + "Invalid Config [scan.snapshot-id: 3.5]"); + ASSERT_NOK_WITH_MSG(CoreOptions::FromMap({{Options::SEQUENCE_FIELD_SORT_ORDER, "invalid"}}), + "invalid sort order: invalid"); + ASSERT_NOK_WITH_MSG(CoreOptions::FromMap({{Options::SORT_ENGINE, "invalid"}}), + "invalid sort engine: invalid"); + ASSERT_NOK_WITH_MSG(CoreOptions::FromMap({{Options::MERGE_ENGINE, "invalid"}}), + "invalid merge engine: invalid"); + ASSERT_NOK_WITH_MSG(CoreOptions::FromMap({{Options::CHANGELOG_PRODUCER, "invalid"}}), + "invalid changelog producer: invalid"); + ASSERT_NOK_WITH_MSG(CoreOptions::FromMap({{Options::LOOKUP_COMPACT, "invalid"}}), + "invalid lookup mode: invalid"); + ASSERT_NOK_WITH_MSG(CoreOptions::FromMap({{Options::LOOKUP_COMPACT_MAX_INTERVAL, "invalid"}}), + "Invalid Config [lookup-compact.max-interval: invalid]"); + ASSERT_NOK_WITH_MSG( + CoreOptions::FromMap({{Options::LOOKUP_CACHE_HIGH_PRIO_POOL_RATIO, "1.1"}}), + "The high priority pool ratio should in the range [0, 1), while input is 1.1"); + ASSERT_NOK_WITH_MSG(CoreOptions::FromMap({{Options::BUCKET_FUNCTION_TYPE, "invalid"}}), + "invalid bucket function type: invalid"); +} + +TEST(CoreOptionsTest, TestLookupCompactMaxIntervalComputedValue) { + std::map options = { + {Options::NUM_SORTED_RUNS_COMPACTION_TRIGGER, "11"}, + {Options::LOOKUP_COMPACT_MAX_INTERVAL, "13"}, + }; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_EQ(13, core_options.GetLookupCompactMaxInterval()); +} + +TEST(CoreOptionsTest, TestNumSortedRunsStopTriggerFloorAndDefault) { + { + std::map options = { + {Options::NUM_SORTED_RUNS_COMPACTION_TRIGGER, "11"}, + }; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_EQ(14, core_options.GetNumSortedRunsStopTrigger()); + } + + { + std::map options = { + {Options::NUM_SORTED_RUNS_COMPACTION_TRIGGER, "11"}, + {Options::NUM_SORTED_RUNS_STOP_TRIGGER, "7"}, + }; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_EQ(11, core_options.GetNumSortedRunsStopTrigger()); + } +} + +TEST(CoreOptionsTest, TestLookupStrategy) { + { + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap({})); + auto strategy = core_options.GetLookupStrategy(); + ASSERT_FALSE(strategy.is_first_row); + ASSERT_FALSE(strategy.produce_changelog); + ASSERT_FALSE(strategy.deletion_vector); + ASSERT_FALSE(strategy.need_lookup); + } + { + std::map options = { + {Options::MERGE_ENGINE, "first-row"}, + {Options::CHANGELOG_PRODUCER, "lookup"}, + {Options::DELETION_VECTORS_ENABLED, "true"}, + {Options::FORCE_LOOKUP, "true"}, + }; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + auto strategy = core_options.GetLookupStrategy(); + ASSERT_TRUE(strategy.is_first_row); + ASSERT_TRUE(strategy.produce_changelog); + ASSERT_TRUE(strategy.deletion_vector); + ASSERT_TRUE(strategy.need_lookup); + } +} + +TEST(CoreOptionsTest, TestPrepareCommitWaitCompaction) { + { + std::map options = { + {Options::FORCE_LOOKUP, "true"}, + {Options::LOOKUP_WAIT, "false"}, + }; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_TRUE(core_options.NeedLookup()); + ASSERT_FALSE(core_options.PrepareCommitWaitCompaction()); + } + + { + std::map options = { + {Options::FORCE_LOOKUP, "true"}, + {Options::LOOKUP_WAIT, "true"}, + }; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_TRUE(core_options.NeedLookup()); + ASSERT_TRUE(core_options.PrepareCommitWaitCompaction()); + } + + { + std::map options = { + {Options::LOOKUP_WAIT, "true"}, + }; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_FALSE(core_options.NeedLookup()); + ASSERT_FALSE(core_options.PrepareCommitWaitCompaction()); + } +} + +TEST(CoreOptionsTest, TestInvalidFileFormatPerLevel) { + ASSERT_NOK_WITH_MSG(CoreOptions::FromMap({{Options::FILE_FORMAT_PER_LEVEL, "0:AVRO:parquet"}}), + "fail to parse key file.format.per.level, value 0:AVRO:parquet"); + ASSERT_NOK_WITH_MSG(CoreOptions::FromMap({{Options::FILE_FORMAT_PER_LEVEL, "aaa:avro"}}), + "fail to parse level aaa from string to int in file.format.per.level"); +} + +TEST(CoreOptionsTest, TestInvalidFileCompressionPerLevel) { + ASSERT_NOK_WITH_MSG(CoreOptions::FromMap({{Options::FILE_COMPRESSION_PER_LEVEL, "0:lz4:zstd"}}), + "fail to parse key file.compression.per.level, value 0:lz4:zstd"); + ASSERT_NOK_WITH_MSG(CoreOptions::FromMap({{Options::FILE_COMPRESSION_PER_LEVEL, "abc:lz4"}}), + "fail to parse level abc from string to int in file.compression.per.level"); +} + +TEST(CoreOptionsTest, TestCreateExternalPath) { + std::map options = { + {Options::DATA_FILE_EXTERNAL_PATHS, + " FILE:///tmp/index1 ,FILE:///tmp/index2,FILE:///tmp/index3,,"}, + {Options::DATA_FILE_EXTERNAL_PATHS_STRATEGY, "round-robin"}, + }; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_OK_AND_ASSIGN(std::vector external_paths, + core_options.CreateExternalPaths()); + ASSERT_EQ("FILE:/tmp/index1", external_paths[0]); + ASSERT_EQ("FILE:/tmp/index2", external_paths[1]); + ASSERT_EQ("FILE:/tmp/index3", external_paths[2]); +} + +TEST(CoreOptionsTest, TestInvalidCreateExternalPath) { + { + std::map options = { + {Options::DATA_FILE_EXTERNAL_PATHS, + "/tmp/index1,FILE:///tmp/index2,FILE:///tmp/index3, "}, + {Options::DATA_FILE_EXTERNAL_PATHS_STRATEGY, "round-robin"}, + }; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_NOK_WITH_MSG(core_options.CreateExternalPaths(), + "scheme is null, path is /tmp/index1"); + } + { + std::map options = { + {Options::DATA_FILE_EXTERNAL_PATHS, "FILE:///tmp/index"}, + {Options::DATA_FILE_EXTERNAL_PATHS_STRATEGY, "specific-fs"}, + }; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_NOK_WITH_MSG(core_options.CreateExternalPaths(), + "do not support specific-fs external path strategy for now"); + } + { + std::map options = { + {Options::DATA_FILE_EXTERNAL_PATHS, ","}, + {Options::DATA_FILE_EXTERNAL_PATHS_STRATEGY, "round-robin"}, + }; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_NOK_WITH_MSG(core_options.CreateExternalPaths(), "external paths is empty"); + } +} + +TEST(CoreOptionsTest, TestCreateGlobalIndexExternalPath) { + std::map options = { + {Options::GLOBAL_INDEX_EXTERNAL_PATH, " FILE:///tmp/index1"}, + }; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_OK_AND_ASSIGN(std::optional external_path, + core_options.CreateGlobalIndexExternalPath()); + ASSERT_EQ("FILE:/tmp/index1", external_path.value()); +} + +TEST(CoreOptionsTest, TestInvalidCreateGlobalIndexExternalPath) { + std::map options = { + {Options::GLOBAL_INDEX_EXTERNAL_PATH, "/tmp/index1"}, + }; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + ASSERT_NOK_WITH_MSG(core_options.CreateGlobalIndexExternalPath(), + "scheme is null, path is /tmp/index1"); +} + +TEST(CoreOptionsTest, TestFileSystem) { + { + auto mock_fs = std::make_shared(); + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, + CoreOptions::FromMap({}, + /*specified_file_system=*/mock_fs)); + auto fs = core_options.GetFileSystem(); + ASSERT_TRUE(std::dynamic_pointer_cast(fs)); + } + { + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, + CoreOptions::FromMap({}, + /*specified_file_system=*/nullptr)); + auto fs = core_options.GetFileSystem(); + auto typed_fs = std::dynamic_pointer_cast(fs); + ASSERT_TRUE(typed_fs); + ASSERT_TRUE(std::dynamic_pointer_cast( + typed_fs->GetRealFileSystem("/tmp").value_or(nullptr))); + } + { + ASSERT_OK_AND_ASSIGN( + CoreOptions core_options, + CoreOptions::FromMap( + {}, /*specified_file_system=*/nullptr, + /*fs_scheme_to_identifier_map=*/{{"hdfs", "mock_fs"}, {"oss", "local"}})); + auto fs = core_options.GetFileSystem(); + auto typed_fs = std::dynamic_pointer_cast(fs); + ASSERT_TRUE(typed_fs); + ASSERT_TRUE(std::dynamic_pointer_cast( + typed_fs->GetRealFileSystem("/tmp").value_or(nullptr))); + ASSERT_TRUE(std::dynamic_pointer_cast( + typed_fs->GetRealFileSystem("hdfs:///tmp/").value_or(nullptr))); + ASSERT_TRUE(std::dynamic_pointer_cast( + typed_fs->GetRealFileSystem("oss:///tmp/").value_or(nullptr))); + } +} + +TEST(CoreOptionsTest, TestNormalizeValueInCoreOption) { + std::map options = { + {Options::SEQUENCE_FIELD_SORT_ORDER, "ASCENDING"}, + {Options::SORT_ENGINE, "MIN-heap"}, + {Options::MERGE_ENGINE, "first-ROW"}, + {Options::CHANGELOG_PRODUCER, "LOOKUP"}, + {Options::DATA_FILE_EXTERNAL_PATHS_STRATEGY, "ROUND-ROBIN"}, + {Options::LOOKUP_COMPACT, "GENTLE"}, + {Options::SCAN_MODE, "DEFAULT"}, + {Options::BUCKET_FUNCTION_TYPE, "MOD"}, + }; + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap(options)); + + ASSERT_EQ(StartupMode::LatestFull(), core_options.GetStartupMode()); + ASSERT_EQ(ExternalPathStrategy::ROUND_ROBIN, core_options.GetExternalPathStrategy()); + ASSERT_EQ(ChangelogProducer::LOOKUP, core_options.GetChangelogProducer()); + ASSERT_EQ(MergeEngine::FIRST_ROW, core_options.GetMergeEngine()); + ASSERT_EQ(SortEngine::MIN_HEAP, core_options.GetSortEngine()); + ASSERT_EQ(LookupCompactMode::GENTLE, core_options.GetLookupCompactMode()); + ASSERT_TRUE(core_options.SequenceFieldSortOrderIsAscending()); + ASSERT_EQ(BucketFunctionType::MOD, core_options.GetBucketFunctionType()); +} + +TEST(CoreOptionsTest, TestScanTimestampMillis) { + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, + CoreOptions::FromMap({{Options::SCAN_TIMESTAMP_MILLIS, "1721614515032"}})); + ASSERT_EQ(1721614515032, core_options.GetScanTimestampMillis().value()); + ASSERT_EQ(StartupMode::FromTimestamp(), core_options.GetStartupMode()); +} + +TEST(CoreOptionsTest, TestScanTimestampMillisExplicitMode) { + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, + CoreOptions::FromMap({{Options::SCAN_MODE, "from-timestamp"}, + {Options::SCAN_TIMESTAMP_MILLIS, "1721614515032"}})); + ASSERT_EQ(StartupMode::FromTimestamp(), core_options.GetStartupMode()); + ASSERT_EQ(1721614515032, core_options.GetScanTimestampMillis().value()); +} + +TEST(CoreOptionsTest, TestScanTimestampMillisNotSet) { + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, CoreOptions::FromMap({})); + ASSERT_EQ(std::nullopt, core_options.GetScanTimestampMillis()); + ASSERT_EQ(StartupMode::LatestFull(), core_options.GetStartupMode()); +} + +TEST(CoreOptionsTest, TestScanTimestampString) { + TimezoneGuard tz_guard("Asia/Shanghai"); + ASSERT_OK_AND_ASSIGN(CoreOptions core_options, + CoreOptions::FromMap({{Options::SCAN_TIMESTAMP, "2023-06-01 00:00:00"}})); + ASSERT_EQ(core_options.GetScanTimestampMillis().value(), 1685548800000); + ASSERT_EQ(StartupMode::FromTimestamp(), core_options.GetStartupMode()); +} + +TEST(CoreOptionsTest, TestScanTimestampStringDateOnly) { + ASSERT_OK_AND_ASSIGN(CoreOptions opts1, + CoreOptions::FromMap({{Options::SCAN_TIMESTAMP, "2023-06-01"}})); + ASSERT_OK_AND_ASSIGN(CoreOptions opts2, + CoreOptions::FromMap({{Options::SCAN_TIMESTAMP, "2023-06-01 00:00:00"}})); + ASSERT_EQ(opts1.GetScanTimestampMillis().value(), opts2.GetScanTimestampMillis().value()); +} + +TEST(CoreOptionsTest, TestScanTimestampMillisAndStringMutuallyExclusive) { + ASSERT_NOK_WITH_MSG(CoreOptions::FromMap({{Options::SCAN_TIMESTAMP_MILLIS, "1721614515032"}, + {Options::SCAN_TIMESTAMP, "2023-06-01 00:00:00"}}), + "scan.timestamp-millis and scan.timestamp cannot be set at the same time"); +} + +TEST(CoreOptionsTest, TestScanTimestampInvalidString) { + ASSERT_NOK(CoreOptions::FromMap({{Options::SCAN_TIMESTAMP, "not-a-date"}})); +} + +TEST(CoreOptionsTest, TestOverflowProtection) { + std::string max_val = std::to_string(std::numeric_limits::max()); + ASSERT_OK_AND_ASSIGN( + CoreOptions options, + CoreOptions::FromMap({{Options::NUM_SORTED_RUNS_COMPACTION_TRIGGER, max_val}})); + + ASSERT_EQ(options.GetNumSortedRunsStopTrigger(), std::numeric_limits::max()); + ASSERT_EQ(options.GetNumLevels(), std::numeric_limits::max()); + ASSERT_EQ(options.GetLookupCompactMaxInterval(), std::numeric_limits::max()); +} + +TEST(CoreOptionsTest, TestExplicitNumLevels) { + ASSERT_OK_AND_ASSIGN(CoreOptions options, CoreOptions::FromMap({{Options::NUM_LEVELS, "10"}})); + ASSERT_EQ(options.GetNumLevels(), 10); +} + +TEST(CoreOptionsTest, TestParseChangelogProducer) { + { + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::CHANGELOG_PRODUCER, "none"}})); + ASSERT_EQ(options.GetChangelogProducer(), ChangelogProducer::NONE); + } + { + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::CHANGELOG_PRODUCER, "input"}})); + ASSERT_EQ(options.GetChangelogProducer(), ChangelogProducer::INPUT); + } + { + ASSERT_OK_AND_ASSIGN( + CoreOptions options, + CoreOptions::FromMap({{Options::CHANGELOG_PRODUCER, "full-compaction"}})); + ASSERT_EQ(options.GetChangelogProducer(), ChangelogProducer::FULL_COMPACTION); + } + { + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::CHANGELOG_PRODUCER, "lookup"}})); + ASSERT_EQ(options.GetChangelogProducer(), ChangelogProducer::LOOKUP); + } + { + // case insensitive + ASSERT_OK_AND_ASSIGN(CoreOptions options, + CoreOptions::FromMap({{Options::CHANGELOG_PRODUCER, "LOOKUP"}})); + ASSERT_EQ(options.GetChangelogProducer(), ChangelogProducer::LOOKUP); + } + ASSERT_NOK_WITH_MSG(CoreOptions::FromMap({{Options::CHANGELOG_PRODUCER, "invalid"}}), + "invalid changelog producer: invalid"); +} + +TEST(CoreOptionsTest, TestParseExternalPathStrategy) { + { + ASSERT_OK_AND_ASSIGN( + CoreOptions options, + CoreOptions::FromMap({{Options::DATA_FILE_EXTERNAL_PATHS_STRATEGY, "none"}})); + ASSERT_EQ(options.GetExternalPathStrategy(), ExternalPathStrategy::NONE); + } + { + ASSERT_OK_AND_ASSIGN( + CoreOptions options, + CoreOptions::FromMap({{Options::DATA_FILE_EXTERNAL_PATHS_STRATEGY, "specific-fs"}})); + ASSERT_EQ(options.GetExternalPathStrategy(), ExternalPathStrategy::SPECIFIC_FS); + } + { + ASSERT_OK_AND_ASSIGN( + CoreOptions options, + CoreOptions::FromMap({{Options::DATA_FILE_EXTERNAL_PATHS_STRATEGY, "round-robin"}})); + ASSERT_EQ(options.GetExternalPathStrategy(), ExternalPathStrategy::ROUND_ROBIN); + } + { + // case insensitive + ASSERT_OK_AND_ASSIGN( + CoreOptions options, + CoreOptions::FromMap({{Options::DATA_FILE_EXTERNAL_PATHS_STRATEGY, "ROUND-ROBIN"}})); + ASSERT_EQ(options.GetExternalPathStrategy(), ExternalPathStrategy::ROUND_ROBIN); + } + ASSERT_NOK_WITH_MSG( + CoreOptions::FromMap({{Options::DATA_FILE_EXTERNAL_PATHS_STRATEGY, "invalid"}}), + "invalid external path strategy: invalid"); +} + +TEST(CoreOptionsTest, TestCopyAssignmentOperator) { + // Build a CoreOptions with non-default values + std::map options = { + {Options::BUCKET, "3"}, + {Options::PAGE_SIZE, "128 kb"}, + {Options::TARGET_FILE_SIZE, "512MB"}, + {Options::FILE_FORMAT, "ORC"}, + {Options::FILE_COMPRESSION, "lz4"}, + {Options::FILE_COMPRESSION_ZSTD_LEVEL, "5"}, + {Options::PARTITION_DEFAULT_NAME, "foo"}, + {Options::MANIFEST_MERGE_MIN_COUNT, "2"}, + {Options::READ_BATCH_SIZE, "2048"}, + {Options::WRITE_BATCH_SIZE, "1234"}, + {Options::WRITE_BUFFER_SIZE, "16MB"}, + {Options::WRITE_BUFFER_SPILLABLE, "false"}, + {Options::COMMIT_FORCE_COMPACT, "true"}, + {Options::COMMIT_MAX_RETRIES, "20"}, + {Options::SEQUENCE_FIELD, "f1,f2"}, + {Options::MERGE_ENGINE, "first-row"}, + {Options::SORT_ENGINE, "min-heap"}, + {Options::CHANGELOG_PRODUCER, "lookup"}, + {Options::DELETION_VECTORS_ENABLED, "true"}, + {Options::FORCE_LOOKUP, "true"}, + {Options::IGNORE_DELETE, "true"}, + {Options::WRITE_ONLY, "true"}, + {Options::COMPACTION_MIN_FILE_NUM, "10"}, + {Options::COMPACTION_FORCE_REWRITE_ALL_FILES, "true"}, + {Options::NUM_SORTED_RUNS_COMPACTION_TRIGGER, "11"}, + {Options::NUM_SORTED_RUNS_STOP_TRIGGER, "17"}, + {Options::NUM_LEVELS, "9"}, + {Options::LOOKUP_COMPACT, "gentle"}, + {Options::DATA_FILE_PREFIX, "test-data-"}, + {Options::ROW_TRACKING_ENABLED, "true"}, + {Options::DATA_EVOLUTION_ENABLED, "true"}, + {Options::BUCKET_FUNCTION_TYPE, "mod"}, + }; + ASSERT_OK_AND_ASSIGN(CoreOptions source, CoreOptions::FromMap(options)); + + // Default-constructed target with different values + CoreOptions target; + + // Perform copy assignment + target = source; + + // Verify all fields are correctly copied + ASSERT_EQ(3, target.GetBucket()); + ASSERT_EQ(128 * 1024L, target.GetPageSize()); + ASSERT_EQ("orc", target.GetFileFormat()->Identifier()); + ASSERT_EQ("lz4", target.GetFileCompression()); + ASSERT_EQ(5, target.GetFileCompressionZstdLevel()); + ASSERT_EQ("foo", target.GetPartitionDefaultName()); + ASSERT_EQ(2, target.GetManifestMergeMinCount()); + ASSERT_EQ(2048, target.GetReadBatchSize()); + ASSERT_EQ(1234, target.GetWriteBatchSize()); + ASSERT_EQ(16 * 1024 * 1024L, target.GetWriteBufferSize()); + ASSERT_FALSE(target.GetWriteBufferSpillable()); + ASSERT_TRUE(target.CommitForceCompact()); + ASSERT_EQ(20, target.GetCommitMaxRetries()); + ASSERT_EQ(std::vector({"f1", "f2"}), target.GetSequenceField()); + ASSERT_EQ(MergeEngine::FIRST_ROW, target.GetMergeEngine()); + ASSERT_EQ(SortEngine::MIN_HEAP, target.GetSortEngine()); + ASSERT_EQ(ChangelogProducer::LOOKUP, target.GetChangelogProducer()); + ASSERT_TRUE(target.DeletionVectorsEnabled()); + ASSERT_TRUE(target.NeedLookup()); + ASSERT_TRUE(target.IgnoreDelete()); + ASSERT_TRUE(target.WriteOnly()); + ASSERT_EQ(10, target.GetCompactionMinFileNum()); + ASSERT_TRUE(target.CompactionForceRewriteAllFiles()); + ASSERT_EQ(11, target.GetNumSortedRunsCompactionTrigger()); + ASSERT_EQ(17, target.GetNumSortedRunsStopTrigger()); + ASSERT_EQ(9, target.GetNumLevels()); + ASSERT_EQ(LookupCompactMode::GENTLE, target.GetLookupCompactMode()); + ASSERT_EQ("test-data-", target.DataFilePrefix()); + ASSERT_TRUE(target.RowTrackingEnabled()); + ASSERT_TRUE(target.DataEvolutionEnabled()); + ASSERT_EQ(BucketFunctionType::MOD, target.GetBucketFunctionType()); + + // Verify the target's ToMap matches the source's ToMap + ASSERT_EQ(source.ToMap(), target.ToMap()); + + CoreOptions target2 = source; + ASSERT_EQ(source.ToMap(), target2.ToMap()); +} + +TEST(CoreOptionsTest, TestAssignmentIndependence) { + std::map options = { + {Options::BUCKET, "5"}, + {Options::MERGE_ENGINE, "first-row"}, + }; + ASSERT_OK_AND_ASSIGN(CoreOptions source, CoreOptions::FromMap(options)); + + CoreOptions target; + target = source; + + // Verify target matches source + ASSERT_EQ(5, target.GetBucket()); + ASSERT_EQ(MergeEngine::FIRST_ROW, target.GetMergeEngine()); + + // Modify source by reassigning a different config + std::map new_options = { + {Options::BUCKET, "99"}, + {Options::MERGE_ENGINE, "deduplicate"}, + }; + ASSERT_OK_AND_ASSIGN(source, CoreOptions::FromMap(new_options)); + + // Target should be unaffected (deep copy) + ASSERT_EQ(5, target.GetBucket()); + ASSERT_EQ(MergeEngine::FIRST_ROW, target.GetMergeEngine()); + + // Source should have new values + ASSERT_EQ(99, source.GetBucket()); + ASSERT_EQ(MergeEngine::DEDUPLICATE, source.GetMergeEngine()); +} + +TEST(CoreOptionsTest, TestFallback) { + { + ASSERT_OK_AND_ASSIGN( + CoreOptions options, + CoreOptions::FromMap({{Options::FALLBACK_BLOB_DESCRIPTOR_FIELD, "b1,b2"}})); + ASSERT_EQ(options.GetBlobDescriptorFields(), std::vector({"b1", "b2"})); + } + { + ASSERT_OK_AND_ASSIGN( + CoreOptions options, + CoreOptions::FromMap({{Options::FALLBACK_BLOB_DESCRIPTOR_FIELD, "b1,b2"}, + {Options::BLOB_DESCRIPTOR_FIELD, "new_b1 , new_b2"}})); + ASSERT_EQ(options.GetBlobDescriptorFields(), + std::vector({"new_b1", "new_b2"})); + } +} +} // namespace paimon::test diff --git a/src/paimon/core/key_value.h b/src/paimon/core/key_value.h new file mode 100644 index 0000000..c19a3c6 --- /dev/null +++ b/src/paimon/core/key_value.h @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "arrow/c/bridge.h" +#include "arrow/c/helpers.h" +#include "paimon/common/data/internal_row.h" +#include "paimon/common/types/row_kind.h" + +namespace paimon { +/// A key value, including user key, sequence number, value kind and value. This object can be +/// reused. +/// e.g., primary key: k0, k1 +/// table fields: k0, k1, v0, v1, v2 +/// InternalRow of key in KeyValue Object: k0, k1 +/// InternalRow of value in KeyValue Object: k0, k1, v0, v1, v2 +struct KeyValue { + static constexpr int32_t UNKNOWN_LEVEL = -1; + static constexpr int32_t UNKNOWN_SEQUENCE = -1; + KeyValue() = default; + + KeyValue(const RowKind* _value_kind, int64_t _sequence_number, int32_t _level, + std::shared_ptr _key, std::unique_ptr _value) + : value_kind(_value_kind), + sequence_number(_sequence_number), + level(_level), + key(std::move(_key)), + value(std::move(_value)) {} + + KeyValue(KeyValue&& other) noexcept { + *this = std::move(other); + } + + KeyValue& operator=(KeyValue&& other) noexcept { + if (&other == this) { + return *this; + } + value_kind = other.value_kind; + sequence_number = other.sequence_number; + level = other.level; + key = std::move(other.key); + value = std::move(other.value); + return *this; + } + + const RowKind* value_kind = nullptr; + // determined after written into memory table or read from file + int64_t sequence_number = -1; + // determined after read from file + int32_t level = -1; + std::shared_ptr key; + std::unique_ptr value; +}; + +struct KeyValueBatch { + KeyValueBatch() = default; + KeyValueBatch(KeyValueBatch&& other) noexcept { + *this = std::move(other); + } + KeyValueBatch& operator=(KeyValueBatch&& other) noexcept { + if (&other == this) { + return *this; + } + delete_row_count = other.delete_row_count; + min_sequence_number = other.min_sequence_number; + max_sequence_number = other.max_sequence_number; + min_key = std::move(other.min_key); + max_key = std::move(other.max_key); + batch = std::move(other.batch); + other.delete_row_count = 0; + return *this; + } + ~KeyValueBatch() { + if (batch) { + ArrowArrayRelease(batch.get()); + } + } + int64_t delete_row_count = 0; + int64_t min_sequence_number = std::numeric_limits::max(); + int64_t max_sequence_number = std::numeric_limits::min(); + std::shared_ptr min_key; + std::shared_ptr max_key; + std::unique_ptr batch; +}; +} // namespace paimon