From e69d3aca4ec6faabdc5aa189f77383aa5d4bf9cb Mon Sep 17 00:00:00 2001 From: Liam Date: Sat, 11 Jan 2025 11:53:53 -0500 Subject: [PATCH 1/4] Avoid unnecessarily collapsing iodata in request bodies --- lib/philomena_query/search/client.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/philomena_query/search/client.ex b/lib/philomena_query/search/client.ex index 98a81820..42e69e40 100644 --- a/lib/philomena_query/search/client.ex +++ b/lib/philomena_query/search/client.ex @@ -46,10 +46,10 @@ defmodule PhilomenaQuery.Search.Client do end defp encode_body(body) when is_map(body), - do: Jason.encode!(body) + do: Jason.encode_to_iodata!(body) defp encode_body(body) when is_list(body), - do: [Enum.map_intersperse(body, "\n", &Jason.encode!(&1)), "\n"] + do: [Enum.map_intersperse(body, "\n", &Jason.encode_to_iodata!(&1)), "\n"] defp encode_options, do: [headers: request_headers(), receive_timeout: @receive_timeout] From 25bedd2aa11d6330e8c42051ac27b669843f638b Mon Sep 17 00:00:00 2001 From: Liam Date: Sat, 11 Jan 2025 12:02:19 -0500 Subject: [PATCH 2/4] Fix reindex concurrency --- lib/philomena_query/search.ex | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/lib/philomena_query/search.ex b/lib/philomena_query/search.ex index cd02137c..ff6a29b1 100644 --- a/lib/philomena_query/search.ex +++ b/lib/philomena_query/search.ex @@ -201,23 +201,31 @@ defmodule PhilomenaQuery.Search do """ @spec reindex(queryable(), schema_module(), Batch.batch_options()) :: :ok def reindex(queryable, module, opts \\ []) do + max_concurrency = Keyword.get(opts, :max_concurrency, 1) index = @policy.index_for(module) queryable - |> Batch.record_batches(opts) - |> Enum.each(fn records -> - lines = - Enum.flat_map(records, fn record -> - doc = index.as_json(record) + |> Batch.query_batches(opts) + |> Task.async_stream( + fn query -> + lines = + query + |> Repo.all() + |> Enum.flat_map(fn record -> + doc = index.as_json(record) - [ - %{index: %{_index: index.index_name(), _id: doc.id}}, - doc - ] - end) + [ + %{index: %{_index: index.index_name(), _id: doc.id}}, + doc + ] + end) - Api.bulk(@policy.opensearch_url(), lines) - end) + Api.bulk(@policy.opensearch_url(), lines) + end, + timeout: :infinity, + max_concurrency: max_concurrency + ) + |> Stream.run() end @doc ~S""" From d9309a275e2a0a9c272aaf691e2389daa680a6e3 Mon Sep 17 00:00:00 2001 From: Liam Date: Sat, 11 Jan 2025 12:07:05 -0500 Subject: [PATCH 3/4] Use custom batch sizes tailored to each schema --- lib/philomena/search_indexer.ex | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/lib/philomena/search_indexer.ex b/lib/philomena/search_indexer.ex index 0e0a0c83..32ed018b 100644 --- a/lib/philomena/search_indexer.ex +++ b/lib/philomena/search_indexer.ex @@ -40,6 +40,16 @@ defmodule Philomena.SearchIndexer do Tag => Tags } + @batch_sizes %{ + Comment => 2048, + Filter => 2048, + Gallery => 1024, + Image => 32, + Post => 2048, + Report => 128, + Tag => 2048 + } + @doc """ Recreate the index corresponding to all schemas, and then reindex all of the documents within. @@ -115,7 +125,7 @@ defmodule Philomena.SearchIndexer do # Reports currently require handling for their polymorphic nature Report |> preload([:user, :admin]) - |> Batch.record_batches() + |> Batch.record_batches(batch_size: @batch_sizes[Report]) |> Enum.each(fn records -> records |> Polymorphic.load_polymorphic(reportable: [reportable_id: :reportable_type]) @@ -129,6 +139,6 @@ defmodule Philomena.SearchIndexer do schema |> preload(^context.indexing_preloads()) - |> Search.reindex(schema) + |> Search.reindex(schema, batch_size: @batch_sizes[schema]) end end From 4cce62233213d38ce5c95490b1850b4decf82a34 Mon Sep 17 00:00:00 2001 From: Liam Date: Sat, 11 Jan 2025 12:14:26 -0500 Subject: [PATCH 4/4] Use concurrency at the lowest level --- lib/philomena/search_indexer.ex | 37 +++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/lib/philomena/search_indexer.ex b/lib/philomena/search_indexer.ex index 32ed018b..cf564a5c 100644 --- a/lib/philomena/search_indexer.ex +++ b/lib/philomena/search_indexer.ex @@ -63,11 +63,7 @@ defmodule Philomena.SearchIndexer do @spec recreate_reindex_all_destructive! :: :ok def recreate_reindex_all_destructive! do @schemas - |> Task.async_stream( - &recreate_reindex_schema_destructive!/1, - ordered: false, - timeout: :infinity - ) + |> Stream.map(&recreate_reindex_schema_destructive!/1) |> Stream.run() end @@ -101,11 +97,7 @@ defmodule Philomena.SearchIndexer do @spec reindex_all :: :ok def reindex_all do @schemas - |> Task.async_stream( - &reindex_schema/1, - ordered: false, - timeout: :infinity - ) + |> Stream.map(&reindex_schema/1) |> Stream.run() end @@ -126,11 +118,16 @@ defmodule Philomena.SearchIndexer do Report |> preload([:user, :admin]) |> Batch.record_batches(batch_size: @batch_sizes[Report]) - |> Enum.each(fn records -> - records - |> Polymorphic.load_polymorphic(reportable: [reportable_id: :reportable_type]) - |> Enum.map(&Search.index_document(&1, Report)) - end) + |> Task.async_stream( + fn records -> + records + |> Polymorphic.load_polymorphic(reportable: [reportable_id: :reportable_type]) + |> Enum.map(&Search.index_document(&1, Report)) + end, + timeout: :infinity, + max_concurrency: max_concurrency() + ) + |> Stream.run() end def reindex_schema(schema) when schema in @schemas do @@ -139,6 +136,14 @@ defmodule Philomena.SearchIndexer do schema |> preload(^context.indexing_preloads()) - |> Search.reindex(schema, batch_size: @batch_sizes[schema]) + |> Search.reindex(schema, + batch_size: @batch_sizes[schema], + max_concurrency: max_concurrency() + ) + end + + @spec max_concurrency() :: pos_integer() + defp max_concurrency do + System.schedulers_online() end end