diff --git a/apps/labrinth/.env.docker-compose b/apps/labrinth/.env.docker-compose index 097a47c640..b900b2ec10 100644 --- a/apps/labrinth/.env.docker-compose +++ b/apps/labrinth/.env.docker-compose @@ -25,6 +25,8 @@ ELASTICSEARCH_INDEX_PREFIX=labrinth ELASTICSEARCH_USERNAME=elastic ELASTICSEARCH_PASSWORD=elastic SEARCH_INDEX_CHUNK_SIZE=5000 +SEARCH_INCREMENTAL_INDEX_BATCH_DELAY_SECONDS=5 +SEARCH_INCREMENTAL_INDEX_BATCH_MAX_SIZE=1000 TYPESENSE_URL=http://localhost:8108 TYPESENSE_API_KEY=modrinth TYPESENSE_INDEX_PREFIX=labrinth diff --git a/apps/labrinth/.env.local b/apps/labrinth/.env.local index 7925e52a1e..df34775b5e 100644 --- a/apps/labrinth/.env.local +++ b/apps/labrinth/.env.local @@ -43,6 +43,8 @@ ELASTICSEARCH_USERNAME= ELASTICSEARCH_PASSWORD= SEARCH_INDEX_CHUNK_SIZE=5000 +SEARCH_INCREMENTAL_INDEX_BATCH_DELAY_SECONDS=5 +SEARCH_INCREMENTAL_INDEX_BATCH_MAX_SIZE=1000 TYPESENSE_URL=http://localhost:8108 TYPESENSE_API_KEY=modrinth TYPESENSE_INDEX_PREFIX=labrinth diff --git a/apps/labrinth/.sqlx/query-8145d09f7c6c5d2e18a10ef814551f87424ee39aac49cae5575c0d3313fb7f24.json b/apps/labrinth/.sqlx/query-8145d09f7c6c5d2e18a10ef814551f87424ee39aac49cae5575c0d3313fb7f24.json new file mode 100644 index 0000000000..59ab74bde2 --- /dev/null +++ b/apps/labrinth/.sqlx/query-8145d09f7c6c5d2e18a10ef814551f87424ee39aac49cae5575c0d3313fb7f24.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT id FROM versions WHERE mod_id = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "id", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "8145d09f7c6c5d2e18a10ef814551f87424ee39aac49cae5575c0d3313fb7f24" +} diff --git a/apps/labrinth/src/background_task.rs b/apps/labrinth/src/background_task.rs index 31c51e2367..0c3fb13ef0 100644 --- a/apps/labrinth/src/background_task.rs +++ b/apps/labrinth/src/background_task.rs @@ -19,7 +19,7 @@ use crate::util::anrok; use actix_web::web; use clap::ValueEnum; use eyre::WrapErr; -use tracing::info; +use tracing::{info, instrument}; #[derive(ValueEnum, Debug, Copy, Clone, PartialEq, Eq)] #[clap(rename_all = "kebab_case")] @@ -50,6 +50,7 @@ pub enum BackgroundTask { impl BackgroundTask { #[allow(clippy::too_many_arguments)] + #[instrument(skip_all, fields(background_task = ?self))] pub async fn run( self, pool: PgPool, diff --git a/apps/labrinth/src/env.rs b/apps/labrinth/src/env.rs index bf99822156..20c6472316 100644 --- a/apps/labrinth/src/env.rs +++ b/apps/labrinth/src/env.rs @@ -38,7 +38,7 @@ macro_rules! vars { )] let $field: Option<$ty> = { let mut default = None::<$ty>; - $( default = Some({ $default }.into()); )? + $( default = Some(<$ty>::from({ $default })); )? match parse_value::<$ty>(stringify!($field), default) { Ok(value) => Some(value), @@ -160,9 +160,12 @@ vars! { // search SEARCH_BACKEND: crate::search::SearchBackendKind = crate::search::SearchBackendKind::Typesense; SEARCH_INDEX_CHUNK_SIZE: i64 = 5000i64; + SEARCH_INCREMENTAL_INDEX_BATCH_DELAY_SECONDS: u64 = 5u64; + SEARCH_INCREMENTAL_INDEX_BATCH_MAX_SIZE: usize = 1000usize; TYPESENSE_URL: String = "http://localhost:8108"; TYPESENSE_API_KEY: String = "modrinth"; TYPESENSE_INDEX_PREFIX: String = "labrinth"; + TYPESENSE_DELETE_BATCH_SIZE: usize = 10_000usize; // storage STORAGE_BACKEND: crate::file_hosting::FileHostKind = crate::file_hosting::FileHostKind::Local; diff --git a/apps/labrinth/src/queue/server_ping.rs b/apps/labrinth/src/queue/server_ping.rs index 5b6305961b..95359701d2 100644 --- a/apps/labrinth/src/queue/server_ping.rs +++ b/apps/labrinth/src/queue/server_ping.rs @@ -1,9 +1,9 @@ use crate::database::DBProject; -use crate::database::models::DBProjectId; +use crate::database::models::{DBProjectId, DBVersionId}; use crate::database::redis::RedisPool; use crate::env::ENV; use crate::models::exp; -use crate::models::ids::ProjectId; +use crate::models::ids::{ProjectId, VersionId}; use crate::models::projects::ProjectStatus; use crate::search::incremental::IncrementalSearchQueue; use crate::{database::PgPool, util::error::Context}; @@ -175,14 +175,26 @@ impl ServerPingQueue { } if updated_project { + let version_ids = sqlx::query_scalar!( + "SELECT id FROM versions WHERE mod_id = $1", + DBProjectId::from(*project_id) as DBProjectId, + ) + .fetch_all(&self.db) + .await + .wrap_err("failed to fetch project version IDs")? + .into_iter() + .map(|version_id| VersionId::from(DBVersionId(version_id))) + .collect::>(); + let clear_cache = DBProject::clear_cache( (*project_id).into(), None, None, &self.redis, ); - let queue_search = - self.incremental_search_queue.push(*project_id); + let queue_search = self + .incremental_search_queue + .push(*project_id, version_ids); let (clear_cache_result, _) = join(clear_cache, queue_search).await; diff --git a/apps/labrinth/src/routes/internal/moderation/tech_review.rs b/apps/labrinth/src/routes/internal/moderation/tech_review.rs index cfa609a98e..1ccdbafb49 100644 --- a/apps/labrinth/src/routes/internal/moderation/tech_review.rs +++ b/apps/labrinth/src/routes/internal/moderation/tech_review.rs @@ -1142,6 +1142,7 @@ async fn submit_report( if verdict == DelphiVerdict::Unsafe { crate::routes::v3::projects::clear_project_cache_and_queue_search( + &pool, &redis, &search_state, project_id, diff --git a/apps/labrinth/src/routes/v3/organizations.rs b/apps/labrinth/src/routes/v3/organizations.rs index b59e74cefd..b27b3ce4dc 100644 --- a/apps/labrinth/src/routes/v3/organizations.rs +++ b/apps/labrinth/src/routes/v3/organizations.rs @@ -802,6 +802,7 @@ pub async fn organization_delete( for project_id in organization_project_ids { super::projects::clear_project_cache_and_queue_search( + &pool, &redis, &search_state, project_id, @@ -969,6 +970,7 @@ pub async fn organization_projects_add( ) .await?; super::projects::clear_project_cache_and_queue_search( + &pool, &redis, &search_state, project_item.inner.id, @@ -1159,6 +1161,7 @@ pub async fn organization_projects_remove( ) .await?; super::projects::clear_project_cache_and_queue_search( + &pool, &redis, &search_state, project_item.inner.id, diff --git a/apps/labrinth/src/routes/v3/project_creation.rs b/apps/labrinth/src/routes/v3/project_creation.rs index 1088302d2f..5e239543e7 100644 --- a/apps/labrinth/src/routes/v3/project_creation.rs +++ b/apps/labrinth/src/routes/v3/project_creation.rs @@ -349,6 +349,7 @@ pub async fn project_create_internal( } else { transaction.commit().await?; super::projects::clear_project_cache_and_queue_search( + &client, &redis, &search_state, project_id.into(), @@ -407,6 +408,7 @@ pub async fn project_create_with_id( } else { transaction.commit().await?; super::projects::clear_project_cache_and_queue_search( + &client, &redis, &search_state, project_id.into(), diff --git a/apps/labrinth/src/routes/v3/project_creation/new.rs b/apps/labrinth/src/routes/v3/project_creation/new.rs index 877ad9a70d..d3d025a1ee 100644 --- a/apps/labrinth/src/routes/v3/project_creation/new.rs +++ b/apps/labrinth/src/routes/v3/project_creation/new.rs @@ -342,6 +342,7 @@ pub async fn create( .wrap_internal_err("failed to commit transaction")?; super::super::projects::clear_project_cache_and_queue_search( + &db, &redis, &search_state, project_id.into(), diff --git a/apps/labrinth/src/routes/v3/projects.rs b/apps/labrinth/src/routes/v3/projects.rs index 902f62122f..ea3d9cda3b 100644 --- a/apps/labrinth/src/routes/v3/projects.rs +++ b/apps/labrinth/src/routes/v3/projects.rs @@ -75,11 +75,42 @@ pub fn utoipa_config( } pub async fn clear_project_cache_and_queue_search( + pool: &PgPool, redis: &RedisPool, search_state: &SearchState, project_id: db_ids::DBProjectId, slug: Option, clear_dependencies: Option, +) -> Result<(), ApiError> { + let version_ids = sqlx::query_scalar!( + "SELECT id FROM versions WHERE mod_id = $1", + project_id as db_ids::DBProjectId, + ) + .fetch_all(pool) + .await + .wrap_internal_err("failed to fetch project version IDs")? + .into_iter() + .map(|version_id| VersionId::from(db_ids::DBVersionId(version_id))) + .collect::>(); + + clear_project_cache_and_queue_search_versions( + redis, + search_state, + project_id, + slug, + clear_dependencies, + version_ids, + ) + .await +} + +pub async fn clear_project_cache_and_queue_search_versions( + redis: &RedisPool, + search_state: &SearchState, + project_id: db_ids::DBProjectId, + slug: Option, + clear_dependencies: Option, + version_ids: impl IntoIterator, ) -> Result<(), ApiError> { db_models::DBProject::clear_cache( project_id, @@ -88,7 +119,11 @@ pub async fn clear_project_cache_and_queue_search( redis, ) .await?; - search_state.queue.push(project_id.into()).await; + + search_state + .queue + .push(project_id.into(), version_ids) + .await; Ok(()) } @@ -1135,6 +1170,7 @@ pub async fn project_edit_internal( transaction.commit().await?; clear_project_cache_and_queue_search( + &pool, &redis, &search_state, project_item.inner.id, @@ -1149,16 +1185,9 @@ pub async fn project_edit_internal( new_project.status.map(|status| status.is_searchable()), ) { search_state - .backend - .remove_documents( - &project_item - .versions - .into_iter() - .map(|x| x.into()) - .collect::>(), - ) - .await - .wrap_internal_err("failed to remove documents")?; + .queue + .push_project_removal(project_item.inner.id.into()) + .await; } Ok(HttpResponse::NoContent().body("")) @@ -1640,6 +1669,7 @@ pub async fn projects_edit( for (project_id, slug) in changed_projects { clear_project_cache_and_queue_search( + &pool, &redis, &search_state, project_id, @@ -1862,6 +1892,7 @@ pub async fn project_icon_edit_internal( transaction.commit().await?; clear_project_cache_and_queue_search( + &pool, &redis, &search_state, project_item.inner.id, @@ -1977,6 +2008,7 @@ pub async fn delete_project_icon_internal( transaction.commit().await?; clear_project_cache_and_queue_search( + &pool, &redis, &search_state, project_item.inner.id, @@ -2164,6 +2196,7 @@ pub async fn add_gallery_item_internal( transaction.commit().await?; clear_project_cache_and_queue_search( + &pool, &redis, &search_state, project_item.inner.id, @@ -2370,6 +2403,7 @@ pub async fn edit_gallery_item_internal( transaction.commit().await?; clear_project_cache_and_queue_search( + &pool, &redis, &search_state, project_item.inner.id, @@ -2510,6 +2544,7 @@ pub async fn delete_gallery_item_internal( transaction.commit().await?; clear_project_cache_and_queue_search( + &pool, &redis, &search_state, project_item.inner.id, @@ -2652,27 +2687,18 @@ pub async fn project_delete_internal( .await .wrap_internal_err("failed to commit transaction")?; - search_state - .backend - .remove_documents( - &project - .versions - .into_iter() - .map(|x| x.into()) - .collect::>(), - ) - .await - .wrap_internal_err("failed to remove project version documents")?; - if result.is_some() { - clear_project_cache_and_queue_search( - &redis, - &search_state, + db_models::DBProject::clear_cache( project.inner.id, project.inner.slug, None, + &redis, ) .await?; + search_state + .queue + .push_project_removal(project.inner.id.into()) + .await; Ok(()) } else { Err(ApiError::NotFound) diff --git a/apps/labrinth/src/routes/v3/version_creation.rs b/apps/labrinth/src/routes/v3/version_creation.rs index 9749efee3c..a1a3424418 100644 --- a/apps/labrinth/src/routes/v3/version_creation.rs +++ b/apps/labrinth/src/routes/v3/version_creation.rs @@ -145,19 +145,20 @@ pub async fn version_create( if let Err(e) = rollback_result { return Err(e.into()); } - } else if let Ok((_, project_id)) = &result { + } else if let Ok((_, project_id, version_id)) = &result { transaction.commit().await?; - super::projects::clear_project_cache_and_queue_search( + super::projects::clear_project_cache_and_queue_search_versions( &redis, &search_state, *project_id, None, Some(true), + [VersionId::from(*version_id)], ) .await?; } - result.map(|(response, _)| response) + result.map(|(response, _, _)| response) } #[allow(clippy::too_many_arguments)] @@ -172,7 +173,8 @@ async fn version_create_inner( session_queue: &AuthQueue, moderation_queue: &AutomatedModerationQueue, http: &reqwest::Client, -) -> Result<(HttpResponse, models::DBProjectId), CreateError> { +) -> Result<(HttpResponse, models::DBProjectId, models::DBVersionId), CreateError> +{ let mut initial_version_data = None; let mut version_builder = None; let mut selected_loaders = None; @@ -543,7 +545,11 @@ async fn version_create_inner( moderation_queue.projects.insert(project_id.into()); } - Ok((HttpResponse::Ok().json(response), project_id)) + Ok(( + HttpResponse::Ok().json(response), + project_id, + models::DBVersionId::from(version_id), + )) } pub async fn upload_file_to_version( @@ -560,17 +566,18 @@ pub async fn upload_file_to_version( let mut transaction = client.begin().await?; let mut uploaded_files = Vec::new(); - let version_id = models::DBVersionId::from(url_data.into_inner().0); + let version_id = url_data.into_inner().0; + let db_version_id = models::DBVersionId::from(version_id); let result = upload_file_to_version_inner( req, &mut payload, - client, + client.clone(), &mut transaction, redis.clone(), &**file_host, &mut uploaded_files, - version_id, + db_version_id, &session_queue, &http, ) @@ -590,12 +597,13 @@ pub async fn upload_file_to_version( } } else if let Ok((_, project_id)) = &result { transaction.commit().await?; - super::projects::clear_project_cache_and_queue_search( + super::projects::clear_project_cache_and_queue_search_versions( &redis, &search_state, *project_id, None, Some(true), + [version_id], ) .await?; } diff --git a/apps/labrinth/src/routes/v3/versions.rs b/apps/labrinth/src/routes/v3/versions.rs index 0e322cde51..c9b8cda4b3 100644 --- a/apps/labrinth/src/routes/v3/versions.rs +++ b/apps/labrinth/src/routes/v3/versions.rs @@ -761,12 +761,13 @@ pub async fn version_edit_helper( transaction.commit().await?; database::models::DBVersion::clear_cache(&version_item, &redis) .await?; - super::projects::clear_project_cache_and_queue_search( + super::projects::clear_project_cache_and_queue_search_versions( &redis, &search_state, version_item.inner.project_id, None, Some(true), + [VersionId::from(version_item.inner.id)], ) .await?; Ok(HttpResponse::NoContent().body("")) @@ -1097,12 +1098,13 @@ pub async fn version_delete( transaction.commit().await?; - super::projects::clear_project_cache_and_queue_search( + super::projects::clear_project_cache_and_queue_search_versions( &redis, &search_state, version.inner.project_id, None, Some(true), + [VersionId::from(version.inner.id)], ) .await?; search_backend diff --git a/apps/labrinth/src/search/backend/typesense/mod.rs b/apps/labrinth/src/search/backend/typesense/mod.rs index 6bbb141bf7..a6b2d69f6c 100644 --- a/apps/labrinth/src/search/backend/typesense/mod.rs +++ b/apps/labrinth/src/search/backend/typesense/mod.rs @@ -7,7 +7,7 @@ use regex::Regex; use reqwest::Method; use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use crate::database::PgPool; use crate::database::redis::RedisPool; @@ -32,6 +32,7 @@ pub struct TypesenseConfig { pub index_prefix: String, pub meta_namespace: String, pub index_chunk_size: i64, + pub delete_batch_size: usize, } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -160,6 +161,7 @@ impl TypesenseConfig { index_prefix: ENV.TYPESENSE_INDEX_PREFIX.clone(), meta_namespace: meta_namespace.unwrap_or_default(), index_chunk_size: ENV.SEARCH_INDEX_CHUNK_SIZE, + delete_batch_size: ENV.TYPESENSE_DELETE_BATCH_SIZE, } } @@ -323,12 +325,13 @@ impl TypesenseClient { &self, collection: &str, filter_by: &str, + batch_size: usize, ) -> Result<()> { let resp = self .request( Method::DELETE, &format!( - "/collections/{collection}/documents?filter_by={}&batch_size=1000", + "/collections/{collection}/documents?filter_by={}&batch_size={batch_size}", urlencoding::encode(filter_by) ), ) @@ -721,6 +724,24 @@ impl Typesense { self.client.upsert_alias(alias, &name).await?; Ok(()) } + + async fn delete_documents_by_filter_if_exists( + &self, + collection: &str, + filter: &str, + ) -> Result<()> { + if self.client.collection_exists(collection).await? { + self.client + .delete_documents_by_filter( + collection, + filter, + self.config.delete_batch_size, + ) + .await?; + } + + Ok(()) + } } #[async_trait] @@ -995,6 +1016,7 @@ impl SearchBackend for Typesense { return Ok(()); } + let num_documents = documents.len(); let jsonl = documents_to_jsonl(documents)?; for alias in [ self.config.get_alias_name("projects"), @@ -1005,10 +1027,20 @@ impl SearchBackend for Typesense { let shadow_current = self.config.get_next_collection_name(&alias, false); + debug!( + ?alias, + ?live, + ?shadow_alt, + ?shadow_current, + num_documents, + "Inserting into alias", + ); + for collection in live.into_iter().chain([shadow_alt, shadow_current]) { if self.client.collection_exists(&collection).await? { + debug!("Inserting into existing collection {collection:?}"); self.client .import_documents(&collection, jsonl.clone()) .await?; @@ -1016,6 +1048,7 @@ impl SearchBackend for Typesense { } } + debug!("Done importing"); Ok(()) } @@ -1038,21 +1071,68 @@ impl SearchBackend for Typesense { self.config.get_alias_name("projects"), self.config.get_alias_name("projects_filtered"), ] { + debug!("Performing removal on alias {alias:?}"); + let live = self.client.get_alias(&alias).await?; + debug!("Got live alias {live:?}"); + let shadow_alt = self.config.get_next_collection_name(&alias, true); + debug!("Got shadow alt {shadow_alt:?}"); + let shadow_current = self.config.get_next_collection_name(&alias, false); + debug!("Got shadow current {shadow_current:?}"); + + let delete_live = async { + if let Some(collection) = live.as_deref() { + debug!("Working on collection {collection:?}"); + debug!( + filter_len = filter.len(), + "Collection exists, deleting by filter" + ); + self.delete_documents_by_filter_if_exists( + collection, &filter, + ) + .await?; + } - for collection in - live.into_iter().chain([shadow_alt, shadow_current]) - { - if self.client.collection_exists(&collection).await? { - self.client - .delete_documents_by_filter(&collection, &filter) - .await?; + Ok::<(), eyre::Report>(()) + }; + let delete_shadow_alt = async { + if live.as_deref() != Some(shadow_alt.as_str()) { + debug!("Working on collection {shadow_alt:?}"); + self.delete_documents_by_filter_if_exists( + &shadow_alt, + &filter, + ) + .await?; } - } + + Ok::<(), eyre::Report>(()) + }; + let delete_shadow_current = async { + if live.as_deref() != Some(shadow_current.as_str()) { + debug!("Working on collection {shadow_current:?}"); + self.delete_documents_by_filter_if_exists( + &shadow_current, + &filter, + ) + .await?; + } + + Ok::<(), eyre::Report>(()) + }; + let (live_result, shadow_alt_result, shadow_current_result) = tokio::join!( + delete_live, + delete_shadow_alt, + delete_shadow_current + ); + live_result?; + shadow_alt_result?; + shadow_current_result?; } + + debug!("Done"); Ok(()) } @@ -1078,15 +1158,46 @@ impl SearchBackend for Typesense { let shadow_current = self.config.get_next_collection_name(&alias, false); - for collection in - live.into_iter().chain([shadow_alt, shadow_current]) - { - if self.client.collection_exists(&collection).await? { - self.client - .delete_documents_by_filter(&collection, &filter) - .await?; + let delete_live = async { + if let Some(collection) = live.as_deref() { + self.delete_documents_by_filter_if_exists( + collection, &filter, + ) + .await?; } - } + + Ok::<(), eyre::Report>(()) + }; + let delete_shadow_alt = async { + if live.as_deref() != Some(shadow_alt.as_str()) { + self.delete_documents_by_filter_if_exists( + &shadow_alt, + &filter, + ) + .await?; + } + + Ok::<(), eyre::Report>(()) + }; + let delete_shadow_current = async { + if live.as_deref() != Some(shadow_current.as_str()) { + self.delete_documents_by_filter_if_exists( + &shadow_current, + &filter, + ) + .await?; + } + + Ok::<(), eyre::Report>(()) + }; + let (live_result, shadow_alt_result, shadow_current_result) = tokio::join!( + delete_live, + delete_shadow_alt, + delete_shadow_current + ); + live_result?; + shadow_alt_result?; + shadow_current_result?; } Ok(()) } diff --git a/apps/labrinth/src/search/incremental.rs b/apps/labrinth/src/search/incremental.rs index 07c5a05b55..b5ba4f0d0a 100644 --- a/apps/labrinth/src/search/incremental.rs +++ b/apps/labrinth/src/search/incremental.rs @@ -1,43 +1,62 @@ pub mod consume; -use std::{mem, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + mem, + sync::Arc, + time::Duration, +}; use rdkafka::{producer::FutureRecord, util::Timeout}; use serde::Serialize; use tokio::sync::Mutex; use crate::{ - models::ids::ProjectId, + models::ids::{ProjectId, VersionId}, util::kafka::{KAFKA_OPERATION_INTERVAL, KafkaClientState, KafkaEvent}, }; pub const SEARCH_PROJECT_INDEX_QUEUE_TOPIC: &str = "public.labrinth.search-project-index-queue.v1"; +const QUEUE_FLUSH_INTERVAL: Duration = Duration::from_secs(10); #[derive(Clone)] pub struct IncrementalSearchQueue { - operations: Arc>>, + operations: Arc>, kafka_client: actix_web::web::Data, } impl IncrementalSearchQueue { pub fn new(kafka_client: actix_web::web::Data) -> Self { Self { - operations: Arc::new(Mutex::new(Vec::new())), + operations: Arc::new(Mutex::new( + PendingSearchIndexOperations::default(), + )), kafka_client, } } - pub async fn push(&self, project_id: ProjectId) { + pub async fn push( + &self, + project_id: ProjectId, + version_ids: impl IntoIterator, + ) { + self.operations + .lock() + .await + .push_project_change(project_id, version_ids); + } + + pub async fn push_project_removal(&self, project_id: ProjectId) { self.operations .lock() .await - .push(SearchIndexOperation { project_id }); + .push_project_removal(project_id); } pub async fn run(self) { loop { - tokio::time::sleep(KAFKA_OPERATION_INTERVAL).await; + tokio::time::sleep(QUEUE_FLUSH_INTERVAL).await; if let Err(err) = self.drain().await { tracing::error!( @@ -57,13 +76,11 @@ impl IncrementalSearchQueue { return Ok(()); } - let mut operations = operations.into_iter(); + let mut operations = operations.into_events().into_iter(); while let Some(operation) = operations.next() { let event = KafkaEvent::new( SEARCH_PROJECT_INDEX_QUEUE_TOPIC, - SearchProjectIndexQueueEventData { - project_id: operation.project_id, - }, + operation.clone(), ); let event_id = event.event_metadata.event_id; let key = event_id.to_string(); @@ -79,8 +96,10 @@ impl IncrementalSearchQueue { .await { let mut queued_operations = self.operations.lock().await; - queued_operations.push(operation); - queued_operations.extend(operations); + queued_operations.push_event(operation); + for operation in operations { + queued_operations.push_event(operation); + } return Err(err.into()); } @@ -90,12 +109,101 @@ impl IncrementalSearchQueue { } } -#[derive(Debug, Clone)] -pub struct SearchIndexOperation { - pub project_id: ProjectId, +#[derive(Default)] +struct PendingSearchIndexOperations { + changed_project_ids: HashSet, + changed_project_versions: HashMap>, + removed_project_ids: HashSet, +} + +impl PendingSearchIndexOperations { + fn is_empty(&self) -> bool { + self.changed_project_ids.is_empty() + && self.changed_project_versions.is_empty() + && self.removed_project_ids.is_empty() + } + + fn push_project_change( + &mut self, + project_id: ProjectId, + version_ids: impl IntoIterator, + ) { + if !self.removed_project_ids.contains(&project_id) { + let version_ids = version_ids.into_iter().collect::>(); + if version_ids.is_empty() { + self.changed_project_versions.remove(&project_id); + self.changed_project_ids.insert(project_id); + } else if !self.changed_project_ids.contains(&project_id) { + self.changed_project_versions + .entry(project_id) + .or_default() + .extend(version_ids); + } + } + } + + fn push_project_removal(&mut self, project_id: ProjectId) { + self.changed_project_ids.remove(&project_id); + self.changed_project_versions.remove(&project_id); + self.removed_project_ids.insert(project_id); + } + + fn push_event(&mut self, event: SearchProjectIndexQueueEventData) { + match event { + SearchProjectIndexQueueEventData::ProjectChange { project_id } => { + self.push_project_change(project_id, []) + } + SearchProjectIndexQueueEventData::ProjectVersionChange { + project_id, + version_ids, + } => { + if !version_ids.is_empty() { + self.push_project_change(project_id, version_ids) + } + } + SearchProjectIndexQueueEventData::ProjectRemoval { project_id } => { + self.push_project_removal(project_id) + } + } + } + + fn into_events(self) -> Vec { + let mut events = Vec::with_capacity( + self.changed_project_ids.len() + + self.changed_project_versions.len() + + self.removed_project_ids.len(), + ); + + events.extend(self.removed_project_ids.into_iter().map(|project_id| { + SearchProjectIndexQueueEventData::ProjectRemoval { project_id } + })); + events.extend(self.changed_project_ids.into_iter().map(|project_id| { + SearchProjectIndexQueueEventData::ProjectChange { project_id } + })); + events.extend(self.changed_project_versions.into_iter().map( + |(project_id, version_ids)| { + SearchProjectIndexQueueEventData::ProjectVersionChange { + project_id, + version_ids: version_ids.into_iter().collect(), + } + }, + )); + + events + } } -#[derive(Debug, Serialize)] -pub struct SearchProjectIndexQueueEventData { - pub project_id: ProjectId, +#[derive(Debug, Clone, Serialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum SearchProjectIndexQueueEventData { + ProjectChange { + project_id: ProjectId, + }, + ProjectVersionChange { + project_id: ProjectId, + version_ids: Vec, + }, + ProjectRemoval { + project_id: ProjectId, + }, } diff --git a/apps/labrinth/src/search/incremental/consume.rs b/apps/labrinth/src/search/incremental/consume.rs index 69e3b30f54..5128712bc8 100644 --- a/apps/labrinth/src/search/incremental/consume.rs +++ b/apps/labrinth/src/search/incremental/consume.rs @@ -1,20 +1,25 @@ use actix_web::web; use eyre::WrapErr; -use futures::FutureExt; use rdkafka::{ Message, consumer::{CommitMode, Consumer, StreamConsumer}, message::BorrowedMessage, }; use serde::Deserialize; -use std::collections::HashSet; +use std::{ + collections::HashSet, + time::{Duration, Instant}, +}; +use tracing::{Instrument, info, info_span}; use crate::{ database::{PgPool, redis::RedisPool}, - models::ids::ProjectId, + env::ENV, + models::ids::{ProjectId, VersionId}, search::{ - SearchBackend, incremental::SEARCH_PROJECT_INDEX_QUEUE_TOPIC, - indexing::index_project_documents, + SearchBackend, + incremental::SEARCH_PROJECT_INDEX_QUEUE_TOPIC, + indexing::{index_project_documents, index_project_version_documents}, }, util::kafka::{ INCREMENTAL_INDEX_SEARCH_TASK, KAFKA_OPERATION_INTERVAL, @@ -22,8 +27,6 @@ use crate::{ }, }; -const BATCH_SIZE: usize = 100; - pub async fn run( ro_pool: PgPool, redis_pool: RedisPool, @@ -60,25 +63,60 @@ async fn consume( search_backend: &dyn SearchBackend, consumer: &StreamConsumer, ) -> eyre::Result<()> { + // keep buffer capacity (pre-)allocated + let mut messages = Vec::with_capacity(1024); loop { - let mut messages = Vec::with_capacity(BATCH_SIZE); - messages.push( - consumer - .recv() - .await - .wrap_err("failed to receive Kafka message")?, - ); + messages.clear(); - while messages.len() < BATCH_SIZE { - let Some(message) = consumer.recv().now_or_never() else { - break; - }; + // wait for a first message to come in... + let first_message = consumer + .recv() + .await + .wrap_err("failed to receive Kafka message")?; + messages.push(first_message); - messages.push(message.wrap_err("failed to receive Kafka message")?); + let delay = Duration::from_secs( + ENV.SEARCH_INCREMENTAL_INDEX_BATCH_DELAY_SECONDS, + ); + info!( + "Received initial Kafka message; waiting {delay:.2?} for more to batch", + ); + + // ..then wait a while for more messages to batch up + // so that we can process a big batch to reindex. + // we stop until either we've reached the max batch size, + // or we've waited enough time - whichever is first. + // + // do a little trick with an `AsyncFnMut` closure + // so that we can explicitly specify the return type + let mut collect_more_messages = async || -> eyre::Result<()> { + while messages.len() < ENV.SEARCH_INCREMENTAL_INDEX_BATCH_MAX_SIZE { + let message = consumer + .recv() + .await + .wrap_err("failed to receive Kafka message")?; + messages.push(message); + } + eyre::Ok(()) + }; + match tokio::time::timeout(delay, collect_more_messages()).await { + Ok(Ok(())) | Err(_) => {} + Ok(Err(err)) => { + return Err( + err.wrap_err("failed to receive more Kafka messages") + ); + } } - consume_batch(ro_pool, redis_pool, search_backend, consumer, messages) - .await?; + info!("Consuming batch of {} messages", messages.len()); + consume_batch( + ro_pool, + redis_pool, + search_backend, + consumer, + messages.drain(..), + ) + .await?; } } @@ -87,10 +125,14 @@ async fn consume_batch( redis_pool: &RedisPool, search_backend: &dyn SearchBackend, consumer: &StreamConsumer, - messages: Vec>, + messages: impl IntoIterator>, ) -> eyre::Result<()> { - let mut project_ids = Vec::new(); - let mut seen_project_ids = HashSet::new(); + let start = Instant::now(); + + let mut project_ids_to_change = HashSet::new(); + let mut project_ids_with_version_changes = HashSet::new(); + let mut project_ids_to_remove = HashSet::new(); + let mut version_ids_to_change = HashSet::new(); let mut messages_to_commit = Vec::new(); for message in messages { @@ -125,25 +167,130 @@ async fn consume_batch( } }; - if seen_project_ids.insert(event.project_id) { - project_ids.push(event.project_id); + match event.into_data() { + SearchProjectIndexQueueEventData::ProjectChange { project_id } => { + project_ids_to_change.insert(project_id); + } + SearchProjectIndexQueueEventData::ProjectVersionChange { + project_id, + version_ids, + } => { + if !version_ids.is_empty() { + project_ids_with_version_changes.insert(project_id); + version_ids_to_change.extend(version_ids); + } + } + SearchProjectIndexQueueEventData::ProjectRemoval { project_id } => { + project_ids_to_remove.insert(project_id); + } } messages_to_commit.push(message); } - if project_ids.is_empty() { - return Ok(()); - } + project_ids_to_change + .retain(|project_id| !project_ids_to_remove.contains(project_id)); + project_ids_with_version_changes + .retain(|project_id| !project_ids_to_remove.contains(project_id)); - tracing::info!( + let project_ids_to_change = + project_ids_to_change.into_iter().collect::>(); + let project_ids_with_version_changes = project_ids_with_version_changes + .into_iter() + .collect::>(); + let project_ids_to_remove = + project_ids_to_remove.into_iter().collect::>(); + let version_ids_to_change = + version_ids_to_change.into_iter().collect::>(); + + info!( kafka.message_count = messages_to_commit.len(), - project_count = project_ids.len(), - "Consumed incremental search index event batch" + "Read all Kafka messages in {:.2?}, found {} projects to change, {} projects with version changes, {} versions to change, and {} projects to remove", + start.elapsed(), + project_ids_to_change.len(), + project_ids_with_version_changes.len(), + version_ids_to_change.len(), + project_ids_to_remove.len(), ); + let start = Instant::now(); - reindex_projects(ro_pool, redis_pool, search_backend, &project_ids) + if !project_ids_to_remove.is_empty() { + let operation_start = Instant::now(); + info!( + project_count = project_ids_to_remove.len(), + "Removing project documents" + ); + search_backend + .remove_project_documents(&project_ids_to_remove) + .await + .wrap_err("failed to remove project documents")?; + info!( + project_count = project_ids_to_remove.len(), + "Removed project documents in {:.2?}", + operation_start.elapsed() + ); + } + + if !version_ids_to_change.is_empty() { + let operation_start = Instant::now(); + info!( + version_count = version_ids_to_change.len(), + "Removing changed version documents", + ); + search_backend + .remove_documents(&version_ids_to_change) + .await + .wrap_err("failed to remove changed version documents")?; + info!( + version_count = version_ids_to_change.len(), + "Removed changed version documents in {:.2?}", + operation_start.elapsed() + ); + } + + if !project_ids_with_version_changes.is_empty() { + let operation_start = Instant::now(); + info!( + project_count = project_ids_with_version_changes.len(), + version_count = version_ids_to_change.len(), + "Indexing changed project versions" + ); + index_changed_project_versions( + ro_pool, + redis_pool, + search_backend, + &project_ids_with_version_changes, + &version_ids_to_change, + ) .await - .wrap_err("failed to reindex project batch")?; + .wrap_err("failed to index changed project version batch")?; + info!( + project_count = project_ids_with_version_changes.len(), + version_count = version_ids_to_change.len(), + "Indexed changed project versions in {:.2?}", + operation_start.elapsed() + ); + } + + if !project_ids_to_change.is_empty() { + let operation_start = Instant::now(); + info!( + project_count = project_ids_to_change.len(), + "Indexing changed projects" + ); + index_changed_projects( + ro_pool, + redis_pool, + search_backend, + &project_ids_to_change, + ) + .await + .wrap_err("failed to index changed project batch")?; + info!( + project_count = project_ids_to_change.len(), + "Indexed changed projects in {:.2?}", + operation_start.elapsed() + ); + } for message in messages_to_commit { consumer @@ -151,6 +298,13 @@ async fn consume_batch( .wrap_err("failed to commit Kafka message")?; } + info!( + "Changed {} projects and removed {} projects in {:.2?}", + project_ids_to_change.len(), + project_ids_to_remove.len(), + start.elapsed() + ); + Ok(()) } @@ -169,20 +323,67 @@ pub async fn reindex_projects( search_backend: &dyn SearchBackend, project_ids: &[ProjectId], ) -> eyre::Result<()> { + info!("Removing documents for batch"); search_backend.remove_project_documents(project_ids).await?; - let mut documents = Vec::new(); - for project_id in project_ids { - documents.extend( - index_project_documents(ro_pool, redis_pool, *project_id) - .await - .wrap_err_with(|| { - format!( - "failed to build project {project_id} search documents" - ) - })?, - ); - } + info!("Creating project documents"); + index_changed_projects(ro_pool, redis_pool, search_backend, project_ids) + .await?; + + Ok(()) +} + +async fn index_changed_projects( + ro_pool: &PgPool, + redis_pool: &RedisPool, + search_backend: &dyn SearchBackend, + project_ids: &[ProjectId], +) -> eyre::Result<()> { + let documents = index_project_documents(ro_pool, redis_pool, project_ids) + .instrument(info_span!("index", batch_size = project_ids.len())) + .await + .wrap_err_with(|| { + format!( + "failed to build search documents for {} projects", + project_ids.len() + ) + })?; + + info!("Fetched all project documents, indexing into backend"); + + search_backend.index_documents(&documents).await?; + + Ok(()) +} + +async fn index_changed_project_versions( + ro_pool: &PgPool, + redis_pool: &RedisPool, + search_backend: &dyn SearchBackend, + project_ids: &[ProjectId], + version_ids: &[VersionId], +) -> eyre::Result<()> { + let documents = index_project_version_documents( + ro_pool, + redis_pool, + project_ids, + version_ids, + ) + .instrument(info_span!( + "index", + batch_size = project_ids.len(), + version_count = version_ids.len() + )) + .await + .wrap_err_with(|| { + format!( + "failed to build search documents for {} projects and {} versions", + project_ids.len(), + version_ids.len() + ) + })?; + + info!("Fetched all project version documents, indexing into backend"); search_backend.index_documents(&documents).await?; @@ -190,6 +391,34 @@ pub async fn reindex_projects( } #[derive(Debug, Deserialize)] -struct SearchProjectIndexQueueEvent { - project_id: ProjectId, +#[serde(untagged)] +enum SearchProjectIndexQueueEvent { + Current(SearchProjectIndexQueueEventData), + Legacy { project_id: ProjectId }, +} + +impl SearchProjectIndexQueueEvent { + fn into_data(self) -> SearchProjectIndexQueueEventData { + match self { + Self::Current(data) => data, + Self::Legacy { project_id } => { + SearchProjectIndexQueueEventData::ProjectChange { project_id } + } + } + } +} + +#[derive(Debug, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +enum SearchProjectIndexQueueEventData { + ProjectChange { + project_id: ProjectId, + }, + ProjectVersionChange { + project_id: ProjectId, + version_ids: Vec, + }, + ProjectRemoval { + project_id: ProjectId, + }, } diff --git a/apps/labrinth/src/search/indexing.rs b/apps/labrinth/src/search/indexing.rs index 3c4c8a762f..4ebe510abd 100644 --- a/apps/labrinth/src/search/indexing.rs +++ b/apps/labrinth/src/search/indexing.rs @@ -5,7 +5,7 @@ use futures::TryStreamExt; use heck::ToKebabCase; use itertools::Itertools; use regex::Regex; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::LazyLock; use tracing::{info, warn}; @@ -20,7 +20,7 @@ use crate::database::models::{ }; use crate::database::redis::RedisPool; use crate::models::exp; -use crate::models::ids::ProjectId; +use crate::models::ids::{ProjectId, VersionId}; use crate::models::projects::{DependencyType, from_duplicate_version_fields}; use crate::models::v2::projects::LegacyProject; use crate::routes::v2_reroute; @@ -114,17 +114,74 @@ pub async fn index_local( return Ok((vec![], i64::MAX)); }; - let uploads = build_search_documents(pool, redis, db_projects).await?; + let uploads = + build_search_documents(pool, redis, db_projects, None).await?; Ok((uploads, *largest)) } pub async fn index_project_documents( pool: &PgPool, redis: &RedisPool, - project_id: ProjectId, + project_ids: &[ProjectId], ) -> eyre::Result> { let searchable_statuses = searchable_statuses(); - let project_ids = vec![DBProjectId::from(project_id).0]; + let project_ids = project_ids + .iter() + .map(|project_id| DBProjectId::from(*project_id).0) + .collect::>(); + + let db_projects = sqlx::query!( + r#" + SELECT m.id id, m.name name, m.summary summary, m.downloads downloads, m.follows follows, + m.icon_url icon_url, m.updated updated, m.approved approved, m.published, m.license license, m.slug slug, m.color, + m.components AS "components: sqlx::types::Json" + FROM mods m + WHERE m.status = ANY($1) AND m.id = ANY($2) + GROUP BY m.id + ORDER BY m.id ASC; + "#, + &searchable_statuses, + &project_ids, + ) + .fetch(pool) + .map_ok(|m| PartialProject { + id: DBProjectId(m.id), + name: m.name, + summary: m.summary, + downloads: m.downloads, + follows: m.follows, + icon_url: m.icon_url, + updated: m.updated, + approved: m.approved.unwrap_or(m.published), + slug: m.slug, + color: m.color, + license: m.license, + components: m.components.0, + }) + .try_collect::>() + .await + .wrap_err("failed to fetch project")?; + + info!("Fetched partial projects"); + + build_search_documents(pool, redis, db_projects, None).await +} + +pub async fn index_project_version_documents( + pool: &PgPool, + redis: &RedisPool, + project_ids: &[ProjectId], + version_ids: &[VersionId], +) -> eyre::Result> { + let searchable_statuses = searchable_statuses(); + let project_ids = project_ids + .iter() + .map(|project_id| DBProjectId::from(*project_id).0) + .collect::>(); + let version_ids = version_ids + .iter() + .map(|version_id| DBVersionId::from(*version_id)) + .collect::>(); let db_projects = sqlx::query!( r#" @@ -158,13 +215,14 @@ pub async fn index_project_documents( .await .wrap_err("failed to fetch project")?; - build_search_documents(pool, redis, db_projects).await + build_search_documents(pool, redis, db_projects, Some(&version_ids)).await } async fn build_search_documents( pool: &PgPool, redis: &RedisPool, db_projects: Vec, + version_ids_to_index: Option<&HashSet>, ) -> eyre::Result> { let searchable_statuses = searchable_statuses(); let project_ids = db_projects.iter().map(|x| x.id.0).collect::>(); @@ -496,6 +554,12 @@ async fn build_search_documents( .collect::>(); for version in versions { + if let Some(version_ids_to_index) = version_ids_to_index + && !version_ids_to_index.contains(&version.id) + { + continue; + } + let version_fields = VersionField::from_query_json( version.version_fields, &loader_fields, diff --git a/apps/labrinth/src/search/mod.rs b/apps/labrinth/src/search/mod.rs index 9507aafc2e..d88a8ff656 100644 --- a/apps/labrinth/src/search/mod.rs +++ b/apps/labrinth/src/search/mod.rs @@ -232,16 +232,22 @@ impl FromStr for SearchBackendKind { } } +/// Nullable fields in Typesense-bound documents should use +/// `skip_serializing_if = "Option::is_none"` so they are omitted instead of +/// serialized as `null`. #[derive(Serialize, Deserialize, Debug, Clone)] pub struct UploadSearchProject { pub version_id: String, pub project_id: String, // pub project_types: Vec, + #[serde(skip_serializing_if = "Option::is_none")] pub slug: Option, pub author: String, pub author_id: String, + #[serde(skip_serializing_if = "Option::is_none")] pub organization: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub organization_id: Option, pub indexed_author: String, pub name: String, @@ -252,9 +258,11 @@ pub struct UploadSearchProject { pub follows: i32, pub downloads: i32, pub log_downloads: f64, + #[serde(skip_serializing_if = "Option::is_none")] pub icon_url: Option, pub license: String, pub gallery: Vec, + #[serde(skip_serializing_if = "Option::is_none")] pub featured_gallery: Option, /// RFC 3339 formatted creation date of the project pub date_created: DateTime, @@ -267,6 +275,7 @@ pub struct UploadSearchProject { /// Unix timestamp of the publication date of the version pub version_published_timestamp: i64, pub open_source: bool, + #[serde(skip_serializing_if = "Option::is_none")] pub color: Option, #[serde(default)] pub dependency_project_ids: Vec, @@ -285,12 +294,17 @@ pub struct UploadSearchProject { pub loader_fields: HashMap>, } +/// Nullable fields in Typesense-bound documents should use +/// `skip_serializing_if = "Option::is_none"` so they are omitted instead of +/// serialized as `null`. #[derive(Serialize, Deserialize, Debug, Clone)] pub struct SearchProjectDependency { pub project_id: String, pub dependency_type: DependencyType, pub name: String, + #[serde(skip_serializing_if = "Option::is_none")] pub slug: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub icon_url: Option, } diff --git a/scripts/__pycache__/create-dummy-projects.cpython-314.pyc b/scripts/__pycache__/create-dummy-projects.cpython-314.pyc new file mode 100644 index 0000000000..5100c7d310 Binary files /dev/null and b/scripts/__pycache__/create-dummy-projects.cpython-314.pyc differ diff --git a/scripts/create-dummy-projects.py b/scripts/create-dummy-projects.py new file mode 100755 index 0000000000..c55ea25d2e --- /dev/null +++ b/scripts/create-dummy-projects.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python3 +import argparse +import json +import time +import urllib.error +import urllib.request +from concurrent.futures import ThreadPoolExecutor, as_completed +from uuid import uuid4 + + +def make_body(boundary, slug, index): + data = { + "name": f"Dummy Load {index:04d}", + "slug": slug, + "summary": "A dummy project for local load testing.", + "description": "This project was generated locally for batch indexing tests.", + "initial_versions": [], + "is_draft": True, + "categories": [], + "license_id": "MIT", + } + + payload = json.dumps(data, separators=(",", ":")) + return ( + f"--{boundary}\r\n" + 'Content-Disposition: form-data; name="data"\r\n' + "Content-Type: application/json\r\n\r\n" + f"{payload}\r\n" + f"--{boundary}--\r\n" + ).encode() + + +def create_project(base_url, token, boundary, prefix, index, retries): + slug = f"{prefix}-{index:04d}" + body = make_body(boundary, slug, index) + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": f"multipart/form-data; boundary={boundary}", + } + + for attempt in range(retries + 1): + req = urllib.request.Request( + f"{base_url}/v3/project", + data=body, + headers=headers, + method="POST", + ) + + try: + with urllib.request.urlopen(req, timeout=60) as resp: + resp.read() + return True, slug, resp.status, "" + except urllib.error.HTTPError as err: + text = err.read().decode("utf-8", errors="replace") + if err.code < 500 or attempt == retries: + return False, slug, err.code, text + except Exception as err: + if attempt == retries: + return False, slug, "error", repr(err) + + time.sleep(min(2**attempt, 10)) + + raise RuntimeError("unreachable") + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--base-url", default="http://localhost:8000") + parser.add_argument("--token", default="mra_admin") + parser.add_argument("--count", type=int, default=1000) + parser.add_argument("--concurrency", type=int, default=2) + parser.add_argument("--retries", type=int, default=5) + args = parser.parse_args() + + boundary = "----modrinth-dummy-project-boundary" + prefix = f"dummy-load-{int(time.time())}-{uuid4().hex[:6]}" + + ok = 0 + failures = [] + + with ThreadPoolExecutor(max_workers=args.concurrency) as executor: + futures = [ + executor.submit( + create_project, + args.base_url, + args.token, + boundary, + prefix, + index, + args.retries, + ) + for index in range(args.count) + ] + + for completed, future in enumerate(as_completed(futures), 1): + success, slug, status, text = future.result() + if success: + ok += 1 + else: + failures.append((slug, status, text[:500])) + + if completed % 50 == 0: + print( + f"completed={completed} created={ok} failed={len(failures)}", + flush=True, + ) + + print( + json.dumps( + { + "prefix": prefix, + "attempted": args.count, + "created": ok, + "failed": len(failures), + "failures": failures[:20], + }, + indent=2, + ) + ) + + +if __name__ == "__main__": + main()