diff --git a/src/paimon/format/orc/orc_adapter.cpp b/src/paimon/format/orc/orc_adapter.cpp new file mode 100644 index 0000000..b1dd16c --- /dev/null +++ b/src/paimon/format/orc/orc_adapter.cpp @@ -0,0 +1,1580 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Adapted from Apache Arrow +// https://github.com/apache/arrow/blob/main/cpp/src/arrow/adapters/orc/util.cc + +#include "paimon/format/orc/orc_adapter.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/array/array_base.h" +#include "arrow/array/array_decimal.h" +#include "arrow/array/array_nested.h" +#include "arrow/array/builder_base.h" +#include "arrow/array/builder_binary.h" +#include "arrow/array/builder_decimal.h" +#include "arrow/array/builder_primitive.h" +#include "arrow/array/data.h" +#include "arrow/array/util.h" +#include "arrow/buffer_builder.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "arrow/type_fwd.h" +#include "arrow/type_traits.h" +#include "arrow/util/bitmap_ops.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/decimal.h" +#include "arrow/util/key_value_metadata.h" +#include "arrow/util/range.h" +#include "arrow/visit_data_inline.h" +#include "fmt/format.h" +#include "orc/Int128.hh" +#include "orc/MemoryPool.hh" +#include "orc/Type.hh" +#include "orc/Vector.hh" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/date_time_utils.h" +#include "paimon/data/timestamp.h" + +namespace paimon::orc { +namespace { +template +class OrcBackedArrowBuffer : public arrow::ResizableBuffer { + public: + explicit OrcBackedArrowBuffer(::orc::DataBuffer&& buffer) + : ResizableBuffer(reinterpret_cast(buffer.data()), buffer.size() * sizeof(T)), + orc_buffer_(std::move(buffer)) {} + + arrow::Status Resize(const int64_t new_size, bool shrink_to_fit) override { + try { + // arrow buffer size is equal to orc buffer size * sizeof(T) + orc_buffer_.resize(new_size / sizeof(T)); + } catch (const std::exception& e) { + return arrow::Status::Invalid( + fmt::format("OrcBackedArrowBuffer resize failed for {}", e.what())); + } + arrow::ResizableBuffer::data_ = reinterpret_cast(orc_buffer_.data()); + arrow::ResizableBuffer::size_ = orc_buffer_.size() * sizeof(T); + arrow::ResizableBuffer::capacity_ = orc_buffer_.capacity() * sizeof(T); + return arrow::Status::OK(); + } + + arrow::Status Reserve(const int64_t new_capacity) override { + try { + // arrow buffer capacity is equal to orc buffer capacity * sizeof(T) + orc_buffer_.reserve(new_capacity / sizeof(T)); + } catch (const std::exception& e) { + return arrow::Status::Invalid( + fmt::format("OrcBackedArrowBuffer reserve failed for {}", e.what())); + } + arrow::ResizableBuffer::data_ = reinterpret_cast(orc_buffer_.data()); + arrow::ResizableBuffer::capacity_ = orc_buffer_.capacity() * sizeof(T); + return arrow::Status::OK(); + } + + private: + ::orc::DataBuffer orc_buffer_; +}; + +class EmptyBuilder : public arrow::ArrayBuilder { + public: + using arrow::ArrayBuilder::SetNotNull; + explicit EmptyBuilder(arrow::MemoryPool* pool) : arrow::ArrayBuilder(pool) {} + arrow::Status AppendNulls(int64_t length) override { + return arrow::Status::NotImplemented("AppendNulls is not implemented"); + } + + arrow::Status AppendNull() override { + return arrow::Status::NotImplemented("AppendNull is not implemented"); + } + + arrow::Status AppendEmptyValue() override { + return arrow::Status::NotImplemented("AppendEmptyValue is not implemented"); + } + + arrow::Status AppendEmptyValues(int64_t length) override { + return arrow::Status::NotImplemented("AppendEmptyValues is not implemented"); + } + + // when null_count == 0 in array, null_bitmap buffer can be null, thus, there is no need to + // construct null bitmap, use IncreaseLength() to increase the length_ of array + virtual void IncreaseLength(int64_t length) { + length_ += length; + has_nulls_ = false; + } + + protected: + bool has_nulls_ = true; +}; + +class UnPooledBooleanBuilder : public EmptyBuilder { + public: + explicit UnPooledBooleanBuilder(arrow::MemoryPool* pool) + : EmptyBuilder(pool), + type_(arrow::boolean()), + data_builder_(std::make_shared>(pool)) {} + + std::shared_ptr type() const override { + return type_; + } + + arrow::Status SetNulls(const uint8_t* valid_bytes, int64_t length) { + return arrow::ArrayBuilder::AppendToBitmap(valid_bytes, length); + } + + arrow::Status SetData(const uint8_t* data, int64_t length) { + ARROW_RETURN_NOT_OK(data_builder_->Reserve(length)); + data_builder_->UnsafeAppend(data, length); + return arrow::Status::OK(); + } + + arrow::Status FinishInternal(std::shared_ptr* out) override { + std::shared_ptr null_bitmap; + if (has_nulls_) { + ARROW_ASSIGN_OR_RAISE(null_bitmap, null_bitmap_builder_.FinishWithLength(length_)); + } + ARROW_ASSIGN_OR_RAISE(auto data, data_builder_->FinishWithLength(length_)); + *out = arrow::ArrayData::Make(type_, length_, {null_bitmap, data}, null_count_); + Reset(); + return arrow::Status::OK(); + } + + void Reset() override { + arrow::ArrayBuilder::Reset(); + } + + private: + const std::shared_ptr type_; + std::shared_ptr> data_builder_; +}; + +template +class UnPooledPrimitiveBuilder : public arrow::NumericBuilder { + public: + UnPooledPrimitiveBuilder(const std::shared_ptr& type, arrow::MemoryPool* pool) + : arrow::NumericBuilder(type, pool) {} + + void SetData(const std::shared_ptr& data) { + data_ = data; + } + + void IncreaseLength(int64_t length) { + has_nulls_ = false; + this->length_ += length; + } + + arrow::Status SetNulls(const uint8_t* valid_bytes, int64_t length) { + return arrow::ArrayBuilder::AppendToBitmap(valid_bytes, length); + } + + arrow::Status FinishInternal(std::shared_ptr* out) override { + std::shared_ptr null_bitmap; + if (has_nulls_) { + ARROW_RETURN_NOT_OK(this->null_bitmap_builder_.Finish(&null_bitmap)); + } + *out = arrow::ArrayData::Make(this->type_, this->length_, {null_bitmap, data_}, + this->null_count_); + Reset(); + return arrow::Status::OK(); + } + + void Reset() override { + arrow::ArrayBuilder::Reset(); + } + + private: + bool has_nulls_ = true; + std::shared_ptr data_; +}; + +class UnPooledLargeBinaryBuilder : public arrow::LargeBinaryBuilder { + public: + using arrow::ArrayBuilder::SetNotNull; + UnPooledLargeBinaryBuilder(const std::shared_ptr& type, + arrow::MemoryPool* pool) + : arrow::LargeBinaryBuilder(pool), type_(type) {} + + void SetOffsets(const std::shared_ptr& offsets) { + offsets_ = offsets; + } + void SetData(const std::shared_ptr& data) { + data_ = data; + } + void IncreaseLength(int64_t length) { + has_nulls_ = false; + length_ += length; + } + arrow::Status SetNulls(const uint8_t* valid_bytes, int64_t length) { + return arrow::ArrayBuilder::AppendToBitmap(valid_bytes, length); + } + arrow::Status FinishInternal(std::shared_ptr* out) override { + std::shared_ptr null_bitmap; + if (has_nulls_) { + ARROW_RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap)); + } + *out = arrow::ArrayData::Make(type_, length_, {null_bitmap, offsets_, data_}, null_count_); + Reset(); + return arrow::Status::OK(); + } + std::shared_ptr type() const override { + return type_; + } + void Reset() override { + arrow::ArrayBuilder::Reset(); + } + + private: + bool has_nulls_ = true; + std::shared_ptr type_; + std::shared_ptr offsets_; + std::shared_ptr data_; +}; + +class UnPooledBinaryBuilder : public arrow::BinaryBuilder { + public: + using arrow::ArrayBuilder::SetNotNull; + UnPooledBinaryBuilder(const std::shared_ptr& type, arrow::MemoryPool* pool) + : arrow::BinaryBuilder(pool), type_(type) {} + + void SetOffsets(const std::shared_ptr& offsets) { + offsets_ = offsets; + } + + void SetData(const std::shared_ptr& data) { + data_ = data; + } + + void IncreaseLength(int64_t length) { + has_nulls_ = false; + length_ += length; + } + + arrow::Status SetNulls(const uint8_t* valid_bytes, int64_t length) { + return arrow::ArrayBuilder::AppendToBitmap(valid_bytes, length); + } + + arrow::Status FinishInternal(std::shared_ptr* out) override { + std::shared_ptr null_bitmap; + if (has_nulls_) { + ARROW_RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap)); + } + *out = arrow::ArrayData::Make(type_, length_, {null_bitmap, offsets_, data_}, null_count_); + Reset(); + return arrow::Status::OK(); + } + + std::shared_ptr type() const override { + return type_; + } + + void Reset() override { + arrow::ArrayBuilder::Reset(); + } + + private: + bool has_nulls_ = true; + std::shared_ptr type_; + std::shared_ptr offsets_; + std::shared_ptr data_; +}; + +class UnPooledListBuilder : public EmptyBuilder { + public: + UnPooledListBuilder(const std::shared_ptr& type, + const std::shared_ptr& value_builder, + arrow::MemoryPool* pool) + : EmptyBuilder(pool), type_(type), value_builder_(value_builder) {} + + std::shared_ptr type() const override { + return arrow::list(value_builder_->type()); + } + + arrow::Status SetNulls(const uint8_t* valid_bytes, int64_t length) { + return arrow::ArrayBuilder::AppendToBitmap(valid_bytes, length); + } + + void SetOffsets(const std::shared_ptr& offsets) { + offsets_ = offsets; + } + + arrow::Status FinishInternal(std::shared_ptr* out) override { + // Offset padding zeroed by BufferBuilder + std::shared_ptr null_bitmap; + if (has_nulls_) { + ARROW_RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap)); + } + std::shared_ptr items; + ARROW_RETURN_NOT_OK(value_builder_->FinishInternal(&items)); + + *out = + arrow::ArrayData::Make(type(), length_, {null_bitmap, offsets_}, {items}, null_count_); + Reset(); + return arrow::Status::OK(); + } + + void Reset() override { + arrow::ArrayBuilder::Reset(); + } + + private: + std::shared_ptr type_; + std::shared_ptr offsets_; + std::shared_ptr value_builder_; +}; +class UnPooledStructBuilder : public EmptyBuilder { + public: + UnPooledStructBuilder(const std::shared_ptr& type, + const std::vector>& field_builders, + arrow::MemoryPool* pool) + : EmptyBuilder(pool), type_(type), children_(field_builders) {} + + std::shared_ptr type() const override { + arrow::FieldVector fields; + fields.reserve(children_.size()); + for (size_t i = 0; i < children_.size(); i++) { + fields.emplace_back(arrow::field(type_->field(i)->name(), children_[i]->type())); + } + return arrow::struct_(fields); + } + + arrow::Status SetNulls(const uint8_t* valid_bytes, int64_t length) { + return arrow::ArrayBuilder::AppendToBitmap(valid_bytes, length); + } + + arrow::Status FinishInternal(std::shared_ptr* out) override { + std::shared_ptr null_bitmap; + if (has_nulls_) { + ARROW_RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap)); + } + std::vector> child_data(children_.size()); + for (size_t i = 0; i < children_.size(); i++) { + ARROW_RETURN_NOT_OK(children_[i]->FinishInternal(&child_data[i])); + } + + *out = arrow::ArrayData::Make(type(), length_, {null_bitmap}, null_count_); + (*out)->child_data = std::move(child_data); + Reset(); + return arrow::Status::OK(); + } + + void Reset() override { + arrow::ArrayBuilder::Reset(); + } + + private: + std::shared_ptr type_; + std::vector> children_; +}; + +class UnPooledMapBuilder : public EmptyBuilder { + public: + UnPooledMapBuilder(const std::shared_ptr& type, + const std::shared_ptr& struct_builder, + arrow::MemoryPool* pool) + : EmptyBuilder(pool), type_(type) { + list_builder_ = std::make_shared(arrow::list(struct_builder->type()), + struct_builder, pool); + } + + std::shared_ptr type() const override { + auto map_type = arrow::internal::checked_cast(type_.get()); + auto list_type = + arrow::internal::checked_pointer_cast(list_builder_->type()); + return std::make_shared(arrow::field("entries", list_type->value_type()), + map_type->keys_sorted()); + } + + arrow::Status SetNulls(const uint8_t* valid_bytes, int64_t length) { + return list_builder_->SetNulls(valid_bytes, length); + } + + void IncreaseLength(int64_t length) override { + list_builder_->IncreaseLength(length); + } + + void SetOffsets(const std::shared_ptr& offsets) { + list_builder_->SetOffsets(offsets); + } + + arrow::Status FinishInternal(std::shared_ptr* out) override { + ARROW_RETURN_NOT_OK(list_builder_->FinishInternal(out)); + (*out)->type = type(); + Reset(); + return arrow::Status::OK(); + } + + void Reset() override { + arrow::ArrayBuilder::Reset(); + } + + private: + std::shared_ptr type_; + std::shared_ptr list_builder_; +}; + +template +Status BuilderSetNulls(::orc::ColumnVectorBatch* column_vector_batch, BuilderType* builder) { + if (column_vector_batch->hasNulls) { + PAIMON_RETURN_NOT_OK_FROM_ARROW( + builder->SetNulls(reinterpret_cast(column_vector_batch->notNull.data()), + column_vector_batch->numElements)); + } else { + builder->IncreaseLength(column_vector_batch->numElements); + } + return Status::OK(); +} + +class OrcStringDictionary : public arrow::ArrayData { + public: + OrcStringDictionary(const std::shared_ptr& data, + const std::shared_ptr<::orc::StringDictionary>& orc_dictionary) + : arrow::ArrayData(*data), orc_dictionary_(orc_dictionary) {} + + private: + const std::shared_ptr<::orc::StringDictionary> orc_dictionary_; +}; + +class UnPooledStringDictionaryBuilder : public EmptyBuilder { + public: + explicit UnPooledStringDictionaryBuilder(arrow::MemoryPool* pool) + : EmptyBuilder(pool), pool_(pool) {} + + arrow::Status SetDictionary(const std::shared_ptr<::orc::StringDictionary>& orc_dictionary) { + auto dict_builder = + std::make_shared(arrow::large_utf8(), pool_); + // We do not transfer buffer ownership to arrow here. The memory still + // owned by dictionary which is hold in a shared_ptr. + const auto& dict_offset = orc_dictionary->dictionaryOffset; + const auto& dict_data = orc_dictionary->dictionaryBlob; + auto offsets = + std::make_shared(reinterpret_cast(dict_offset.data()), + dict_offset.size() * sizeof(int64_t)); + auto data = std::make_shared( + reinterpret_cast(dict_data.data()), dict_data.size()); + dict_builder->SetData(data); + dict_builder->SetOffsets(offsets); + dict_builder->IncreaseLength(dict_offset.size() - 1); + std::shared_ptr dictionary; + ARROW_RETURN_NOT_OK(dict_builder->Finish(&dictionary)); + dictionary_ = std::make_shared(dictionary->data(), orc_dictionary); + return arrow::Status::OK(); + } + + void SetIndices(const std::shared_ptr& indices) { + indices_ = indices; + } + arrow::Status SetNulls(const uint8_t* valid_bytes, int64_t length) { + return arrow::ArrayBuilder::AppendToBitmap(valid_bytes, length); + } + arrow::Status FinishInternal(std::shared_ptr* out) override { + std::shared_ptr null_bitmap; + if (has_nulls_) { + ARROW_RETURN_NOT_OK(null_bitmap_builder_.Finish(&null_bitmap)); + } + *out = arrow::ArrayData::Make(type(), length_, {null_bitmap, indices_}, null_count_); + (*out)->dictionary = dictionary_; + Reset(); + return arrow::Status::OK(); + } + std::shared_ptr type() const override { + static std::shared_ptr dict_type = + arrow::dictionary(arrow::int64(), arrow::large_utf8()); + return dict_type; + } + void Reset() override { + arrow::ArrayBuilder::Reset(); + } + + private: + arrow::MemoryPool* pool_; + std::shared_ptr indices_; + std::shared_ptr dictionary_; +}; + +Status CheckOutOfBounds(const ::orc::StringVectorBatch* string_vector_batch) { + int64_t total_length = 0; + bool has_nulls = string_vector_batch->hasNulls; + for (uint64_t i = 0; i < string_vector_batch->numElements; ++i) { + if (has_nulls && !string_vector_batch->notNull[i]) { + continue; + } + if (string_vector_batch->length[i] > std::numeric_limits::max()) { + return Status::Invalid(fmt::format("index {} with length {} is out-of-bounds", i, + string_vector_batch->length[i])); + } + total_length += string_vector_batch->length[i]; + if (total_length > std::numeric_limits::max()) { + return Status::Invalid("total length is out-of-bounds"); + } + } + return Status::OK(); +} + +Result> PrepareOffsetsBufferForString( + const ::orc::StringVectorBatch* string_vector_batch, arrow::MemoryPool* pool) { + // TODO(liancheng.lsz): length for StringVectorBatch in orc is int64, offset for string in arrow + // array is int32, we only support int32 for now. large_string & large_binary will be supported + // in the future. + PAIMON_RETURN_NOT_OK(CheckOutOfBounds(string_vector_batch)); + int64_t buffer_size = (string_vector_batch->numElements + 1) * sizeof(int32_t); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr data_buffer, + arrow::AllocateBuffer(buffer_size, pool)); + auto* offsets_data = data_buffer->mutable_data_as(); + int32_t cur_offset = 0; + bool has_nulls = string_vector_batch->hasNulls; + for (size_t i = 0; i < string_vector_batch->numElements; i++) { + int32_t tmp_length = + (!has_nulls) ? string_vector_batch->length[i] + : (string_vector_batch->notNull[i] ? string_vector_batch->length[i] : 0); + offsets_data[i] = cur_offset; + cur_offset += tmp_length; + } + offsets_data[string_vector_batch->numElements] = cur_offset; + return data_buffer; +} + +Result> DeepCopyDataBufferForString( + const ::orc::StringVectorBatch* string_vector_batch, arrow::MemoryPool* pool) { + int64_t data_size = 0; + bool has_nulls = string_vector_batch->hasNulls; + for (uint64_t i = 0; i < string_vector_batch->numElements; ++i) { + if (!has_nulls || string_vector_batch->notNull[i]) { + data_size += string_vector_batch->length[i]; + } + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr data_buffer, + arrow::AllocateBuffer(data_size, pool)); + char* data = data_buffer->mutable_data_as(); + int32_t dest_offset = 0; + for (uint64_t i = 0; i < string_vector_batch->numElements; ++i) { + if (!has_nulls || string_vector_batch->notNull[i]) { + memcpy(data + dest_offset, string_vector_batch->data[i], + string_vector_batch->length[i]); + dest_offset += string_vector_batch->length[i]; + } + } + return data_buffer; +} + +template +Result> MakeOrcBackedPrimitiveBuilder( + const std::shared_ptr& type, ::orc::ColumnVectorBatch* column_vector_batch, + arrow::MemoryPool* pool) { + auto typed_batch = dynamic_cast(column_vector_batch); + assert(typed_batch); + auto builder = std::make_shared(type, pool); + builder->SetData( + std::make_shared>(std::move(typed_batch->data))); + PAIMON_RETURN_NOT_OK(BuilderSetNulls(column_vector_batch, builder.get())); + return builder; +} + +Result> MakeOrcBackedBooleanBuilder( + ::orc::ColumnVectorBatch* column_vector_batch, arrow::MemoryPool* pool) { + auto typed_batch = dynamic_cast<::orc::ByteVectorBatch*>(column_vector_batch); + assert(typed_batch); + auto builder = std::make_shared(pool); + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->SetData( + reinterpret_cast(typed_batch->data.data()), typed_batch->numElements)); + PAIMON_RETURN_NOT_OK( + BuilderSetNulls(column_vector_batch, builder.get())); + return builder; +} + +Result> MakeOrcBackedDate32Builder( + const std::shared_ptr& type, ::orc::ColumnVectorBatch* column_vector_batch, + arrow::MemoryPool* pool) { + auto typed_batch = dynamic_cast<::orc::LongVectorBatch*>(column_vector_batch); + assert(typed_batch); + auto builder = std::make_shared(type, pool); + + const uint8_t* valid_bytes = typed_batch->hasNulls + ? reinterpret_cast(typed_batch->notNull.data()) + : nullptr; + auto transform_iter = arrow::internal::MakeLazyRange( + [&typed_batch](int64_t index) { + return static_cast(typed_batch->data.data()[index]); + }, + typed_batch->numElements); + + PAIMON_RETURN_NOT_OK_FROM_ARROW( + builder->AppendValues(transform_iter.begin(), transform_iter.end(), valid_bytes)); + return builder; +} + +Result> MakeOrcBackedTimestampBuilder( + const std::shared_ptr& type, ::orc::ColumnVectorBatch* column_vector_batch, + arrow::MemoryPool* pool) { + auto typed_batch = dynamic_cast<::orc::TimestampVectorBatch*>(column_vector_batch); + assert(typed_batch); + auto builder = std::make_shared(type, pool); + + const uint8_t* valid_bytes = typed_batch->hasNulls + ? reinterpret_cast(typed_batch->notNull.data()) + : nullptr; + const int64_t* seconds = typed_batch->data.data(); + const int64_t* nanos = typed_batch->nanoseconds.data(); + auto timestamp_type = arrow::internal::checked_pointer_cast(type); + assert(timestamp_type); + int32_t precision = DateTimeUtils::GetPrecisionFromType(timestamp_type); + // TODO(lisizhuo.lsz): check nano overflow in arrow + if (precision == Timestamp::MIN_PRECISION) { + auto transform_iter = arrow::internal::MakeLazyRange( + [seconds](int64_t index) { return seconds[index]; }, typed_batch->numElements); + PAIMON_RETURN_NOT_OK_FROM_ARROW( + builder->AppendValues(transform_iter.begin(), transform_iter.end(), valid_bytes)); + return builder; + } else if (precision == Timestamp::MILLIS_PRECISION) { + auto transform_iter = arrow::internal::MakeLazyRange( + [seconds, nanos](int64_t index) { + return seconds[index] * + DateTimeUtils::CONVERSION_FACTORS[DateTimeUtils::TimeType::MILLISECOND] + + nanos[index] / + DateTimeUtils::CONVERSION_FACTORS[DateTimeUtils::TimeType::MICROSECOND]; + }, + typed_batch->numElements); + PAIMON_RETURN_NOT_OK_FROM_ARROW( + builder->AppendValues(transform_iter.begin(), transform_iter.end(), valid_bytes)); + return builder; + } else if (precision == Timestamp::DEFAULT_PRECISION) { + auto transform_iter = arrow::internal::MakeLazyRange( + [seconds, nanos](int64_t index) { + return seconds[index] * + DateTimeUtils::CONVERSION_FACTORS[DateTimeUtils::TimeType::MICROSECOND] + + nanos[index] / + DateTimeUtils::CONVERSION_FACTORS[DateTimeUtils::TimeType::MILLISECOND]; + }, + typed_batch->numElements); + PAIMON_RETURN_NOT_OK_FROM_ARROW( + builder->AppendValues(transform_iter.begin(), transform_iter.end(), valid_bytes)); + return builder; + } else if (precision == Timestamp::MAX_PRECISION) { + auto transform_iter = arrow::internal::MakeLazyRange( + [seconds, nanos](int64_t index) { + return seconds[index] * + DateTimeUtils::CONVERSION_FACTORS[DateTimeUtils::TimeType::NANOSECOND] + + nanos[index]; + }, + typed_batch->numElements); + PAIMON_RETURN_NOT_OK_FROM_ARROW( + builder->AppendValues(transform_iter.begin(), transform_iter.end(), valid_bytes)); + return builder; + } + return Status::Invalid(fmt::format("invalid timestamp precision {}", precision)); +} + +Result> MakeOrcBackedDecimal128Builder( + const std::shared_ptr& type, ::orc::ColumnVectorBatch* column_vector_batch, + arrow::MemoryPool* pool) { + auto builder = std::make_shared(type, pool); + const bool has_nulls = column_vector_batch->hasNulls; + auto decimal_type = arrow::internal::checked_cast(type.get()); + assert(decimal_type); + if (decimal_type->precision() == 0 || decimal_type->precision() > 18) { + auto typed_batch = + arrow::internal::checked_cast(column_vector_batch); + for (size_t i = 0; i < typed_batch->numElements; i++) { + if (!has_nulls || typed_batch->notNull[i]) { + int64_t high_bits = typed_batch->values[i].getHighBits(); + uint64_t low_bits = typed_batch->values[i].getLowBits(); + PAIMON_RETURN_NOT_OK_FROM_ARROW( + builder->Append(arrow::Decimal128(high_bits, low_bits))); + } else { + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->AppendNull()); + } + } + } else { + auto typed_batch = + arrow::internal::checked_cast(column_vector_batch); + for (size_t i = 0; i < typed_batch->numElements; i++) { + if (!has_nulls || typed_batch->notNull[i]) { + PAIMON_RETURN_NOT_OK_FROM_ARROW( + builder->Append(arrow::Decimal128(typed_batch->values[i]))); + } else { + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->AppendNull()); + } + } + } + return builder; +} + +Result> MakeOrcBackedBinaryBuilder( + const std::shared_ptr& type, ::orc::ColumnVectorBatch* column_vector_batch, + arrow::MemoryPool* pool) { + assert(column_vector_batch->numElements > 0); + auto typed_batch = dynamic_cast<::orc::StringVectorBatch*>(column_vector_batch); + assert(typed_batch); + auto encoded_batch = dynamic_cast<::orc::EncodedStringVectorBatch*>(typed_batch); + if (typed_batch->blob.size() != 0) { + assert(!typed_batch->isEncoded); + // condition1: orc batch is normal string, return arrow::string + auto data = std::make_shared>(std::move(typed_batch->blob)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr offsets, + PrepareOffsetsBufferForString(typed_batch, pool)); + auto builder = std::make_shared(type, pool); + builder->SetData(data); + builder->SetOffsets(offsets); + PAIMON_RETURN_NOT_OK( + BuilderSetNulls(column_vector_batch, builder.get())); + return builder; + } else if (encoded_batch && encoded_batch->dictionary) { + assert(type->id() == arrow::Type::type::STRING); + // condition2: orc batch is dict batch, return arrow::dict + auto builder = std::make_shared(pool); + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->SetDictionary(encoded_batch->dictionary)); + auto indices = + std::make_shared>(std::move(encoded_batch->index)); + builder->SetIndices(indices); + PAIMON_RETURN_NOT_OK( + BuilderSetNulls(column_vector_batch, builder.get())); + return builder; + } else { + // condition3: disable dict batch in orc, while blob is also invalid, degrade to deep copy + // version + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr data, + DeepCopyDataBufferForString(typed_batch, pool)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr offsets, + PrepareOffsetsBufferForString(typed_batch, pool)); + auto builder = std::make_shared(type, pool); + builder->SetData(data); + builder->SetOffsets(offsets); + PAIMON_RETURN_NOT_OK( + BuilderSetNulls(column_vector_batch, builder.get())); + return builder; + } + return Status::Invalid("convert to arrow array failed, because of invalid string vector batch"); +} + +Result> MakeArrowBuilder( + const std::shared_ptr& type, ::orc::ColumnVectorBatch* column_vector_batch, + arrow::MemoryPool* pool); + +Result> MakeOrcBackedListBuilder( + const std::shared_ptr& type, ::orc::ColumnVectorBatch* column_vector_batch, + arrow::MemoryPool* pool) { + using OffsetType = arrow::ListType::offset_type; + auto typed_batch = dynamic_cast<::orc::ListVectorBatch*>(column_vector_batch); + assert(typed_batch); + auto list_type = arrow::internal::checked_cast(type.get()); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr elements_builder, + MakeArrowBuilder(list_type->value_type(), typed_batch->elements.get(), pool)); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + std::shared_ptr offsets, + arrow::AllocateBuffer(sizeof(OffsetType) * (typed_batch->numElements + 1), pool)); + auto* typed_offsets = offsets->mutable_data_as(); + for (size_t i = 0; i < typed_batch->numElements + 1; i++) { + typed_offsets[i] = static_cast(typed_batch->offsets[i]); + } + auto list_builder = std::make_shared(type, elements_builder, pool); + list_builder->SetOffsets(std::move(offsets)); + PAIMON_RETURN_NOT_OK( + BuilderSetNulls(column_vector_batch, list_builder.get())); + return list_builder; +} + +Result> MakeOrcBackedStructBuilder( + const std::shared_ptr& type, ::orc::ColumnVectorBatch* column_vector_batch, + arrow::MemoryPool* pool) { + auto typed_batch = dynamic_cast<::orc::StructVectorBatch*>(column_vector_batch); + assert(typed_batch); + auto struct_type = arrow::internal::checked_cast(type.get()); + std::vector> children_builders; + children_builders.reserve(typed_batch->fields.size()); + for (size_t i = 0; i < typed_batch->fields.size(); i++) { + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr child_builder, + MakeArrowBuilder(struct_type->field(i)->type(), typed_batch->fields[i], pool)); + children_builders.push_back(child_builder); + } + auto struct_builder = std::make_shared(type, children_builders, pool); + PAIMON_RETURN_NOT_OK( + BuilderSetNulls(column_vector_batch, struct_builder.get())); + return struct_builder; +} + +Result> MakeOrcBackedMapBuilder( + const std::shared_ptr& type, ::orc::ColumnVectorBatch* column_vector_batch, + arrow::MemoryPool* pool) { + using OffsetType = arrow::ListType::offset_type; + auto typed_batch = dynamic_cast<::orc::MapVectorBatch*>(column_vector_batch); + assert(typed_batch); + auto map_type = arrow::internal::checked_cast(type.get()); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr key_builder, + MakeArrowBuilder(map_type->key_type(), typed_batch->keys.get(), pool)); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr item_builder, + MakeArrowBuilder(map_type->item_type(), typed_batch->elements.get(), pool)); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW( + std::shared_ptr offsets, + arrow::AllocateBuffer(sizeof(OffsetType) * (typed_batch->numElements + 1), pool)); + auto* typed_offsets = offsets->mutable_data_as(); + for (size_t i = 0; i < typed_batch->numElements + 1; i++) { + typed_offsets[i] = static_cast(typed_batch->offsets[i]); + } + + std::vector> child_builders = {key_builder, item_builder}; + auto struct_builder = + std::make_shared(map_type->value_type(), child_builders, pool); + // key cannot be null + struct_builder->IncreaseLength(key_builder->length()); + + auto map_builder = std::make_shared(type, struct_builder, pool); + map_builder->SetOffsets(std::move(offsets)); + PAIMON_RETURN_NOT_OK( + BuilderSetNulls(column_vector_batch, map_builder.get())); + return map_builder; +} + +Result> MakeArrowBuilder( + const std::shared_ptr& type, ::orc::ColumnVectorBatch* column_vector_batch, + arrow::MemoryPool* pool) { + if (column_vector_batch->numElements == 0) { + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr builder, + arrow::MakeBuilder(type, pool)); + return builder; + } + arrow::Type::type kind = type->id(); + switch (kind) { + case arrow::Type::type::BOOL: + return MakeOrcBackedBooleanBuilder(column_vector_batch, pool); + case arrow::Type::type::INT8: + return MakeOrcBackedPrimitiveBuilder, + ::orc::ByteVectorBatch, int8_t>( + type, column_vector_batch, pool); + case arrow::Type::type::INT16: + return MakeOrcBackedPrimitiveBuilder, + ::orc::ShortVectorBatch, int16_t>( + type, column_vector_batch, pool); + case arrow::Type::type::INT32: + return MakeOrcBackedPrimitiveBuilder, + ::orc::IntVectorBatch, int32_t>( + type, column_vector_batch, pool); + + case arrow::Type::type::INT64: + return MakeOrcBackedPrimitiveBuilder, + ::orc::LongVectorBatch, int64_t>( + type, column_vector_batch, pool); + case arrow::Type::type::FLOAT: + return MakeOrcBackedPrimitiveBuilder, + ::orc::FloatVectorBatch, float>( + type, column_vector_batch, pool); + case arrow::Type::type::DOUBLE: + return MakeOrcBackedPrimitiveBuilder, + ::orc::DoubleVectorBatch, double>( + type, column_vector_batch, pool); + case arrow::Type::type::BINARY: + case arrow::Type::type::STRING: + return MakeOrcBackedBinaryBuilder(type, column_vector_batch, pool); + case arrow::Type::type::DATE32: + return MakeOrcBackedDate32Builder(type, column_vector_batch, pool); + case arrow::Type::type::TIMESTAMP: + return MakeOrcBackedTimestampBuilder(type, column_vector_batch, pool); + case arrow::Type::type::DECIMAL128: + return MakeOrcBackedDecimal128Builder(type, column_vector_batch, pool); + case arrow::Type::type::STRUCT: + return MakeOrcBackedStructBuilder(type, column_vector_batch, pool); + case arrow::Type::type::LIST: + return MakeOrcBackedListBuilder(type, column_vector_batch, pool); + case arrow::Type::type::MAP: + return MakeOrcBackedMapBuilder(type, column_vector_batch, pool); + default: { + return Status::NotImplemented("Unknown or unsupported Arrow type: ", type->ToString()); + } + } +} + +} // namespace + +Result> OrcAdapter::AppendBatch( + const std::shared_ptr& type, ::orc::ColumnVectorBatch* batch, + arrow::MemoryPool* pool) { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr builder, + MakeArrowBuilder(type, batch, pool)); + std::shared_ptr array; + PAIMON_RETURN_NOT_OK_FROM_ARROW(builder->Finish(&array)); + return array; +} + +namespace { + +arrow::Status WriteBatch(const arrow::Array& array, ::orc::ColumnVectorBatch* column_vector_batch); + +// Make sure children of StructArray have appropriate null. +arrow::Result> NormalizeArray( + const std::shared_ptr& array) { + arrow::Type::type kind = array->type_id(); + switch (kind) { + case arrow::Type::type::STRUCT: { + auto struct_array = arrow::internal::checked_cast(array.get()); + const std::shared_ptr bitmap = struct_array->null_bitmap(); + std::shared_ptr struct_type = struct_array->type(); + std::size_t size = struct_type->fields().size(); + arrow::ArrayVector new_children(size, nullptr); + for (std::size_t i = 0; i < size; i++) { + auto child_length = struct_array->data()->child_data[i]->length; + auto child_offset = struct_array->data()->child_data[i]->offset; + // field function will change length & offset in child data + std::shared_ptr child = struct_array->field(static_cast(i)); + const std::shared_ptr child_bitmap = child->null_bitmap(); + std::shared_ptr final_child_bitmap; + if (child_bitmap == nullptr && bitmap == nullptr) { + final_child_bitmap = nullptr; + } else if (child_bitmap == nullptr) { + final_child_bitmap = bitmap; + } else if (bitmap == nullptr) { + final_child_bitmap = child_bitmap; + } else { + ARROW_ASSIGN_OR_RAISE(final_child_bitmap, + arrow::internal::BitmapAnd( + arrow::default_memory_pool(), bitmap->data(), 0, + child_bitmap->data(), 0, struct_array->length(), 0)); + } + std::shared_ptr child_array_data = child->data(); + std::vector> child_buffers = + child_array_data->buffers; + child_buffers[0] = final_child_bitmap; + // When slicing, we do not know the null count of the sliced range without + // doing some computation. To avoid doing this eagerly, we set the null count + // to -1. + std::shared_ptr new_child_array_data = arrow::ArrayData::Make( + child->type(), child_length, child_buffers, child_array_data->child_data, + child_array_data->dictionary, + /*null_count=*/arrow::kUnknownNullCount, child_offset); + ARROW_ASSIGN_OR_RAISE(new_children[i], + NormalizeArray(arrow::MakeArray(new_child_array_data))); + } + return std::make_shared( + struct_type, struct_array->length(), new_children, bitmap, + struct_array->null_count(), struct_array->offset()); + } + case arrow::Type::type::LIST: { + auto list_array = arrow::internal::checked_cast(array.get()); + ARROW_ASSIGN_OR_RAISE(auto value_array, NormalizeArray(list_array->values())); + return std::make_shared( + list_array->type(), list_array->length(), list_array->value_offsets(), value_array, + list_array->null_bitmap(), list_array->null_count(), list_array->offset()); + } + case arrow::Type::type::MAP: { + auto map_array = arrow::internal::checked_cast(array.get()); + ARROW_ASSIGN_OR_RAISE(auto key_array, NormalizeArray(map_array->keys())); + ARROW_ASSIGN_OR_RAISE(auto item_array, NormalizeArray(map_array->items())); + return std::make_shared( + map_array->type(), map_array->length(), map_array->value_offsets(), key_array, + item_array, map_array->null_bitmap(), map_array->null_count(), map_array->offset()); + } + default: { + return array; + } + } +} + +template +struct Appender {}; + +// Types for long/double-like Appender, that is, numeric, boolean or date32 +template +using is_generic_type = std::integral_constant::value || + std::is_same_v || + arrow::is_boolean_type::value>; +template +using enable_if_generic = arrow::enable_if_t::value, R>; + +// Number-like +template +struct Appender> { + using ArrayType = typename arrow::TypeTraits::ArrayType; + using ValueType = typename arrow::TypeTraits::CType; + arrow::Status VisitNull() { + batch->notNull[running_orc_offset] = false; + running_orc_offset++; + batch->numElements = running_orc_offset; + running_arrow_offset++; + return arrow::Status::OK(); + } + arrow::Status VisitValue(ValueType v) { + batch->data[running_orc_offset] = array.Value(running_arrow_offset); + batch->notNull[running_orc_offset] = true; + running_orc_offset++; + batch->numElements = running_orc_offset; + running_arrow_offset++; + return arrow::Status::OK(); + } + const ArrayType& array; + BatchType* batch; + int64_t running_orc_offset, running_arrow_offset; +}; + +// Binary +template +struct Appender { + using ArrayType = typename arrow::TypeTraits::ArrayType; + using COffsetType = typename arrow::TypeTraits::OffsetType::c_type; + arrow::Status VisitNull() { + batch->notNull[running_orc_offset] = false; + running_orc_offset++; + batch->numElements = running_orc_offset; + running_arrow_offset++; + return arrow::Status::OK(); + } + arrow::Status VisitValue(std::string_view v) { + batch->notNull[running_orc_offset] = true; + COffsetType data_length = 0; + batch->data[running_orc_offset] = reinterpret_cast( + const_cast(array.GetValue(running_arrow_offset, &data_length))); + batch->length[running_orc_offset] = data_length; + running_orc_offset++; + batch->numElements = running_orc_offset; + running_arrow_offset++; + return arrow::Status::OK(); + } + const ArrayType& array; + ::orc::StringVectorBatch* batch; + int64_t running_orc_offset, running_arrow_offset; +}; + +// Decimal +template <> +struct Appender { + arrow::Status VisitNull() { + batch->notNull[running_orc_offset] = false; + running_orc_offset++; + batch->numElements = running_orc_offset; + running_arrow_offset++; + return arrow::Status::OK(); + } + arrow::Status VisitValue(std::string_view v) { + batch->notNull[running_orc_offset] = true; + const arrow::Decimal128 dec_value(array.GetValue(running_arrow_offset)); + batch->values[running_orc_offset] = static_cast(dec_value.low_bits()); + running_orc_offset++; + batch->numElements = running_orc_offset; + running_arrow_offset++; + return arrow::Status::OK(); + } + const arrow::Decimal128Array& array; + ::orc::Decimal64VectorBatch* batch; + int64_t running_orc_offset, running_arrow_offset; +}; + +template <> +struct Appender { + arrow::Status VisitNull() { + batch->notNull[running_orc_offset] = false; + running_orc_offset++; + batch->numElements = running_orc_offset; + running_arrow_offset++; + return arrow::Status::OK(); + } + arrow::Status VisitValue(std::string_view v) { + batch->notNull[running_orc_offset] = true; + const arrow::Decimal128 dec_value(array.GetValue(running_arrow_offset)); + batch->values[running_orc_offset] = + ::orc::Int128(dec_value.high_bits(), dec_value.low_bits()); + running_orc_offset++; + batch->numElements = running_orc_offset; + running_arrow_offset++; + return arrow::Status::OK(); + } + const arrow::Decimal128Array& array; + ::orc::Decimal128VectorBatch* batch; + int64_t running_orc_offset, running_arrow_offset; +}; + +// Timestamp +template +struct TimestampAppender { + using ArrayType = typename arrow::TypeTraits::ArrayType; + arrow::Status VisitNull() { + batch->notNull[running_orc_offset] = false; + running_orc_offset++; + batch->numElements = running_orc_offset; + running_arrow_offset++; + return arrow::Status::OK(); + } + arrow::Status VisitValue(int64_t v) { + int64_t data = array.Value(running_arrow_offset); + batch->notNull[running_orc_offset] = true; + auto [second, nanosecond] = DateTimeUtils::TimestampConverter( + data, time_type, DateTimeUtils::SECOND, DateTimeUtils::NANOSECOND); + batch->data[running_orc_offset] = second; + batch->nanoseconds[running_orc_offset] = nanosecond; + running_orc_offset++; + batch->numElements = running_orc_offset; + running_arrow_offset++; + return arrow::Status::OK(); + } + DateTimeUtils::TimeType time_type = DateTimeUtils::TimeType::MICROSECOND; + const ArrayType& array; + ::orc::TimestampVectorBatch* batch; + int64_t running_orc_offset, running_arrow_offset; +}; + +template +arrow::Status ShallowCopyGenericBatch(const arrow::Array& array, + ::orc::ColumnVectorBatch* column_vector_batch) { + using ArrayType = typename arrow::TypeTraits::ArrayType; + using value_type = typename ArrayType::value_type; + const auto& array_(arrow::internal::checked_cast(array)); + auto batch = arrow::internal::checked_cast(column_vector_batch); + if (array.null_count()) { + batch->hasNulls = true; + } + int64_t length = array_.length(); + for (int64_t i = 0; i < length; i++) { + batch->notNull[i] = array_.IsValid(i) ? static_cast(1) : static_cast(0); + } + auto* raw_values = const_cast(array_.raw_values()); + batch->data.setData(raw_values, sizeof(value_type) * length); + batch->numElements = length; + return arrow::Status::OK(); +} + +// static_cast from int64_t or double to itself shouldn't introduce overhead +// Please see +// https://stackoverflow.com/questions/19106826/ +// can-static-cast-to-same-type-introduce-runtime-overhead +template +arrow::Status WriteGenericBatch(const arrow::Array& array, + ::orc::ColumnVectorBatch* column_vector_batch) { + using ArrayType = typename arrow::TypeTraits::ArrayType; + const auto& array_(arrow::internal::checked_cast(array)); + auto batch = arrow::internal::checked_cast(column_vector_batch); + if (array.null_count()) { + batch->hasNulls = true; + } + Appender appender{array_, batch, /*orc_offset=*/0, 0}; + arrow::ArraySpanVisitor visitor; + ARROW_RETURN_NOT_OK(visitor.Visit(*array_.data(), &appender)); + return arrow::Status::OK(); +} + +template +arrow::Status WriteTimestampBatch(const arrow::Array& array, + ::orc::ColumnVectorBatch* column_vector_batch) { + using ArrayType = typename arrow::TypeTraits::ArrayType; + const auto& array_(arrow::internal::checked_cast(array)); + auto batch = arrow::internal::checked_cast<::orc::TimestampVectorBatch*>(column_vector_batch); + if (array.null_count()) { + batch->hasNulls = true; + } + auto timestamp_type = arrow::internal::checked_pointer_cast(array.type()); + auto time_type = DateTimeUtils::GetTimeTypeFromArrowType(timestamp_type); + TimestampAppender appender{time_type, array_, batch, + /*orc_offset=*/0, 0}; + arrow::ArraySpanVisitor visitor; + ARROW_RETURN_NOT_OK(visitor.Visit(*array_.data(), &appender)); + return arrow::Status::OK(); +} + +arrow::Status WriteStructBatch(const arrow::Array& array, + ::orc::ColumnVectorBatch* column_vector_batch) { + std::shared_ptr array_ = arrow::MakeArray(array.data()); + auto* struct_array = arrow::internal::checked_cast(array_.get()); + assert(struct_array); + auto batch = arrow::internal::checked_cast<::orc::StructVectorBatch*>(column_vector_batch); + std::size_t size = array.type()->fields().size(); + int64_t arrow_length = array.length(); + batch->numElements = arrow_length; + int64_t running_arrow_offset = 0, running_orc_offset = 0; + // First fill fields of ColumnVectorBatch + if (array.null_count()) { + batch->hasNulls = true; + } + for (; running_arrow_offset < arrow_length; running_orc_offset++, running_arrow_offset++) { + batch->notNull[running_orc_offset] = + array.IsValid(running_arrow_offset) ? static_cast(1) : static_cast(0); + } + // Fill the fields + for (std::size_t i = 0; i < size; i++) { + batch->fields[i]->resize(arrow_length); + ARROW_RETURN_NOT_OK( + WriteBatch(*(struct_array->field(static_cast(i))), batch->fields[i])); + } + return arrow::Status::OK(); +} + +template +arrow::Status WriteListBatch(const arrow::Array& array, + ::orc::ColumnVectorBatch* column_vector_batch) { + const auto& list_array(arrow::internal::checked_cast(array)); + auto batch = arrow::internal::checked_cast<::orc::ListVectorBatch*>(column_vector_batch); + ::orc::ColumnVectorBatch* element_batch = (batch->elements).get(); + int64_t arrow_length = array.length(); + batch->numElements = arrow_length; + int64_t running_arrow_offset = 0, running_orc_offset = 0; + if (running_orc_offset == 0) { + batch->offsets[0] = 0; + } + if (array.null_count()) { + batch->hasNulls = true; + } + element_batch->resize(list_array.values()->length()); + ARROW_RETURN_NOT_OK(WriteBatch(*list_array.values(), element_batch)); + // TODO(liancheng.lsz): if array large list, can memcpy offsets + for (; running_arrow_offset < arrow_length; running_orc_offset++, running_arrow_offset++) { + if (array.IsValid(running_arrow_offset)) { + batch->notNull[running_orc_offset] = true; + batch->offsets[running_orc_offset + 1] = + batch->offsets[running_orc_offset] + + list_array.value_offset(running_arrow_offset + 1) - + list_array.value_offset(running_arrow_offset); + } else { + batch->notNull[running_orc_offset] = false; + batch->offsets[running_orc_offset + 1] = batch->offsets[running_orc_offset]; + } + } + return arrow::Status::OK(); +} + +arrow::Status WriteMapBatch(const arrow::Array& array, + ::orc::ColumnVectorBatch* column_vector_batch) { + const auto& map_array(arrow::internal::checked_cast(array)); + auto batch = arrow::internal::checked_cast<::orc::MapVectorBatch*>(column_vector_batch); + ::orc::ColumnVectorBatch* key_batch = (batch->keys).get(); + ::orc::ColumnVectorBatch* element_batch = (batch->elements).get(); + std::shared_ptr key_array = map_array.keys(); + std::shared_ptr element_array = map_array.items(); + int64_t arrow_length = array.length(); + batch->numElements = arrow_length; + int64_t running_arrow_offset = 0, running_orc_offset = 0; + if (running_orc_offset == 0) { + batch->offsets[0] = 0; + } + if (array.null_count()) { + batch->hasNulls = true; + } + key_batch->resize(key_array->length()); + element_batch->resize(element_array->length()); + ARROW_RETURN_NOT_OK(WriteBatch(*key_array, key_batch)); + ARROW_RETURN_NOT_OK(WriteBatch(*element_array, element_batch)); + for (; running_arrow_offset < arrow_length; running_orc_offset++, running_arrow_offset++) { + if (array.IsValid(running_arrow_offset)) { + batch->notNull[running_orc_offset] = true; + batch->offsets[running_orc_offset + 1] = + batch->offsets[running_orc_offset] + + map_array.value_offset(running_arrow_offset + 1) - + map_array.value_offset(running_arrow_offset); + } else { + batch->notNull[running_orc_offset] = false; + batch->offsets[running_orc_offset + 1] = batch->offsets[running_orc_offset]; + } + } + return arrow::Status::OK(); +} + +arrow::Status WriteBatch(const arrow::Array& array, ::orc::ColumnVectorBatch* column_vector_batch) { + arrow::Type::type kind = array.type_id(); + column_vector_batch->numElements = 0; + switch (kind) { + case arrow::Type::type::BOOL: + return WriteGenericBatch( + array, column_vector_batch); + case arrow::Type::type::INT8: + return ShallowCopyGenericBatch( + array, column_vector_batch); + case arrow::Type::type::INT16: + return ShallowCopyGenericBatch( + array, column_vector_batch); + case arrow::Type::type::INT32: + return ShallowCopyGenericBatch( + array, column_vector_batch); + case arrow::Type::type::INT64: + return ShallowCopyGenericBatch( + array, column_vector_batch); + case arrow::Type::type::FLOAT: + return ShallowCopyGenericBatch( + array, column_vector_batch); + case arrow::Type::type::DOUBLE: + return ShallowCopyGenericBatch( + array, column_vector_batch); + case arrow::Type::type::BINARY: + return WriteGenericBatch( + array, column_vector_batch); + case arrow::Type::type::STRING: + return WriteGenericBatch( + array, column_vector_batch); + case arrow::Type::type::DATE32: + return WriteGenericBatch( + array, column_vector_batch); + case arrow::Type::type::TIMESTAMP: + return WriteTimestampBatch(array, column_vector_batch); + case arrow::Type::type::DECIMAL128: { + int32_t precision = + arrow::internal::checked_cast(array.type().get()) + ->precision(); + if (precision > 18 || precision == 0) { + return WriteGenericBatch( + array, column_vector_batch); + } else { + return WriteGenericBatch( + array, column_vector_batch); + } + } + case arrow::Type::type::STRUCT: + return WriteStructBatch(array, column_vector_batch); + case arrow::Type::type::LIST: + return WriteListBatch(array, column_vector_batch); + case arrow::Type::type::MAP: + return WriteMapBatch(array, column_vector_batch); + default: { + return arrow::Status::NotImplemented("Unknown or unsupported Arrow type: ", + array.type()->ToString()); + } + } +} + +void SetAttributes(const std::shared_ptr& field, ::orc::Type* type) { + if (field->HasMetadata()) { + const auto& metadata = field->metadata(); + for (int64_t i = 0; i < metadata->size(); i++) { + type->setAttribute(metadata->key(i), metadata->value(i)); + } + } +} + +arrow::Result> GetOrcType(const arrow::DataType& type) { + arrow::Type::type kind = type.id(); + switch (kind) { + case arrow::Type::type::BOOL: + return ::orc::createPrimitiveType(::orc::TypeKind::BOOLEAN); + case arrow::Type::type::INT8: + return ::orc::createPrimitiveType(::orc::TypeKind::BYTE); + case arrow::Type::type::INT16: + return ::orc::createPrimitiveType(::orc::TypeKind::SHORT); + case arrow::Type::type::INT32: + return ::orc::createPrimitiveType(::orc::TypeKind::INT); + case arrow::Type::type::INT64: + return ::orc::createPrimitiveType(::orc::TypeKind::LONG); + case arrow::Type::type::FLOAT: + return ::orc::createPrimitiveType(::orc::TypeKind::FLOAT); + case arrow::Type::type::DOUBLE: + return ::orc::createPrimitiveType(::orc::TypeKind::DOUBLE); + // Use STRING instead of VARCHAR for now, both use UTF-8 + case arrow::Type::type::STRING: + return ::orc::createPrimitiveType(::orc::TypeKind::STRING); + case arrow::Type::type::BINARY: + return ::orc::createPrimitiveType(::orc::TypeKind::BINARY); + case arrow::Type::type::DATE32: + return ::orc::createPrimitiveType(::orc::TypeKind::DATE); + case arrow::Type::type::TIMESTAMP: { + const auto& timestamp_type = + arrow::internal::checked_cast(type); + if (timestamp_type.timezone().empty()) { + return ::orc::createPrimitiveType(::orc::TypeKind::TIMESTAMP); + } + return ::orc::createPrimitiveType(::orc::TypeKind::TIMESTAMP_INSTANT); + } + case arrow::Type::type::DECIMAL128: { + const auto precision = static_cast( + arrow::internal::checked_cast(type).precision()); + const auto scale = static_cast( + arrow::internal::checked_cast(type).scale()); + return ::orc::createDecimalType(precision, scale); + } + case arrow::Type::type::LIST: { + const auto& value_field = + arrow::internal::checked_cast(type).value_field(); + ARROW_ASSIGN_OR_RAISE(auto orc_subtype, paimon::orc::GetOrcType(*value_field->type())); + SetAttributes(value_field, orc_subtype.get()); + return ::orc::createListType(std::move(orc_subtype)); + } + case arrow::Type::type::STRUCT: { + std::unique_ptr<::orc::Type> out_type = ::orc::createStructType(); + arrow::FieldVector arrow_fields = + arrow::internal::checked_cast(type).fields(); + for (auto& arrow_field : arrow_fields) { + std::string field_name = arrow_field->name(); + ARROW_ASSIGN_OR_RAISE(auto orc_subtype, GetOrcType(*arrow_field->type())); + SetAttributes(arrow_field, orc_subtype.get()); + out_type->addStructField(field_name, std::move(orc_subtype)); + } + return out_type; + } + case arrow::Type::type::MAP: { + const auto& key_field = + arrow::internal::checked_cast(type).key_field(); + const auto& item_field = + arrow::internal::checked_cast(type).item_field(); + ARROW_ASSIGN_OR_RAISE(auto key_orc_type, GetOrcType(*key_field->type())); + ARROW_ASSIGN_OR_RAISE(auto item_orc_type, GetOrcType(*item_field->type())); + SetAttributes(key_field, key_orc_type.get()); + SetAttributes(item_field, item_orc_type.get()); + return ::orc::createMapType(std::move(key_orc_type), std::move(item_orc_type)); + } + default: { + return arrow::Status::NotImplemented("Unknown or unsupported Arrow type: ", + type.ToString()); + } + } +} +} // namespace + +Status OrcAdapter::WriteBatch(const std::shared_ptr& array, + ::orc::ColumnVectorBatch* column_vector_batch) { + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr normal_array, + NormalizeArray(array)); + if (static_cast(normal_array->length()) > column_vector_batch->capacity) { + return Status::Invalid( + fmt::format("need to copy {} rows of arrow array, while orc batch has only {} capacity", + normal_array->length(), column_vector_batch->capacity)); + } + uint64_t num_written_elements = normal_array->length(); + if (num_written_elements > 0) { + PAIMON_RETURN_NOT_OK_FROM_ARROW( + paimon::orc::WriteBatch(*normal_array, column_vector_batch)); + } + column_vector_batch->numElements = num_written_elements; + return Status::OK(); +} + +Result> OrcAdapter::GetArrowType(const ::orc::Type* type) { + // When subselecting fields on read, orc will set some nodes to nullptr, + // so we need to check for nullptr before progressing + if (type == nullptr) { + return arrow::null(); + } + ::orc::TypeKind kind = type->getKind(); + const auto subtype_count = static_cast(type->getSubtypeCount()); + + switch (kind) { + case ::orc::BOOLEAN: + return arrow::boolean(); + case ::orc::BYTE: + return arrow::int8(); + case ::orc::SHORT: + return arrow::int16(); + case ::orc::INT: + return arrow::int32(); + case ::orc::LONG: + return arrow::int64(); + case ::orc::FLOAT: + return arrow::float32(); + case ::orc::DOUBLE: + return arrow::float64(); + case ::orc::CHAR: + case ::orc::VARCHAR: + case ::orc::STRING: + return arrow::utf8(); + case ::orc::BINARY: + return arrow::binary(); + case ::orc::TIMESTAMP: + // Values of TIMESTAMP type are stored in the writer timezone in the Orc file. + // Values are read back in the reader timezone. However, the writer timezone + // information in the Orc stripe footer is optional and may be missing. What is + // more, stripes in the same Orc file may have different writer timezones (though + // unlikely). So we cannot tell the exact timezone of values read back in the + // arrow::TimestampArray. In the adapter implementations, we set both writer and + // reader timezone to UTC to avoid any conversion so users can get the same values + // as written. To get rid of this burden, TIMESTAMP_INSTANT type is always preferred + // over TIMESTAMP type. + // for timestamp type, precision info is missing from orc type + return timestamp(arrow::TimeUnit::NANO); + case ::orc::TIMESTAMP_INSTANT: { + auto timezone = DateTimeUtils::GetLocalTimezoneName(); + return timestamp(arrow::TimeUnit::NANO, timezone); + } + case ::orc::DATE: + return arrow::date32(); + case ::orc::DECIMAL: { + const auto precision = static_cast(type->getPrecision()); + const auto scale = static_cast(type->getScale()); + return arrow::decimal128(precision, scale); + } + case ::orc::LIST: { + if (subtype_count != 1) { + return Status::TypeError("Invalid Orc List type"); + } + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr elem_field, + GetArrowField("item", type->getSubtype(0))); + return list(std::move(elem_field)); + } + case ::orc::MAP: { + if (subtype_count != 2) { + return Status::TypeError("Invalid Orc Map type"); + } + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr key_field, + GetArrowField("key", type->getSubtype(0), /*nullable=*/false)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr value_field, + GetArrowField("value", type->getSubtype(1))); + return std::make_shared(std::move(key_field), std::move(value_field)); + } + case ::orc::STRUCT: { + arrow::FieldVector fields(subtype_count); + for (int32_t child = 0; child < subtype_count; ++child) { + const auto& name = type->getFieldName(child); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr elem_field, + GetArrowField(name, type->getSubtype(child))); + fields[child] = std::move(elem_field); + } + return arrow::struct_(std::move(fields)); + } + default: + return Status::TypeError("Unknown Orc type kind: ", type->toString()); + } +} + +Result> OrcAdapter::GetOrcType(const arrow::Schema& schema) { + int32_t numFields = schema.num_fields(); + std::unique_ptr<::orc::Type> out_type = ::orc::createStructType(); + for (int32_t i = 0; i < numFields; i++) { + const auto& field = schema.field(i); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::unique_ptr<::orc::Type> orc_subtype, + paimon::orc::GetOrcType(*field->type())); + SetAttributes(field, orc_subtype.get()); + out_type->addStructField(field->name(), std::move(orc_subtype)); + } + return out_type; +} + +Result> OrcAdapter::GetFieldMetadata( + const ::orc::Type* type) { + if (type == nullptr) { + return std::shared_ptr(); + } + const auto keys = type->getAttributeKeys(); + if (keys.empty()) { + return std::shared_ptr(); + } + auto metadata = std::make_shared(); + for (const auto& key : keys) { + metadata->Append(key, type->getAttributeValue(key)); + } + return std::const_pointer_cast(metadata); +} + +Result> OrcAdapter::GetArrowField(const std::string& name, + const ::orc::Type* type, + bool nullable) { + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr arrow_type, GetArrowType(type)); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr metadata, + GetFieldMetadata(type)); + return field(name, std::move(arrow_type), nullable, std::move(metadata)); +} + +} // namespace paimon::orc diff --git a/src/paimon/format/orc/orc_adapter.h b/src/paimon/format/orc/orc_adapter.h new file mode 100644 index 0000000..382c6f0 --- /dev/null +++ b/src/paimon/format/orc/orc_adapter.h @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Adapted from Apache Arrow +// https://github.com/apache/arrow/blob/main/cpp/src/arrow/adapters/orc/util.h + +#pragma once + +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/array/array_base.h" +#include "arrow/array/builder_base.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/type.h" +#include "arrow/type_fwd.h" +#include "arrow/util/key_value_metadata.h" +#include "orc/OrcFile.hh" +#include "orc/Type.hh" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace orc { +struct ColumnVectorBatch; +} // namespace orc + +namespace arrow { +class MemoryPool; +} // namespace arrow + +namespace paimon::orc { +class OrcAdapter { + public: + OrcAdapter() = delete; + ~OrcAdapter() = delete; + + static Result> GetArrowType(const ::orc::Type* type); + + static Result> GetOrcType(const arrow::Schema& schema); + + static Result> GetFieldMetadata( + const ::orc::Type* type); + + static Result> GetArrowField(const std::string& name, + const ::orc::Type* type, + bool nullable = true); + + static Result> AppendBatch( + const std::shared_ptr& type, ::orc::ColumnVectorBatch* batch, + arrow::MemoryPool* pool); + + static Status WriteBatch(const std::shared_ptr& array, + ::orc::ColumnVectorBatch* column_vector_batch); +}; +} // namespace paimon::orc diff --git a/src/paimon/format/orc/orc_file_format.h b/src/paimon/format/orc/orc_file_format.h new file mode 100644 index 0000000..d75c67b --- /dev/null +++ b/src/paimon/format/orc/orc_file_format.h @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "arrow/c/bridge.h" +#include "arrow/c/helpers.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/format/file_format.h" +#include "paimon/format/orc/orc_reader_builder.h" +#include "paimon/format/orc/orc_stats_extractor.h" +#include "paimon/format/orc/orc_writer_builder.h" + +namespace paimon::orc { + +class OrcFileFormat : public FileFormat { + public: + explicit OrcFileFormat(const std::map& options) + : identifier_("orc"), options_(options) {} + + const std::string& Identifier() const override { + return identifier_; + } + + Result> CreateReaderBuilder(int32_t batch_size) const override { + return std::make_unique(options_, batch_size); + } + + Result> CreateWriterBuilder(::ArrowSchema* schema, + int32_t batch_size) const override { + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr typed_schema, + arrow::ImportSchema(schema)); + return std::make_unique(typed_schema, batch_size, options_); + } + + Result> CreateStatsExtractor( + ::ArrowSchema* schema) const override { + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr typed_schema, + arrow::ImportSchema(schema)); + return std::make_unique(typed_schema); + } + + private: + std::string identifier_; + std::map options_; +}; +} // namespace paimon::orc diff --git a/src/paimon/format/orc/orc_file_format_factory.cpp b/src/paimon/format/orc/orc_file_format_factory.cpp new file mode 100644 index 0000000..3ce73fb --- /dev/null +++ b/src/paimon/format/orc/orc_file_format_factory.cpp @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/orc/orc_file_format_factory.h" + +#include + +#include "paimon/factories/factory.h" +#include "paimon/format/orc/orc_file_format.h" + +namespace paimon::orc { +const char OrcFileFormatFactory::IDENTIFIER[] = "orc"; + +Result> OrcFileFormatFactory::Create( + const std::map& options) const { + return std::make_unique(options); +} + +REGISTER_PAIMON_FACTORY(OrcFileFormatFactory); + +} // namespace paimon::orc diff --git a/src/paimon/format/orc/orc_file_format_factory.h b/src/paimon/format/orc/orc_file_format_factory.h new file mode 100644 index 0000000..945df97 --- /dev/null +++ b/src/paimon/format/orc/orc_file_format_factory.h @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/format/file_format.h" +#include "paimon/format/file_format_factory.h" +#include "paimon/result.h" + +namespace paimon::orc { + +class OrcFileFormatFactory : public FileFormatFactory { + public: + static const char IDENTIFIER[]; + + const char* Identifier() const override { + return IDENTIFIER; + } + + Result> Create( + const std::map& options) const override; +}; + +} // namespace paimon::orc diff --git a/src/paimon/format/orc/orc_format_defs.h b/src/paimon/format/orc/orc_format_defs.h new file mode 100644 index 0000000..4a03dc1 --- /dev/null +++ b/src/paimon/format/orc/orc_format_defs.h @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace paimon::orc { +// write options +static inline const char ORC_STRIPE_SIZE[] = "orc.stripe.size"; +static constexpr size_t DEFAULT_STRIPE_SIZE = 64 * 1024 * 1024; + +static inline const char ORC_ROW_INDEX_STRIDE[] = "orc.row.index.stride"; +static constexpr size_t DEFAULT_ROW_INDEX_STRIDE = 10000; + +static inline const char ORC_COMPRESSION_BLOCK_SIZE[] = "orc.compression.block-size"; +static constexpr size_t DEFAULT_COMPRESSION_BLOCK_SIZE = 64 * 1024; + +static inline const char ORC_DICTIONARY_KEY_SIZE_THRESHOLD[] = "orc.dictionary-key-size-threshold"; +static constexpr double DEFAULT_DICTIONARY_KEY_SIZE_THRESHOLD = 0.8; +// default value of ORC_WRITE_ENABLE_METRICS is false +static inline const char ORC_WRITE_ENABLE_METRICS[] = "orc.write.enable-metrics"; +// default value of ORC_TIMESTAMP_LTZ_LEGACY_TYPE is true. This option is used to be compatible with +// the paimon-orc's old behavior for the `timestamp_ltz` data type. Details at +// https://github.com/apache/paimon/issues/5066. +static inline const char ORC_TIMESTAMP_LTZ_LEGACY_TYPE[] = "orc.timestamp-ltz.legacy.type"; + +// read options +// default value of ORC_READ_ENABLE_LAZY_DECODING is false +static inline const char ORC_READ_ENABLE_LAZY_DECODING[] = "orc.read.enable-lazy-decoding"; +static inline const char ORC_NATURAL_READ_SIZE[] = "orc.read.natural-read-size"; +static constexpr uint64_t DEFAULT_NATURAL_READ_SIZE = 1024 * 1024; +// default value of ORC_READ_ENABLE_METRICS is false +static inline const char ORC_READ_ENABLE_METRICS[] = "orc.read.enable-metrics"; + +static constexpr uint64_t MIN_ROW_GROUP_COUNT_IN_ONE_NATURAL_READ = 1; +static inline const char ENABLE_PREFETCH_READ_SIZE_THRESHOLD[] = + "orc.read.enable-prefetch-read-size-threshold"; +// Prefetching will not be enabled if the total amount of data queried is below this threshold, as +// prefetching for very small data sets is not beneficial. +static constexpr uint64_t DEFAULT_ENABLE_PREFETCH_READ_SIZE_THRESHOLD = 10ull * 1024 * 1024; + +} // namespace paimon::orc diff --git a/src/paimon/format/orc/orc_input_output_stream_test.cpp b/src/paimon/format/orc/orc_input_output_stream_test.cpp new file mode 100644 index 0000000..1caa5f1 --- /dev/null +++ b/src/paimon/format/orc/orc_input_output_stream_test.cpp @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "gtest/gtest.h" +#include "orc/MemoryPool.hh" +#include "orc/OrcFile.hh" +#include "orc/Reader.hh" +#include "orc/Type.hh" +#include "orc/Vector.hh" +#include "orc/Writer.hh" +#include "paimon/format/orc/orc_format_defs.h" +#include "paimon/format/orc/orc_input_stream_impl.h" +#include "paimon/format/orc/orc_output_stream_impl.h" +#include "paimon/fs/file_system.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/result.h" +#include "paimon/status.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::orc::test { +TEST(OrcInputOutputStreamTest, TestInOutStream) { + auto test_root_dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(test_root_dir); + std::string test_root = test_root_dir->Str(); + std::shared_ptr file_system = std::make_shared(); + ASSERT_OK(file_system->Mkdirs(test_root)); + std::string file_name = test_root + "/test.orc"; + + // out stream + ASSERT_OK_AND_ASSIGN(std::shared_ptr out, + file_system->Create(file_name, /*overwrite=*/true)); + ASSERT_OK_AND_ASSIGN(std::unique_ptr out_stream, + OrcOutputStreamImpl::Create(out)); + ASSERT_EQ(out_stream->getName(), file_name); + ASSERT_EQ(out_stream->getNaturalWriteSize(), 128 * 1024); + ASSERT_EQ(out_stream->getLength(), 0); + + std::string data = "hello"; + out_stream->write(data.data(), data.length()); + // noted that OrcOutputStreamImpl::close() api do nothing + ASSERT_OK(out_stream->output_stream_->Close()); + + // in stream + ASSERT_OK_AND_ASSIGN(std::shared_ptr in, file_system->Open(file_name)); + ASSERT_OK_AND_ASSIGN(auto in_stream, OrcInputStreamImpl::Create(in, DEFAULT_NATURAL_READ_SIZE)); + + char ret[10]; + in_stream->read(ret, data.length(), /*offset=*/0); + ASSERT_EQ(data, std::string(ret, data.length())); +} + +TEST(OrcInputOutputStreamTest, TestSimple) { + auto test_root_dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(test_root_dir); + std::string test_root = test_root_dir->Str(); + std::shared_ptr file_system = std::make_shared(); + ASSERT_OK(file_system->Mkdirs(test_root)); + std::string file_name = test_root + "/test.orc"; + + // write process + ASSERT_OK_AND_ASSIGN(std::shared_ptr out, + file_system->Create(file_name, /*overwrite=*/true)); + ASSERT_OK_AND_ASSIGN(std::unique_ptr out_stream, + OrcOutputStreamImpl::Create(out)); + ASSERT_EQ(out_stream->getName(), file_name); + ASSERT_EQ(out_stream->getNaturalWriteSize(), 128 * 1024); + ASSERT_EQ(out_stream->getLength(), 0); + + std::string orc_schema = "struct"; + std::unique_ptr<::orc::Type> type = ::orc::Type::buildTypeFromString(orc_schema); + ::orc::WriterOptions writer_options; + std::unique_ptr<::orc::Writer> writer = + ::orc::createWriter(*type, out_stream.get(), writer_options); + size_t batch_size = 11; + std::unique_ptr<::orc::ColumnVectorBatch> batch = writer->createRowBatch(batch_size); + auto* struct_batch = dynamic_cast<::orc::StructVectorBatch*>(batch.get()); + ASSERT_TRUE(struct_batch); + auto* int_batch = dynamic_cast<::orc::LongVectorBatch*>(struct_batch->fields[0]); + ASSERT_TRUE(int_batch); + auto* double_batch = dynamic_cast<::orc::DoubleVectorBatch*>(struct_batch->fields[1]); + ASSERT_TRUE(double_batch); + auto* string_batch = dynamic_cast<::orc::StringVectorBatch*>(struct_batch->fields[2]); + ASSERT_TRUE(string_batch); + + std::vector> raw_data; + raw_data.reserve(batch_size); + for (size_t i = 0; i < batch_size; i++) { + raw_data.emplace_back(i, 0.1 + i, "str_" + std::to_string(i)); + } + for (size_t i = 0; i < batch_size; i++) { + const auto& [ivalue, dvalue, svalue] = raw_data[i]; + int_batch->data[i] = ivalue; + double_batch->data[i] = dvalue; + string_batch->data[i] = const_cast(svalue.c_str()); + string_batch->length[i] = static_cast(svalue.length()); + } + struct_batch->numElements = batch_size; + struct_batch->fields[0]->numElements = batch_size; + struct_batch->fields[1]->numElements = batch_size; + struct_batch->fields[2]->numElements = batch_size; + writer->add(*batch); + + writer->close(); + ASSERT_OK(out->Close()); + ASSERT_TRUE(file_system->Exists(file_name).value()); + + // read process + ASSERT_OK_AND_ASSIGN(std::shared_ptr input_stream, file_system->Open(file_name)); + ASSERT_OK_AND_ASSIGN(auto in_stream, + OrcInputStreamImpl::Create(input_stream, DEFAULT_NATURAL_READ_SIZE)); + auto length = file_system->GetFileStatus(file_name).value()->GetLen(); + ASSERT_EQ(in_stream->getName(), file_name); + ASSERT_EQ(in_stream->getLength(), length); + ASSERT_EQ(in_stream->getNaturalReadSize(), 1024 * 1024); + + ::orc::ReaderOptions reader_options; + std::unique_ptr<::orc::Reader> reader = + ::orc::createReader(std::move(in_stream), reader_options); + ASSERT_EQ("struct", reader->getType().toString()); + ::orc::RowReaderOptions options; + // read with reversed field sequence + options.include(std::list({"Col1", "Col2", "Col3"})); + std::unique_ptr<::orc::RowReader> row_reader = reader->createRowReader(options); + auto sub_type_count = row_reader->getSelectedType().getSubtypeCount(); + ASSERT_EQ(3, sub_type_count); + std::list expect_selected_type = {"Col3", "Col2", "Col1"}; + std::list result_selected_type; + for (size_t i = 0; i < sub_type_count; i++) { + result_selected_type.push_back(row_reader->getSelectedType().getFieldName(i)); + } + ASSERT_EQ(expect_selected_type, result_selected_type); + + batch_size = 10; + batch = row_reader->createRowBatch(batch_size); + ASSERT_TRUE(row_reader->next(*batch)); + ASSERT_EQ(10, batch->numElements); + + struct_batch = dynamic_cast<::orc::StructVectorBatch*>(batch.get()); + ASSERT_TRUE(struct_batch); + int_batch = dynamic_cast<::orc::LongVectorBatch*>(struct_batch->fields[0]); + ASSERT_TRUE(int_batch); + double_batch = dynamic_cast<::orc::DoubleVectorBatch*>(struct_batch->fields[1]); + ASSERT_TRUE(double_batch); + string_batch = dynamic_cast<::orc::StringVectorBatch*>(struct_batch->fields[2]); + ASSERT_TRUE(string_batch); + + for (size_t i = 0; i < batch_size; i++) { + const auto& [ivalue, dvalue, svalue] = raw_data[i]; + ASSERT_EQ(ivalue, int_batch->data[i]); + ASSERT_NEAR(dvalue, double_batch->data[i], 0.001); + ASSERT_EQ(svalue, std::string(string_batch->data[i], string_batch->length[i])); + } +} +} // namespace paimon::orc::test diff --git a/src/paimon/format/orc/orc_input_stream_impl.cpp b/src/paimon/format/orc/orc_input_stream_impl.cpp new file mode 100644 index 0000000..bc5dec2 --- /dev/null +++ b/src/paimon/format/orc/orc_input_stream_impl.cpp @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/orc/orc_input_stream_impl.h" + +#include +#include + +#include "fmt/format.h" +#include "orc/Exceptions.hh" +#include "orc/Reader.hh" +#include "paimon/fs/file_system.h" +#include "paimon/status.h" + +namespace paimon::orc { +Result> OrcInputStreamImpl::Create( + const std::shared_ptr& input_stream, uint64_t natural_read_size) { + PAIMON_ASSIGN_OR_RAISE(std::string name, input_stream->GetUri()); + PAIMON_ASSIGN_OR_RAISE(uint64_t length, input_stream->Length()); + return std::unique_ptr( + new OrcInputStreamImpl(input_stream, name, length, natural_read_size)); +} + +OrcInputStreamImpl::OrcInputStreamImpl(const std::shared_ptr& input_stream, + const std::string& name, uint64_t length, + uint64_t natural_read_size) + : input_stream_(input_stream), + uri_name_(name), + length_(length), + natural_read_size_(natural_read_size) {} + +OrcInputStreamImpl::~OrcInputStreamImpl() { + if (input_stream_ != nullptr) { + [[maybe_unused]] auto status = input_stream_->Close(); + } +} + +uint64_t OrcInputStreamImpl::getLength() const { + return length_; +} + +uint64_t OrcInputStreamImpl::getNaturalReadSize() const { + return natural_read_size_; +} + +void OrcInputStreamImpl::read(void* buf, uint64_t length, uint64_t offset) { + if (metrics_) { + metrics_->IOCount.fetch_add(1); + } + + Result read_bytes = input_stream_->Read(static_cast(buf), length, offset); + if (!read_bytes.ok()) { + throw ::orc::ParseError("read failed, status: " + read_bytes.status().ToString()); + } + if (static_cast(read_bytes.value()) != length) { + throw ::orc::ParseError( + fmt::format("read failed, expected length: {}, actual read length: {}", length, + read_bytes.value())); + } +} + +std::future OrcInputStreamImpl::readAsync(void* buf, uint64_t length, uint64_t offset) { + auto promise = std::make_shared>(); + auto future = promise->get_future(); + auto callback = [this, promise, length, offset](const Status& status) mutable { + try { + if (status.ok()) { + read_bytes_.fetch_add(length, std::memory_order_relaxed); + promise->set_value(); + } else { + promise->set_exception(std::make_exception_ptr(::orc::ParseError( + "Async read failed at offset " + std::to_string(offset) + ", length " + + std::to_string(length) + ": " + status.ToString()))); + } + } catch (...) { + promise->set_exception(std::current_exception()); + } + --pending_request_; + }; + + ++pending_request_; + input_stream_->ReadAsync(static_cast(buf), length, offset, std::move(callback)); + + return future; +} + +const std::string& OrcInputStreamImpl::getName() const { + return uri_name_; +} + +} // namespace paimon::orc diff --git a/src/paimon/format/orc/orc_input_stream_impl.h b/src/paimon/format/orc/orc_input_stream_impl.h new file mode 100644 index 0000000..e5d2b83 --- /dev/null +++ b/src/paimon/format/orc/orc_input_stream_impl.h @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "orc/OrcFile.hh" +#include "paimon/fs/file_system.h" +#include "paimon/result.h" + +namespace orc { +struct ReaderMetrics; +} // namespace orc + +namespace paimon { +class InputStream; +} // namespace paimon + +namespace paimon::orc { +class OrcInputStreamImpl : public ::orc::InputStream { + public: + static Result> Create( + const std::shared_ptr& input_stream, uint64_t natural_read_size); + + ~OrcInputStreamImpl() override; + + public: + // in orc, metrics IOCount is accumulated in InputStream + void SetMetrics(::orc::ReaderMetrics* metrics) { + metrics_ = metrics; + } + uint64_t getLength() const override; + uint64_t getNaturalReadSize() const override; + void read(void* buf, uint64_t length, uint64_t offset) override; + std::future readAsync(void* buf, uint64_t length, uint64_t offset) override; + const std::string& getName() const override; + + private: + OrcInputStreamImpl(const std::shared_ptr& input_stream, + const std::string& name, uint64_t length, uint64_t natural_read_size); + + private: + std::atomic read_bytes_ = {0}; + std::atomic pending_request_ = {0}; + std::shared_ptr input_stream_; + const std::string uri_name_; + const uint64_t length_; + const uint64_t natural_read_size_; + ::orc::ReaderMetrics* metrics_ = nullptr; +}; +} // namespace paimon::orc diff --git a/src/paimon/format/orc/orc_memory_pool.h b/src/paimon/format/orc/orc_memory_pool.h new file mode 100644 index 0000000..d2d7182 --- /dev/null +++ b/src/paimon/format/orc/orc_memory_pool.h @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "orc/MemoryPool.hh" +#include "paimon/common/utils/concurrent_hash_map.h" +#include "paimon/memory/memory_pool.h" + +namespace paimon::orc { + +class OrcMemoryPool : public ::orc::MemoryPool { + public: + using SizeType = uint64_t; + explicit OrcMemoryPool(const std::shared_ptr& pool) : pool_(pool) {} + char* malloc(SizeType size) override { + if (size == 0) { + return ZERO_SIZE_AREA; + } + if (size > std::numeric_limits::max() - HEADER_SIZE) { + return nullptr; + } + if (void* ret = pool_->Malloc(size + HEADER_SIZE)) { + *reinterpret_cast(ret) = size; + return reinterpret_cast(ret) + HEADER_SIZE; + } + return nullptr; + } + void free(char* p) override { + if (p == nullptr || p == ZERO_SIZE_AREA) { + return; + } + char* raw = p - HEADER_SIZE; + SizeType size = *reinterpret_cast(raw); + pool_->Free(raw, size + HEADER_SIZE); + } + + private: + static constexpr size_t ALIGNMENT = 64; + static constexpr size_t HEADER_SIZE = (sizeof(SizeType) + ALIGNMENT - 1) & ~(ALIGNMENT - 1); + alignas(ALIGNMENT) inline static char ZERO_SIZE_AREA[1]; + + std::shared_ptr pool_; +}; + +} // namespace paimon::orc diff --git a/src/paimon/format/orc/orc_metrics.h b/src/paimon/format/orc/orc_metrics.h new file mode 100644 index 0000000..f95cd45 --- /dev/null +++ b/src/paimon/format/orc/orc_metrics.h @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace paimon::orc { + +class OrcMetrics { + public: + // write + static inline const char WRITE_IO_COUNT[] = "orc.write.io.count"; + + // read + static inline const char READ_INCLUSIVE_LATENCY_US[] = "orc.read.inclusive.latency.us"; + static inline const char READ_IO_COUNT[] = "orc.read.io.count"; +}; + +} // namespace paimon::orc diff --git a/src/paimon/format/orc/orc_output_stream_impl.cpp b/src/paimon/format/orc/orc_output_stream_impl.cpp new file mode 100644 index 0000000..1168e45 --- /dev/null +++ b/src/paimon/format/orc/orc_output_stream_impl.cpp @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/orc/orc_output_stream_impl.h" + +#include +#include +#include + +#include "fmt/format.h" +#include "paimon/fs/file_system.h" +#include "paimon/status.h" + +namespace paimon::orc { +Result> OrcOutputStreamImpl::Create( + const std::shared_ptr& output_stream) { + PAIMON_ASSIGN_OR_RAISE(std::string name, output_stream->GetUri()); + return std::unique_ptr(new OrcOutputStreamImpl(output_stream, name)); +} + +OrcOutputStreamImpl::OrcOutputStreamImpl(const std::shared_ptr& output_stream, + const std::string& name) + : output_stream_(output_stream), file_name_(name) { + assert(output_stream_); +} + +uint64_t OrcOutputStreamImpl::getLength() const { + Result pos = output_stream_->GetPos(); + if (!pos.ok()) { + throw std::runtime_error(fmt::format("get length failed, file name {}, error msg {}", + file_name_, pos.status().ToString())); + } else { + return pos.value(); + } +} + +void OrcOutputStreamImpl::write(const void* buf, size_t length) { + Result write_len = output_stream_->Write(static_cast(buf), length); + if (!write_len.ok()) { + throw std::runtime_error("write failed, status: " + write_len.status().ToString()); + } + if (static_cast(write_len.value()) != length) { + throw std::runtime_error( + fmt::format("write failed, expected length: {}, actual write length: {}", length, + write_len.value())); + } +} + +void OrcOutputStreamImpl::close() { + // output stream close is called by paimon single file writer, no need to close here +} + +} // namespace paimon::orc diff --git a/src/paimon/format/orc/orc_output_stream_impl.h b/src/paimon/format/orc/orc_output_stream_impl.h new file mode 100644 index 0000000..c79a08b --- /dev/null +++ b/src/paimon/format/orc/orc_output_stream_impl.h @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "orc/OrcFile.hh" +#include "paimon/fs/file_system.h" +#include "paimon/result.h" + +namespace paimon { +class OutputStream; +} // namespace paimon + +namespace paimon::orc { +class OrcOutputStreamImpl : public ::orc::OutputStream { + public: + static Result> Create( + const std::shared_ptr& output_stream); + + ~OrcOutputStreamImpl() override = default; + + uint64_t getLength() const override; + uint64_t getNaturalWriteSize() const override { + return ORC_NATURAL_WRITE_SIZE; + } + void write(const void* buf, size_t length) override; + + const std::string& getName() const override { + return file_name_; + } + void close() override; + + private: + OrcOutputStreamImpl(const std::shared_ptr& output_stream, + const std::string& name); + + private: + static constexpr uint64_t ORC_NATURAL_WRITE_SIZE = 128 * 1024; + + std::shared_ptr output_stream_; + std::string file_name_; +}; +} // namespace paimon::orc diff --git a/src/paimon/format/orc/orc_reader_builder.h b/src/paimon/format/orc/orc_reader_builder.h new file mode 100644 index 0000000..3fd88d8 --- /dev/null +++ b/src/paimon/format/orc/orc_reader_builder.h @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "paimon/common/utils/options_utils.h" +#include "paimon/format/orc/orc_file_batch_reader.h" +#include "paimon/format/orc/orc_format_defs.h" +#include "paimon/format/orc/orc_input_stream_impl.h" +#include "paimon/format/reader_builder.h" +#include "paimon/fs/file_system.h" +namespace paimon::orc { +class OrcReaderBuilder : public ReaderBuilder { + public: + OrcReaderBuilder(const std::map& options, int32_t batch_size) + : batch_size_(batch_size), pool_(GetDefaultPool()), options_(options) {} + + ReaderBuilder* WithMemoryPool(const std::shared_ptr& pool) override { + pool_ = pool; + return this; + } + + Result> Build( + const std::shared_ptr& path) const override { + PAIMON_ASSIGN_OR_RAISE(uint64_t natural_read_size, + OptionsUtils::GetValueFromMap( + options_, ORC_NATURAL_READ_SIZE, DEFAULT_NATURAL_READ_SIZE)); + if (natural_read_size == 0) { + return Status::Invalid("natural read size should be greater than zero"); + } + + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr input_stream, + OrcInputStreamImpl::Create(path, natural_read_size)); + return OrcFileBatchReader::Create(std::move(input_stream), pool_, options_, batch_size_); + } + + Result> Build(const std::string& path) const override { + return Status::Invalid("do not support build reader with path in orc format"); + } + + private: + int32_t batch_size_ = -1; + std::shared_ptr pool_; + std::map options_; +}; +} // namespace paimon::orc diff --git a/src/paimon/format/orc/orc_writer_builder.h b/src/paimon/format/orc/orc_writer_builder.h new file mode 100644 index 0000000..32f7bcd --- /dev/null +++ b/src/paimon/format/orc/orc_writer_builder.h @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "paimon/format/orc/orc_format_writer.h" +#include "paimon/format/orc/orc_output_stream_impl.h" +#include "paimon/format/writer_builder.h" + +namespace paimon::orc { + +class OrcWriterBuilder : public WriterBuilder { + public: + OrcWriterBuilder(const std::shared_ptr& schema, int32_t batch_size, + const std::map& options) + : batch_size_(batch_size), pool_(GetDefaultPool()), schema_(schema), options_(options) { + assert(schema); + } + + WriterBuilder* WithMemoryPool(const std::shared_ptr& pool) override { + pool_ = pool; + return this; + } + + Result> Build(const std::shared_ptr& out, + const std::string& compression) override { + assert(out); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr output_stream, + OrcOutputStreamImpl::Create(out)); + // each format writer has its own memory pool while shares other options + return OrcFormatWriter::Create(std::move(output_stream), *schema_, options_, compression, + batch_size_, pool_); + } + + private: + int32_t batch_size_ = -1; + std::shared_ptr pool_; + std::shared_ptr schema_; + std::map options_; +}; +} // namespace paimon::orc