diff --git a/src/paimon/format/orc/orc_file_batch_reader.cpp b/src/paimon/format/orc/orc_file_batch_reader.cpp new file mode 100644 index 0000000..ed25327 --- /dev/null +++ b/src/paimon/format/orc/orc_file_batch_reader.cpp @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/orc/orc_file_batch_reader.h" + +#include +#include +#include +#include +#include +#include + +#include "arrow/c/bridge.h" +#include "fmt/format.h" +#include "orc/OrcFile.hh" +#include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/options_utils.h" +#include "paimon/common/utils/scope_guard.h" +#include "paimon/core/schema/arrow_schema_validator.h" +#include "paimon/format/orc/orc_adapter.h" +#include "paimon/format/orc/orc_format_defs.h" +#include "paimon/format/orc/orc_input_stream_impl.h" +#include "paimon/format/orc/orc_memory_pool.h" +#include "paimon/format/orc/orc_metrics.h" +#include "paimon/format/orc/predicate_converter.h" + +namespace paimon::orc { + +OrcFileBatchReader::OrcFileBatchReader(std::unique_ptr<::orc::ReaderMetrics>&& reader_metrics, + std::unique_ptr&& reader, + const std::map& options, + const std::shared_ptr& arrow_pool, + const std::shared_ptr<::orc::MemoryPool>& orc_pool) + : options_(options), + arrow_pool_(arrow_pool), + orc_pool_(orc_pool), + reader_metrics_(std::move(reader_metrics)), + reader_(std::move(reader)), + metrics_(std::make_shared()) {} + +Result> OrcFileBatchReader::Create( + std::unique_ptr<::orc::InputStream>&& input_stream, const std::shared_ptr& pool, + const std::map& options, int32_t batch_size) { + assert(input_stream); + std::string file_name = input_stream->getName(); + try { + ::orc::ReaderOptions reader_options; + if (pool == nullptr) { + return Status::Invalid("memory pool is nullptr"); + } + uint64_t natural_read_size = input_stream->getNaturalReadSize(); + auto orc_pool = std::make_shared(pool); + std::shared_ptr arrow_pool = GetArrowPool(pool); + reader_options.setMemoryPool(*orc_pool); + + std::unique_ptr<::orc::ReaderMetrics> reader_metrics; + PAIMON_ASSIGN_OR_RAISE( + bool read_enable_metrics, + OptionsUtils::GetValueFromMap(options, ORC_READ_ENABLE_METRICS, false)); + if (read_enable_metrics) { + reader_metrics = std::make_unique<::orc::ReaderMetrics>(); + reader_options.setReaderMetrics(reader_metrics.get()); + auto orc_input_stream = dynamic_cast(input_stream.get()); + if (orc_input_stream) { + orc_input_stream->SetMetrics(reader_metrics.get()); + } + } + std::unique_ptr<::orc::Reader> reader = + ::orc::createReader(std::move(input_stream), reader_options); + + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr reader_wrapper, + OrcReaderWrapper::Create(std::move(reader), file_name, batch_size, natural_read_size, + options, arrow_pool, orc_pool)); + auto orc_file_batch_reader = std::unique_ptr(new OrcFileBatchReader( + std::move(reader_metrics), std::move(reader_wrapper), options, arrow_pool, orc_pool)); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<::ArrowSchema> file_schema, + orc_file_batch_reader->GetFileSchema()); + PAIMON_RETURN_NOT_OK(orc_file_batch_reader->SetReadSchema( + file_schema.get(), /*predicate=*/nullptr, /*selection_bitmap=*/std::nullopt)); + return orc_file_batch_reader; + } catch (const std::exception& e) { + return Status::Invalid(fmt::format( + "create orc file batch reader failed for file {}, with {} error", file_name, e.what())); + } catch (...) { + return Status::UnknownError(fmt::format( + "create orc file batch reader failed for file {}, with unknown error", file_name)); + } +} + +Result> OrcFileBatchReader::GetFileSchema() const { + assert(reader_); + const auto& orc_file_type = reader_->GetOrcType(); + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr arrow_file_type, + OrcAdapter::GetArrowType(&orc_file_type)); + auto c_schema = std::make_unique<::ArrowSchema>(); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportType(*arrow_file_type, c_schema.get())); + return c_schema; +} + +Status OrcFileBatchReader::SetReadSchema(::ArrowSchema* read_schema, + const std::shared_ptr& predicate, + const std::optional& selection_bitmap) { + if (!read_schema) { + return Status::Invalid("SetReadSchema failed: read schema cannot be nullptr"); + } + if (selection_bitmap) { + // TODO(liancheng.lsz): support bitmap + } + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr arrow_schema, + arrow::ImportSchema(read_schema)); + if (ArrowSchemaValidator::ContainTimestampWithTimezone( + *arrow::struct_(arrow_schema->fields()))) { + PAIMON_ASSIGN_OR_RAISE(bool ltz_legacy, OptionsUtils::GetValueFromMap( + options_, ORC_TIMESTAMP_LTZ_LEGACY_TYPE, true)); + if (ltz_legacy) { + return Status::Invalid( + "invalid config, do not support reading timestamp with timezone in legacy format " + "for orc"); + } + } + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<::orc::Type> orc_target_type, + OrcAdapter::GetOrcType(*arrow_schema)); + const auto& orc_src_type = reader_->GetOrcType(); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<::orc::SearchArgument> search_arg, + PredicateConverter::Convert(orc_src_type, predicate)); + auto target_type = arrow::struct_(arrow_schema->fields()); + std::vector target_column_ids; + PAIMON_ASSIGN_OR_RAISE( + ::orc::RowReaderOptions row_reader_options, + CreateRowReaderOptions(&orc_src_type, orc_target_type.get(), std::move(search_arg), + options_, &target_column_ids)); + + target_column_ids_ = target_column_ids; + PAIMON_RETURN_NOT_OK(reader_->SetReadSchema(target_type, row_reader_options)); + return Status::OK(); +} + +Status OrcFileBatchReader::SeekToRow(uint64_t row_number) { + return reader_->SeekToRow(row_number); +} + +Result>> OrcFileBatchReader::PreBufferRange() { + return reader_->PreBufferRange(target_column_ids_); +} + +Result OrcFileBatchReader::NextBatch() { + return reader_->Next(); +} + +std::shared_ptr OrcFileBatchReader::GetReaderMetrics() const { + if (reader_metrics_) { + metrics_->SetCounter(OrcMetrics::READ_INCLUSIVE_LATENCY_US, + reader_metrics_->ReaderInclusiveLatencyUs); + metrics_->SetCounter(OrcMetrics::READ_IO_COUNT, reader_metrics_->IOCount); + } + return metrics_; +} + +Result> OrcFileBatchReader::GetAndCheckIncludedFields( + const ::orc::Type* src_type, const ::orc::Type* target_type, + std::vector* target_column_ids) { + std::list include_fields; + std::unordered_map src_type_map; + for (uint64_t i = 0; i < src_type->getSubtypeCount(); i++) { + src_type_map[src_type->getFieldName(i)] = src_type->getSubtype(i); + } + int64_t prev_target_field_col_id = -1; + for (uint64_t i = 0; i < target_type->getSubtypeCount(); i++) { + auto& field_name = target_type->getFieldName(i); + auto iter = src_type_map.find(field_name); + if (iter == src_type_map.end()) { + return Status::Invalid( + fmt::format("field {} not in file schema {}", field_name, src_type->toString())); + } + // Noted that: do not support recall partial fields in nested type + if (iter->second->toString() != target_type->getSubtype(i)->toString()) { + return Status::Invalid( + fmt::format("target_type {} not match src_type {}, mismatch field name {}", + target_type->toString(), src_type->toString(), field_name)); + } + int64_t target_field_col_id = iter->second->getColumnId(); + GetSubColumnIds(iter->second, target_column_ids); + if (prev_target_field_col_id >= target_field_col_id) { + return Status::Invalid( + "The column id of the target field should be monotonically increasing in " + "format reader"); + } + prev_target_field_col_id = target_field_col_id; + include_fields.push_back(field_name); + } + return include_fields; +} + +void OrcFileBatchReader::GetSubColumnIds(const ::orc::Type* type, std::vector* col_ids) { + col_ids->push_back(type->getColumnId()); + for (uint64_t i = 0; i < type->getSubtypeCount(); i++) { + GetSubColumnIds(type->getSubtype(i), col_ids); + } +} + +Result<::orc::RowReaderOptions> OrcFileBatchReader::CreateRowReaderOptions( + const ::orc::Type* src_type, const ::orc::Type* target_type, + std::unique_ptr<::orc::SearchArgument>&& search_arg, + const std::map& options, std::vector* target_column_ids) { + PAIMON_ASSIGN_OR_RAISE(std::list include_fields, + GetAndCheckIncludedFields(src_type, target_type, target_column_ids)); + ::orc::RowReaderOptions row_reader_options; + row_reader_options.include(include_fields); + // In order to avoid issue like https://github.com/alibaba/paimon-cpp/issues/42, we explicitly + // set GMT timezone. + row_reader_options.setTimezoneName("GMT"); + row_reader_options.searchArgument(std::move(search_arg)); + + PAIMON_ASSIGN_OR_RAISE( + bool enable_lazy_decoding, + OptionsUtils::GetValueFromMap(options, ORC_READ_ENABLE_LAZY_DECODING, false)); + row_reader_options.setEnableLazyDecoding(enable_lazy_decoding); + + // always use tight numeric vector + row_reader_options.setUseTightNumericVector(true); + + return row_reader_options; +} + +} // namespace paimon::orc diff --git a/src/paimon/format/orc/orc_file_batch_reader.h b/src/paimon/format/orc/orc_file_batch_reader.h new file mode 100644 index 0000000..56e7a57 --- /dev/null +++ b/src/paimon/format/orc/orc_file_batch_reader.h @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include "arrow/c/bridge.h" +#include "arrow/memory_pool.h" +#include "arrow/type.h" +#include "orc/OrcFile.hh" +#include "orc/Reader.hh" +#include "paimon/format/orc/orc_reader_wrapper.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/predicate/predicate.h" +#include "paimon/reader/prefetch_file_batch_reader.h" + +namespace orc { +class InputStream; +} // namespace orc + +namespace paimon::orc { + +class OrcFileBatchReader : public PrefetchFileBatchReader { + public: + ~OrcFileBatchReader() override = default; + static Result> Create( + std::unique_ptr<::orc::InputStream>&& input_stream, const std::shared_ptr& pool, + const std::map& options, int32_t batch_size); + + // For timestamp type, precision info is missing from file + Result> GetFileSchema() const override; + + Status SetReadSchema(::ArrowSchema* read_schema, const std::shared_ptr& predicate, + const std::optional& selection_bitmap) override; + + Status SeekToRow(uint64_t row_number) override; + + Status SetReadRanges(const std::vector>& read_ranges) override { + return reader_->SetReadRanges(read_ranges); + } + + // Important: output ArrowArray is allocated on arrow_pool_ whose lifecycle holds in + // OrcFileBatchReader. Therefore, we need to hold BatchReader when using output ArrowArray. + Result NextBatch() override; + + Result GetPreviousBatchFirstRowNumber() const override { + return reader_->GetRowNumber(); + } + + Result GetNumberOfRows() const override { + return reader_->GetNumberOfRows(); + } + + uint64_t GetNextRowToRead() const override { + return reader_->GetNextRowToRead(); + } + + std::shared_ptr GetReaderMetrics() const override; + + Result>> GenReadRanges( + bool* need_prefetch) const override { + PAIMON_ASSIGN_OR_RAISE(uint64_t num_rows, GetNumberOfRows()); + return reader_->GenReadRanges(target_column_ids_, 0, num_rows, need_prefetch); + } + + void Close() override { + metrics_ = GetReaderMetrics(); + reader_.reset(); + reader_metrics_.reset(); + } + + bool SupportPreciseBitmapSelection() const override { + return false; + } + + Result>> PreBufferRange() override; + + private: + OrcFileBatchReader(std::unique_ptr<::orc::ReaderMetrics>&& reader_metrics, + std::unique_ptr&& reader, + const std::map& options, + const std::shared_ptr& arrow_pool, + const std::shared_ptr<::orc::MemoryPool>& orc_pool); + + static void GetSubColumnIds(const ::orc::Type* type, std::vector* col_ids); + + static Result<::orc::RowReaderOptions> CreateRowReaderOptions( + const ::orc::Type* src_type, const ::orc::Type* target_type, + std::unique_ptr<::orc::SearchArgument>&& search_arg, + const std::map& options, + std::vector* target_column_ids); + + static Result> GetAndCheckIncludedFields( + const ::orc::Type* src_type, const ::orc::Type* target_type, + std::vector* target_column_ids); + + std::map options_; + + std::shared_ptr arrow_pool_; + std::shared_ptr<::orc::MemoryPool> orc_pool_; + + std::unique_ptr<::orc::ReaderMetrics> reader_metrics_; + std::unique_ptr reader_; + std::shared_ptr metrics_; + std::vector target_column_ids_; + std::vector> cache_ranges_; +}; + +} // namespace paimon::orc diff --git a/src/paimon/format/orc/orc_file_batch_reader_test.cpp b/src/paimon/format/orc/orc_file_batch_reader_test.cpp new file mode 100644 index 0000000..afb407b --- /dev/null +++ b/src/paimon/format/orc/orc_file_batch_reader_test.cpp @@ -0,0 +1,750 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/orc/orc_file_batch_reader.h" + +#include +#include +#include +#include +#include +#include + +#include "arrow/c/bridge.h" +#include "arrow/ipc/api.h" +#include "gtest/gtest.h" +#include "paimon/common/types/data_field.h" +#include "paimon/defs.h" +#include "paimon/format/orc/orc_adapter.h" +#include "paimon/format/orc/orc_format_defs.h" +#include "paimon/format/orc/orc_format_writer.h" +#include "paimon/format/orc/orc_input_stream_impl.h" +#include "paimon/format/orc/orc_memory_pool.h" +#include "paimon/format/orc/orc_metrics.h" +#include "paimon/format/orc/orc_output_stream_impl.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/predicate/predicate_builder.h" +#include "paimon/testing/utils/read_result_collector.h" +#include "paimon/testing/utils/testharness.h" +#include "paimon/testing/utils/timezone_guard.h" + +namespace paimon::orc::test { + +struct TestParam { + uint64_t natural_read_size; + bool enable_tz; +}; + +class OrcFileBatchReaderTest : public ::testing::Test, + public ::testing::WithParamInterface { + public: + void SetUp() override { + pool_ = GetDefaultPool(); + batch_size_ = 10; + + arrow::FieldVector fields = { + arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; + + struct_array_ = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ + ["Bob", 10, 0, 12.1], ["Emily", 10, 0, 13.1], ["Tony", 10, 0, 14.1], ["Emily", 10, 0, 15.1], + ["Bob", 10, 0, 12.1], ["Alex", 10, 0, 16.1], ["David", 10, 0, 17.1], ["Lily", 10, 0, 17.1] + ])") + .ValueOrDie()); + } + void TearDown() override {} + + std::pair, std::shared_ptr> + ReadBatchWithCustomizedData(const std::shared_ptr& src_array, + int32_t write_batch_size, int32_t write_stripe_size, + int32_t write_row_index_stride, const arrow::Schema* read_schema, + const std::shared_ptr& predicate, + const std::optional& selection_bitmap, + int32_t read_batch_size, double dict_key_size_threshold, + bool enable_lazy_decoding) const { + arrow::Schema src_schema(src_array->type()->fields()); + EXPECT_OK_AND_ASSIGN(std::unique_ptr<::orc::Type> orc_type, + OrcAdapter::GetOrcType(src_schema)); + EXPECT_TRUE(orc_type); + auto dir = paimon::test::UniqueTestDirectory::Create(); + EXPECT_TRUE(dir); + auto fs = dir->GetFileSystem(); + std::string data_path = dir->Str() + "/test.data"; + EXPECT_OK_AND_ASSIGN(std::shared_ptr output_stream, + fs->Create(data_path, /*overwrite=*/true)); + EXPECT_TRUE(output_stream); + EXPECT_OK_AND_ASSIGN(auto orc_output_stream, OrcOutputStreamImpl::Create(output_stream)); + EXPECT_TRUE(orc_output_stream); + ::orc::WriterOptions writer_options; + writer_options.setDictionaryKeySizeThreshold(dict_key_size_threshold); + if (write_stripe_size != -1) { + writer_options.setStripeSize(write_stripe_size); + } + if (write_row_index_stride != -1) { + writer_options.setRowIndexStride(write_row_index_stride); + } + std::unique_ptr<::orc::Writer> writer = + ::orc::createWriter(*orc_type, orc_output_stream.get(), writer_options); + // for simple case, assume that src_array.length() % write_batch_size == 0 + EXPECT_EQ(src_array->length() % write_batch_size, 0); + int32_t write_batch_count = src_array->length() / write_batch_size; + for (int32_t i = 0; i < write_batch_count; i++) { + auto src_slice = src_array->Slice(i * write_batch_size, write_batch_size); + auto write_batch = writer->createRowBatch(src_slice->length()); + // Convert from arrow array to orc batch + EXPECT_OK(OrcAdapter::WriteBatch(src_slice, write_batch.get())); + writer->add(*write_batch); + } + writer->close(); + EXPECT_OK(output_stream->Close()); + + EXPECT_OK_AND_ASSIGN(std::shared_ptr input_stream, + fs->Open(data_path)); + EXPECT_TRUE(input_stream); + EXPECT_OK_AND_ASSIGN(auto orc_input_stream, + OrcInputStreamImpl::Create(input_stream, DEFAULT_NATURAL_READ_SIZE)); + EXPECT_TRUE(orc_input_stream); + std::string enable_lazy_decoding_str = enable_lazy_decoding ? "true" : "false"; + std::map options = { + {ORC_READ_ENABLE_LAZY_DECODING, enable_lazy_decoding_str}}; + auto orc_batch_reader = + PrepareOrcFileBatchReader(std::move(orc_input_stream), options, read_schema, predicate, + selection_bitmap, read_batch_size); + EXPECT_OK_AND_ASSIGN( + auto result, paimon::test::ReadResultCollector::CollectResult(orc_batch_reader.get())); + return std::make_pair(std::move(orc_batch_reader), result); + } + + std::unique_ptr PrepareOrcFileBatchReader( + const std::string& file_name, const arrow::Schema* read_schema, int32_t batch_size, + uint64_t natural_read_size) const { + std::shared_ptr file_system = std::make_shared(); + EXPECT_OK_AND_ASSIGN(std::shared_ptr input_stream, + file_system->Open(file_name)); + EXPECT_TRUE(input_stream); + EXPECT_OK_AND_ASSIGN(auto in_stream, + OrcInputStreamImpl::Create(input_stream, natural_read_size)); + EXPECT_TRUE(in_stream); + std::map options = {{ORC_READ_ENABLE_LAZY_DECODING, "true"}, + {ORC_READ_ENABLE_METRICS, "true"}, + {"orc.timestamp-ltz.legacy.type", "false"}}; + return PrepareOrcFileBatchReader(std::move(in_stream), options, read_schema, + /*predicate=*/nullptr, /*selection_bitmap=*/std::nullopt, + batch_size); + } + + std::unique_ptr PrepareOrcFileBatchReader( + std::unique_ptr<::orc::InputStream>&& in_stream, + const std::map& options, const arrow::Schema* read_schema, + const std::shared_ptr& predicate, + const std::optional& selection_bitmap, int32_t batch_size) const { + EXPECT_OK_AND_ASSIGN( + auto orc_batch_reader, + OrcFileBatchReader::Create(std::move(in_stream), pool_, options, batch_size)) + EXPECT_TRUE(orc_batch_reader); + std::unique_ptr c_schema = std::make_unique(); + auto arrow_status = arrow::ExportSchema(*read_schema, c_schema.get()); + EXPECT_TRUE(arrow_status.ok()); + EXPECT_OK(orc_batch_reader->SetReadSchema(c_schema.get(), predicate, selection_bitmap)); + return orc_batch_reader; + } + + void WriteArray(const std::shared_ptr& fs, const std::string& file_path, + const std::shared_ptr& src_array, + const std::shared_ptr& arrow_schema, + const std::map& options) const { + ASSERT_OK_AND_ASSIGN(std::shared_ptr out, + fs->Create(file_path, /*overwrite=*/true)); + ASSERT_OK_AND_ASSIGN(std::unique_ptr output_stream, + OrcOutputStreamImpl::Create(out)); + ASSERT_OK_AND_ASSIGN( + auto format_writer, + OrcFormatWriter::Create(std::move(output_stream), *arrow_schema, options, "zstd", + /*batch_size=*/src_array->length(), pool_)); + auto arrow_array = std::make_unique(); + ASSERT_TRUE(arrow::ExportArray(*src_array, arrow_array.get()).ok()); + ASSERT_OK(format_writer->AddBatch(arrow_array.get())); + ASSERT_OK(format_writer->Flush()); + ASSERT_OK(format_writer->Finish()); + ASSERT_OK(out->Flush()); + ASSERT_OK(out->Close()); + } + + private: + std::shared_ptr pool_; + int32_t batch_size_; + std::shared_ptr struct_array_; +}; + +INSTANTIATE_TEST_SUITE_P(TestParam, OrcFileBatchReaderTest, + ::testing::Values(TestParam{128 * 1024, false}, TestParam{16, false}, + TestParam{16, true})); + +TEST_F(OrcFileBatchReaderTest, TestSetReadSchema) { + std::string file_name = paimon::test::GetDataDir() + + "/orc/append_09.db/append_09/f1=10/bucket-1/" + "data-b9e7c41f-66e8-4dad-b25a-e6e1963becc4-0.orc"; + std::shared_ptr file_system = std::make_shared(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr input_stream, file_system->Open(file_name)); + ASSERT_OK_AND_ASSIGN(auto in_stream, + OrcInputStreamImpl::Create(input_stream, DEFAULT_NATURAL_READ_SIZE)); + std::map options = {{ORC_READ_ENABLE_LAZY_DECODING, "true"}}; + ASSERT_OK_AND_ASSIGN( + auto orc_batch_reader, + OrcFileBatchReader::Create(std::move(in_stream), pool_, options, batch_size_)); + // test GetFileSchema() + ASSERT_OK_AND_ASSIGN(auto c_file_schema, orc_batch_reader->GetFileSchema()); + auto arrow_file_schema = arrow::ImportSchema(c_file_schema.get()).ValueOrDie(); + arrow::FieldVector fields = { + arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; + arrow::Schema file_schema(fields); + ASSERT_TRUE(arrow_file_schema->Equals(file_schema)); + // NextBatch() without SetReadSchema(), will return data with + // file schema + ASSERT_OK_AND_ASSIGN(auto result_with_file_schema, + paimon::test::ReadResultCollector::CollectResult(orc_batch_reader.get())); + auto expected_file_chunk_array = std::make_shared(struct_array_); + ASSERT_TRUE(result_with_file_schema->Equals(expected_file_chunk_array)); + // NextBatch() with SetReadSchema(), will return data with read schema + arrow::Schema read_schema({fields[0], fields[2]}); + std::unique_ptr c_read_schema = std::make_unique(); + ASSERT_TRUE(arrow::ExportSchema(read_schema, c_read_schema.get()).ok()); + ASSERT_OK(orc_batch_reader->SetReadSchema(c_read_schema.get(), + /*predicate=*/nullptr, + /*selection_bitmap=*/std::nullopt)); + auto expected_read_array = + arrow::StructArray::Make({struct_array_->field(0), struct_array_->field(2)}, + {fields[0], fields[2]}) + .ValueOrDie(); + auto expected_read_chunk_array = std::make_shared(expected_read_array); + ASSERT_OK_AND_ASSIGN(auto result_with_read_schema, + paimon::test::ReadResultCollector::CollectResult(orc_batch_reader.get())); + ASSERT_TRUE(result_with_read_schema->Equals(expected_read_chunk_array)); + + // NextBatch() with predicate + auto predicate = PredicateBuilder::GreaterThan( + /*field_index=*/3, /*field_name=*/"f2", FieldType::INT, Literal(2)); + c_read_schema = std::make_unique(); + ASSERT_TRUE(arrow::ExportSchema(read_schema, c_read_schema.get()).ok()); + ASSERT_OK(orc_batch_reader->SetReadSchema(c_read_schema.get(), predicate, + /*selection_bitmap=*/std::nullopt)); + ASSERT_OK_AND_ASSIGN(result_with_read_schema, + paimon::test::ReadResultCollector::CollectResult(orc_batch_reader.get())); + ASSERT_FALSE(result_with_read_schema); +} + +TEST_F(OrcFileBatchReaderTest, TestCreateRowReaderOptions) { + auto orc_pool = std::make_shared(pool_); + std::vector target_column_ids; + { + // read all fields && default options + std::map options; + std::string orc_schema = "struct"; + std::unique_ptr<::orc::Type> src_type = ::orc::Type::buildTypeFromString(orc_schema); + std::unique_ptr<::orc::Type> target_type = ::orc::Type::buildTypeFromString(orc_schema); + ASSERT_OK_AND_ASSIGN(auto row_reader_option, + OrcFileBatchReader::CreateRowReaderOptions( + src_type.get(), target_type.get(), + /*search_arg=*/nullptr, options, &target_column_ids)); + ASSERT_EQ(std::list({"col1", "col2", "col3"}), + row_reader_option.getIncludeNames()); + ASSERT_EQ(row_reader_option.getEnableLazyDecoding(), false); + } + { + // read partial fields && set options + std::map options = {{ORC_READ_ENABLE_LAZY_DECODING, "true"}}; + std::string src_orc_schema = "struct"; + std::unique_ptr<::orc::Type> src_type = ::orc::Type::buildTypeFromString(src_orc_schema); + std::string target_orc_schema = "struct"; + std::unique_ptr<::orc::Type> target_type = + ::orc::Type::buildTypeFromString(target_orc_schema); + ASSERT_OK_AND_ASSIGN(auto row_reader_option, + OrcFileBatchReader::CreateRowReaderOptions( + src_type.get(), target_type.get(), + /*search_arg=*/nullptr, options, &target_column_ids)); + ASSERT_EQ(std::list({"col1", "col3"}), row_reader_option.getIncludeNames()); + ASSERT_EQ(row_reader_option.getEnableLazyDecoding(), true); + } + { + // read partial fields, sequence mismatch + std::map options; + std::string src_orc_schema = "struct"; + std::unique_ptr<::orc::Type> src_type = ::orc::Type::buildTypeFromString(src_orc_schema); + std::string target_orc_schema = "struct"; + std::unique_ptr<::orc::Type> target_type = + ::orc::Type::buildTypeFromString(target_orc_schema); + ASSERT_NOK_WITH_MSG( + OrcFileBatchReader::CreateRowReaderOptions(src_type.get(), target_type.get(), + /*search_arg=*/nullptr, options, + &target_column_ids), + "The column id of the target field should be monotonically increasing in format " + "reader"); + } + { + // read non exist column + std::map options; + std::string src_orc_schema = "struct"; + std::unique_ptr<::orc::Type> src_type = ::orc::Type::buildTypeFromString(src_orc_schema); + std::string target_orc_schema = "struct"; + std::unique_ptr<::orc::Type> target_type = + ::orc::Type::buildTypeFromString(target_orc_schema); + ASSERT_NOK_WITH_MSG(OrcFileBatchReader::CreateRowReaderOptions( + src_type.get(), target_type.get(), + /*search_arg=*/nullptr, options, &target_column_ids), + "field non_exist_col not in file schema"); + } + { + std::map options; + std::string src_orc_schema = + "struct>,sub3:int>,col2:double,col3:" + "map>"; + std::unique_ptr<::orc::Type> src_type = ::orc::Type::buildTypeFromString(src_orc_schema); + std::string target_orc_schema = + "struct>,sub3:int>,col3:map>"; + std::unique_ptr<::orc::Type> target_type = + ::orc::Type::buildTypeFromString(target_orc_schema); + ASSERT_OK_AND_ASSIGN(auto row_reader_option, + OrcFileBatchReader::CreateRowReaderOptions( + src_type.get(), target_type.get(), + /*search_arg=*/nullptr, options, &target_column_ids)); + ASSERT_EQ(std::list({"col1", "col3"}), row_reader_option.getIncludeNames()); + } + { + // read with type mismatch + std::map options; + std::string src_orc_schema = + "struct,col2:double,col3:string>"; + std::unique_ptr<::orc::Type> src_type = ::orc::Type::buildTypeFromString(src_orc_schema); + std::string target_orc_schema = + "struct,col2:double,col3:string>"; + std::unique_ptr<::orc::Type> target_type = + ::orc::Type::buildTypeFromString(target_orc_schema); + + ASSERT_NOK_WITH_MSG( + OrcFileBatchReader::CreateRowReaderOptions(src_type.get(), target_type.get(), + /*search_arg=*/nullptr, options, + &target_column_ids), + "target_type " + "struct,col2:double,col3:string> not match " + "src_type struct,col2:double,col3:string>, " + "mismatch field name col1"); + } +} + +TEST_P(OrcFileBatchReaderTest, TestNextBatchSimple) { + std::string file_name = paimon::test::GetDataDir() + + "/orc/append_09.db/append_09/f1=10/bucket-1/" + "data-b9e7c41f-66e8-4dad-b25a-e6e1963becc4-0.orc"; + auto data_type = struct_array_->struct_type(); + arrow::Schema read_schema(data_type->fields()); + auto [natural_read_size, _] = GetParam(); + for (auto batch_size : {1, 2, 3, 5, 8, 10}) { + auto orc_batch_reader = + PrepareOrcFileBatchReader(file_name, &read_schema, batch_size, natural_read_size); + ASSERT_EQ(orc_batch_reader->GetPreviousBatchFirstRowNumber().value(), -1); + ASSERT_OK_AND_ASSIGN(auto result_array, paimon::test::ReadResultCollector::CollectResult( + orc_batch_reader.get())); + ASSERT_EQ(orc_batch_reader->GetPreviousBatchFirstRowNumber().value(), 8); + orc_batch_reader->Close(); + auto expected_array = std::make_shared(struct_array_); + ASSERT_TRUE(result_array->Equals(expected_array)); + // test metrics + auto reader_metrics = orc_batch_reader->GetReaderMetrics(); + ASSERT_OK_AND_ASSIGN(uint64_t io_count, + reader_metrics->GetCounter(OrcMetrics::READ_IO_COUNT)); + ASSERT_GT(io_count, 0); + ASSERT_OK_AND_ASSIGN(uint64_t latency, + reader_metrics->GetCounter(OrcMetrics::READ_INCLUSIVE_LATENCY_US)); + ASSERT_GT(latency, 0); + } +} + +TEST_P(OrcFileBatchReaderTest, TestNextBatchWithTargetSchema) { + std::string file_name = paimon::test::GetDataDir() + + "/orc/append_09.db/append_09/f1=10/bucket-1/" + "data-b9e7c41f-66e8-4dad-b25a-e6e1963becc4-0.orc"; + // read without f2 + auto data_type = struct_array_->struct_type(); + arrow::FieldVector fields = {data_type->GetFieldByName("f0"), data_type->GetFieldByName("f1"), + data_type->GetFieldByName("f3")}; + auto [natural_read_size, _] = GetParam(); + arrow::Schema read_schema(fields); + + auto orc_batch_reader = + PrepareOrcFileBatchReader(file_name, &read_schema, batch_size_, natural_read_size); + ASSERT_OK_AND_ASSIGN(auto result_array, + paimon::test::ReadResultCollector::CollectResult(orc_batch_reader.get())); + auto expected_array = std::make_shared( + arrow::StructArray::Make( + {struct_array_->GetFieldByName("f0"), struct_array_->GetFieldByName("f1"), + struct_array_->GetFieldByName("f3")}, + fields) + .ValueOrDie()); + ASSERT_TRUE(result_array->Equals(expected_array)); +} + +TEST_F(OrcFileBatchReaderTest, TestNextBatchWithOutofOrderTargetSchema) { + std::string file_name = paimon::test::GetDataDir() + + "/orc/append_09.db/append_09/f1=10/bucket-1/" + "data-b9e7c41f-66e8-4dad-b25a-e6e1963becc4-0.orc"; + // file type is f0, f1, f2, f3 + // read with f3, f1, f0 + auto data_type = struct_array_->struct_type(); + arrow::FieldVector fields = {data_type->GetFieldByName("f3"), data_type->GetFieldByName("f1"), + data_type->GetFieldByName("f0")}; + arrow::Schema read_schema(fields); + std::shared_ptr file_system = std::make_shared(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr input_stream, file_system->Open(file_name)); + ASSERT_OK_AND_ASSIGN(auto in_stream, + OrcInputStreamImpl::Create(input_stream, DEFAULT_NATURAL_READ_SIZE)); + std::map options = {{ORC_READ_ENABLE_LAZY_DECODING, "true"}}; + ASSERT_OK_AND_ASSIGN( + auto orc_batch_reader, + OrcFileBatchReader::Create(std::move(in_stream), pool_, options, batch_size_)); + std::unique_ptr c_schema = std::make_unique(); + ASSERT_TRUE(arrow::ExportSchema(read_schema, c_schema.get()).ok()); + ASSERT_NOK_WITH_MSG(orc_batch_reader->SetReadSchema(c_schema.get(), /*predicate=*/nullptr, + /*selection_bitmap=*/std::nullopt), + "The column id of the target field should be " + "monotonically increasing in " + "format reader"); +} + +TEST_P(OrcFileBatchReaderTest, TestNextBatchWithNullValue) { + std::string file_name = paimon::test::GetDataDir() + + "/orc/append_09.db/append_09/f1=20/bucket-0/" + "data-b913a160-a4d1-4084-af2a-18333c35668e-0.orc"; + arrow::FieldVector fields = { + arrow::field("f0", arrow::utf8()), arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int32()), arrow::field("f3", arrow::float64())}; + auto arrow_data_type = arrow::struct_(fields); + arrow::Schema read_schema(fields); + auto [natural_read_size, _] = GetParam(); + auto orc_batch_reader = + PrepareOrcFileBatchReader(file_name, &read_schema, batch_size_, natural_read_size); + ASSERT_OK_AND_ASSIGN(auto result_array, + paimon::test::ReadResultCollector::CollectResult(orc_batch_reader.get())); + std::shared_ptr expected_array; + std::string json = R"([ + ["Paul", 20, 1, null] + ])"; + auto array_status = + arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {json}, &expected_array); + ASSERT_TRUE(array_status.ok()); + ASSERT_TRUE(result_array->Equals(expected_array)); +} + +TEST_F(OrcFileBatchReaderTest, TestNextBatchWithDictionary) { + auto f0 = arrow::field("f0", arrow::list(arrow::utf8())); + auto f1 = arrow::field("f1", arrow::map(arrow::utf8(), arrow::binary())); + auto f2 = arrow::field( + "f2", arrow::struct_({field("sub1", arrow::int64()), field("sub2", arrow::binary()), + field("sub3", arrow::utf8())})); + + arrow::FieldVector fields = {f0, f1, f2}; + auto src_array = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ + [["a", "a", "b"], [["a", "q"], ["b", "w"]], [10, "q", "a"]], + [["a", "c"], [["a", "e"], ["b", "r"], ["c", "e"]], [20, "w", "a"]], + [["a", "d"], [["d", "r"], ["e", "t"]], [null, "e", "b"]], + [["a"], [["a", "q"]], [null, "w", "c"]], + [null, [["a", "w"], ["f", "y"]], [50, "r", null]], + [["a"], null, [60, "r", "b"]], + [["a", "b", "e"], [["a", null], ["b", "w"]], null] + ])") + .ValueOrDie()); + + arrow::FieldVector read_fields = {f0, f1, f2}; + auto read_schema = arrow::schema(read_fields); + auto expected_chunk_array = std::make_shared(src_array); + auto check_result = [&](double dict_key_size_threshold, bool enable_lazy_decoding) { + auto [orc_reader_holder, target_array] = ReadBatchWithCustomizedData( + src_array, /*write_batch_size=*/src_array->length(), /*write_stripe_size=*/-1, + /*write_row_index_stride=*/-1, read_schema.get(), /*predicate=*/nullptr, + /*selection_bitmap=*/std::nullopt, 10, dict_key_size_threshold, enable_lazy_decoding); + ASSERT_TRUE(target_array->Equals(expected_chunk_array)); + ASSERT_TRUE(expected_chunk_array->Equals(target_array)); + }; + // touch all conditions + check_result(0.9, true); + check_result(0.1, true); + check_result(0.9, false); + check_result(0.1, false); +} + +TEST_P(OrcFileBatchReaderTest, TestComplexType) { + std::string file_name = paimon::test::GetDataDir() + + "/orc/append_complex_data.db/append_complex_data/f1=10/bucket-0/" + "data-14a30421-7650-486c-9876-66a1fa4356ff-0.orc"; + arrow::FieldVector fields = {arrow::field("f1", arrow::int32()), + arrow::field("f2", arrow::int32()), + arrow::field("f3", arrow::date32()), + arrow::field("f4", arrow::timestamp(arrow::TimeUnit::NANO)), + arrow::field("f5", arrow::decimal128(23, 5)), + arrow::field("f6", arrow::binary())}; + auto arrow_data_type = arrow::struct_(fields); + arrow::Schema read_schema(fields); + auto [natural_read_size, _] = GetParam(); + auto orc_batch_reader = + PrepareOrcFileBatchReader(file_name, &read_schema, batch_size_, natural_read_size); + ASSERT_OK_AND_ASSIGN(auto result_array, + paimon::test::ReadResultCollector::CollectResult(orc_batch_reader.get())); + std::shared_ptr expected_array; + auto array_status = arrow::ipc::internal::json::ChunkedArrayFromJSON(arrow_data_type, {R"([ + [10, 1, 1234, "2033-05-18 03:33:20.0", "123456789987654321.45678", "add"], + [10, 1, 19909, "2033-05-18 03:33:20.000001001", "12.30000", "cat"], + [10, 1, 0, "2008-12-28 00:00:00.000123456", null, "dad"], + [10, 1, 100, "2008-12-28 00:00:00.00012345", "-123.45000", "eat"], + [10, 1, null, "1899-01-01 00:59:20.001001001", "0.00000", "fat"], + [10, 1, 20006, "2024-10-10 10:10:10.1001001", "1728551410100.10010", null] + ])"}, + &expected_array); + ASSERT_TRUE(array_status.ok()); + ASSERT_TRUE(result_array->Equals(*expected_array)); +} + +TEST_F(OrcFileBatchReaderTest, TestGetFileSchemaWithFieldId) { + auto get_file_schema = [&](const std::string& file_name) -> std::shared_ptr { + std::shared_ptr file_system = std::make_shared(); + EXPECT_OK_AND_ASSIGN(std::shared_ptr input_stream, + file_system->Open(file_name)); + EXPECT_TRUE(input_stream); + EXPECT_OK_AND_ASSIGN(auto in_stream, + OrcInputStreamImpl::Create(input_stream, DEFAULT_NATURAL_READ_SIZE)); + EXPECT_TRUE(in_stream); + std::map options = {{ORC_READ_ENABLE_LAZY_DECODING, "true"}}; + EXPECT_OK_AND_ASSIGN( + auto orc_batch_reader, + OrcFileBatchReader::Create(std::move(in_stream), pool_, options, batch_size_)); + EXPECT_TRUE(orc_batch_reader); + auto c_file_schema = orc_batch_reader->GetFileSchema(); + EXPECT_TRUE(c_file_schema.ok()); + auto arrow_file_schema = arrow::ImportSchema(c_file_schema.value().get()); + EXPECT_TRUE(arrow_file_schema.ok()); + return arrow_file_schema.ValueOrDie(); + }; + { + // test file without field id + std::string file_name = paimon::test::GetDataDir() + + "/orc/append_09.db/append_09/f1=10/bucket-1/" + "data-b9e7c41f-66e8-4dad-b25a-e6e1963becc4-0.orc"; + auto file_schema = get_file_schema(file_name); + ASSERT_NOK(DataField::ConvertArrowSchemaToDataFields(file_schema)); + } + { + // test file with field id, simple schema + std::string file_name = paimon::test::GetDataDir() + + "/orc/append_table_with_alter_table_build_in_fieldid.db/" + "append_table_with_alter_table_build_in_fieldid/key0=0/" + "key1=1/bucket-0/" + "data-35e7027e-b12a-4ebf-ae15-4c0fe8d6a895-0.orc"; + auto file_schema = get_file_schema(file_name); + ASSERT_OK_AND_ASSIGN(auto data_fields, + DataField::ConvertArrowSchemaToDataFields(file_schema)); + std::vector expected_data_fields = { + DataField(0, arrow::field("key0", arrow::int32())), + DataField(1, arrow::field("key1", arrow::int32())), + DataField(2, arrow::field("a", arrow::int32())), + DataField(3, arrow::field("b", arrow::int32())), + DataField(4, arrow::field("c", arrow::int32())), + DataField(5, arrow::field("d", arrow::int32())), + DataField(6, arrow::field("k", arrow::int32()))}; + ASSERT_EQ(data_fields, expected_data_fields); + } + { + // test file with field id, complex schema + std::string file_name = paimon::test::GetDataDir() + + "/orc/append_complex_build_in_fieldid.db/" + "append_complex_build_in_fieldid/bucket-0/" + "data-6dac9052-36d8-4950-8f74-b2bbc082e489-0.orc"; + auto file_schema = get_file_schema(file_name); + ASSERT_OK_AND_ASSIGN(auto data_fields, + DataField::ConvertArrowSchemaToDataFields(file_schema)); + // map type will lose meta info in c <-> c++ bridge (see + // TestBridgeForMapType in OrcAdapterTest + auto map_type = arrow::map(arrow::int8(), arrow::int16()); + auto list_type = arrow::list(DataField::ConvertDataFieldToArrowField( + DataField(536871936, arrow::field("item", arrow::float32())))); + std::vector struct_fields = {DataField(3, arrow::field("f0", arrow::boolean())), + DataField(4, arrow::field("f1", arrow::int64()))}; + auto struct_type = DataField::ConvertDataFieldsToArrowStructType(struct_fields); + std::vector expected_data_fields = { + DataField(0, arrow::field("f1", map_type)), + DataField(1, arrow::field("f2", list_type)), + DataField(2, arrow::field("f3", struct_type)), + DataField(5, arrow::field("f4", arrow::timestamp(arrow::TimeUnit::NANO))), + DataField(6, arrow::field("f5", arrow::date32())), + DataField(7, arrow::field("f6", arrow::decimal128(2, 2)))}; + ASSERT_EQ(data_fields, expected_data_fields); + } +} + +TEST_F(OrcFileBatchReaderTest, TestDictionaryWithMultiStripe) { + arrow::FieldVector fields = {arrow::field("f0", arrow::utf8())}; + auto arrow_type = arrow::struct_(fields); + auto src_array = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow_type, R"([ + ["abc"], + ["abc"], + ["abc"], + ["de"], + ["de"], + ["de"] + ])") + .ValueOrDie()); + auto src_schema = arrow::schema(fields); + // force generate two stripe in one file + auto [orc_reader_holder, target_array] = ReadBatchWithCustomizedData( + src_array, /*write_batch_size=*/3, /*write_stripe_size=*/3, + /*write_row_index_stride=*/3, /*read_schema=*/src_schema.get(), /*predicate=*/nullptr, + /*selection_bitmap=*/std::nullopt, /*read_batch_size=*/10, + /*dict_key_size_threshold=*/0.9, + /*enable_lazy_decoding=*/true); + auto expected_array = arrow::ChunkedArray::Make({src_array}).ValueOrDie(); + ASSERT_TRUE(target_array->Equals(expected_array)); + ASSERT_TRUE(expected_array->Equals(target_array)); +} + +TEST_F(OrcFileBatchReaderTest, TestReadNoField) { + // if only read partition fields, format reader will set empty read schema + std::string file_name = paimon::test::GetDataDir() + + "/orc/append_09.db/append_09/f1=10/bucket-1/" + "data-b9e7c41f-66e8-4dad-b25a-e6e1963becc4-0.orc"; + // read no field + arrow::Schema read_schema({}); + auto orc_batch_reader = PrepareOrcFileBatchReader(file_name, &read_schema, /*batch_size=*/3, + /*natural_read_size=*/10); + // read 3 rows + ASSERT_EQ(orc_batch_reader->GetPreviousBatchFirstRowNumber().value(), -1); + ASSERT_OK_AND_ASSIGN(auto batch1, orc_batch_reader->NextBatch()); + ASSERT_EQ(orc_batch_reader->GetPreviousBatchFirstRowNumber().value(), 0); + // read 3 rows + ASSERT_OK_AND_ASSIGN(auto batch2, orc_batch_reader->NextBatch()); + ASSERT_EQ(orc_batch_reader->GetPreviousBatchFirstRowNumber().value(), 3); + // read 2 rows + ASSERT_OK_AND_ASSIGN(auto batch3, orc_batch_reader->NextBatch()); + ASSERT_EQ(orc_batch_reader->GetPreviousBatchFirstRowNumber().value(), 6); + // read rows with eof + ASSERT_OK_AND_ASSIGN(auto batch4, orc_batch_reader->NextBatch()); + ASSERT_EQ(orc_batch_reader->GetPreviousBatchFirstRowNumber().value(), 8); + ASSERT_TRUE(BatchReader::IsEofBatch(batch4)); + orc_batch_reader->Close(); + + arrow::FieldVector fields; + auto arrow_type = arrow::struct_(fields); + auto expected_array = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow_type, R"([ +[], +[], +[] + ])") + .ValueOrDie()); + + auto check_batch = [](BatchReader::ReadBatch&& result_batch, + const std::shared_ptr& expected_array) { + auto& [c_array, c_schema] = result_batch; + auto result_array = arrow::ImportArray(c_array.get(), c_schema.get()).ValueOr(nullptr); + ASSERT_TRUE(result_array); + ASSERT_TRUE(result_array->Equals(expected_array)); + }; + + check_batch(std::move(batch1), expected_array); + check_batch(std::move(batch2), expected_array); + check_batch(std::move(batch3), expected_array->Slice(0, 2)); +} + +TEST_P(OrcFileBatchReaderTest, TestTimestampType) { + auto [natural_read_size, enable_tz] = GetParam(); + auto timezone = DateTimeUtils::GetLocalTimezoneName(); + std::string timezone_str = enable_tz ? "Asia/Tokyo" : timezone; + paimon::test::TimezoneGuard tz_guard(timezone_str); + arrow::FieldVector fields = { + arrow::field("ts_sec", arrow::timestamp(arrow::TimeUnit::SECOND)), + arrow::field("ts_milli", arrow::timestamp(arrow::TimeUnit::MILLI)), + arrow::field("ts_micro", arrow::timestamp(arrow::TimeUnit::MICRO)), + arrow::field("ts_nano", arrow::timestamp(arrow::TimeUnit::NANO)), + arrow::field("ts_tz_sec", arrow::timestamp(arrow::TimeUnit::SECOND, timezone_str)), + arrow::field("ts_tz_milli", arrow::timestamp(arrow::TimeUnit::MILLI, timezone_str)), + arrow::field("ts_tz_micro", arrow::timestamp(arrow::TimeUnit::MICRO, timezone_str)), + arrow::field("ts_tz_nano", arrow::timestamp(arrow::TimeUnit::NANO, timezone_str)), + }; + + auto array = std::dynamic_pointer_cast( + arrow::ipc::internal::json::ArrayFromJSON(arrow::struct_(fields), R"([ +["1970-01-01 00:00:01", "1970-01-01 00:00:00.001", "1970-01-01 00:00:00.000001", "1970-01-01 00:00:00.000000001", "1970-01-01 00:00:02", "1970-01-01 00:00:00.002", "1970-01-01 00:00:00.000002", "1970-01-01 00:00:00.000000002"], +["1970-01-01 00:00:03", "1970-01-01 00:00:00.003", null, "1970-01-01 00:00:00.000000003", "1970-01-01 00:00:04", "1970-01-01 00:00:00.004", "1970-01-01 00:00:00.000004", "1970-01-01 00:00:00.000000004"], +["1970-01-01 00:00:05", "1970-01-01 00:00:00.005", null, null, "1970-01-01 00:00:06", null, "1970-01-01 00:00:00.000006", null] + ])") + .ValueOrDie()); + auto arrow_schema = arrow::schema(fields); + std::shared_ptr expected_array = + std::make_shared(array); + { + // read data generated by Java Paimon + std::string file_name = paimon::test::GetDataDir() + + "/orc/append_with_multiple_ts_precision_and_timezone.db/" + "append_with_multiple_ts_precision_and_timezone/bucket-0/" + "data-3f58c403-1672-49a3-93c0-d90cfff9bd8a-0.orc"; + auto orc_batch_reader = PrepareOrcFileBatchReader(file_name, arrow_schema.get(), + batch_size_, natural_read_size); + ASSERT_OK_AND_ASSIGN( + std::shared_ptr result_array, + paimon::test::ReadResultCollector::CollectResult(orc_batch_reader.get())); + ASSERT_TRUE(result_array->Equals(*expected_array)) << result_array->ToString() << std::endl + << expected_array->ToString(); + } + { + // read data generated by C++ Paimon + auto dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(dir); + std::string data_path = dir->Str() + "/test.data"; + WriteArray(dir->GetFileSystem(), data_path, array, arrow_schema, + /*options=*/{{"orc.timestamp-ltz.legacy.type", "false"}}); + auto orc_batch_reader = PrepareOrcFileBatchReader(data_path, arrow_schema.get(), + batch_size_, natural_read_size); + // check file schema + ASSERT_OK_AND_ASSIGN(auto c_file_schema, orc_batch_reader->GetFileSchema()); + auto result_file_schema = arrow::ImportSchema(c_file_schema.get()).ValueOr(nullptr); + ASSERT_TRUE(result_file_schema); + + arrow::FieldVector expected_fields = { + arrow::field("ts_sec", arrow::timestamp(arrow::TimeUnit::NANO)), + arrow::field("ts_milli", arrow::timestamp(arrow::TimeUnit::NANO)), + arrow::field("ts_micro", arrow::timestamp(arrow::TimeUnit::NANO)), + arrow::field("ts_nano", arrow::timestamp(arrow::TimeUnit::NANO)), + arrow::field("ts_tz_sec", arrow::timestamp(arrow::TimeUnit::NANO, timezone_str)), + arrow::field("ts_tz_milli", arrow::timestamp(arrow::TimeUnit::NANO, timezone_str)), + arrow::field("ts_tz_micro", arrow::timestamp(arrow::TimeUnit::NANO, timezone_str)), + arrow::field("ts_tz_nano", arrow::timestamp(arrow::TimeUnit::NANO, timezone_str)), + }; + auto expected_file_schema = arrow::schema(expected_fields); + ASSERT_TRUE(result_file_schema->Equals(expected_file_schema)); + + // check array + ASSERT_OK_AND_ASSIGN( + std::shared_ptr result_array, + paimon::test::ReadResultCollector::CollectResult(orc_batch_reader.get())); + ASSERT_TRUE(result_array->Equals(expected_array)) << result_array->ToString(); + } +} + +// TODO(liancheng.lsz): TestBitmapPushDownWithMultiRowGroups, TestPredicateAndBitmapPushDown +// TODO(liancheng.lsz): TestGenReadRanges +} // namespace paimon::orc::test diff --git a/src/paimon/format/orc/orc_format_writer.cpp b/src/paimon/format/orc/orc_format_writer.cpp new file mode 100644 index 0000000..870d755 --- /dev/null +++ b/src/paimon/format/orc/orc_format_writer.cpp @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/orc/orc_format_writer.h" + +#include +#include +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/array/array_base.h" +#include "arrow/c/bridge.h" +#include "fmt/format.h" +#include "orc/Common.hh" +#include "orc/OrcFile.hh" +#include "orc/Type.hh" +#include "orc/Vector.hh" +#include "orc/Writer.hh" +#include "paimon/common/metrics/metrics_impl.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/options_utils.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/core/schema/arrow_schema_validator.h" +#include "paimon/format/orc/orc_adapter.h" +#include "paimon/format/orc/orc_format_defs.h" +#include "paimon/format/orc/orc_memory_pool.h" +#include "paimon/format/orc/orc_metrics.h" +#include "paimon/macros.h" + +namespace paimon { +class MemoryPool; +} // namespace paimon +struct ArrowArray; + +namespace paimon::orc { + +OrcFormatWriter::OrcFormatWriter(const std::shared_ptr& orc_memory_pool, + std::unique_ptr<::orc::OutputStream>&& output_stream, + std::unique_ptr<::orc::WriterMetrics>&& writer_metrics, + std::unique_ptr<::orc::Writer>&& writer, + std::unique_ptr<::orc::ColumnVectorBatch>&& orc_batch, + std::unique_ptr<::orc::Type>&& orc_type, + const ::orc::WriterOptions& writer_options, + const std::shared_ptr& data_type) + : orc_memory_pool_(orc_memory_pool), + output_stream_(std::move(output_stream)), + writer_metrics_(std::move(writer_metrics)), + writer_(std::move(writer)), + orc_batch_(std::move(orc_batch)), + orc_type_(std::move(orc_type)), + writer_options_(writer_options), + data_type_(data_type), + metrics_(std::make_shared()) {} + +Result> OrcFormatWriter::Create( + std::unique_ptr<::orc::OutputStream>&& output_stream, const arrow::Schema& schema, + const std::map& options, const std::string& compression, + int32_t batch_size, const std::shared_ptr& pool) { + assert(output_stream); + PAIMON_ASSIGN_OR_RAISE(std::unique_ptr<::orc::Type> orc_type, OrcAdapter::GetOrcType(schema)); + auto data_type = arrow::struct_(schema.fields()); + try { + PAIMON_ASSIGN_OR_RAISE(::orc::WriterOptions writer_options, + PrepareWriterOptions(options, compression, data_type)); + std::shared_ptr orc_memory_pool; + if (pool) { + orc_memory_pool = std::make_shared(pool); + writer_options.setMemoryPool(orc_memory_pool.get()); + } + + std::unique_ptr<::orc::WriterMetrics> writer_metrics; + PAIMON_ASSIGN_OR_RAISE( + bool write_enable_metrics, + OptionsUtils::GetValueFromMap(options, ORC_WRITE_ENABLE_METRICS, false)); + if (write_enable_metrics) { + writer_metrics = std::make_unique<::orc::WriterMetrics>(); + writer_options.setWriterMetrics(writer_metrics.get()); + } + std::unique_ptr<::orc::Writer> writer = + ::orc::createWriter(*orc_type, output_stream.get(), writer_options); + assert(writer); + std::unique_ptr<::orc::ColumnVectorBatch> orc_batch = writer->createRowBatch(batch_size); + return std::unique_ptr(new OrcFormatWriter( + orc_memory_pool, std::move(output_stream), std::move(writer_metrics), std::move(writer), + std::move(orc_batch), std::move(orc_type), writer_options, data_type)); + } catch (const std::exception& e) { + return Status::Invalid( + fmt::format("create orc format writer failed for file {}, with {} error", + output_stream->getName(), e.what())); + } catch (...) { + return Status::UnknownError( + fmt::format("create orc format writer failed for file {}, with unknown error", + output_stream->getName())); + } +} + +Status OrcFormatWriter::ExpandBatch(uint64_t expect_size) { + try { + orc_batch_.reset(writer_->createRowBatch(expect_size).release()); + return Status::OK(); + } catch (const std::exception& e) { + return Status::Invalid( + fmt::format("expand orc batch to {} failed for file {}, with {} error", expect_size, + output_stream_->getName(), e.what())); + } catch (...) { + return Status::UnknownError( + fmt::format("expand orc batch to {} failed for file {}, with unknown error", + expect_size, output_stream_->getName())); + } +} + +Status OrcFormatWriter::AddBatch(ArrowArray* batch) { + assert(batch); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr arrow_array, + arrow::ImportArray(batch, data_type_)); + if (PAIMON_UNLIKELY(static_cast(arrow_array->length()) > orc_batch_->capacity)) { + PAIMON_RETURN_NOT_OK(ExpandBatch(arrow_array->length())); + } + PAIMON_RETURN_NOT_OK(OrcAdapter::WriteBatch(arrow_array, orc_batch_.get())); + assert(orc_batch_->numElements == static_cast(arrow_array->length())); + PAIMON_RETURN_NOT_OK(Flush()); + return Status::OK(); +} + +Status OrcFormatWriter::Flush() { + try { + if (orc_batch_->numElements > 0) { + writer_->add(*orc_batch_); + } + orc_batch_->clear(); + } catch (const std::exception& e) { + return Status::Invalid( + fmt::format("orc format writer flush failed for file {}, with {} error", + output_stream_->getName(), e.what())); + } catch (...) { + return Status::UnknownError( + fmt::format("orc format writer flush failed for file {}, with unknown error", + output_stream_->getName())); + } + return Status::OK(); +} + +Status OrcFormatWriter::Finish() { + PAIMON_RETURN_NOT_OK(Flush()); + try { + metrics_ = GetWriterMetrics(); + orc_batch_.reset(); + writer_->close(); + writer_.reset(); + writer_metrics_.reset(); + } catch (const std::exception& e) { + return Status::Invalid( + fmt::format("orc format writer finish failed for file {}, with {} error", + output_stream_->getName(), e.what())); + } catch (...) { + return Status::UnknownError( + fmt::format("orc format writer finish failed for file {}, with unknown error", + output_stream_->getName())); + } + return Status::OK(); +} + +Result OrcFormatWriter::ReachTargetSize(bool suggested_check, int64_t target_size) const { + if (suggested_check) { + PAIMON_ASSIGN_OR_RAISE(uint64_t length, GetEstimateLength()); + return length >= static_cast(target_size); + } + return false; +} + +Result OrcFormatWriter::GetEstimateLength() const { + try { + return output_stream_->getLength() + writer_options_.getStripeSize(); + } catch (const std::exception& e) { + return Status::Invalid(fmt::format( + "orc format writer get estimated file size failed for file {}, with {} error", + output_stream_->getName(), e.what())); + } catch (...) { + return Status::UnknownError(fmt::format( + "orc format writer get estimated file size failed for file {}, with unknown error", + output_stream_->getName())); + } +} + +std::shared_ptr OrcFormatWriter::GetWriterMetrics() const { + if (writer_metrics_) { + metrics_->SetCounter(OrcMetrics::WRITE_IO_COUNT, writer_metrics_->IOCount); + } + return metrics_; +} + +Result<::orc::WriterOptions> OrcFormatWriter::PrepareWriterOptions( + const std::map& options, const std::string& file_compression, + const std::shared_ptr& data_type) { + if (ArrowSchemaValidator::ContainTimestampWithTimezone(*data_type)) { + PAIMON_ASSIGN_OR_RAISE(bool ltz_legacy, OptionsUtils::GetValueFromMap( + options, ORC_TIMESTAMP_LTZ_LEGACY_TYPE, true)); + if (ltz_legacy) { + return Status::Invalid( + "invalid config, do not support writing timestamp with timezone in legacy format " + "for orc"); + } + } + ::orc::WriterOptions writer_options; + PAIMON_ASSIGN_OR_RAISE(size_t stripe_size, OptionsUtils::GetValueFromMap( + options, ORC_STRIPE_SIZE, DEFAULT_STRIPE_SIZE)); + writer_options.setStripeSize(stripe_size); + PAIMON_ASSIGN_OR_RAISE(::orc::CompressionKind compression, + ToOrcCompressionKind(StringUtils::ToLowerCase(file_compression))); + writer_options.setCompression(compression); + PAIMON_ASSIGN_OR_RAISE(size_t compression_block_size, OptionsUtils::GetValueFromMap( + options, ORC_COMPRESSION_BLOCK_SIZE, + DEFAULT_COMPRESSION_BLOCK_SIZE)); + writer_options.setCompressionBlockSize(compression_block_size); + PAIMON_ASSIGN_OR_RAISE( + double dictionary_key_threshold, + OptionsUtils::GetValueFromMap(options, ORC_DICTIONARY_KEY_SIZE_THRESHOLD, + DEFAULT_DICTIONARY_KEY_SIZE_THRESHOLD)); + writer_options.setDictionaryKeySizeThreshold(dictionary_key_threshold); + // always use tight numeric vector + writer_options.setUseTightNumericVector(true); + PAIMON_ASSIGN_OR_RAISE(size_t row_index_stride, + OptionsUtils::GetValueFromMap(options, ORC_ROW_INDEX_STRIDE, + DEFAULT_ROW_INDEX_STRIDE)); + writer_options.setRowIndexStride(row_index_stride); + // In order to avoid issue like https://github.com/alibaba/paimon-cpp/issues/42, we explicitly + // set GMT timezone. + writer_options.setTimezoneName("GMT"); + return writer_options; +} + +Result<::orc::CompressionKind> OrcFormatWriter::ToOrcCompressionKind( + const std::string& file_compression) { + if (file_compression == "zstd") { + return ::orc::CompressionKind_ZSTD; + } else if (file_compression == "lz4") { + return ::orc::CompressionKind_LZ4; + } else if (file_compression == "snappy") { + return ::orc::CompressionKind_SNAPPY; + } else if (file_compression == "zlib") { + return ::orc::CompressionKind_ZLIB; + } else if (file_compression == "lzo") { + return ::orc::CompressionKind_LZO; + } else if (file_compression == "none") { + return ::orc::CompressionKind_NONE; + } else { + return Status::Invalid("unknown compression " + file_compression); + } +} + +} // namespace paimon::orc diff --git a/src/paimon/format/orc/orc_format_writer.h b/src/paimon/format/orc/orc_format_writer.h new file mode 100644 index 0000000..409e7a8 --- /dev/null +++ b/src/paimon/format/orc/orc_format_writer.h @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "arrow/api.h" +#include "orc/Common.hh" +#include "orc/OrcFile.hh" +#include "orc/Type.hh" +#include "orc/Vector.hh" +#include "orc/Writer.hh" +#include "paimon/format/format_writer.h" +#include "paimon/format/orc/orc_memory_pool.h" +#include "paimon/metrics.h" +#include "paimon/result.h" +#include "paimon/status.h" + +namespace arrow { +class DataType; +class Schema; +} // namespace arrow +namespace paimon { +class MemoryPool; +namespace orc { +class OrcMemoryPool; +} // namespace orc +} // namespace paimon +struct ArrowArray; + +namespace paimon::orc { +/// A `FormatWriter` implementation that writes data in ORC format. +class OrcFormatWriter : public FormatWriter { + public: + static Result> Create( + std::unique_ptr<::orc::OutputStream>&& output_stream, const arrow::Schema& schema, + const std::map& options, const std::string& compression, + int32_t batch_size, const std::shared_ptr& pool); + + Status AddBatch(ArrowArray* batch) override; + + Status Flush() override; + + Status Finish() override; + + Result ReachTargetSize(bool suggested_check, int64_t target_size) const override; + + std::shared_ptr GetWriterMetrics() const override; + + private: + OrcFormatWriter(const std::shared_ptr& orc_memory_pool, + std::unique_ptr<::orc::OutputStream>&& output_stream, + std::unique_ptr<::orc::WriterMetrics>&& writer_metrics, + std::unique_ptr<::orc::Writer>&& writer, + std::unique_ptr<::orc::ColumnVectorBatch>&& orc_batch, + std::unique_ptr<::orc::Type>&& orc_type, + const ::orc::WriterOptions& writer_options, + const std::shared_ptr& data_type); + + Result GetEstimateLength() const; + Status ExpandBatch(uint64_t expect_size); + + static Result<::orc::WriterOptions> PrepareWriterOptions( + const std::map& options, const std::string& file_compression, + const std::shared_ptr& data_type); + static Result<::orc::CompressionKind> ToOrcCompressionKind(const std::string& file_compression); + + private: + std::shared_ptr orc_memory_pool_; + std::unique_ptr<::orc::OutputStream> output_stream_; + std::unique_ptr<::orc::WriterMetrics> writer_metrics_; + std::unique_ptr<::orc::Writer> writer_; + std::unique_ptr<::orc::ColumnVectorBatch> orc_batch_; + std::unique_ptr<::orc::Type> orc_type_; + ::orc::WriterOptions writer_options_; + std::shared_ptr data_type_; + std::shared_ptr metrics_; +}; +} // namespace paimon::orc diff --git a/src/paimon/format/orc/orc_format_writer_test.cpp b/src/paimon/format/orc/orc_format_writer_test.cpp new file mode 100644 index 0000000..5026e95 --- /dev/null +++ b/src/paimon/format/orc/orc_format_writer_test.cpp @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/orc/orc_format_writer.h" + +#include +#include +#include +#include + +#include "arrow/api.h" +#include "arrow/array/builder_binary.h" +#include "arrow/array/builder_nested.h" +#include "arrow/array/builder_primitive.h" +#include "arrow/c/abi.h" +#include "arrow/c/bridge.h" +#include "gtest/gtest.h" +#include "orc/Common.hh" +#include "orc/MemoryPool.hh" +#include "orc/OrcFile.hh" +#include "orc/Reader.hh" +#include "orc/Type.hh" +#include "orc/Vector.hh" +#include "orc/Writer.hh" +#include "paimon/format/orc/orc_format_defs.h" +#include "paimon/format/orc/orc_input_stream_impl.h" +#include "paimon/format/orc/orc_metrics.h" +#include "paimon/format/orc/orc_output_stream_impl.h" +#include "paimon/fs/file_system.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/record_batch.h" +#include "paimon/testing/utils/testharness.h" + +namespace arrow { +class Array; +} // namespace arrow + +namespace paimon::orc::test { + +class OrcFormatWriterTest : public ::testing::Test { + public: + void SetUp() override {} + void TearDown() override {} + + std::pair> PrepareArrowSchema() const { + auto string_field = arrow::field("col1", arrow::utf8()); + auto int_field = arrow::field("col2", arrow::int32()); + auto bool_field = arrow::field("col3", arrow::boolean()); + auto struct_type = arrow::struct_({string_field, int_field, bool_field}); + return std::make_pair( + arrow::Schema(arrow::FieldVector({string_field, int_field, bool_field})), struct_type); + } + + std::shared_ptr PrepareArray(const std::shared_ptr& data_type, + int32_t record_batch_size, + int32_t offset = 0) const { + arrow::StructBuilder struct_builder( + data_type, arrow::default_memory_pool(), + {std::make_shared(), std::make_shared(), + std::make_shared()}); + auto string_builder = static_cast(struct_builder.field_builder(0)); + auto int_builder = static_cast(struct_builder.field_builder(1)); + auto bool_builder = static_cast(struct_builder.field_builder(2)); + for (int32_t i = 0 + offset; i < record_batch_size + offset; ++i) { + EXPECT_TRUE(struct_builder.Append().ok()); + if (i % 2 == 0) { + EXPECT_TRUE(string_builder->AppendNull().ok()); + EXPECT_TRUE(int_builder->AppendNull().ok()); + EXPECT_TRUE(bool_builder->AppendNull().ok()); + } else { + EXPECT_TRUE(string_builder->Append("str_" + std::to_string(i)).ok()); + EXPECT_TRUE(int_builder->Append(i).ok()); + EXPECT_TRUE(bool_builder->Append(true).ok()); + } + } + std::shared_ptr array; + EXPECT_TRUE(struct_builder.Finish(&array).ok()); + return array; + } + + void AddRecordBatchOnce(const std::shared_ptr& format_writer, + const std::shared_ptr& struct_type, + int32_t record_batch_size, int32_t offset) const { + auto array = PrepareArray(struct_type, record_batch_size, offset); + auto arrow_array = std::make_unique(); + ASSERT_TRUE(arrow::ExportArray(*array, arrow_array.get()).ok()); + auto batch = std::make_shared( + /*partition=*/std::map(), /*bucket=*/-1, + /*row_kinds=*/std::vector(), arrow_array.get()); + ASSERT_OK(format_writer->AddBatch(batch->GetData())); + } + + void CheckResult(const std::string& file_name, int32_t batch_size) const { + ASSERT_OK_AND_ASSIGN(std::shared_ptr input_stream, + file_system_->Open(file_name)); + ASSERT_OK_AND_ASSIGN(auto in_stream, + OrcInputStreamImpl::Create(input_stream, DEFAULT_NATURAL_READ_SIZE)); + ::orc::ReaderOptions reader_options; + std::unique_ptr<::orc::Reader> reader = + ::orc::createReader(std::move(in_stream), reader_options); + ::orc::RowReaderOptions options; + options.setUseTightNumericVector(true); + std::unique_ptr<::orc::RowReader> row_reader = reader->createRowReader(options); + auto sub_type_count = row_reader->getSelectedType().getSubtypeCount(); + ASSERT_EQ(3, sub_type_count); + std::list expect_selected_type = {"col1", "col2", "col3"}; + std::list result_selected_type; + for (size_t i = 0; i < sub_type_count; i++) { + result_selected_type.push_back(row_reader->getSelectedType().getFieldName(i)); + } + ASSERT_EQ(expect_selected_type, result_selected_type); + + auto batch = row_reader->createRowBatch(batch_size); + ASSERT_TRUE(row_reader->next(*batch)); + ASSERT_EQ(batch_size, batch->numElements); + + auto struct_batch = dynamic_cast<::orc::StructVectorBatch*>(batch.get()); + ASSERT_TRUE(struct_batch); + auto string_batch = static_cast<::orc::StringVectorBatch*>(struct_batch->fields[0]); + ASSERT_TRUE(string_batch); + auto int_batch = dynamic_cast<::orc::IntVectorBatch*>(struct_batch->fields[1]); + ASSERT_TRUE(int_batch); + auto bool_batch = dynamic_cast<::orc::ByteVectorBatch*>(struct_batch->fields[2]); + ASSERT_TRUE(bool_batch); + ASSERT_TRUE(string_batch->hasNulls); + ASSERT_TRUE(int_batch->hasNulls); + ASSERT_TRUE(bool_batch->hasNulls); + for (int32_t i = 0; i < batch_size; i++) { + if (i % 2 == 0) { + ASSERT_FALSE(string_batch->notNull[i]); + ASSERT_FALSE(int_batch->notNull[i]); + ASSERT_FALSE(bool_batch->notNull[i]); + } else { + ASSERT_EQ("str_" + std::to_string(i), + std::string(string_batch->data[i], string_batch->length[i])); + ASSERT_EQ(i, int_batch->data[i]); + ASSERT_EQ(true, bool_batch->data[i]); + } + } + } + + private: + std::shared_ptr file_system_ = std::make_shared(); + std::shared_ptr pool_ = GetDefaultPool(); +}; + +TEST_F(OrcFormatWriterTest, TestWriteWithVariousBatchSize) { + auto test_root_dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(test_root_dir); + std::string test_root = test_root_dir->Str(); + ASSERT_OK(file_system_->Mkdirs(test_root)); + std::string file_name = test_root + "/test.orc"; + + auto schema_pair = PrepareArrowSchema(); + const auto& arrow_schema = schema_pair.first; + const auto& struct_type = schema_pair.second; + std::map options = {{ORC_WRITE_ENABLE_METRICS, "true"}}; + for (auto record_batch_size : {1, 2, 3, 5, 20}) { + for (auto orc_batch_capacity : {1, 2, 3, 5, 20}) { + ASSERT_OK_AND_ASSIGN(std::shared_ptr out, + file_system_->Create(file_name, /*overwrite=*/true)); + ASSERT_OK_AND_ASSIGN(std::unique_ptr output_stream, + OrcOutputStreamImpl::Create(out)); + ASSERT_OK_AND_ASSIGN( + auto format_writer, + OrcFormatWriter::Create(std::move(output_stream), arrow_schema, options, + /*compression=*/"lz4", orc_batch_capacity, pool_)); + auto array = PrepareArray(struct_type, record_batch_size); + auto arrow_array = std::make_unique(); + ASSERT_TRUE(arrow::ExportArray(*array, arrow_array.get()).ok()); + + auto batch = std::make_shared( + /*partition=*/std::map(), /*bucket=*/-1, + /*row_kinds=*/std::vector(), arrow_array.get()); + ASSERT_OK(format_writer->AddBatch(batch->GetData())); + ASSERT_OK(format_writer->Finish()); + ASSERT_OK(out->Flush()); + ASSERT_OK(out->Close()); + CheckResult(file_name, record_batch_size); + // test metrics + auto writer_metrics = format_writer->GetWriterMetrics(); + ASSERT_OK_AND_ASSIGN(uint64_t io_count, + writer_metrics->GetCounter(OrcMetrics::WRITE_IO_COUNT)); + ASSERT_GT(io_count, 0); + } + } +} +TEST_F(OrcFormatWriterTest, TestPrepareOptionsFileCompression) { + arrow::FieldVector fields; + std::shared_ptr data_type = arrow::struct_(fields); + { + ASSERT_OK_AND_ASSIGN(auto writer_options, + OrcFormatWriter::PrepareWriterOptions({}, "lz4", data_type)); + ASSERT_EQ(writer_options.getCompression(), ::orc::CompressionKind_LZ4); + } + { + ASSERT_OK_AND_ASSIGN(auto writer_options, + OrcFormatWriter::PrepareWriterOptions({}, "zstd", data_type)); + ASSERT_EQ(writer_options.getCompression(), ::orc::CompressionKind_ZSTD); + } + { + ASSERT_OK_AND_ASSIGN(auto writer_options, + OrcFormatWriter::PrepareWriterOptions({}, "LZ4", data_type)); + ASSERT_EQ(writer_options.getCompression(), ::orc::CompressionKind_LZ4); + } + { + ASSERT_OK_AND_ASSIGN(auto writer_options, + OrcFormatWriter::PrepareWriterOptions({}, "ZSTd", data_type)); + ASSERT_EQ(writer_options.getCompression(), ::orc::CompressionKind_ZSTD); + } + { + ASSERT_OK_AND_ASSIGN(auto writer_options, + OrcFormatWriter::PrepareWriterOptions({}, "zlib", data_type)); + ASSERT_EQ(writer_options.getCompression(), ::orc::CompressionKind_ZLIB); + } + { + ASSERT_OK_AND_ASSIGN(auto writer_options, + OrcFormatWriter::PrepareWriterOptions({}, "snappy", data_type)); + ASSERT_EQ(writer_options.getCompression(), ::orc::CompressionKind_SNAPPY); + } + { + ASSERT_OK_AND_ASSIGN(auto writer_options, + OrcFormatWriter::PrepareWriterOptions({}, "lzo", data_type)); + ASSERT_EQ(writer_options.getCompression(), ::orc::CompressionKind_LZO); + } + { + ASSERT_NOK_WITH_MSG(OrcFormatWriter::PrepareWriterOptions({}, "unknown", data_type), + "unknown compression"); + } +} +TEST_F(OrcFormatWriterTest, TestPrepareWriterOptions) { + arrow::FieldVector fields; + std::shared_ptr data_type = arrow::struct_(fields); + { + // test default value + std::map options = {}; + ASSERT_OK_AND_ASSIGN(auto writer_options, + OrcFormatWriter::PrepareWriterOptions(options, "zstd", data_type)); + ASSERT_EQ(writer_options.getCompression(), ::orc::CompressionKind_ZSTD); + ASSERT_TRUE(writer_options.getEnableDictionary()); + ASSERT_EQ(writer_options.getDictionaryKeySizeThreshold(), + DEFAULT_DICTIONARY_KEY_SIZE_THRESHOLD); + ASSERT_EQ(writer_options.getStripeSize(), DEFAULT_STRIPE_SIZE); + ASSERT_EQ(writer_options.getRowIndexStride(), DEFAULT_ROW_INDEX_STRIDE); + ASSERT_EQ(writer_options.getCompressionBlockSize(), DEFAULT_COMPRESSION_BLOCK_SIZE); + ASSERT_TRUE(writer_options.getUseTightNumericVector()); + } + { + // test specific value + std::map options = {{ORC_STRIPE_SIZE, "4096"}, + {ORC_ROW_INDEX_STRIDE, "10"}, + {ORC_COMPRESSION_BLOCK_SIZE, "2048"}, + {ORC_DICTIONARY_KEY_SIZE_THRESHOLD, "0.98"}}; + ASSERT_OK_AND_ASSIGN(auto writer_options, + OrcFormatWriter::PrepareWriterOptions(options, "zstd", data_type)); + ASSERT_EQ(writer_options.getCompression(), ::orc::CompressionKind_ZSTD); + ASSERT_TRUE(writer_options.getEnableDictionary()); + ASSERT_EQ(writer_options.getDictionaryKeySizeThreshold(), 0.98); + ASSERT_EQ(writer_options.getStripeSize(), 4096); + ASSERT_EQ(writer_options.getRowIndexStride(), 10); + ASSERT_EQ(writer_options.getCompressionBlockSize(), 2048); + ASSERT_TRUE(writer_options.getUseTightNumericVector()); + } + { + // test disable dictionary + std::map options = {{ORC_DICTIONARY_KEY_SIZE_THRESHOLD, "0"}}; + ASSERT_OK_AND_ASSIGN(auto writer_options, + OrcFormatWriter::PrepareWriterOptions(options, "zstd", data_type)); + ASSERT_FALSE(writer_options.getEnableDictionary()); + } + { + // test disable config for timestamp with timezone + arrow::FieldVector invalid_fields = { + arrow::field("f0", arrow::timestamp(arrow::TimeUnit::NANO, "Asia/Shanghai"))}; + auto invalid_data_type = arrow::struct_(invalid_fields); + std::map options = {}; + ASSERT_NOK_WITH_MSG( + OrcFormatWriter::PrepareWriterOptions(options, "zstd", invalid_data_type), + "invalid config, do not support writing timestamp with timezone in legacy format"); + } +} +// TODO(liancheng.lsz): add tests for GetEstimateLength + +} // namespace paimon::orc::test diff --git a/src/paimon/format/orc/orc_reader_wrapper.cpp b/src/paimon/format/orc/orc_reader_wrapper.cpp new file mode 100644 index 0000000..2095606 --- /dev/null +++ b/src/paimon/format/orc/orc_reader_wrapper.cpp @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/orc/orc_reader_wrapper.h" + +#include +#include +#include +#include + +#include "fmt/format.h" +#include "orc/OrcFile.hh" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/scope_guard.h" + +namespace paimon::orc { + +Status OrcReaderWrapper::SeekToRow(uint64_t row_number) { + try { + row_reader_->seekToRow(row_number); + next_row_ = row_number; + } catch (const std::exception& e) { + return Status::Invalid( + fmt::format("orc file batch reader seek to row {} failed for file {}, with {} error", + row_number, file_name_, e.what())); + } catch (...) { + return Status::UnknownError(fmt::format( + "orc file batch reader seek to row {} failed for file {}, with unknown error", + row_number, file_name_)); + } + return Status::OK(); +} + +Status OrcReaderWrapper::SetReadSchema(const std::shared_ptr& target_type, + const ::orc::RowReaderOptions& row_reader_options) { + try { + row_reader_ = reader_->createRowReader(row_reader_options); + target_type_ = target_type; + } catch (const std::exception& e) { + return Status::Invalid( + fmt::format("orc file batch reader create row reader failed for file {}, with {} error", + file_name_, e.what())); + } catch (...) { + return Status::UnknownError(fmt::format( + "orc file batch reader create row reader failed for file {}, with unknown error", + file_name_)); + } + return Status::OK(); +} + +Result OrcReaderWrapper::Next() { + if (has_error_) { + return Status::Invalid(fmt::format( + "Since an error has occurred, next batch has been prohibited. file '{}'", file_name_)); + } + std::unique_ptr c_array = std::make_unique(); + std::unique_ptr c_schema = std::make_unique(); + try { + auto orc_batch = row_reader_->createRowBatch(batch_size_); + bool eof = !row_reader_->next(*orc_batch); + if (eof) { + return BatchReader::MakeEofBatch(); + } + ScopeGuard guard([this]() { has_error_ = true; }); + assert(orc_batch->numElements > 0); + PAIMON_ASSIGN_OR_RAISE( + std::shared_ptr array, + OrcAdapter::AppendBatch(target_type_, orc_batch.get(), arrow_pool_.get())); + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, c_array.get(), c_schema.get())); + next_row_ = GetRowNumber() + orc_batch->numElements; + guard.Release(); + } catch (const std::exception& e) { + return Status::Invalid( + fmt::format("orc file batch reader get next batch failed for file {}, with {} error", + file_name_, e.what())); + } catch (...) { + return Status::UnknownError(fmt::format( + "orc file batch reader get next batch failed for file {}, with unknown error", + file_name_)); + } + return make_pair(std::move(c_array), std::move(c_schema)); +} + +} // namespace paimon::orc diff --git a/src/paimon/format/orc/orc_reader_wrapper.h b/src/paimon/format/orc/orc_reader_wrapper.h new file mode 100644 index 0000000..e98ad3d --- /dev/null +++ b/src/paimon/format/orc/orc_reader_wrapper.h @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "arrow/c/bridge.h" +#include "arrow/memory_pool.h" +#include "fmt/format.h" +#include "paimon/format/orc/orc_adapter.h" +#include "paimon/format/orc/read_range_generator.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/reader/batch_reader.h" + +namespace paimon::orc { + +// The OrcReaderWrapper is a decorator class designed to support GetNextRowToRead. +class OrcReaderWrapper { + public: + ~OrcReaderWrapper() { + row_reader_.reset(); + reader_.reset(); + } + + static Result> Create( + std::unique_ptr<::orc::Reader> reader, const std::string& file_name, int32_t batch_size, + uint64_t natural_read_size, const std::map& options, + const std::shared_ptr& arrow_pool, + const std::shared_ptr<::orc::MemoryPool>& orc_pool) { + PAIMON_ASSIGN_OR_RAISE( + std::unique_ptr range_generator, + ReadRangeGenerator::Create(reader.get(), natural_read_size, options)); + return std::unique_ptr( + new OrcReaderWrapper(std::move(reader), std::move(range_generator), file_name, + batch_size, arrow_pool, orc_pool)); + } + + Status SeekToRow(uint64_t row_number); + Result Next(); + Status SetReadSchema(const std::shared_ptr& target_type, + const ::orc::RowReaderOptions& row_reader_options); + + uint64_t GetNextRowToRead() const { + return next_row_; + } + + uint64_t GetRowNumber() const { + return row_reader_->getRowNumber(); + } + + uint64_t GetNumberOfRows() const { + return reader_->getNumberOfRows(); + } + + Status SetReadRanges(const std::vector>& read_ranges) { + // Intentionally a no-op: SetReadRanges is a best-effort hint only. + return Status::OK(); + } + + const ::orc::Type& GetOrcType() const { + return reader_->getType(); + } + + Result>> GenReadRanges( + std::vector target_column_ids, uint64_t begin_row_num, uint64_t end_row_num, + bool* need_prefetch) const { + return range_generator_->GenReadRanges(target_column_ids, begin_row_num, end_row_num, + need_prefetch); + } + + Result>> PreBufferRange( + const std::vector& target_column_ids) { + uint64_t stripe_number = reader_->getNumberOfStripes(); + std::vector stripes; + for (uint64_t i = 0; i < stripe_number; i++) { + stripes.push_back(static_cast(i)); + } + std::list include_types(target_column_ids.begin(), target_column_ids.end()); + try { + return reader_->preBufferRange(stripes, include_types); + } catch (const std::exception& e) { + return Status::Invalid(fmt::format( + "orc pre buffer range failed for file {}, with {} error", file_name_, e.what())); + } catch (...) { + return Status::Invalid(fmt::format( + "orc pre buffer range failed for file {}, with unknown error", file_name_)); + } + } + + private: + OrcReaderWrapper(std::unique_ptr<::orc::Reader> reader, + std::unique_ptr range_generator, + const std::string& file_name, int32_t batch_size, + const std::shared_ptr& arrow_pool, + const std::shared_ptr<::orc::MemoryPool>& orc_pool) + : reader_(std::move(reader)), + range_generator_(std::move(range_generator)), + file_name_(file_name), + batch_size_(batch_size), + arrow_pool_(arrow_pool), + orc_pool_(orc_pool) {} + + std::unique_ptr<::orc::Reader> reader_; + std::unique_ptr<::orc::RowReader> row_reader_; + + std::unique_ptr range_generator_; + + const std::string file_name_; + const int32_t batch_size_; + + std::shared_ptr arrow_pool_; + std::shared_ptr<::orc::MemoryPool> orc_pool_; + + std::shared_ptr target_type_; + + // The next absolute row index to read. + uint64_t next_row_ = 0; + bool has_error_ = false; +}; + +} // namespace paimon::orc diff --git a/src/paimon/format/orc/orc_reader_wrapper_test.cpp b/src/paimon/format/orc/orc_reader_wrapper_test.cpp new file mode 100644 index 0000000..c31ec5e --- /dev/null +++ b/src/paimon/format/orc/orc_reader_wrapper_test.cpp @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/format/orc/orc_reader_wrapper.h" + +#include "arrow/api.h" +#include "arrow/io/api.h" +#include "gtest/gtest.h" +#include "orc/OrcFile.hh" +#include "paimon/common/reader/reader_utils.h" +#include "paimon/common/utils/arrow/mem_utils.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::orc::test { + +class OrcReaderWrapperTest : public ::testing::Test { + protected: + void SetUp() override {} + + void TearDown() override {} +}; + +TEST_F(OrcReaderWrapperTest, NextRowToRead) { + auto dir = paimon::test::UniqueTestDirectory::Create(); + std::string file_path = dir->Str() + "/file.orc"; + { + std::unique_ptr<::orc::OutputStream> outStream = ::orc::writeLocalFile(file_path); + ::orc::WriterOptions options; + std::unique_ptr<::orc::Type> schema = + ::orc::Type::buildTypeFromString("struct"); + std::unique_ptr<::orc::Writer> writer = createWriter(*schema, outStream.get(), options); + auto col_batch = writer->createRowBatch(3); + auto batch = dynamic_cast<::orc::StructVectorBatch*>(col_batch.get()); + auto* col1 = dynamic_cast<::orc::LongVectorBatch*>(batch->fields[0]); + auto* col2 = dynamic_cast<::orc::StringVectorBatch*>(batch->fields[1]); + batch->numElements = 3; + col1->numElements = 3; + col2->numElements = 3; + col1->data[0] = 1; + col1->data[1] = 2; + col1->data[2] = 3; + col2->data[0] = const_cast("a"); + col2->length[0] = 1; + col2->data[1] = const_cast("b"); + col2->length[1] = 1; + col2->data[2] = const_cast("c"); + col2->length[2] = 1; + writer->add(*batch); + writer->close(); + } + + ::orc::ReaderOptions reader_opts; + std::unique_ptr<::orc::Reader> reader = + ::orc::createReader(::orc::readLocalFile(file_path), reader_opts); + std::map options; + ASSERT_OK_AND_ASSIGN(auto wrapper, OrcReaderWrapper::Create( + /*reader=*/std::move(reader), + /*file_name=*/file_path, + /*batch_size=*/2, + /*natural_read_size=*/0, + /*options=*/options, + /*arrow_pool=*/GetArrowPool(GetDefaultPool()), + /*orc_pool=*/nullptr)); + auto data_types = + arrow::struct_({arrow::field("col1", arrow::int64()), arrow::field("col2", arrow::utf8())}); + ::orc::RowReaderOptions row_opts; + ASSERT_TRUE(wrapper->SetReadSchema(data_types, row_opts).ok()); + + ASSERT_OK_AND_ASSIGN(auto batch1, wrapper->Next()); + EXPECT_EQ(wrapper->GetNextRowToRead(), 2u); // batch_size=2 + ReaderUtils::ReleaseReadBatch(std::move(batch1)); + + ASSERT_OK_AND_ASSIGN(auto batch2, wrapper->Next()); + EXPECT_EQ(wrapper->GetNextRowToRead(), 3u); // only 1 row left + ReaderUtils::ReleaseReadBatch(std::move(batch2)); + + ASSERT_OK_AND_ASSIGN(auto batch3, wrapper->Next()); + EXPECT_EQ(wrapper->GetNextRowToRead(), 3u); + ReaderUtils::ReleaseReadBatch(std::move(batch3)); +} + +} // namespace paimon::orc::test