Use concurrency at the lowest level

This commit is contained in:
Liam 2025-01-11 12:14:26 -05:00
parent d9309a275e
commit 4cce622332

View file

@ -63,11 +63,7 @@ defmodule Philomena.SearchIndexer do
@spec recreate_reindex_all_destructive! :: :ok
def recreate_reindex_all_destructive! do
@schemas
|> Task.async_stream(
&recreate_reindex_schema_destructive!/1,
ordered: false,
timeout: :infinity
)
|> Stream.map(&recreate_reindex_schema_destructive!/1)
|> Stream.run()
end
@ -101,11 +97,7 @@ defmodule Philomena.SearchIndexer do
@spec reindex_all :: :ok
def reindex_all do
@schemas
|> Task.async_stream(
&reindex_schema/1,
ordered: false,
timeout: :infinity
)
|> Stream.map(&reindex_schema/1)
|> Stream.run()
end
@ -126,11 +118,16 @@ defmodule Philomena.SearchIndexer do
Report
|> preload([:user, :admin])
|> Batch.record_batches(batch_size: @batch_sizes[Report])
|> Enum.each(fn records ->
|> Task.async_stream(
fn records ->
records
|> Polymorphic.load_polymorphic(reportable: [reportable_id: :reportable_type])
|> Enum.map(&Search.index_document(&1, Report))
end)
end,
timeout: :infinity,
max_concurrency: max_concurrency()
)
|> Stream.run()
end
def reindex_schema(schema) when schema in @schemas do
@ -139,6 +136,14 @@ defmodule Philomena.SearchIndexer do
schema
|> preload(^context.indexing_preloads())
|> Search.reindex(schema, batch_size: @batch_sizes[schema])
|> Search.reindex(schema,
batch_size: @batch_sizes[schema],
max_concurrency: max_concurrency()
)
end
@spec max_concurrency() :: pos_integer()
defp max_concurrency do
System.schedulers_online()
end
end