Add reindexing command line with rate and time estimate

This commit is contained in:
Liam 2025-01-11 17:31:05 -05:00
parent bcae416e14
commit 07ee436d6d
3 changed files with 97 additions and 27 deletions

View file

@ -11,6 +11,6 @@ defmodule Mix.Tasks.ReindexAll do
raise "do not run this task unless you know what you're doing" raise "do not run this task unless you know what you're doing"
end end
SearchIndexer.recreate_reindex_all_destructive!() SearchIndexer.recreate_reindex_all_destructive!(maintenance: false)
end end
end end

View file

@ -17,7 +17,9 @@ defmodule Philomena.SearchIndexer do
alias Philomena.Tags alias Philomena.Tags
alias Philomena.Tags.Tag alias Philomena.Tags.Tag
alias Philomena.Maintenance
alias Philomena.Polymorphic alias Philomena.Polymorphic
alias Philomena.Repo
import Ecto.Query import Ecto.Query
@schemas [ @schemas [
@ -60,10 +62,10 @@ defmodule Philomena.SearchIndexer do
:ok :ok
""" """
@spec recreate_reindex_all_destructive! :: :ok @spec recreate_reindex_all_destructive!(opts :: Keyword.t()) :: :ok
def recreate_reindex_all_destructive! do def recreate_reindex_all_destructive!(opts \\ []) do
@schemas @schemas
|> Stream.map(&recreate_reindex_schema_destructive!/1) |> Stream.map(&recreate_reindex_schema_destructive!(&1, opts))
|> Stream.run() |> Stream.run()
end end
@ -77,12 +79,12 @@ defmodule Philomena.SearchIndexer do
:ok :ok
""" """
@spec recreate_reindex_schema_destructive!(schema :: module()) :: :ok @spec recreate_reindex_schema_destructive!(schema :: module(), opts :: Keyword.t()) :: :ok
def recreate_reindex_schema_destructive!(schema) when schema in @schemas do def recreate_reindex_schema_destructive!(schema, opts \\ []) when schema in @schemas do
Search.delete_index!(schema) Search.delete_index!(schema)
Search.create_index!(schema) Search.create_index!(schema)
reindex_schema(schema) reindex_schema(schema, opts)
end end
@doc """ @doc """
@ -94,10 +96,10 @@ defmodule Philomena.SearchIndexer do
:ok :ok
""" """
@spec reindex_all :: :ok @spec reindex_all(opts :: Keyword.t()) :: :ok
def reindex_all do def reindex_all(opts \\ []) do
@schemas @schemas
|> Stream.map(&reindex_schema/1) |> Stream.map(&reindex_schema(&1, opts))
|> Stream.run() |> Stream.run()
end end
@ -110,10 +112,30 @@ defmodule Philomena.SearchIndexer do
:ok :ok
""" """
@spec reindex_schema(schema :: module()) :: :ok @spec reindex_schema(schema :: module(), opts :: Keyword.t()) :: :ok
def reindex_schema(schema) def reindex_schema(schema, opts \\ []) do
maintenance = Keyword.get(opts, :maintenance, true)
def reindex_schema(Report) do if maintenance do
query = limit(schema, 1)
min = Repo.one(order_by(query, asc: :id)).id
max = Repo.one(order_by(query, desc: :id)).id
schema
|> reindex_schema_impl(opts)
|> Maintenance.log_progress(inspect(schema), min, max)
else
schema
|> reindex_schema_impl(opts)
|> Stream.run()
end
end
@spec reindex_schema_impl(schema :: module(), opts :: Keyword.t()) ::
Enumerable.t({:ok, integer()})
defp reindex_schema_impl(schema, opts)
defp reindex_schema_impl(Report, opts) do
# Reports currently require handling for their polymorphic nature # Reports currently require handling for their polymorphic nature
Report Report
|> preload([:user, :admin]) |> preload([:user, :admin])
@ -125,25 +147,24 @@ defmodule Philomena.SearchIndexer do
|> Enum.map(&Search.index_document(&1, Report)) |> Enum.map(&Search.index_document(&1, Report))
end, end,
timeout: :infinity, timeout: :infinity,
max_concurrency: max_concurrency() max_concurrency: max_concurrency(opts)
) )
|> Stream.run()
end end
def reindex_schema(schema) when schema in @schemas do defp reindex_schema_impl(schema, opts) when schema in @schemas do
# Normal schemas can simply be reindexed with indexing_preloads # Normal schemas can simply be reindexed with indexing_preloads
context = Map.fetch!(@contexts, schema) context = Map.fetch!(@contexts, schema)
schema schema
|> preload(^context.indexing_preloads()) |> preload(^context.indexing_preloads())
|> Search.reindex(schema, |> Search.reindex_stream(schema,
batch_size: @batch_sizes[schema], batch_size: @batch_sizes[schema],
max_concurrency: max_concurrency() max_concurrency: max_concurrency(opts)
) )
end end
@spec max_concurrency() :: pos_integer() @spec max_concurrency(opts :: Keyword.t()) :: pos_integer()
defp max_concurrency do defp max_concurrency(opts) do
System.schedulers_online() Keyword.get(opts, :max_concurrency, System.schedulers_online())
end end
end end

