Fix reindex concurrency

This commit is contained in:
Liam 2025-01-11 12:02:19 -05:00
parent e69d3aca4e
commit 25bedd2aa1

View file

@ -201,23 +201,31 @@ defmodule PhilomenaQuery.Search do
""" """
@spec reindex(queryable(), schema_module(), Batch.batch_options()) :: :ok @spec reindex(queryable(), schema_module(), Batch.batch_options()) :: :ok
def reindex(queryable, module, opts \\ []) do def reindex(queryable, module, opts \\ []) do
max_concurrency = Keyword.get(opts, :max_concurrency, 1)
index = @policy.index_for(module) index = @policy.index_for(module)
queryable queryable
|> Batch.record_batches(opts) |> Batch.query_batches(opts)
|> Enum.each(fn records -> |> Task.async_stream(
lines = fn query ->
Enum.flat_map(records, fn record -> lines =
doc = index.as_json(record) query
|> Repo.all()
|> Enum.flat_map(fn record ->
doc = index.as_json(record)
[ [
%{index: %{_index: index.index_name(), _id: doc.id}}, %{index: %{_index: index.index_name(), _id: doc.id}},
doc doc
] ]
end) end)
Api.bulk(@policy.opensearch_url(), lines) Api.bulk(@policy.opensearch_url(), lines)
end) end,
timeout: :infinity,
max_concurrency: max_concurrency
)
|> Stream.run()
end end
@doc ~S""" @doc ~S"""