diff --git a/lib/philomena/search_indexer.ex b/lib/philomena/search_indexer.ex index 32ed018b..cf564a5c 100644 --- a/lib/philomena/search_indexer.ex +++ b/lib/philomena/search_indexer.ex @@ -63,11 +63,7 @@ defmodule Philomena.SearchIndexer do @spec recreate_reindex_all_destructive! :: :ok def recreate_reindex_all_destructive! do @schemas - |> Task.async_stream( - &recreate_reindex_schema_destructive!/1, - ordered: false, - timeout: :infinity - ) + |> Stream.map(&recreate_reindex_schema_destructive!/1) |> Stream.run() end @@ -101,11 +97,7 @@ defmodule Philomena.SearchIndexer do @spec reindex_all :: :ok def reindex_all do @schemas - |> Task.async_stream( - &reindex_schema/1, - ordered: false, - timeout: :infinity - ) + |> Stream.map(&reindex_schema/1) |> Stream.run() end @@ -126,11 +118,16 @@ defmodule Philomena.SearchIndexer do Report |> preload([:user, :admin]) |> Batch.record_batches(batch_size: @batch_sizes[Report]) - |> Enum.each(fn records -> - records - |> Polymorphic.load_polymorphic(reportable: [reportable_id: :reportable_type]) - |> Enum.map(&Search.index_document(&1, Report)) - end) + |> Task.async_stream( + fn records -> + records + |> Polymorphic.load_polymorphic(reportable: [reportable_id: :reportable_type]) + |> Enum.map(&Search.index_document(&1, Report)) + end, + timeout: :infinity, + max_concurrency: max_concurrency() + ) + |> Stream.run() end def reindex_schema(schema) when schema in @schemas do @@ -139,6 +136,14 @@ defmodule Philomena.SearchIndexer do schema |> preload(^context.indexing_preloads()) - |> Search.reindex(schema, batch_size: @batch_sizes[schema]) + |> Search.reindex(schema, + batch_size: @batch_sizes[schema], + max_concurrency: max_concurrency() + ) + end + + @spec max_concurrency() :: pos_integer() + defp max_concurrency do + System.schedulers_online() end end