View file

@ -189,6 +189,10 @@ defmodule PhilomenaQuery.Search do
Note that indexing is near real-time and requires an index refresh before documents will Note that indexing is near real-time and requires an index refresh before documents will
become visible. Unless changed in the mapping, this happens after 5 seconds have elapsed. become visible. Unless changed in the mapping, this happens after 5 seconds have elapsed.
> #### Warning {: .warning}
> The returned stream must be enumerated for the reindex to process. If you do not care
> about the progress IDs yielded, use `reindex/3` instead.
## Example ## Example
query = query =
@ -196,11 +200,14 @@ defmodule PhilomenaQuery.Search do
where: i.id < 100_000, where: i.id < 100_000,
preload: ^Images.indexing_preloads() preload: ^Images.indexing_preloads()
Search.reindex(query, Image, batch_size: 5000) query
|> Search.reindex_stream(Image, batch_size: 1024)
|> Enum.each(&IO.inspect/1)
""" """
@spec reindex(queryable(), schema_module(), Batch.batch_options()) :: :ok @spec reindex_stream(queryable(), schema_module(), Batch.batch_options()) ::
def reindex(queryable, module, opts \\ []) do Enumerable.t({:ok, integer()})
def reindex_stream(queryable, module, opts \\ []) do
max_concurrency = Keyword.get(opts, :max_concurrency, 1) max_concurrency = Keyword.get(opts, :max_concurrency, 1)
index = @policy.index_for(module) index = @policy.index_for(module)
@ -208,10 +215,10 @@ defmodule PhilomenaQuery.Search do
|> Batch.query_batches(opts) |> Batch.query_batches(opts)
|> Task.async_stream( |> Task.async_stream(
fn query -> fn query ->
records = Repo.all(query)
lines = lines =
query Enum.flat_map(records, fn record ->
|> Repo.all()
|> Enum.flat_map(fn record ->
doc = index.as_json(record) doc = index.as_json(record)
[ [
@ -221,10 +228,52 @@ defmodule PhilomenaQuery.Search do
end) end)
Api.bulk(@policy.opensearch_url(), lines) Api.bulk(@policy.opensearch_url(), lines)
last_id(records)
end, end,
timeout: :infinity, timeout: :infinity,
max_concurrency: max_concurrency max_concurrency: max_concurrency
) )
|> flatten_stream()
end
defp last_id([]), do: []
defp last_id(records), do: [Enum.max_by(records, & &1.id).id]
@spec flatten_stream(Enumerable.t({:ok, [integer()]})) :: Enumerable.t({:ok, integer()})
defp flatten_stream(stream) do
# Converts [{:ok, [1, 2]}] into [{:ok, 1}, {:ok, 2}]
Stream.transform(stream, [], fn {:ok, last_id}, _ ->
{Enum.map(last_id, &{:ok, &1}), []}
end)
end
@doc """
Efficiently index a batch of documents in the index named by the module.
This function is substantially more efficient than running `index_document/2` for
each instance of a schema struct and can index with hundreds of times the throughput.
The queryable should be a schema type with its indexing preloads included in
the query. The options are forwarded to `PhilomenaQuery.Batch.record_batches/3`.
Note that indexing is near real-time and requires an index refresh before documents will
become visible. Unless changed in the mapping, this happens after 5 seconds have elapsed.
## Example
query =
from i in Image,
where: i.id < 100_000,
preload: ^Images.indexing_preloads()
Search.reindex(query, Image, batch_size: 1024)
"""
@spec reindex(queryable(), schema_module(), Batch.batch_options()) :: :ok
def reindex(queryable, module, opts \\ []) do
queryable
|> reindex_stream(module, opts)
|> Stream.run() |> Stream.run()
end end