diff --git a/lib/philomena/search_indexer.ex b/lib/philomena/search_indexer.ex index 0e0a0c83..cf564a5c 100644 --- a/lib/philomena/search_indexer.ex +++ b/lib/philomena/search_indexer.ex @@ -40,6 +40,16 @@ defmodule Philomena.SearchIndexer do Tag => Tags } + @batch_sizes %{ + Comment => 2048, + Filter => 2048, + Gallery => 1024, + Image => 32, + Post => 2048, + Report => 128, + Tag => 2048 + } + @doc """ Recreate the index corresponding to all schemas, and then reindex all of the documents within. @@ -53,11 +63,7 @@ defmodule Philomena.SearchIndexer do @spec recreate_reindex_all_destructive! :: :ok def recreate_reindex_all_destructive! do @schemas - |> Task.async_stream( - &recreate_reindex_schema_destructive!/1, - ordered: false, - timeout: :infinity - ) + |> Stream.map(&recreate_reindex_schema_destructive!/1) |> Stream.run() end @@ -91,11 +97,7 @@ defmodule Philomena.SearchIndexer do @spec reindex_all :: :ok def reindex_all do @schemas - |> Task.async_stream( - &reindex_schema/1, - ordered: false, - timeout: :infinity - ) + |> Stream.map(&reindex_schema/1) |> Stream.run() end @@ -115,12 +117,17 @@ defmodule Philomena.SearchIndexer do # Reports currently require handling for their polymorphic nature Report |> preload([:user, :admin]) - |> Batch.record_batches() - |> Enum.each(fn records -> - records - |> Polymorphic.load_polymorphic(reportable: [reportable_id: :reportable_type]) - |> Enum.map(&Search.index_document(&1, Report)) - end) + |> Batch.record_batches(batch_size: @batch_sizes[Report]) + |> Task.async_stream( + fn records -> + records + |> Polymorphic.load_polymorphic(reportable: [reportable_id: :reportable_type]) + |> Enum.map(&Search.index_document(&1, Report)) + end, + timeout: :infinity, + max_concurrency: max_concurrency() + ) + |> Stream.run() end def reindex_schema(schema) when schema in @schemas do @@ -129,6 +136,14 @@ defmodule Philomena.SearchIndexer do schema |> preload(^context.indexing_preloads()) - |> Search.reindex(schema) + |> Search.reindex(schema, + batch_size: @batch_sizes[schema], + max_concurrency: max_concurrency() + ) + end + + @spec max_concurrency() :: pos_integer() + defp max_concurrency do + System.schedulers_online() end end diff --git a/lib/philomena_query/search.ex b/lib/philomena_query/search.ex index cd02137c..ff6a29b1 100644 --- a/lib/philomena_query/search.ex +++ b/lib/philomena_query/search.ex @@ -201,23 +201,31 @@ defmodule PhilomenaQuery.Search do """ @spec reindex(queryable(), schema_module(), Batch.batch_options()) :: :ok def reindex(queryable, module, opts \\ []) do + max_concurrency = Keyword.get(opts, :max_concurrency, 1) index = @policy.index_for(module) queryable - |> Batch.record_batches(opts) - |> Enum.each(fn records -> - lines = - Enum.flat_map(records, fn record -> - doc = index.as_json(record) + |> Batch.query_batches(opts) + |> Task.async_stream( + fn query -> + lines = + query + |> Repo.all() + |> Enum.flat_map(fn record -> + doc = index.as_json(record) - [ - %{index: %{_index: index.index_name(), _id: doc.id}}, - doc - ] - end) + [ + %{index: %{_index: index.index_name(), _id: doc.id}}, + doc + ] + end) - Api.bulk(@policy.opensearch_url(), lines) - end) + Api.bulk(@policy.opensearch_url(), lines) + end, + timeout: :infinity, + max_concurrency: max_concurrency + ) + |> Stream.run() end @doc ~S""" diff --git a/lib/philomena_query/search/client.ex b/lib/philomena_query/search/client.ex index 98a81820..42e69e40 100644 --- a/lib/philomena_query/search/client.ex +++ b/lib/philomena_query/search/client.ex @@ -46,10 +46,10 @@ defmodule PhilomenaQuery.Search.Client do end defp encode_body(body) when is_map(body), - do: Jason.encode!(body) + do: Jason.encode_to_iodata!(body) defp encode_body(body) when is_list(body), - do: [Enum.map_intersperse(body, "\n", &Jason.encode!(&1)), "\n"] + do: [Enum.map_intersperse(body, "\n", &Jason.encode_to_iodata!(&1)), "\n"] defp encode_options, do: [headers: request_headers(), receive_timeout: @receive_timeout]