From 25bedd2aa11d6330e8c42051ac27b669843f638b Mon Sep 17 00:00:00 2001 From: Liam Date: Sat, 11 Jan 2025 12:02:19 -0500 Subject: [PATCH] Fix reindex concurrency --- lib/philomena_query/search.ex | 32 ++++++++++++++++++++------------ 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/lib/philomena_query/search.ex b/lib/philomena_query/search.ex index cd02137c..ff6a29b1 100644 --- a/lib/philomena_query/search.ex +++ b/lib/philomena_query/search.ex @@ -201,23 +201,31 @@ defmodule PhilomenaQuery.Search do """ @spec reindex(queryable(), schema_module(), Batch.batch_options()) :: :ok def reindex(queryable, module, opts \\ []) do + max_concurrency = Keyword.get(opts, :max_concurrency, 1) index = @policy.index_for(module) queryable - |> Batch.record_batches(opts) - |> Enum.each(fn records -> - lines = - Enum.flat_map(records, fn record -> - doc = index.as_json(record) + |> Batch.query_batches(opts) + |> Task.async_stream( + fn query -> + lines = + query + |> Repo.all() + |> Enum.flat_map(fn record -> + doc = index.as_json(record) - [ - %{index: %{_index: index.index_name(), _id: doc.id}}, - doc - ] - end) + [ + %{index: %{_index: index.index_name(), _id: doc.id}}, + doc + ] + end) - Api.bulk(@policy.opensearch_url(), lines) - end) + Api.bulk(@policy.opensearch_url(), lines) + end, + timeout: :infinity, + max_concurrency: max_concurrency + ) + |> Stream.run() end @doc ~S"""