diff --git a/LICENSE b/LICENSE index 2665c1e..a7381cc 100644 --- a/LICENSE +++ b/LICENSE @@ -502,6 +502,16 @@ POSSIBILITY OF SUCH DAMAGE. -------------------------------------------------------------------------------- +This product includes code from LucenePlusPlus. + +* LucenePlusPlus utility in src/paimon/global_index/lucene/ directory + +Copyright: 2009-2014 Alan Wright. +Home page: https://github.com/luceneplusplus/LucenePlusPlus +License: https://www.apache.org/licenses/LICENSE-2.0 + +-------------------------------------------------------------------------------- + This product includes code from cppjieba. * cppjieba utility in src/paimon/global_index/lucene/ directory diff --git a/NOTICE b/NOTICE index b98bb34..09913cb 100644 --- a/NOTICE +++ b/NOTICE @@ -29,5 +29,8 @@ Copyright (C) 2012-2023 Yann Collet This product includes software from CRoaring project (Apache 2.0) Copyright 2016-2022 The CRoaring authors +This product includes software from LucenePlusPlus project (Apache 2.0) +Copyright 2009-2014 Alan Wright. + This product includes software from cppjieba project (MIT) Copyright 2013 diff --git a/src/paimon/global_index/lucene/jieba_analyzer.cpp b/src/paimon/global_index/lucene/jieba_analyzer.cpp new file mode 100644 index 0000000..e508138 --- /dev/null +++ b/src/paimon/global_index/lucene/jieba_analyzer.cpp @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "paimon/global_index/lucene/jieba_analyzer.h" + +#include "paimon/common/utils/string_utils.h" +#include "paimon/global_index/lucene/lucene_utils.h" + +namespace paimon::lucene { +JiebaTokenizerContext::JiebaTokenizerContext(const std::string& _tokenize_mode, bool _with_position, + const std::shared_ptr& _jieba, + const std::shared_ptr& _pool, + int32_t _buffer_size) + : pool(_pool), + tokenize_mode(_tokenize_mode), + with_position(_with_position), + buffer_size(_buffer_size), + jieba(_jieba) {} + +JiebaTokenizer::JiebaTokenizer(const JiebaTokenizerContext& context, const Lucene::ReaderPtr& input) + : Lucene::Tokenizer(input), context_(context) { + term_att_ = addAttribute(); + pos_att_ = addAttribute(); + buffer_ = static_cast( + context_.pool->Malloc(context_.buffer_size * sizeof(wchar_t), /*alignment=*/8)); +} + +JiebaTokenizer::~JiebaTokenizer() { + if (buffer_) { + context_.pool->Free(reinterpret_cast(buffer_), + context_.buffer_size * sizeof(wchar_t), + /*alignment=*/8); + buffer_ = nullptr; + } +} + +bool JiebaTokenizer::incrementToken() { + if (term_index_ >= normalized_terms_.size()) { + return false; + } + + const auto& term = normalized_terms_[term_index_++]; + clearAttributes(); + + term_att_->setTermBuffer(LuceneUtils::StringToWstring(term)); + + if (context_.with_position) { + pos_att_->setPositionIncrement(1); + } else { + pos_att_->setPositionIncrement(0); + } + return true; +} + +void JiebaTokenizer::CutWithMode(const std::string& tokenize_mode, const cppjieba::Jieba* jieba, + const std::string& str, std::vector* terms_ptr) { + auto& terms = *terms_ptr; + if (tokenize_mode == "mp") { + jieba->CutSmall(str, terms, /*max_word_len=*/JiebaTokenizerContext::kMaxWordLen); + } else if (tokenize_mode == "hmm") { + jieba->CutHMM(str, terms); + } else if (tokenize_mode == "mix") { + jieba->Cut(str, terms, /*hmm=*/true); + } else if (tokenize_mode == "full") { + jieba->CutAll(str, terms); + } else if (tokenize_mode == "query") { + jieba->CutForSearch(str, terms, /*hmm=*/true); + } else { + throw Lucene::IllegalArgumentException( + L"only support mp/hmm/mix/full/query in jieba tokenizer"); + } +} + +void JiebaTokenizer::Normalize(const std::unordered_set& stop_words, + std::vector* input_ptr, + std::vector* output_ptr) { + auto& input = *input_ptr; + auto& output = *output_ptr; + output.clear(); + output.reserve(input.size()); + for (auto& term : input) { + if (StringUtils::IsNullOrWhitespaceOnly(term)) { + continue; + } + // remove stop words + if (stop_words.find(term) != stop_words.end()) { + continue; + } + // to lower case + bool is_alphanumeric = true; + for (const auto& c : term) { + if (!std::isalnum(static_cast(c))) { + is_alphanumeric = false; + break; + } + } + if (is_alphanumeric && !term.empty()) { + std::transform(term.begin(), term.end(), term.begin(), [](char ch) { + return static_cast(std::tolower(static_cast(ch))); + }); + } + output.emplace_back(term.data(), term.length()); + } +} + +void JiebaTokenizer::reset() { + Lucene::Tokenizer::reset(); + InnerReset(); +} + +void JiebaTokenizer::reset(const Lucene::ReaderPtr& input) { + Lucene::Tokenizer::reset(input); + InnerReset(); +} + +void JiebaTokenizer::InnerReset() { + terms_.clear(); + normalized_terms_.clear(); + term_index_ = 0; + + // read wchar from input + Lucene::String wstr; + wstr.reserve(context_.buffer_size); + while (true) { + int32_t length = input->read(buffer_, /*offset=*/0, context_.buffer_size); + if (length <= 0) { + break; + } + wstr.append(buffer_, length); + } + + // jieba tokenize + std::string doc_str = LuceneUtils::WstringToString(wstr); + // TODO(xinyu.lxy): support porter2 stemmer + CutWithMode(context_.tokenize_mode, context_.jieba.get(), doc_str, &terms_); + Normalize(context_.jieba->extractor.GetStopWords(), &terms_, &normalized_terms_); +} + +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lucene/jieba_analyzer.h b/src/paimon/global_index/lucene/jieba_analyzer.h new file mode 100644 index 0000000..6a179df --- /dev/null +++ b/src/paimon/global_index/lucene/jieba_analyzer.h @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include "cppjieba/Jieba.hpp" +#include "lucene++/LuceneHeaders.h" +#include "lucene++/MiscUtils.h" +#include "lucene++/PositionIncrementAttribute.h" +#include "lucene++/TermAttribute.h" +#include "paimon/global_index/lucene/lucene_utils.h" +#include "paimon/memory/memory_pool.h" +namespace paimon::lucene { +struct JiebaTokenizerContext { + JiebaTokenizerContext(const std::string& _tokenize_mode, bool _with_position, + const std::shared_ptr& _jieba, + const std::shared_ptr& _pool, + int32_t _buffer_size = kReadBufferSize); + + std::shared_ptr pool; + std::string tokenize_mode; + bool with_position; + int32_t buffer_size; + std::shared_ptr jieba; + + static inline const int32_t kReadBufferSize = 5 * 1024 * 1024; + static inline const int32_t kMaxWordLen = 1024; +}; + +class JiebaTokenizer : public Lucene::Tokenizer { + public: + JiebaTokenizer(const JiebaTokenizerContext& context, const Lucene::ReaderPtr& input); + + ~JiebaTokenizer() override; + + bool incrementToken() override; + + void reset(const Lucene::ReaderPtr& input) override; + + void reset() override; + + static void CutWithMode(const std::string& tokenize_mode, const cppjieba::Jieba* jieba, + const std::string& str, std::vector* terms_ptr); + + // In-place converts each string in `input` to lowercase to avoid data copying. + static void Normalize(const std::unordered_set& stop_words, + std::vector* input, std::vector* output); + + private: + void InnerReset(); + + private: + JiebaTokenizerContext context_; + size_t term_index_ = 0; + std::vector terms_; + std::vector normalized_terms_; + wchar_t* buffer_; + Lucene::TermAttributePtr term_att_; + Lucene::PositionIncrementAttributePtr pos_att_; +}; + +class JiebaAnalyzer : public Lucene::Analyzer { + public: + explicit JiebaAnalyzer(const JiebaTokenizerContext& context) : context_(context) {} + + ~JiebaAnalyzer() override = default; + + Lucene::TokenStreamPtr tokenStream(const Lucene::String& field_name, + const Lucene::ReaderPtr& reader) override { + return Lucene::newLucene(context_, reader); + } + + private: + JiebaTokenizerContext context_; +}; +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lucene/jieba_analyzer_test.cpp b/src/paimon/global_index/lucene/jieba_analyzer_test.cpp new file mode 100644 index 0000000..c84f4b5 --- /dev/null +++ b/src/paimon/global_index/lucene/jieba_analyzer_test.cpp @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "paimon/global_index/lucene/jieba_analyzer.h" + +#include "cppjieba/Jieba.hpp" +#include "gtest/gtest.h" +#include "lucene++/LuceneHeaders.h" +#include "paimon/global_index/lucene/lucene_utils.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/testharness.h" +namespace paimon::lucene::test { +class JiebaAnalyzerTest : public ::testing::Test, public ::testing::WithParamInterface { + public: + void SetUp() override {} + void TearDown() override {} + Lucene::TokenStreamPtr CreateJiebaTokenizer(bool with_position) const { + return CreateJiebaTokenizer(with_position, L"我爱机器学习"); + } + + Lucene::TokenStreamPtr CreateJiebaTokenizer(bool with_position, + const Lucene::String& text) const { + auto pool = GetDefaultPool(); + std::string dictionary_dir = LuceneUtils::GetJiebaDictionaryDir().value(); + auto jieba = std::make_shared( + dictionary_dir + "/jieba.dict.utf8", dictionary_dir + "/hmm_model.utf8", + dictionary_dir + "/user.dict.utf8", dictionary_dir + "/idf.utf8", + dictionary_dir + "/stop_words.utf8"); + auto reader = Lucene::newLucene(text); + int32_t buffer_size = GetParam(); + JiebaTokenizerContext context(/*tokenize_mode=*/"query", with_position, jieba, pool, + buffer_size); + auto analyzer = Lucene::newLucene(context); + return analyzer->tokenStream(/*field_name*/ L"f0", reader); + } +}; + +TEST_P(JiebaAnalyzerTest, TestSimple) { + auto tokenizer = CreateJiebaTokenizer(/*with_position=*/false); + + auto term_att = tokenizer->addAttribute(); + + tokenizer->reset(); + std::vector results; + while (tokenizer->incrementToken()) { + results.push_back(term_att->term()); + } + tokenizer->end(); + tokenizer->close(); + std::vector expected = {L"爱", L"机器", L"学习"}; + ASSERT_EQ(expected, results); +} + +TEST_P(JiebaAnalyzerTest, TestWithPosition) { + auto tokenizer = CreateJiebaTokenizer(/*with_position=*/true); + + auto term_att = tokenizer->addAttribute(); + auto pos_att = tokenizer->addAttribute(); + + tokenizer->reset(); + std::vector results; + std::vector result_pos; + int32_t pos = 0; + while (tokenizer->incrementToken()) { + pos += pos_att->getPositionIncrement(); + result_pos.push_back(pos); + results.push_back(term_att->term()); + } + tokenizer->end(); + tokenizer->close(); + + std::vector expected = {L"爱", L"机器", L"学习"}; + std::vector expected_pos = {1, 2, 3}; + ASSERT_EQ(expected, results); + ASSERT_EQ(expected_pos, result_pos); +} + +TEST_P(JiebaAnalyzerTest, TestNormalize) { + auto tokenizer = CreateJiebaTokenizer( + /*with_position=*/false, + L"由于购买了Iphone14,我越来越热爱网上学习了!Happy work, happy day! \n\t"); + + auto term_att = tokenizer->addAttribute(); + + tokenizer->reset(); + std::vector results; + while (tokenizer->incrementToken()) { + results.push_back(term_att->term()); + } + tokenizer->end(); + tokenizer->close(); + std::vector expected = {L"购买", L"iphone14", L"越来", L"越来越", + L"热爱", L"网上", L"学习", L"happy", + L"work", L"happy", L"day"}; + ASSERT_EQ(expected, results); +} + +INSTANTIATE_TEST_SUITE_P(ReadBufferSize, JiebaAnalyzerTest, + ::testing::ValuesIn(std::vector({2, 5, 10, 100}))); + +} // namespace paimon::lucene::test diff --git a/src/paimon/global_index/lucene/jieba_api_test.cpp b/src/paimon/global_index/lucene/jieba_api_test.cpp new file mode 100644 index 0000000..9b32abf --- /dev/null +++ b/src/paimon/global_index/lucene/jieba_api_test.cpp @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#include "cppjieba/Jieba.hpp" +#include "gtest/gtest.h" +#include "paimon/global_index/lucene/lucene_utils.h" +#include "paimon/testing/utils/testharness.h" +namespace paimon::lucene::test { +class JiebaInterfaceTest : public ::testing::Test { + public: + void SetUp() override {} + void TearDown() override {} +}; + +TEST_F(JiebaInterfaceTest, TestSimple) { + ASSERT_OK_AND_ASSIGN(std::string dictionary_dir, LuceneUtils::GetJiebaDictionaryDir()); + cppjieba::Jieba jieba(dictionary_dir + "/jieba.dict.utf8", dictionary_dir + "/hmm_model.utf8", + dictionary_dir + "/user.dict.utf8", dictionary_dir + "/idf.utf8", + dictionary_dir + "/stop_words.utf8"); + + { + std::vector words; + jieba.CutForSearch("我爱机器学习", words); + std::vector expected = {"我", "爱", "机器", "学习"}; + ASSERT_EQ(expected, words); + } + { + std::vector words; + jieba.CutForSearch("我爱机器学习 工作 good work nice Day,price12345,12345", words); + std::vector expected = {"我", "爱", "机器", "学习", " ", "工作", + " ", "good", " ", "work", " ", "nice", + " ", "Day", ",", "price12345", ",", "12345"}; + ASSERT_EQ(expected, words); + } +} + +TEST_F(JiebaInterfaceTest, TestMP) { + ASSERT_OK_AND_ASSIGN(std::string dictionary_dir, LuceneUtils::GetJiebaDictionaryDir()); + cppjieba::Jieba jieba(dictionary_dir + "/jieba.dict.utf8", dictionary_dir + "/hmm_model.utf8", + dictionary_dir + "/user.dict.utf8", dictionary_dir + "/idf.utf8", + dictionary_dir + "/stop_words.utf8"); + + { + std::vector words; + jieba.CutSmall("我爱机器学习", words, /*max_word_len=*/5); + std::vector expected = {"我", "爱", "机器", "学习"}; + ASSERT_EQ(expected, words); + } + { + std::vector words; + jieba.CutSmall("我爱机器学习", words, /*max_word_len=*/1); + std::vector expected = {"我", "爱", "机", "器", "学", "习"}; + ASSERT_EQ(expected, words); + } +} + +} // namespace paimon::lucene::test diff --git a/src/paimon/global_index/lucene/lucene_api_test.cpp b/src/paimon/global_index/lucene/lucene_api_test.cpp new file mode 100644 index 0000000..697dcbc --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_api_test.cpp @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "gtest/gtest.h" +#include "lucene++/FileUtils.h" +#include "lucene++/LuceneHeaders.h" +#include "lucene++/MiscUtils.h" +#include "paimon/global_index/lucene/jieba_analyzer.h" +#include "paimon/global_index/lucene/lucene_directory.h" +#include "paimon/global_index/lucene/lucene_utils.h" +#include "paimon/memory/memory_pool.h" +#include "paimon/testing/utils/testharness.h" +namespace paimon::lucene::test { +class LuceneInterfaceTest : public ::testing::Test { + public: + void SetUp() override {} + void TearDown() override {} + + class TestDocIdSetIterator : public Lucene::DocIdSetIterator { + public: + explicit TestDocIdSetIterator(const std::vector& ids) + : Lucene::DocIdSetIterator(), ids_(ids) {} + + int32_t advance(int32_t target) override { + int32_t doc_id = nextDoc(); + while (doc_id < target) { + doc_id = nextDoc(); + } + return doc_id; + } + int32_t docID() override { + if (cursor_ >= ids_.size()) { + return Lucene::DocIdSetIterator::NO_MORE_DOCS; + } + return ids_[cursor_]; + } + int32_t nextDoc() override { + if (cursor_ >= ids_.size()) { + return Lucene::DocIdSetIterator::NO_MORE_DOCS; + } + return ids_[cursor_++]; + } + + private: + size_t cursor_ = 0; + std::vector ids_; + }; + + class TestDocIdSet : public Lucene::DocIdSet { + public: + explicit TestDocIdSet(const std::vector& ids) : DocIdSet(), ids_(ids) {} + + Lucene::DocIdSetIteratorPtr iterator() override { + return Lucene::newLucene(ids_); + } + bool isCacheable() override { + return true; + } + + private: + std::vector ids_; + }; + + class TestFilter : public Lucene::Filter { + public: + explicit TestFilter(const std::vector& ids) : ids_(ids) {} + + Lucene::DocIdSetPtr getDocIdSet(const Lucene::IndexReaderPtr& reader) override { + return Lucene::newLucene(ids_); + } + + private: + std::vector ids_; + }; + + struct WriteContext { + Lucene::IndexWriterPtr writer; + Lucene::DocumentPtr doc; + Lucene::FieldPtr field; + Lucene::FieldPtr doc_id_field; + }; + + Lucene::AnalyzerPtr CreateJiebaAnalyzer() const { + auto pool = GetDefaultPool(); + std::string dictionary_dir = LuceneUtils::GetJiebaDictionaryDir().value(); + auto jieba = std::make_shared( + dictionary_dir + "/jieba.dict.utf8", dictionary_dir + "/hmm_model.utf8", + dictionary_dir + "/user.dict.utf8", dictionary_dir + "/idf.utf8", + dictionary_dir + "/stop_words.utf8"); + JiebaTokenizerContext context(/*tokenize_mode=*/"query", /*with_position=*/true, jieba, + pool); + return Lucene::newLucene(context); + } + + WriteContext CreateWriteContext(const Lucene::DirectoryPtr& lucene_dir, + const Lucene::AnalyzerPtr& analyzer) const { + auto lucene_analyzer = analyzer ? analyzer + : Lucene::newLucene( + Lucene::LuceneVersion::LUCENE_CURRENT); + Lucene::IndexWriterPtr writer = Lucene::newLucene( + lucene_dir, lucene_analyzer, + /*create=*/true, Lucene::IndexWriter::MaxFieldLengthLIMITED); + + Lucene::DocumentPtr doc = Lucene::newLucene(); + auto field = Lucene::newLucene(L"content", L"", Lucene::Field::STORE_NO, + Lucene::Field::INDEX_ANALYZED_NO_NORMS); + auto doc_id_field = Lucene::newLucene( + L"id", L"", Lucene::Field::STORE_YES, Lucene::Field::INDEX_NOT_ANALYZED_NO_NORMS); + + field->setOmitTermFreqAndPositions(false); + doc_id_field->setOmitTermFreqAndPositions(true); + doc->add(field); + doc->add(doc_id_field); + return {writer, doc, field, doc_id_field}; + } + + void AddDocument(const std::wstring& doc_str, int32_t doc_id, WriteContext* context) const { + context->field->setValue(doc_str); + context->doc_id_field->setValue(LuceneUtils::StringToWstring(std::to_string(doc_id))); + context->writer->addDocument(context->doc); + } + + struct ReadContext { + Lucene::IndexReaderPtr reader; + Lucene::IndexSearcherPtr searcher; + Lucene::QueryParserPtr parser; + }; + + ReadContext CreateReadContext(const Lucene::DirectoryPtr& lucene_dir, + const Lucene::AnalyzerPtr& analyzer) const { + auto lucene_analyzer = analyzer ? analyzer + : Lucene::newLucene( + Lucene::LuceneVersion::LUCENE_CURRENT); + Lucene::IndexReaderPtr reader = Lucene::IndexReader::open(lucene_dir, /*read_only=*/true); + Lucene::IndexSearcherPtr searcher = Lucene::newLucene(reader); + Lucene::QueryParserPtr parser = Lucene::newLucene( + Lucene::LuceneVersion::LUCENE_CURRENT, L"content", lucene_analyzer); + parser->setAllowLeadingWildcard(true); + return {reader, searcher, parser}; + } + + void Search(const std::wstring& query_str, int32_t limit, + const std::optional> selected_id, + const std::vector& expected_doc_id_vec, + const std::vector& expected_doc_id_content_vec, + ReadContext* context) const { + Lucene::QueryPtr query = context->parser->parse(query_str); + Lucene::TopDocsPtr results; + if (selected_id) { + Lucene::FilterPtr lucene_filter = Lucene::newLucene(selected_id.value()); + results = context->searcher->search(query, lucene_filter, limit); + } else { + results = context->searcher->search(query, limit); + } + ASSERT_EQ(expected_doc_id_vec.size(), results->scoreDocs.size()); + + std::vector result_doc_id_vec; + std::vector result_doc_id_content_vec; + for (auto score_doc : results->scoreDocs) { + Lucene::DocumentPtr result_doc = context->searcher->doc(score_doc->doc); + result_doc_id_vec.push_back(score_doc->doc); + result_doc_id_content_vec.push_back(result_doc->get(L"id")); + } + ASSERT_EQ(result_doc_id_vec, expected_doc_id_vec); + ASSERT_EQ(result_doc_id_content_vec, expected_doc_id_content_vec); + } +}; + +TEST_F(LuceneInterfaceTest, TestSimple) { + auto dir = paimon::test::UniqueTestDirectory::Create("local"); + std::string index_path = dir->Str() + "/lucene_test"; + auto lucene_dir = Lucene::FSDirectory::open(LuceneUtils::StringToWstring(index_path), + Lucene::NoLockFactory::getNoLockFactory()); + // write + auto write_context = CreateWriteContext(lucene_dir, /*analyzer=*/nullptr); + + AddDocument(L"This is an test document.", 0, &write_context); + AddDocument(L"This is an new document document document.", 1, &write_context); + AddDocument(L"Document document document document test.", 2, &write_context); + AddDocument(L"unordered user-defined doc id", 5, &write_context); + AddDocument(L"", 6, &write_context); // add a null doc + + write_context.writer->optimize(); + write_context.writer->close(); + + // read + auto read_context = CreateReadContext(lucene_dir, /*analyzer=*/nullptr); + + // result is sorted by tf-idf score + Search(L"document", /*limit=*/10, /*selected_id=*/std::nullopt, std::vector({2, 1, 0}), + std::vector({L"2", L"1", L"0"}), &read_context); + Search(L"document", /*limit=*/1, /*selected_id=*/std::nullopt, std::vector({2}), + std::vector({L"2"}), &read_context); + Search(L"test AND document", /*limit=*/10, /*selected_id=*/std::nullopt, + std::vector({2, 0}), std::vector({L"2", L"0"}), &read_context); + Search(L"test OR new", /*limit=*/10, /*selected_id=*/std::nullopt, + std::vector({1, 0, 2}), std::vector({L"1", L"0", L"2"}), + &read_context); + Search(L"\"test document\"", /*limit=*/10, /*selected_id=*/std::nullopt, + std::vector({0}), std::vector({L"0"}), &read_context); + Search(L"unordered", /*limit=*/10, /*selected_id=*/std::nullopt, std::vector({3}), + std::vector({L"5"}), &read_context); + Search(L"*orDer*", /*limit=*/10, /*selected_id=*/std::nullopt, std::vector({3}), + std::vector({L"5"}), &read_context); + + // test filter + Search(L"document", /*limit=*/10, /*selected_id=*/std::vector({0, 1}), + std::vector({1, 0}), std::vector({L"1", L"0"}), &read_context); + Search(L"document OR unordered", /*limit=*/10, + /*selected_id=*/std::vector({0, 1, 3}), std::vector({3, 1, 0}), + std::vector({L"5", L"1", L"0"}), &read_context); + Search(L"unordered", /*limit=*/10, /*selected_id=*/std::vector({0}), + std::vector(), std::vector(), &read_context); + + read_context.reader->close(); + lucene_dir->close(); +} + +TEST_F(LuceneInterfaceTest, TestWithAnalyzer) { + auto dir = paimon::test::UniqueTestDirectory::Create("local"); + std::string index_path = dir->Str() + "/lucene_test"; + auto lucene_dir = Lucene::FSDirectory::open(LuceneUtils::StringToWstring(index_path), + Lucene::NoLockFactory::getNoLockFactory()); + // write + auto analyzer = CreateJiebaAnalyzer(); + auto write_context = CreateWriteContext(lucene_dir, analyzer); + + AddDocument(L"我爱机器学习", 0, &write_context); + AddDocument(L"机器会学习吗?", 1, &write_context); + AddDocument(L"我爱工作", 2, &write_context); + AddDocument(L"Have a nice day", 3, &write_context); + + write_context.writer->optimize(); + write_context.writer->close(); + + // read + auto read_context = CreateReadContext(lucene_dir, analyzer); + + // result is sorted by tf-idf score + Search(L"机器", /*limit=*/10, /*selected_id=*/std::nullopt, std::vector({0, 1}), + std::vector({L"0", L"1"}), &read_context); + Search(L"机器 AND 学习", /*limit=*/10, /*selected_id=*/std::nullopt, + std::vector({0, 1}), std::vector({L"0", L"1"}), &read_context); + Search(L"\"机器学习\"", /*limit=*/10, /*selected_id=*/std::nullopt, std::vector({0}), + std::vector({L"0"}), &read_context); + Search(L"我爱", /*limit=*/10, /*selected_id=*/std::nullopt, std::vector({0, 2}), + std::vector({L"0", L"2"}), &read_context); + Search(L"爱 OR nice", /*limit=*/10, /*selected_id=*/std::nullopt, + std::vector({3, 0, 2}), std::vector({L"3", L"0", L"2"}), + &read_context); + + read_context.reader->close(); + lucene_dir->close(); +} + +} // namespace paimon::lucene::test diff --git a/src/paimon/global_index/lucene/lucene_collector.h b/src/paimon/global_index/lucene/lucene_collector.h new file mode 100644 index 0000000..e95eacb --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_collector.h @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include "lucene++/LuceneHeaders.h" +#include "paimon/utils/roaring_bitmap64.h" + +namespace paimon::lucene { +class LuceneCollector : public Lucene::Collector { + public: + LuceneCollector() : Lucene::Collector() {} + void setScorer(const Lucene::ScorerPtr& scorer) override { + // ignore scorer + } + void collect(int32_t doc) override { + bitmap_.Add(doc_base_ + doc); + } + void setNextReader(const Lucene::IndexReaderPtr& reader, int32_t doc_base) override { + doc_base_ = doc_base; + } + bool acceptsDocsOutOfOrder() override { + return true; + } + const RoaringBitmap64& GetBitmap() const { + return bitmap_; + } + + private: + RoaringBitmap64 bitmap_; + int64_t doc_base_ = 0; +}; +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lucene/lucene_defs.h b/src/paimon/global_index/lucene/lucene_defs.h new file mode 100644 index 0000000..7fc3554 --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_defs.h @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +namespace paimon::lucene { +static inline const int32_t kVersion = 1; +static inline const char kIdentifier[] = "lucene-fts"; +static inline const wchar_t kEmptyWstring[] = L""; + +static inline const char kOptionKeyPrefix[] = "lucene-fts."; + +static inline const int32_t kDefaultReadBufferSize = 1024 * 1024; +// default is 1MB +static inline const char kLuceneReadBufferSize[] = "read.buffer-size"; +// default is false +static inline const char kLuceneWriteOmitTermFreqAndPositions[] = + "write.omit-term-freq-and-position"; +// no default value, must explicit set +static inline const char kLuceneWriteTmpDir[] = "write.tmp.directory"; + +static inline const char kJiebaDictDirEnv[] = "PAIMON_JIEBA_DICT_DIR"; + +static inline const char kDefaultJiebaTokenizeMode[] = "mix"; +// default is "mix". Values can be "mp", "hmm", "mix", "full", "query". +static inline const char kJiebaTokenizeMode[] = "jieba.tokenize-mode"; +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lucene/lucene_directory.cpp b/src/paimon/global_index/lucene/lucene_directory.cpp new file mode 100644 index 0000000..5ea7d91 --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_directory.cpp @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "paimon/global_index/lucene/lucene_directory.h" + +#include "paimon/common/io/offset_input_stream.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/global_index/lucene/lucene_defs.h" +#include "paimon/global_index/lucene/lucene_input.h" +#include "paimon/global_index/lucene/lucene_utils.h" + +namespace paimon::lucene { +LuceneDirectory::LuceneDirectory( + const std::string& path, + const std::map>& file_name_to_offset_and_length, + const std::shared_ptr& paimon_input) + : LuceneDirectory::LuceneDirectory(path, file_name_to_offset_and_length, paimon_input, + kDefaultReadBufferSize) {} + +LuceneDirectory::LuceneDirectory( + const std::string& path, + const std::map>& file_name_to_offset_and_length, + const std::shared_ptr& paimon_input, int32_t input_buffer_size) + : Lucene::Directory(), + input_buffer_size_(input_buffer_size), + path_(path), + file_name_to_offset_and_length_(file_name_to_offset_and_length), + paimon_input_(paimon_input) { + Lucene::Directory::setLockFactory(Lucene::NoLockFactory::getNoLockFactory()); +} + +Lucene::HashSet LuceneDirectory::listAll() { + ensureOpen(); + Lucene::HashSet result_file_list( + Lucene::HashSet::newInstance()); + for (const auto& [file_name, _] : file_name_to_offset_and_length_) { + result_file_list.add(LuceneUtils::StringToWstring(file_name)); + } + return result_file_list; +} + +bool LuceneDirectory::fileExists(const Lucene::String& name) { + ensureOpen(); + auto iter = file_name_to_offset_and_length_.find(LuceneUtils::WstringToString(name)); + return iter != file_name_to_offset_and_length_.end(); +} + +uint64_t LuceneDirectory::fileModified(const Lucene::String& name) { + throw Lucene::IOException(L"LuceneDirectory not support fileModified()"); +} + +void LuceneDirectory::touchFile(const Lucene::String& name) { + throw Lucene::IOException(L"LuceneDirectory not support touchFile()"); +} + +void LuceneDirectory::deleteFile(const Lucene::String& name) { + throw Lucene::IOException(L"LuceneDirectory not support deleteFile()"); +} + +int64_t LuceneDirectory::fileLength(const Lucene::String& name) { + ensureOpen(); + auto iter = file_name_to_offset_and_length_.find(LuceneUtils::WstringToString(name)); + if (iter == file_name_to_offset_and_length_.end()) { + throw Lucene::IOException(L"file not exist in fileLength"); + } + return iter->second.second; +} + +Lucene::IndexOutputPtr LuceneDirectory::createOutput(const Lucene::String& name) { + throw Lucene::IOException(L"LuceneDirectory not support createOutput()"); +} + +Lucene::IndexInputPtr LuceneDirectory::openInput(const Lucene::String& name) { + ensureOpen(); + auto file_iter = file_name_to_offset_and_length_.find(LuceneUtils::WstringToString(name)); + if (file_iter == file_name_to_offset_and_length_.end()) { + throw Lucene::IOException(L"file not exist in openInput"); + } + const auto& [offset, length] = file_iter->second; + auto offset_input_result = OffsetInputStream::Create(paimon_input_, length, offset); + if (!offset_input_result.ok()) { + throw Lucene::IOException( + LuceneUtils::StringToWstring(offset_input_result.status().ToString())); + } + std::shared_ptr offset_input = std::move(offset_input_result).value(); + return Lucene::newLucene(Lucene::newLucene(offset_input), + input_buffer_size_); +} + +void LuceneDirectory::close() { + Lucene::SyncLock sync_lock(this); + isOpen = false; +} + +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lucene/lucene_directory.h b/src/paimon/global_index/lucene/lucene_directory.h new file mode 100644 index 0000000..2bb259f --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_directory.h @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include "lucene++/LuceneHeaders.h" +#include "lucene++/NoLockFactory.h" +#include "paimon/fs/file_system.h" + +namespace paimon::lucene { +/// This class wraps a Paimon FileSystem instance to enable Lucene index reading capabilities, +/// but only supports read operations (e.g., openInput, fileLength). Write operations +/// (e.g., createOutput, delete) will throw an exception or be unsupported. +class LuceneDirectory : public Lucene::Directory { + public: + LuceneDirectory( + const std::string& path, + const std::map>& file_name_to_offset_and_length, + const std::shared_ptr& paimon_input); + + LuceneDirectory( + const std::string& path, + const std::map>& file_name_to_offset_and_length, + const std::shared_ptr& paimon_input, int32_t input_buffer_size); + + Lucene::HashSet listAll() override; + + bool fileExists(const Lucene::String& name) override; + + uint64_t fileModified(const Lucene::String& name) override; + + void touchFile(const Lucene::String& name) override; + + void deleteFile(const Lucene::String& name) override; + + int64_t fileLength(const Lucene::String& name) override; + + Lucene::IndexOutputPtr createOutput(const Lucene::String& name) override; + + Lucene::IndexInputPtr openInput(const Lucene::String& name) override; + + void close() override; + + private: + int32_t input_buffer_size_; + std::string path_; + /// @note All files are concatenated into a single physical file for the Paimon global index. + /// Use `file_name_to_offset_and_length_` and `paimon_input_` to obtain the actual + /// offset and length of each logical file within the merged file, which are used + /// to create Lucene index inputs for `Lucene::Directory`. + std::map> file_name_to_offset_and_length_; + std::shared_ptr paimon_input_; +}; +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lucene/lucene_directory_test.cpp b/src/paimon/global_index/lucene/lucene_directory_test.cpp new file mode 100644 index 0000000..ab15ed4 --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_directory_test.cpp @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "paimon/global_index/lucene/lucene_directory.h" + +#include "gtest/gtest.h" +#include "lucene++/FileUtils.h" +#include "lucene++/LuceneHeaders.h" +#include "lucene++/MiscUtils.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/global_index/lucene/lucene_utils.h" +#include "paimon/testing/utils/testharness.h" +namespace paimon::lucene::test { +class LuceneDirectoryTest : public ::testing::Test, public ::testing::WithParamInterface { + public: + void SetUp() override {} + void TearDown() override {} +}; + +TEST_P(LuceneDirectoryTest, TestSimple) { + int32_t read_buffer_size = GetParam(); + // write 3 files in a single concat file + std::vector data = {"helloworld", "abcdefg", "paimoncpp"}; + auto dir = paimon::test::UniqueTestDirectory::Create("local"); + auto lucene_directory = Lucene::FSDirectory::open(LuceneUtils::StringToWstring(dir->Str()), + Lucene::NoLockFactory::getNoLockFactory()); + std::string single_file_name = "lucene-file"; + Lucene::IndexOutputPtr output = + lucene_directory->createOutput(LuceneUtils::StringToWstring(single_file_name)); + std::map> file_name_to_offset_and_length; + int64_t offset = 0; + for (size_t i = 0; i < data.size(); i++) { + const auto& data_str = data[i]; + output->writeBytes(reinterpret_cast(data_str.data()), /*offset=*/0, + data_str.size()); + file_name_to_offset_and_length["file" + std::to_string(i)] = {offset, data_str.size()}; + offset += data_str.size(); + } + output->close(); + + // create paimon directory + auto fs = std::make_shared(); + ASSERT_OK_AND_ASSIGN(std::shared_ptr input, + fs->Open(PathUtil::JoinPath(dir->Str(), single_file_name))); + auto paimon_directory = std::make_shared( + dir->Str(), file_name_to_offset_and_length, input, read_buffer_size); + // test list all + auto list_file_names = paimon_directory->listAll(); + Lucene::HashSet expect_file_list( + Lucene::HashSet::newInstance()); + expect_file_list.add(L"file0"); + expect_file_list.add(L"file1"); + expect_file_list.add(L"file2"); + ASSERT_EQ(list_file_names, expect_file_list); + + // test non-exist file + ASSERT_FALSE(paimon_directory->fileExists(L"non-exist-file")); + ASSERT_THROW(paimon_directory->fileLength(L"non-exist-file"), Lucene::IOException); + + for (size_t i = 0; i < data.size(); i++) { + std::wstring file_name = LuceneUtils::StringToWstring("file" + std::to_string(i)); + // check file exist + ASSERT_TRUE(paimon_directory->fileExists(file_name)); + // check file length + ASSERT_EQ(paimon_directory->fileLength(file_name), data[i].size()); + // check read data + Lucene::IndexInputPtr data_input = paimon_directory->openInput(file_name); + ASSERT_TRUE(data_input); + std::string read_data(data[i].size(), '\0'); + data_input->readBytes(reinterpret_cast(read_data.data()), + /*offset=*/0, /*length=*/data[i].size(), + /*useBuffer=*/true); + ASSERT_EQ(read_data, data[i]); + // check seek + data_input->seek(1); + ASSERT_EQ(1l, data_input->getFilePointer()); + // check read after seek + read_data.resize(data[i].size() - 1, '\0'); + data_input->readBytes(reinterpret_cast(read_data.data()), + /*offset=*/0, /*length=*/data[i].size() - 1, + /*useBuffer=*/true); + ASSERT_EQ(read_data, data[i].substr(1)); + ASSERT_EQ(data[i].size(), data_input->length()); + data_input->close(); + } +} +INSTANTIATE_TEST_SUITE_P(ReadBufferSize, LuceneDirectoryTest, + ::testing::ValuesIn(std::vector({10, 100, 1024}))); + +} // namespace paimon::lucene::test diff --git a/src/paimon/global_index/lucene/lucene_filter.h b/src/paimon/global_index/lucene/lucene_filter.h new file mode 100644 index 0000000..c62719a --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_filter.h @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include "lucene++/LuceneHeaders.h" +#include "paimon/utils/roaring_bitmap64.h" + +namespace paimon::lucene { +class BitmapDocIdSetIterator : public Lucene::DocIdSetIterator { + public: + explicit BitmapDocIdSetIterator(const RoaringBitmap64* ids) + : Lucene::DocIdSetIterator(), ids_(ids), iter_(ids->Begin()) {} + + int32_t advance(int32_t target) override { + iter_.EqualOrLarger(static_cast(target)); + if (iter_ == ids_->End()) { + return Lucene::DocIdSetIterator::NO_MORE_DOCS; + } + return static_cast(*iter_); + } + + int32_t docID() override { + if (iter_ == ids_->End()) { + return Lucene::DocIdSetIterator::NO_MORE_DOCS; + } + return static_cast(*iter_); + } + + int32_t nextDoc() override { + if (iter_ == ids_->End()) { + return Lucene::DocIdSetIterator::NO_MORE_DOCS; + } + auto id = static_cast(*iter_); + ++iter_; + return id; + } + + private: + const RoaringBitmap64* ids_; + RoaringBitmap64::Iterator iter_; +}; + +class BitmapDocIdSet : public Lucene::DocIdSet { + public: + explicit BitmapDocIdSet(const RoaringBitmap64* ids) : DocIdSet(), ids_(ids) {} + + Lucene::DocIdSetIteratorPtr iterator() override { + return Lucene::newLucene(ids_); + } + + bool isCacheable() override { + return true; + } + + private: + const RoaringBitmap64* ids_; +}; + +class LuceneFilter : public Lucene::Filter { + public: + explicit LuceneFilter(const RoaringBitmap64* ids) : ids_(ids) {} + + Lucene::DocIdSetPtr getDocIdSet(const Lucene::IndexReaderPtr& reader) override { + return Lucene::newLucene(ids_); + } + + private: + const RoaringBitmap64* ids_; +}; + +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lucene/lucene_filter_test.cpp b/src/paimon/global_index/lucene/lucene_filter_test.cpp new file mode 100644 index 0000000..3168275 --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_filter_test.cpp @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "paimon/global_index/lucene/lucene_filter.h" + +#include "gtest/gtest.h" +#include "lucene++/LuceneHeaders.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::lucene::test { +TEST(LuceneFilterTest, TestSimple) { + RoaringBitmap64 roaring = RoaringBitmap64::From({1l, 3l, 5l, 100l}); + LuceneFilter filter(&roaring); + + auto doc_id_set = filter.getDocIdSet(/*reader=*/Lucene::IndexReaderPtr()); + ASSERT_TRUE(doc_id_set); + + auto doc_iter = doc_id_set->iterator(); + ASSERT_TRUE(doc_iter); + ASSERT_TRUE(doc_id_set->isCacheable()); + + ASSERT_EQ(1, doc_iter->nextDoc()); + + ASSERT_EQ(5, doc_iter->advance(4)); + ASSERT_EQ(5, doc_iter->docID()); + + ASSERT_EQ(100, doc_iter->advance(100)); + + ASSERT_EQ(Lucene::DocIdSetIterator::NO_MORE_DOCS, doc_iter->advance(1000)); + ASSERT_EQ(Lucene::DocIdSetIterator::NO_MORE_DOCS, doc_iter->docID()); + ASSERT_EQ(Lucene::DocIdSetIterator::NO_MORE_DOCS, doc_iter->nextDoc()); +} + +} // namespace paimon::lucene::test diff --git a/src/paimon/global_index/lucene/lucene_global_index.cpp b/src/paimon/global_index/lucene/lucene_global_index.cpp new file mode 100644 index 0000000..fa54d37 --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_global_index.cpp @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "paimon/global_index/lucene/lucene_global_index.h" + +#include + +#include "arrow/c/bridge.h" +#include "lucene++/FileUtils.h" +#include "paimon/common/utils/options_utils.h" +#include "paimon/global_index/lucene/lucene_defs.h" +#include "paimon/global_index/lucene/lucene_global_index_reader.h" +#include "paimon/global_index/lucene/lucene_global_index_writer.h" +#include "paimon/global_index/lucene/lucene_utils.h" +namespace paimon::lucene { +#define CHECK_NOT_NULL(pointer, error_msg) \ + do { \ + if (!(pointer)) { \ + return Status::Invalid(error_msg); \ + } \ + } while (0) + +LuceneGlobalIndex::LuceneGlobalIndex(const std::map& options) + : options_(OptionsUtils::FetchOptionsWithPrefix(kOptionKeyPrefix, options)) {} + +Result> LuceneGlobalIndex::CreateWriter( + const std::string& field_name, ::ArrowSchema* arrow_schema, + const std::shared_ptr& file_writer, + const std::shared_ptr& pool) const { + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr arrow_type, + arrow::ImportType(arrow_schema)); + // check data type + auto struct_type = std::dynamic_pointer_cast(arrow_type); + CHECK_NOT_NULL(struct_type, "arrow schema must be struct type when create LuceneIndexWriter"); + auto index_field = struct_type->GetFieldByName(field_name); + CHECK_NOT_NULL(index_field, + fmt::format("field {} not exist in arrow schema when create LuceneIndexWriter", + field_name)); + if (index_field->type()->id() != arrow::Type::type::STRING) { + return Status::Invalid("field type must be string"); + } + return LuceneGlobalIndexWriter::Create(field_name, arrow_type, file_writer, options_, pool); +} + +Result> LuceneGlobalIndex::CreateReader( + ::ArrowSchema* c_arrow_schema, const std::shared_ptr& file_reader, + const std::vector& files, const std::shared_ptr& pool) const { + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr arrow_schema, + arrow::ImportSchema(c_arrow_schema)); + if (files.size() != 1) { + return Status::Invalid("lucene index only has one index file per shard"); + } + const auto& io_meta = files[0]; + // check data type + if (arrow_schema->num_fields() != 1) { + return Status::Invalid("LuceneGlobalIndex now only support one field"); + } + auto index_field = arrow_schema->field(0); + if (index_field->type()->id() != arrow::Type::type::STRING) { + return Status::Invalid("field type must be string"); + } + return LuceneGlobalIndexReader::Create(index_field->name(), io_meta, file_reader, options_, + pool); +} + +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lucene/lucene_global_index.h b/src/paimon/global_index/lucene/lucene_global_index.h new file mode 100644 index 0000000..9f033ba --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_global_index.h @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "arrow/type.h" +#include "lucene++/LuceneHeaders.h" +#include "paimon/global_index/bitmap_global_index_result.h" +#include "paimon/global_index/global_indexer.h" +#include "paimon/global_index/lucene/lucene_defs.h" +#include "paimon/predicate/full_text_search.h" +namespace paimon::lucene { +class LuceneGlobalIndex : public GlobalIndexer { + public: + explicit LuceneGlobalIndex(const std::map& options); + + Result> CreateWriter( + const std::string& field_name, ::ArrowSchema* arrow_schema, + const std::shared_ptr& file_writer, + const std::shared_ptr& pool) const override; + + Result> CreateReader( + ::ArrowSchema* arrow_schema, const std::shared_ptr& file_reader, + const std::vector& files, + const std::shared_ptr& pool) const override; + + private: + std::map options_; +}; + +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lucene/lucene_global_index_factory.cpp b/src/paimon/global_index/lucene/lucene_global_index_factory.cpp new file mode 100644 index 0000000..ac72718 --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_global_index_factory.cpp @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/global_index/lucene/lucene_global_index_factory.h" + +#include +#include +#include +#include + +#include "paimon/global_index/lucene/lucene_global_index.h" +namespace paimon::lucene { + +const char LuceneGlobalIndexFactory::IDENTIFIER[] = "lucene-fts-global"; + +Result> LuceneGlobalIndexFactory::Create( + const std::map& options) const { + return std::make_unique(options); +} + +REGISTER_PAIMON_FACTORY(LuceneGlobalIndexFactory); + +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lucene/lucene_global_index_factory.h b/src/paimon/global_index/lucene/lucene_global_index_factory.h new file mode 100644 index 0000000..798254d --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_global_index_factory.h @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "paimon/global_index/global_indexer.h" +#include "paimon/global_index/global_indexer_factory.h" +namespace paimon::lucene { +/// Factory for creating lucene global indexers. +class LuceneGlobalIndexFactory : public GlobalIndexerFactory { + public: + static const char IDENTIFIER[]; + + const char* Identifier() const override { + return IDENTIFIER; + } + + Result> Create( + const std::map& options) const override; +}; + +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lucene/lucene_global_index_reader.cpp b/src/paimon/global_index/lucene/lucene_global_index_reader.cpp new file mode 100644 index 0000000..603f902 --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_global_index_reader.cpp @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "paimon/global_index/lucene/lucene_global_index_reader.h" + +#include "arrow/c/bridge.h" +#include "lucene++/FileUtils.h" +#include "paimon/common/utils/options_utils.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/common/utils/rapidjson_util.h" +#include "paimon/global_index/bitmap_global_index_result.h" +#include "paimon/global_index/bitmap_scored_global_index_result.h" +#include "paimon/global_index/lucene/jieba_analyzer.h" +#include "paimon/global_index/lucene/lucene_collector.h" +#include "paimon/global_index/lucene/lucene_defs.h" +#include "paimon/global_index/lucene/lucene_directory.h" +#include "paimon/global_index/lucene/lucene_filter.h" +#include "paimon/global_index/lucene/lucene_utils.h" +#include "paimon/io/data_input_stream.h" + +namespace paimon::lucene { +Result> LuceneGlobalIndexReader::Create( + const std::string& field_name, const GlobalIndexIOMeta& io_meta, + const std::shared_ptr& file_reader, + const std::map& options, const std::shared_ptr& pool) { + try { + auto meta_bytes = io_meta.metadata; + if (!meta_bytes) { + return Status::Invalid("Lucene global index must have meta data"); + } + std::map write_options; + PAIMON_RETURN_NOT_OK(RapidJsonUtil::FromJsonString( + std::string(meta_bytes->data(), meta_bytes->size()), &write_options)); + + std::map> file_name_to_offset_and_length; + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr paimon_input, + file_reader->GetInputStream(io_meta.file_path)); + DataInputStream data_input_stream(paimon_input); + PAIMON_ASSIGN_OR_RAISE(int32_t version, data_input_stream.ReadValue()); + if (version != kVersion) { + return Status::Invalid(fmt::format("LuceneGlobalIndex not support version {}"), + kVersion); + } + PAIMON_ASSIGN_OR_RAISE(int32_t num_files, data_input_stream.ReadValue()); + for (int32_t i = 0; i < num_files; i++) { + PAIMON_ASSIGN_OR_RAISE(int32_t file_name_len, data_input_stream.ReadValue()); + auto file_name_bytes = std::make_shared(file_name_len, pool.get()); + PAIMON_RETURN_NOT_OK(data_input_stream.ReadBytes(file_name_bytes.get())); + std::string file_name(file_name_bytes->data(), file_name_bytes->size()); + PAIMON_ASSIGN_OR_RAISE(int64_t file_len, data_input_stream.ReadValue()); + PAIMON_ASSIGN_OR_RAISE(int64_t pos, data_input_stream.GetPos()); + file_name_to_offset_and_length[file_name] = {pos, file_len}; + pos += file_len; + if (i != num_files - 1) { + PAIMON_RETURN_NOT_OK(data_input_stream.Seek(pos)); + } + } + PAIMON_ASSIGN_OR_RAISE( + int32_t read_buffer_size, + OptionsUtils::GetValueFromMap(options, kLuceneReadBufferSize, kDefaultReadBufferSize)); + Lucene::DirectoryPtr lucene_dir = Lucene::newLucene( + PathUtil::GetParentDirPath(io_meta.file_path), file_name_to_offset_and_length, + paimon_input, read_buffer_size); + + Lucene::IndexReaderPtr reader = Lucene::IndexReader::open(lucene_dir, /*read_only=*/true); + Lucene::IndexSearcherPtr searcher = Lucene::newLucene(reader); + + PAIMON_ASSIGN_OR_RAISE(std::string dictionary_dir, LuceneUtils::GetJiebaDictionaryDir()); + auto jieba = std::make_shared( + dictionary_dir + "/jieba.dict.utf8", dictionary_dir + "/hmm_model.utf8", + dictionary_dir + "/user.dict.utf8", dictionary_dir + "/idf.utf8", + dictionary_dir + "/stop_words.utf8"); + + // priority: read options > write options > kDefaultJiebaTokenizeMode + PAIMON_ASSIGN_OR_RAISE( + std::string tokenize_mode, + OptionsUtils::GetValueFromMap(options, kJiebaTokenizeMode, std::string(""))); + if (tokenize_mode.empty()) { + PAIMON_ASSIGN_OR_RAISE(tokenize_mode, OptionsUtils::GetValueFromMap( + write_options, kJiebaTokenizeMode, + std::string(kDefaultJiebaTokenizeMode))); + } + return std::shared_ptr(new LuceneGlobalIndexReader( + LuceneUtils::StringToWstring(field_name), searcher, tokenize_mode, jieba)); + } catch (const std::exception& e) { + return Status::Invalid( + fmt::format("create lucene global index reader failed, with {} error.", e.what())); + } catch (...) { + return Status::UnknownError( + "create lucene global index reader failed, with unknown error."); + } +} + +std::vector LuceneGlobalIndexReader::TokenizeQuery(const std::string& query) const { + std::vector terms; + JiebaTokenizer::CutWithMode(tokenize_mode_, jieba_.get(), query, &terms); + std::vector normalized_terms; + JiebaTokenizer::Normalize(jieba_->extractor.GetStopWords(), &terms, &normalized_terms); + std::vector wterms; + wterms.reserve(normalized_terms.size()); + for (const auto& term : normalized_terms) { + wterms.push_back(LuceneUtils::StringToWstring(term)); + } + return wterms; +} + +Lucene::QueryPtr LuceneGlobalIndexReader::ConstructMatchQuery( + const std::shared_ptr& full_text_search) const noexcept(false) { + assert(full_text_search->search_type == FullTextSearch::SearchType::MATCH_ALL || + full_text_search->search_type == FullTextSearch::SearchType::MATCH_ANY); + Lucene::BooleanClause::Occur occur = + full_text_search->search_type == FullTextSearch::SearchType::MATCH_ALL + ? Lucene::BooleanClause::Occur::MUST + : Lucene::BooleanClause::Occur::SHOULD; + std::vector query_terms = TokenizeQuery(full_text_search->query); + if (query_terms.size() == 1) { + return Lucene::newLucene( + Lucene::newLucene(wfield_name_, query_terms[0])); + } else { + auto typed_query = Lucene::newLucene(); + for (const auto& term : query_terms) { + typed_query->add(Lucene::newLucene( + Lucene::newLucene(wfield_name_, term)), + occur); + } + return typed_query; + } +} + +Lucene::QueryPtr LuceneGlobalIndexReader::ConstructPhraseQuery( + const std::shared_ptr& full_text_search) const noexcept(false) { + assert(full_text_search->search_type == FullTextSearch::SearchType::PHRASE); + std::vector query_terms = TokenizeQuery(full_text_search->query); + auto typed_query = Lucene::newLucene(); + for (const auto& term : query_terms) { + typed_query->add(Lucene::newLucene(wfield_name_, term)); + } + return typed_query; +} + +Lucene::QueryPtr LuceneGlobalIndexReader::ConstructPrefixQuery( + const std::shared_ptr& full_text_search) const noexcept(false) { + assert(full_text_search->search_type == FullTextSearch::SearchType::PREFIX); + return Lucene::newLucene(Lucene::newLucene( + wfield_name_, LuceneUtils::StringToWstring(full_text_search->query))); +} + +Lucene::QueryPtr LuceneGlobalIndexReader::ConstructWildCardQuery( + const std::shared_ptr& full_text_search) const noexcept(false) { + assert(full_text_search->search_type == FullTextSearch::SearchType::WILDCARD); + return Lucene::newLucene(Lucene::newLucene( + wfield_name_, LuceneUtils::StringToWstring(full_text_search->query))); +} + +Result> LuceneGlobalIndexReader::SearchWithLimit( + const Lucene::QueryPtr& query, const std::shared_ptr& full_text_search) const + noexcept(false) { + assert(full_text_search->limit); + Lucene::FilterPtr filter = + full_text_search->pre_filter + ? Lucene::newLucene(&(full_text_search->pre_filter.value())) + : Lucene::FilterPtr(); + + Lucene::TopDocsPtr results = searcher_->search(query, filter, full_text_search->limit.value()); + + // prepare BitmapScoredGlobalIndexResult + std::map id_to_score; + for (auto score_doc : results->scoreDocs) { + id_to_score[static_cast(score_doc->doc)] = static_cast(score_doc->score); + } + RoaringBitmap64 bitmap; + std::vector scores; + scores.reserve(id_to_score.size()); + for (const auto& [id, score] : id_to_score) { + bitmap.Add(id); + scores.push_back(score); + } + return std::make_shared(std::move(bitmap), std::move(scores)); +} + +std::shared_ptr LuceneGlobalIndexReader::SearchWithNoLimit( + const Lucene::QueryPtr& query, const std::shared_ptr& full_text_search) const + noexcept(false) { + assert(!full_text_search->limit); + Lucene::FilterPtr filter = + full_text_search->pre_filter + ? Lucene::newLucene(&(full_text_search->pre_filter.value())) + : Lucene::FilterPtr(); + + // with no limit & no score + auto collector = Lucene::newLucene(); + searcher_->search(query, filter, collector); + return std::make_shared( + [collector]() -> Result { return collector->GetBitmap(); }); +} + +Result> LuceneGlobalIndexReader::VisitFullTextSearch( + const std::shared_ptr& full_text_search) { + try { + Lucene::QueryPtr query; + switch (full_text_search->search_type) { + case FullTextSearch::SearchType::MATCH_ALL: + case FullTextSearch::SearchType::MATCH_ANY: { + query = ConstructMatchQuery(full_text_search); + break; + } + case FullTextSearch::SearchType::PHRASE: { + query = ConstructPhraseQuery(full_text_search); + break; + } + case FullTextSearch::SearchType::PREFIX: { + query = ConstructPrefixQuery(full_text_search); + break; + } + case FullTextSearch::SearchType::WILDCARD: { + query = ConstructWildCardQuery(full_text_search); + break; + } + default: + return Status::Invalid( + fmt::format("Not support for FullTextSearch SearchType {}", + static_cast(full_text_search->search_type))); + } + if (full_text_search->limit) { + return SearchWithLimit(query, full_text_search); + } else { + return SearchWithNoLimit(query, full_text_search); + } + } catch (const std::exception& e) { + return Status::Invalid( + fmt::format("visit full text search failed, with {} error.", e.what())); + } catch (...) { + return Status::UnknownError("visit full text search failed, with unknown error."); + } +} + +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lucene/lucene_global_index_reader.h b/src/paimon/global_index/lucene/lucene_global_index_reader.h new file mode 100644 index 0000000..b86c332 --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_global_index_reader.h @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include + +#include "arrow/type.h" +#include "cppjieba/Jieba.hpp" +#include "lucene++/LuceneHeaders.h" +#include "paimon/global_index/bitmap_global_index_result.h" +#include "paimon/global_index/global_index_io_meta.h" +#include "paimon/global_index/global_index_reader.h" +#include "paimon/global_index/io/global_index_file_reader.h" +#include "paimon/global_index/lucene/lucene_defs.h" +#include "paimon/predicate/full_text_search.h" + +namespace paimon::lucene { +class LuceneGlobalIndexReader : public GlobalIndexReader { + public: + static Result> Create( + const std::string& field_name, const GlobalIndexIOMeta& io_meta, + const std::shared_ptr& file_reader, + const std::map& options, const std::shared_ptr& pool); + + Result> VisitIsNotNull() override { + return CreateAllResult(); + } + + Result> VisitIsNull() override { + return CreateAllResult(); + } + + Result> VisitEqual(const Literal& literal) override { + return CreateAllResult(); + } + + Result> VisitNotEqual(const Literal& literal) override { + return CreateAllResult(); + } + + Result> VisitLessThan(const Literal& literal) override { + return CreateAllResult(); + } + + Result> VisitLessOrEqual(const Literal& literal) override { + return CreateAllResult(); + } + + Result> VisitGreaterThan(const Literal& literal) override { + return CreateAllResult(); + } + + Result> VisitGreaterOrEqual( + const Literal& literal) override { + return CreateAllResult(); + } + + Result> VisitIn( + const std::vector& literals) override { + return CreateAllResult(); + } + + Result> VisitNotIn( + const std::vector& literals) override { + return CreateAllResult(); + } + + Result> VisitStartsWith(const Literal& prefix) override { + return CreateAllResult(); + } + + Result> VisitEndsWith(const Literal& suffix) override { + return CreateAllResult(); + } + + Result> VisitContains(const Literal& literal) override { + return CreateAllResult(); + } + + Result> VisitLike(const Literal& literal) override { + return CreateAllResult(); + } + + Result> VisitVectorSearch( + const std::shared_ptr& vector_search) override { + return Status::Invalid( + "LuceneGlobalIndexReader is not supposed to handle vector search query"); + } + + Result> VisitFullTextSearch( + const std::shared_ptr& full_text_search) override; + + bool IsThreadSafe() const override { + return false; + } + + std::string GetIndexType() const override { + return kIdentifier; + } + + private: + LuceneGlobalIndexReader(const std::wstring& wfield_name, + const Lucene::IndexSearcherPtr& searcher, + const std::string& tokenize_mode, + const std::shared_ptr& jieba) + : wfield_name_(wfield_name), + searcher_(searcher), + tokenize_mode_(tokenize_mode), + jieba_(jieba) {} + + std::vector TokenizeQuery(const std::string& query) const; + + std::shared_ptr CreateAllResult() const { + return nullptr; + } + + Lucene::QueryPtr ConstructMatchQuery( + const std::shared_ptr& full_text_search) const noexcept(false); + + Lucene::QueryPtr ConstructPhraseQuery( + const std::shared_ptr& full_text_search) const noexcept(false); + + Lucene::QueryPtr ConstructPrefixQuery( + const std::shared_ptr& full_text_search) const noexcept(false); + + Lucene::QueryPtr ConstructWildCardQuery( + const std::shared_ptr& full_text_search) const noexcept(false); + + Result> SearchWithLimit( + const Lucene::QueryPtr& query, + const std::shared_ptr& full_text_search) const noexcept(false); + + std::shared_ptr SearchWithNoLimit( + const Lucene::QueryPtr& query, + const std::shared_ptr& full_text_search) const noexcept(false); + + private: + std::wstring wfield_name_; + Lucene::IndexSearcherPtr searcher_; + std::string tokenize_mode_; + std::shared_ptr jieba_; +}; +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lucene/lucene_global_index_test.cpp b/src/paimon/global_index/lucene/lucene_global_index_test.cpp new file mode 100644 index 0000000..a65d630 --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_global_index_test.cpp @@ -0,0 +1,452 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "paimon/global_index/lucene/lucene_global_index.h" + +#include "arrow/c/bridge.h" +#include "arrow/ipc/api.h" +#include "gtest/gtest.h" +#include "paimon/common/utils/arrow/status_utils.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/common/utils/string_utils.h" +#include "paimon/core/global_index/global_index_file_manager.h" +#include "paimon/core/index/index_path_factory.h" +#include "paimon/fs/local/local_file_system.h" +#include "paimon/global_index/bitmap_scored_global_index_result.h" +#include "paimon/global_index/lucene/lucene_global_index_reader.h" +#include "paimon/global_index/lucene/lucene_global_index_writer.h" +#include "paimon/testing/utils/testharness.h" + +namespace paimon::lucene::test { +class LuceneGlobalIndexTest : public ::testing::Test, + public ::testing::WithParamInterface { + public: + void SetUp() override {} + void TearDown() override {} + + class FakeIndexPathFactory : public IndexPathFactory { + public: + explicit FakeIndexPathFactory(const std::string& index_path) : index_path_(index_path) {} + std::string NewPath() const override { + assert(false); + return ""; + } + std::string ToPath(const std::shared_ptr& file) const override { + assert(false); + return ""; + } + std::string ToPath(const std::string& file_name) const override { + return PathUtil::JoinPath(index_path_, file_name); + } + bool IsExternalPath() const override { + return false; + } + + private: + std::string index_path_; + }; + + std::unique_ptr<::ArrowSchema> CreateArrowSchema( + const std::shared_ptr& data_type) const { + auto c_schema = std::make_unique<::ArrowSchema>(); + EXPECT_TRUE(arrow::ExportType(*data_type, c_schema.get()).ok()); + return c_schema; + } + + Result WriteGlobalIndex(const std::string& index_root, + const std::shared_ptr& data_type, + const std::map& options, + const std::shared_ptr& array, + const Range& expected_range, + const std::string& tmp_dir) const { + auto global_index = std::make_shared(options); + auto path_factory = std::make_shared(index_root); + auto file_writer = std::make_shared(fs_, path_factory); + + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr global_writer, + global_index->CreateWriter("f0", CreateArrowSchema(data_type).get(), + file_writer, pool_)); + + ArrowArray c_array; + PAIMON_RETURN_NOT_OK_FROM_ARROW(arrow::ExportArray(*array, &c_array)); + std::vector row_ids(array->length(), 0); + std::iota(row_ids.begin(), row_ids.end(), 0); + PAIMON_RETURN_NOT_OK(global_writer->AddBatch(&c_array, std::move(row_ids))); + PAIMON_ASSIGN_OR_RAISE(auto result_metas, global_writer->Finish()); + + // check tmp dir + std::vector> file_status_list; + EXPECT_OK(fs_->ListDir(tmp_dir, &file_status_list)); + EXPECT_EQ(file_status_list.size(), 1); + + // check meta + EXPECT_EQ(result_metas.size(), 1); + auto file_name = PathUtil::GetName(result_metas[0].file_path); + EXPECT_TRUE(StringUtils::StartsWith(file_name, "lucene-fts-global-index-")); + EXPECT_TRUE(StringUtils::EndsWith(file_name, ".index")); + EXPECT_TRUE(result_metas[0].metadata); + + // after reset writer, rm tmp files + global_writer.reset(); + file_status_list.clear(); + EXPECT_OK(fs_->ListDir(tmp_dir, &file_status_list)); + EXPECT_TRUE(file_status_list.empty()); + + return result_metas[0]; + } + + Result> CreateGlobalIndexReader( + const std::string& index_root, const std::shared_ptr& data_type, + const std::map& options, const GlobalIndexIOMeta& meta) const { + auto global_index = std::make_shared(options); + auto path_factory = std::make_shared(index_root); + auto file_reader = std::make_shared(fs_, path_factory); + return global_index->CreateReader(CreateArrowSchema(data_type).get(), file_reader, {meta}, + pool_); + } + + void CheckResult(const std::shared_ptr& result, + const std::vector& expected_ids) const { + const RoaringBitmap64* bitmap = nullptr; + if (auto scored_result = std::dynamic_pointer_cast(result)) { + ASSERT_OK_AND_ASSIGN(bitmap, scored_result->GetBitmap()); + ASSERT_EQ(scored_result->GetScores().size(), expected_ids.size()); + } else if (auto bitmap_result = + std::dynamic_pointer_cast(result)) { + ASSERT_OK_AND_ASSIGN(bitmap, bitmap_result->GetBitmap()); + } + ASSERT_TRUE(bitmap); + ASSERT_EQ(*bitmap, RoaringBitmap64::From(expected_ids)) + << "result=" << bitmap->ToString() + << ", expected=" << RoaringBitmap64::From(expected_ids).ToString(); + } + + private: + std::shared_ptr pool_ = GetDefaultPool(); + std::shared_ptr fs_ = std::make_shared(); + std::shared_ptr data_type_ = + arrow::struct_({arrow::field("f0", arrow::utf8())}); +}; + +TEST_P(LuceneGlobalIndexTest, TestSimple) { + int32_t read_buffer_size = GetParam(); + auto test_root_dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(test_root_dir); + auto tmp_dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(tmp_dir); + std::string test_root = test_root_dir->Str(); + + std::map options = { + {"lucene-fts.write.omit-term-freq-and-position", "false"}, + {"lucene-fts.read.buffer-size", std::to_string(read_buffer_size)}, + {"lucene-fts.write.tmp.directory", tmp_dir->Str()}}; + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(data_type_, + R"([ + ["This is an test document."], + ["This is an new document document document."], + ["Document document document document test."], + ["unordered user-defined doc id"] + ])") + .ValueOrDie(); + + // write index + ASSERT_OK_AND_ASSIGN(auto meta, WriteGlobalIndex(test_root, data_type_, options, array, + Range(0, 3), tmp_dir->Str())); + if (read_buffer_size == 10) { + ASSERT_EQ( + std::string(meta.metadata->data(), meta.metadata->size()), + R"({"read.buffer-size":"10","write.omit-term-freq-and-position":"false","write.tmp.directory":")" + + tmp_dir->Str() + R"("})"); + } + + // create reader + ASSERT_OK_AND_ASSIGN(auto reader, + CreateGlobalIndexReader(test_root, data_type_, options, meta)); + auto lucene_reader = std::dynamic_pointer_cast(reader); + ASSERT_TRUE(lucene_reader); + + // test visit + { + ASSERT_OK_AND_ASSIGN(auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "document", FullTextSearch::SearchType::MATCH_ALL, + /*pre_filter=*/std::nullopt))); + CheckResult(result, {2l, 1l, 0l}); + } + { + ASSERT_OK_AND_ASSIGN(auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/1, "document", FullTextSearch::SearchType::MATCH_ANY, + /*pre_filter=*/std::nullopt))); + CheckResult(result, {2l}); + } + { + ASSERT_OK_AND_ASSIGN( + auto result, lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "test document", FullTextSearch::SearchType::MATCH_ALL, + /*pre_filter=*/std::nullopt))); + CheckResult(result, {2l, 0l}); + } + { + ASSERT_OK_AND_ASSIGN(auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "test new", FullTextSearch::SearchType::MATCH_ANY, + /*pre_filter=*/std::nullopt))); + CheckResult(result, {1l, 0l, 2l}); + } + { + ASSERT_OK_AND_ASSIGN(auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "test document", FullTextSearch::SearchType::PHRASE, + /*pre_filter=*/std::nullopt))); + CheckResult(result, {0l}); + } + { + ASSERT_OK_AND_ASSIGN(auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "unordered", FullTextSearch::SearchType::MATCH_ALL, + /*pre_filter=*/std::nullopt))); + CheckResult(result, {3l}); + } + { + ASSERT_OK_AND_ASSIGN(auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "unorder", FullTextSearch::SearchType::PREFIX, + /*pre_filter=*/std::nullopt))); + CheckResult(result, {3l}); + } + // test wildcard query + { + ASSERT_OK_AND_ASSIGN(auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "*order*", FullTextSearch::SearchType::WILDCARD, + /*pre_filter=*/std::nullopt))); + CheckResult(result, {3l}); + } + { + ASSERT_OK_AND_ASSIGN(auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "*or*er*", FullTextSearch::SearchType::WILDCARD, + /*pre_filter=*/std::nullopt))); + CheckResult(result, {3l}); + } + // test filter + { + ASSERT_OK_AND_ASSIGN(auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "document", FullTextSearch::SearchType::MATCH_ALL, + /*pre_filter=*/RoaringBitmap64::From({0l, 1l})))); + CheckResult(result, {0l, 1l}); + } + { + ASSERT_OK_AND_ASSIGN(auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "document", FullTextSearch::SearchType::MATCH_ALL, + /*pre_filter=*/RoaringBitmap64::From({2l, 100l})))); + CheckResult(result, {2l}); + } + { + ASSERT_OK_AND_ASSIGN(auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "document", FullTextSearch::SearchType::MATCH_ALL, + /*pre_filter=*/RoaringBitmap64::From({20l, 100l})))); + CheckResult(result, {}); + } + // test no limit + { + ASSERT_OK_AND_ASSIGN( + auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/std::nullopt, "document", FullTextSearch::SearchType::MATCH_ALL, + /*pre_filter=*/std::nullopt))); + CheckResult(result, {0l, 1l, 2l}); + } + { + ASSERT_OK_AND_ASSIGN( + auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/std::nullopt, "document", FullTextSearch::SearchType::MATCH_ALL, + /*pre_filter=*/RoaringBitmap64::From({2l})))); + CheckResult(result, {2l}); + } + { + ASSERT_OK_AND_ASSIGN( + auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/std::nullopt, "document test", FullTextSearch::SearchType::MATCH_ALL, + /*pre_filter=*/RoaringBitmap64::From({1l, 2l, 3l, 100l})))); + CheckResult(result, {2l}); + } +} + +TEST_P(LuceneGlobalIndexTest, TestSimpleChinese) { + int32_t read_buffer_size = GetParam(); + + auto test_root_dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(test_root_dir); + auto tmp_dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(tmp_dir); + std::string test_root = test_root_dir->Str(); + + std::map options = { + {"lucene-fts.write.omit-term-freq-and-position", "false"}, + {"lucene-fts.read.buffer-size", std::to_string(read_buffer_size)}, + {"lucene-fts.jieba.tokenize-mode", "query"}, + {"lucene-fts.write.tmp.directory", tmp_dir->Str()}}; + + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(data_type_, + R"([ +["QianWen 是一个基于 AI 的智能助手,类似于 Siri 和 Alexa。我们正在用 Python 开发 QianWen 的 Natural Language Understanding 模块,该模块支持多轮对话和意图识别功能,是新一代智能助手的核心技术之一。"], +["最近开源了一个新项目叫qianwen(全角字符),功能类似之前的 Qianwen,是一个面向 AI 应用的智能助手。它不仅支持 Machine Learning 和 NLP 技术,还提供了可扩展的开发框架,便于开发者构建自己的智能助手系统。"], +["我们在测试 qianwen-core v1.2 和 ai-engine-alpha 中的 bug,重点优化了 qianwen 的响应速度和稳定性。本次更新增强了核心模块的功能,提升了智能助手的开发效率,并修复了与 NLP 模块相关的多个问题。"], +["AI 助手开发中常用的技术包括 Speech Recognition、Natural Language Processing 和 Recommendation System。我们使用 TensorFlow 和 PyTorch 构建模型,开发了多个智能助手原型,支持语音交互和上下文理解功能,是当前热门的人工智能发展应用方向。"], +["新一代的 AI 助手代号为「千问」,内部命名为 QianwenX-2024,计划在 next quarter 发布。QianwenX 将集成更强的 multimodel 能力,支持图像和文本联合处理,进一步提升智能助手的理解能力和交互体验,是未来智能助手的重要发展方向。"] + ])") + .ValueOrDie(); + + // write index + ASSERT_OK_AND_ASSIGN(auto meta, WriteGlobalIndex(test_root, data_type_, options, array, + Range(0, 4), tmp_dir->Str())); + if (read_buffer_size == 10) { + ASSERT_EQ( + std::string(meta.metadata->data(), meta.metadata->size()), + R"({"jieba.tokenize-mode":"query","read.buffer-size":"10","write.omit-term-freq-and-position":"false","write.tmp.directory":")" + + tmp_dir->Str() + R"("})"); + } + + // create reader + ASSERT_OK_AND_ASSIGN(auto reader, + CreateGlobalIndexReader(test_root, data_type_, options, meta)); + auto lucene_reader = std::dynamic_pointer_cast(reader); + ASSERT_TRUE(lucene_reader); + + // test visit + { + ASSERT_OK_AND_ASSIGN(auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "模块", FullTextSearch::SearchType::MATCH_ALL, + /*pre_filter=*/std::nullopt))); + CheckResult(result, {0l, 2l}); + } + { + ASSERT_OK_AND_ASSIGN(auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/1, "模块", FullTextSearch::SearchType::MATCH_ANY, + /*pre_filter=*/std::nullopt))); + CheckResult(result, {0l}); + } + { + ASSERT_OK_AND_ASSIGN(auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "模块技术", FullTextSearch::SearchType::MATCH_ALL, + /*pre_filter=*/std::nullopt))); + CheckResult(result, {0l}); + } + { + ASSERT_OK_AND_ASSIGN(auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "模块技术", FullTextSearch::SearchType::MATCH_ANY, + /*pre_filter=*/std::nullopt))); + CheckResult(result, {0l, 1l, 2l, 3l}); + } + { + ASSERT_OK_AND_ASSIGN(auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "发展方向", FullTextSearch::SearchType::PHRASE, + /*pre_filter=*/std::nullopt))); + CheckResult(result, {4l}); + } + { + ASSERT_OK_AND_ASSIGN(auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "发", FullTextSearch::SearchType::PREFIX, + /*pre_filter=*/std::nullopt))); + CheckResult(result, {3l, 4l}); + } + // test wildcard query + { + ASSERT_OK_AND_ASSIGN(auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "*发*", FullTextSearch::SearchType::WILDCARD, + /*pre_filter=*/std::nullopt))); + CheckResult(result, {0l, 1l, 2l, 3l, 4l}); + } + // test filter + { + ASSERT_OK_AND_ASSIGN(auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/10, "模块技术", FullTextSearch::SearchType::MATCH_ANY, + /*pre_filter=*/RoaringBitmap64::From({1l, 3l, 4l})))); + + CheckResult(result, {1l, 3l}); + } + // test no limit + { + ASSERT_OK_AND_ASSIGN( + auto result, + lucene_reader->VisitFullTextSearch(std::make_shared( + "f0", + /*limit=*/std::nullopt, "模块技术", FullTextSearch::SearchType::MATCH_ANY, + /*pre_filter=*/std::nullopt))); + CheckResult(result, {0l, 1l, 2l, 3l}); + } +} + +TEST_F(LuceneGlobalIndexTest, TestInvalidWithoutTmpDir) { + auto test_root_dir = paimon::test::UniqueTestDirectory::Create(); + ASSERT_TRUE(test_root_dir); + std::string test_root = test_root_dir->Str(); + + std::map options = { + {"lucene-fts.write.omit-term-freq-and-position", "false"}}; + std::shared_ptr array = arrow::ipc::internal::json::ArrayFromJSON(data_type_, + R"([ + ["This is an test document."] + ])") + .ValueOrDie(); + + // write index + ASSERT_NOK_WITH_MSG(WriteGlobalIndex(test_root, data_type_, options, array, Range(0, 0), ""), + "key write.tmp.directory does not exist in map"); +} +INSTANTIATE_TEST_SUITE_P(ReadBufferSize, LuceneGlobalIndexTest, + ::testing::ValuesIn(std::vector({10, 100, 1024}))); + +} // namespace paimon::lucene::test diff --git a/src/paimon/global_index/lucene/lucene_global_index_writer.cpp b/src/paimon/global_index/lucene/lucene_global_index_writer.cpp new file mode 100644 index 0000000..2cd5291 --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_global_index_writer.cpp @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "paimon/global_index/lucene/lucene_global_index_writer.h" + +#include + +#include "arrow/c/bridge.h" +#include "arrow/c/helpers.h" +#include "lucene++/FileUtils.h" +#include "lucene++/NoLockFactory.h" +#include "paimon/common/global_index/global_index_utils.h" +#include "paimon/common/io/data_output_stream.h" +#include "paimon/common/utils/options_utils.h" +#include "paimon/common/utils/path_util.h" +#include "paimon/common/utils/rapidjson_util.h" +#include "paimon/common/utils/uuid.h" +#include "paimon/global_index/lucene/jieba_analyzer.h" +#include "paimon/global_index/lucene/lucene_defs.h" +#include "paimon/global_index/lucene/lucene_utils.h" +namespace paimon::lucene { +#define CHECK_NOT_NULL(pointer, error_msg) \ + do { \ + if (!(pointer)) { \ + return Status::Invalid(error_msg); \ + } \ + } while (0) + +LuceneGlobalIndexWriter::LuceneWriteContext::LuceneWriteContext( + const std::string& _tmp_index_path, const Lucene::FSDirectoryPtr& _lucene_dir, + const Lucene::IndexWriterPtr& _index_writer, const Lucene::DocumentPtr& _doc, + const Lucene::FieldPtr& _field) + : tmp_index_path(_tmp_index_path), + lucene_dir(_lucene_dir), + index_writer(_index_writer), + doc(_doc), + field(_field) {} + +Result> LuceneGlobalIndexWriter::Create( + const std::string& field_name, const std::shared_ptr& arrow_type, + const std::shared_ptr& file_writer, + const std::map& options, const std::shared_ptr& pool) { + try { + std::string uuid; + if (!UUID::Generate(&uuid)) { + return Status::Invalid("generate uuid for lucene tmp path failed."); + } + // get local tmp path + PAIMON_ASSIGN_OR_RAISE(std::string tmp_dir, OptionsUtils::GetValueFromMap( + options, std::string(kLuceneWriteTmpDir))); + std::string tmp_path = PathUtil::JoinPath(tmp_dir, "paimon-lucene-" + uuid); + + auto lucene_dir = Lucene::FSDirectory::open(LuceneUtils::StringToWstring(tmp_path), + Lucene::NoLockFactory::getNoLockFactory()); + // TODO(xinyu.lxy): support other tokenizer + // open lucene index writer + PAIMON_ASSIGN_OR_RAISE( + bool omit_term_freq_and_positions, + OptionsUtils::GetValueFromMap(options, kLuceneWriteOmitTermFreqAndPositions, false)); + PAIMON_ASSIGN_OR_RAISE( + std::string tokenize_mode, + OptionsUtils::GetValueFromMap(options, kJiebaTokenizeMode, + std::string(kDefaultJiebaTokenizeMode))); + PAIMON_ASSIGN_OR_RAISE(std::string dictionary_dir, LuceneUtils::GetJiebaDictionaryDir()); + auto jieba = std::make_shared( + dictionary_dir + "/jieba.dict.utf8", dictionary_dir + "/hmm_model.utf8", + dictionary_dir + "/user.dict.utf8", dictionary_dir + "/idf.utf8", + dictionary_dir + "/stop_words.utf8"); + JiebaTokenizerContext jieba_context(tokenize_mode, + /*with_position=*/!omit_term_freq_and_positions, jieba, + pool); + auto analyzer = Lucene::newLucene(jieba_context); + Lucene::IndexWriterPtr writer = Lucene::newLucene( + lucene_dir, analyzer, + /*create=*/true, Lucene::IndexWriter::MaxFieldLengthLIMITED); + + // prepare field and document + Lucene::DocumentPtr doc = Lucene::newLucene(); + auto field = Lucene::newLucene(LuceneUtils::StringToWstring(field_name), + kEmptyWstring, Lucene::Field::STORE_NO, + Lucene::Field::INDEX_ANALYZED_NO_NORMS); + field->setOmitTermFreqAndPositions(omit_term_freq_and_positions); + doc->add(field); + return std::shared_ptr(new LuceneGlobalIndexWriter( + field_name, arrow_type, LuceneWriteContext(tmp_path, lucene_dir, writer, doc, field), + file_writer, options, pool)); + } catch (const std::exception& e) { + return Status::Invalid( + fmt::format("create lucene global index writer failed, with {} error.", e.what())); + } catch (...) { + return Status::UnknownError( + "create lucene global index writer failed, with unknown error."); + } +} + +LuceneGlobalIndexWriter::LuceneGlobalIndexWriter( + const std::string& field_name, const std::shared_ptr& arrow_type, + LuceneWriteContext&& write_context, const std::shared_ptr& file_writer, + const std::map& options, const std::shared_ptr& pool) + : pool_(pool), + field_name_(field_name), + arrow_type_(arrow_type), + write_context_(std::move(write_context)), + file_writer_(file_writer), + options_(options) {} + +LuceneGlobalIndexWriter::~LuceneGlobalIndexWriter() { + try { + [[maybe_unused]] bool ec = Lucene::FileUtils::removeDirectory( + LuceneUtils::StringToWstring(write_context_.tmp_index_path)); + } catch (...) { + // do nothing + } +} + +Status LuceneGlobalIndexWriter::AddBatch(::ArrowArray* arrow_array, + std::vector&& relative_row_ids) { + PAIMON_RETURN_NOT_OK( + GlobalIndexUtils::CheckRelativeRowIds(arrow_array, relative_row_ids, row_id_)); + PAIMON_ASSIGN_OR_RAISE_FROM_ARROW(std::shared_ptr array, + arrow::ImportArray(arrow_array, arrow_type_)); + auto struct_array = std::dynamic_pointer_cast(array); + CHECK_NOT_NULL(struct_array, "invalid input array in LuceneIndexWriter, must be struct array"); + auto field_array = struct_array->GetFieldByName(field_name_); + CHECK_NOT_NULL( + field_array, + fmt::format("invalid input array in LuceneIndexWriter, field {} not in input array", + field_name_)); + auto string_array = std::dynamic_pointer_cast(field_array); + CHECK_NOT_NULL( + string_array, + fmt::format( + "invalid input array in LuceneIndexWriter, field array {} is not a string array", + field_name_)); + try { + for (int64_t i = 0; i < string_array->length(); i++) { + if (string_array->IsNull(i)) { + write_context_.field->setValue(kEmptyWstring); + } else { + auto view = string_array->Value(i); + write_context_.field->setValue(LuceneUtils::StringToWstring(view)); + } + row_id_++; + write_context_.index_writer->addDocument(write_context_.doc); + } + } catch (const std::exception& e) { + return Status::Invalid(fmt::format( + "add batch for lucene global index writer failed, with {} error.", e.what())); + } catch (...) { + return Status::UnknownError( + "add batch for lucene global index writer failed, with unknown error."); + } + return Status::OK(); +} + +Result LuceneGlobalIndexWriter::FlushIndexToFinal() { + try { + // flush index to tmp dir + if (write_context_.index_writer->numDocs() != row_id_) { + return Status::Invalid( + fmt::format("lucene writer row count {} mismatch paimon inner row count {}", + write_context_.index_writer->numDocs(), row_id_)); + } + write_context_.index_writer->optimize(); + write_context_.index_writer->close(); + + // list tmp dir + auto tmp_file_names = write_context_.lucene_dir->listAll(); + PAIMON_ASSIGN_OR_RAISE(std::string index_file_name, file_writer_->NewFileName(kIdentifier)); + // prepare output from file_writer + PAIMON_ASSIGN_OR_RAISE(std::shared_ptr out, + file_writer_->NewOutputStream(index_file_name)); + DataOutputStream data_output_stream(out); + PAIMON_RETURN_NOT_OK(data_output_stream.WriteValue(kVersion)); + PAIMON_RETURN_NOT_OK( + data_output_stream.WriteValue(static_cast(tmp_file_names.size()))); + // read all data from index files and write to target output + auto buffer = std::make_shared(kDefaultReadBufferSize, pool_.get()); + for (const auto& wfile_name : tmp_file_names) { + auto file_name = LuceneUtils::WstringToString(wfile_name); + PAIMON_RETURN_NOT_OK( + data_output_stream.WriteValue(static_cast(file_name.size()))); + PAIMON_RETURN_NOT_OK( + data_output_stream.WriteBytes(std::make_shared(file_name, pool_.get()))); + int64_t file_length = write_context_.lucene_dir->fileLength(wfile_name); + PAIMON_RETURN_NOT_OK(data_output_stream.WriteValue(file_length)); + + Lucene::IndexInputPtr input = write_context_.lucene_dir->openInput(wfile_name); + int64_t total_write_size = 0; + while (total_write_size < file_length) { + int64_t current_write_size = std::min(file_length - total_write_size, + static_cast(kDefaultReadBufferSize)); + input->readBytes(reinterpret_cast(buffer->data()), /*offset=*/0, + static_cast(current_write_size)); + PAIMON_ASSIGN_OR_RAISE( + int32_t actual_write_size, + out->Write(buffer->data(), static_cast(current_write_size))); + if (static_cast(actual_write_size) != current_write_size) { + return Status::Invalid( + fmt::format("invalid write, try to write {} while actual write {}", + current_write_size, actual_write_size)); + } + total_write_size += current_write_size; + } + input->close(); + } + PAIMON_RETURN_NOT_OK(out->Flush()); + PAIMON_RETURN_NOT_OK(out->Close()); + write_context_.lucene_dir->close(); + return index_file_name; + } catch (const std::exception& e) { + return Status::Invalid( + fmt::format("finish for lucene global index writer failed, with {} error.", e.what())); + } catch (...) { + return Status::UnknownError( + "finish for lucene global index writer failed, with unknown error."); + } +} + +Result> LuceneGlobalIndexWriter::Finish() { + PAIMON_ASSIGN_OR_RAISE(std::string index_file_name, FlushIndexToFinal()); + // prepare global index meta + PAIMON_ASSIGN_OR_RAISE(int64_t file_size, file_writer_->GetFileSize(index_file_name)); + std::string options_json; + PAIMON_RETURN_NOT_OK(RapidJsonUtil::ToJsonString(options_, &options_json)); + auto meta_bytes = std::make_shared(options_json, pool_.get()); + GlobalIndexIOMeta meta(file_writer_->ToPath(index_file_name), file_size, + /*metadata=*/meta_bytes); + return std::vector({meta}); +} + +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lucene/lucene_global_index_writer.h b/src/paimon/global_index/lucene/lucene_global_index_writer.h new file mode 100644 index 0000000..3806185 --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_global_index_writer.h @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include "arrow/type.h" +#include "cppjieba/Jieba.hpp" +#include "lucene++/LuceneHeaders.h" +#include "paimon/global_index/global_index_writer.h" +#include "paimon/global_index/io/global_index_file_writer.h" +#include "paimon/global_index/lucene/lucene_defs.h" + +namespace paimon::lucene { +class LuceneGlobalIndexWriter : public GlobalIndexWriter { + public: + struct LuceneWriteContext { + LuceneWriteContext(const std::string& _tmp_index_path, + const Lucene::FSDirectoryPtr& _lucene_dir, + const Lucene::IndexWriterPtr& _index_writer, + const Lucene::DocumentPtr& _doc, const Lucene::FieldPtr& _field); + + LuceneWriteContext(LuceneWriteContext&&) = default; + LuceneWriteContext& operator=(LuceneWriteContext&&) = default; + + std::string tmp_index_path; + Lucene::FSDirectoryPtr lucene_dir; + Lucene::IndexWriterPtr index_writer; + Lucene::DocumentPtr doc; + Lucene::FieldPtr field; + }; + + static Result> Create( + const std::string& field_name, const std::shared_ptr& arrow_type, + const std::shared_ptr& file_writer, + const std::map& options, const std::shared_ptr& pool); + + ~LuceneGlobalIndexWriter() override; + + Status AddBatch(::ArrowArray* c_arrow_array, std::vector&& relative_row_ids) override; + + Result> Finish() override; + + private: + LuceneGlobalIndexWriter(const std::string& field_name, + const std::shared_ptr& arrow_type, + LuceneWriteContext&& write_context, + const std::shared_ptr& file_writer, + const std::map& options, + const std::shared_ptr& pool); + + Result FlushIndexToFinal(); + + private: + std::shared_ptr pool_; + int32_t row_id_ = 0; + std::string field_name_; + std::shared_ptr arrow_type_; + LuceneWriteContext write_context_; + std::shared_ptr file_writer_; + std::map options_; +}; + +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lucene/lucene_input.h b/src/paimon/global_index/lucene/lucene_input.h new file mode 100644 index 0000000..22f60df --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_input.h @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "lucene++/BufferedIndexInput.h" +#include "paimon/fs/file_system.h" +#include "paimon/global_index/lucene/lucene_utils.h" + +namespace paimon::lucene { +class LuceneSyncInput : public Lucene::LuceneObject { + public: + explicit LuceneSyncInput(const std::shared_ptr& in_stream) : in_(in_stream) {} + const std::shared_ptr& GetInput() const { + return in_; + } + + private: + std::shared_ptr in_; +}; + +class LuceneIndexInput : public Lucene::BufferedIndexInput { + public: + LuceneIndexInput(const boost::shared_ptr& in_stream, int32_t buffer_size) + : Lucene::BufferedIndexInput(buffer_size), + input_buffer_size_(buffer_size), + in_stream_(in_stream) {} + + public: + int64_t length() override { + auto result = in_stream_->GetInput()->Length(); + if (!result.ok()) { + throw Lucene::IOException(LuceneUtils::StringToWstring(result.status().ToString())); + } + return static_cast(result.value()); + } + void close() override { + if (is_clone_) { + return; + } + if (in_stream_) { + in_stream_.reset(); + } + } + + private: + void readInternal(uint8_t* b, int32_t offset, int32_t length) override { + Lucene::SyncLock lock(in_stream_); + int64_t position = getFilePointer(); + auto read_result = + in_stream_->GetInput()->Read(reinterpret_cast(b + offset), length, position); + if (!read_result.ok()) { + throw Lucene::IOException( + LuceneUtils::StringToWstring(read_result.status().ToString())); + } + if (read_result.value() != length) { + throw Lucene::IOException(L"actual read len and expect read len mismatch"); + } + } + void seekInternal(int64_t pos) override {} + + Lucene::LuceneObjectPtr clone(const Lucene::LuceneObjectPtr& other) override { + Lucene::LuceneObjectPtr clone = Lucene::BufferedIndexInput::clone( + other ? other : Lucene::newLucene(in_stream_, input_buffer_size_)); + boost::shared_ptr clone_index_input( + boost::dynamic_pointer_cast(clone)); + clone_index_input->in_stream_ = in_stream_; + clone_index_input->is_clone_ = true; + return clone_index_input; + } + + private: + bool is_clone_ = false; + int32_t input_buffer_size_; + boost::shared_ptr in_stream_; +}; +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lucene/lucene_utils.cpp b/src/paimon/global_index/lucene/lucene_utils.cpp new file mode 100644 index 0000000..960a397 --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_utils.cpp @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "paimon/global_index/lucene/lucene_utils.h" +namespace paimon::lucene { + +Result LuceneUtils::GetJiebaDictionaryDir() { + const char* env_dir = std::getenv(kJiebaDictDirEnv); + if (env_dir && *env_dir != '\0') { + return std::string(env_dir); + } +#ifdef JIEBA_TEST_DICT_DIR + return std::string(JIEBA_TEST_DICT_DIR); +#endif + return Status::Invalid( + fmt::format("cannot get dictionary dir for jieba, must set env {}", kJiebaDictDirEnv)); +} + +} // namespace paimon::lucene diff --git a/src/paimon/global_index/lucene/lucene_utils.h b/src/paimon/global_index/lucene/lucene_utils.h new file mode 100644 index 0000000..c3dea2d --- /dev/null +++ b/src/paimon/global_index/lucene/lucene_utils.h @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include "fmt/format.h" +#include "lucene++/StringUtils.h" +#include "paimon/global_index/lucene/lucene_defs.h" +#include "paimon/result.h" +namespace paimon::lucene { +class LuceneUtils { + public: + LuceneUtils() = delete; + ~LuceneUtils() = delete; + + template + static Lucene::String StringToWstring(const StrType& str) { + return Lucene::StringUtils::toUnicode(reinterpret_cast(str.data()), + str.length()); + } + + static std::string WstringToString(const Lucene::String& wstr) { + return Lucene::StringUtils::toUTF8(wstr); + } + + static Result GetJiebaDictionaryDir(); +}; +} // namespace paimon::lucene