mirror of
https://github.com/philomena-dev/philomena.git
synced 2025-01-19 22:27:59 +01:00
Merge pull request #400 from philomena-dev/indexing-cli
Add reindexing command line with rate and time estimate
This commit is contained in:
commit
4db83d5a27
4 changed files with 205 additions and 27 deletions
|
@ -11,6 +11,6 @@ defmodule Mix.Tasks.ReindexAll do
|
||||||
raise "do not run this task unless you know what you're doing"
|
raise "do not run this task unless you know what you're doing"
|
||||||
end
|
end
|
||||||
|
|
||||||
SearchIndexer.recreate_reindex_all_destructive!()
|
SearchIndexer.recreate_reindex_all_destructive!(maintenance: false)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
108
lib/philomena/maintenance.ex
Normal file
108
lib/philomena/maintenance.ex
Normal file
|
@ -0,0 +1,108 @@
|
||||||
|
defmodule Philomena.Maintenance do
|
||||||
|
@moduledoc """
|
||||||
|
Functions related to online and offline maintenance tasks.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@typedoc "Progress from a stream job."
|
||||||
|
@type progress_report :: %{
|
||||||
|
curr: integer(),
|
||||||
|
rate: number(),
|
||||||
|
remaining_time: number()
|
||||||
|
}
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Periodically stream progress reports for a stream task that produces a range
|
||||||
|
of integers between `min` and `max`, estimating the rate of progress and time
|
||||||
|
remaining.
|
||||||
|
"""
|
||||||
|
@spec stream_progress(
|
||||||
|
id_stream :: Enumerable.t({:ok, integer()}),
|
||||||
|
min :: integer(),
|
||||||
|
max :: integer(),
|
||||||
|
report_period :: number()
|
||||||
|
) :: Enumerable.t(progress_report())
|
||||||
|
def stream_progress(id_stream, min, max, report_period \\ 1.0) do
|
||||||
|
# Reference point for comparison during the stream.
|
||||||
|
begin = now()
|
||||||
|
|
||||||
|
# Estimate progress counters based on how many objects have been
|
||||||
|
# processed since the initial reference point.
|
||||||
|
create_report = fn state, curr_id ->
|
||||||
|
curr_rate = (curr_id - min) / max(now() - begin, 1)
|
||||||
|
remaining_time = (max - curr_id) / max(curr_rate, 1)
|
||||||
|
|
||||||
|
%{
|
||||||
|
state: state,
|
||||||
|
curr: curr_id,
|
||||||
|
rate: round(curr_rate),
|
||||||
|
remaining_time: remaining_time
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
|
# Convert input items received after every period elapses into
|
||||||
|
# a report, then concatenate an additional report after all items
|
||||||
|
# are processed.
|
||||||
|
id_stream
|
||||||
|
|> Stream.transform(begin, fn {:ok, curr_id}, prev_time ->
|
||||||
|
curr_time = now()
|
||||||
|
|
||||||
|
if curr_time - prev_time > report_period do
|
||||||
|
{[create_report.(:in_progress, curr_id)], curr_time}
|
||||||
|
else
|
||||||
|
{[], prev_time}
|
||||||
|
end
|
||||||
|
end)
|
||||||
|
|> Stream.concat(Stream.map([[]], fn _ -> create_report.(:done, max) end))
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Write progress reports to the console for a stream task that produces a range
|
||||||
|
of integers between `min` and `max`, estimating the rate of progress and time
|
||||||
|
remaining.
|
||||||
|
"""
|
||||||
|
@spec log_progress(
|
||||||
|
id_stream :: Enumerable.t({:ok, integer()}),
|
||||||
|
label :: String.t(),
|
||||||
|
min :: integer(),
|
||||||
|
max :: integer(),
|
||||||
|
report_period :: number()
|
||||||
|
) :: :ok
|
||||||
|
def log_progress(id_stream, label, min, max, report_period \\ 1.0) do
|
||||||
|
id_stream
|
||||||
|
|> stream_progress(min, max, report_period)
|
||||||
|
|> Enum.each(fn p ->
|
||||||
|
# Clear line
|
||||||
|
IO.write("\e[2K\r")
|
||||||
|
|
||||||
|
# Newline on report depends on whether stream is finished
|
||||||
|
case p.state do
|
||||||
|
:in_progress ->
|
||||||
|
eta = format_eta(p.remaining_time)
|
||||||
|
|
||||||
|
IO.write("#{label}: #{p.curr}/#{max} [#{p.rate}/sec], ETA: #{eta}")
|
||||||
|
|
||||||
|
:done ->
|
||||||
|
IO.puts("#{label}: #{p.curr}/#{max} [#{p.rate}/sec], done.")
|
||||||
|
end
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
@spec format_eta(number()) :: String.t()
|
||||||
|
defp format_eta(remaining_time) do
|
||||||
|
seconds = round(remaining_time)
|
||||||
|
minutes = div(seconds, 60)
|
||||||
|
hours = div(minutes, 60)
|
||||||
|
|
||||||
|
cond do
|
||||||
|
seconds < 45 -> "about #{seconds} second(s)"
|
||||||
|
seconds < 90 -> "about a minute"
|
||||||
|
minutes < 45 -> "about #{minutes} minute(s)"
|
||||||
|
true -> "about #{hours} hour(s)"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
@spec now() :: float()
|
||||||
|
defp now do
|
||||||
|
:erlang.system_time(:microsecond) / 1_000_000
|
||||||
|
end
|
||||||
|
end
|
|
@ -17,7 +17,9 @@ defmodule Philomena.SearchIndexer do
|
||||||
alias Philomena.Tags
|
alias Philomena.Tags
|
||||||
alias Philomena.Tags.Tag
|
alias Philomena.Tags.Tag
|
||||||
|
|
||||||
|
alias Philomena.Maintenance
|
||||||
alias Philomena.Polymorphic
|
alias Philomena.Polymorphic
|
||||||
|
alias Philomena.Repo
|
||||||
import Ecto.Query
|
import Ecto.Query
|
||||||
|
|
||||||
@schemas [
|
@schemas [
|
||||||
|
@ -60,10 +62,10 @@ defmodule Philomena.SearchIndexer do
|
||||||
:ok
|
:ok
|
||||||
|
|
||||||
"""
|
"""
|
||||||
@spec recreate_reindex_all_destructive! :: :ok
|
@spec recreate_reindex_all_destructive!(opts :: Keyword.t()) :: :ok
|
||||||
def recreate_reindex_all_destructive! do
|
def recreate_reindex_all_destructive!(opts \\ []) do
|
||||||
@schemas
|
@schemas
|
||||||
|> Stream.map(&recreate_reindex_schema_destructive!/1)
|
|> Stream.map(&recreate_reindex_schema_destructive!(&1, opts))
|
||||||
|> Stream.run()
|
|> Stream.run()
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -77,12 +79,12 @@ defmodule Philomena.SearchIndexer do
|
||||||
:ok
|
:ok
|
||||||
|
|
||||||
"""
|
"""
|
||||||
@spec recreate_reindex_schema_destructive!(schema :: module()) :: :ok
|
@spec recreate_reindex_schema_destructive!(schema :: module(), opts :: Keyword.t()) :: :ok
|
||||||
def recreate_reindex_schema_destructive!(schema) when schema in @schemas do
|
def recreate_reindex_schema_destructive!(schema, opts \\ []) when schema in @schemas do
|
||||||
Search.delete_index!(schema)
|
Search.delete_index!(schema)
|
||||||
Search.create_index!(schema)
|
Search.create_index!(schema)
|
||||||
|
|
||||||
reindex_schema(schema)
|
reindex_schema(schema, opts)
|
||||||
end
|
end
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
|
@ -94,10 +96,10 @@ defmodule Philomena.SearchIndexer do
|
||||||
:ok
|
:ok
|
||||||
|
|
||||||
"""
|
"""
|
||||||
@spec reindex_all :: :ok
|
@spec reindex_all(opts :: Keyword.t()) :: :ok
|
||||||
def reindex_all do
|
def reindex_all(opts \\ []) do
|
||||||
@schemas
|
@schemas
|
||||||
|> Stream.map(&reindex_schema/1)
|
|> Stream.map(&reindex_schema(&1, opts))
|
||||||
|> Stream.run()
|
|> Stream.run()
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -110,10 +112,30 @@ defmodule Philomena.SearchIndexer do
|
||||||
:ok
|
:ok
|
||||||
|
|
||||||
"""
|
"""
|
||||||
@spec reindex_schema(schema :: module()) :: :ok
|
@spec reindex_schema(schema :: module(), opts :: Keyword.t()) :: :ok
|
||||||
def reindex_schema(schema)
|
def reindex_schema(schema, opts \\ []) do
|
||||||
|
maintenance = Keyword.get(opts, :maintenance, true)
|
||||||
|
|
||||||
def reindex_schema(Report) do
|
if maintenance do
|
||||||
|
query = limit(schema, 1)
|
||||||
|
min = Repo.one(order_by(query, asc: :id)).id
|
||||||
|
max = Repo.one(order_by(query, desc: :id)).id
|
||||||
|
|
||||||
|
schema
|
||||||
|
|> reindex_schema_impl(opts)
|
||||||
|
|> Maintenance.log_progress(inspect(schema), min, max)
|
||||||
|
else
|
||||||
|
schema
|
||||||
|
|> reindex_schema_impl(opts)
|
||||||
|
|> Stream.run()
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
@spec reindex_schema_impl(schema :: module(), opts :: Keyword.t()) ::
|
||||||
|
Enumerable.t({:ok, integer()})
|
||||||
|
defp reindex_schema_impl(schema, opts)
|
||||||
|
|
||||||
|
defp reindex_schema_impl(Report, opts) do
|
||||||
# Reports currently require handling for their polymorphic nature
|
# Reports currently require handling for their polymorphic nature
|
||||||
Report
|
Report
|
||||||
|> preload([:user, :admin])
|
|> preload([:user, :admin])
|
||||||
|
@ -125,25 +147,24 @@ defmodule Philomena.SearchIndexer do
|
||||||
|> Enum.map(&Search.index_document(&1, Report))
|
|> Enum.map(&Search.index_document(&1, Report))
|
||||||
end,
|
end,
|
||||||
timeout: :infinity,
|
timeout: :infinity,
|
||||||
max_concurrency: max_concurrency()
|
max_concurrency: max_concurrency(opts)
|
||||||
)
|
)
|
||||||
|> Stream.run()
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def reindex_schema(schema) when schema in @schemas do
|
defp reindex_schema_impl(schema, opts) when schema in @schemas do
|
||||||
# Normal schemas can simply be reindexed with indexing_preloads
|
# Normal schemas can simply be reindexed with indexing_preloads
|
||||||
context = Map.fetch!(@contexts, schema)
|
context = Map.fetch!(@contexts, schema)
|
||||||
|
|
||||||
schema
|
schema
|
||||||
|> preload(^context.indexing_preloads())
|
|> preload(^context.indexing_preloads())
|
||||||
|> Search.reindex(schema,
|
|> Search.reindex_stream(schema,
|
||||||
batch_size: @batch_sizes[schema],
|
batch_size: @batch_sizes[schema],
|
||||||
max_concurrency: max_concurrency()
|
max_concurrency: max_concurrency(opts)
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
@spec max_concurrency() :: pos_integer()
|
@spec max_concurrency(opts :: Keyword.t()) :: pos_integer()
|
||||||
defp max_concurrency do
|
defp max_concurrency(opts) do
|
||||||
System.schedulers_online()
|
Keyword.get(opts, :max_concurrency, System.schedulers_online())
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -189,6 +189,10 @@ defmodule PhilomenaQuery.Search do
|
||||||
Note that indexing is near real-time and requires an index refresh before documents will
|
Note that indexing is near real-time and requires an index refresh before documents will
|
||||||
become visible. Unless changed in the mapping, this happens after 5 seconds have elapsed.
|
become visible. Unless changed in the mapping, this happens after 5 seconds have elapsed.
|
||||||
|
|
||||||
|
> #### Warning {: .warning}
|
||||||
|
> The returned stream must be enumerated for the reindex to process. If you do not care
|
||||||
|
> about the progress IDs yielded, use `reindex/3` instead.
|
||||||
|
|
||||||
## Example
|
## Example
|
||||||
|
|
||||||
query =
|
query =
|
||||||
|
@ -196,11 +200,14 @@ defmodule PhilomenaQuery.Search do
|
||||||
where: i.id < 100_000,
|
where: i.id < 100_000,
|
||||||
preload: ^Images.indexing_preloads()
|
preload: ^Images.indexing_preloads()
|
||||||
|
|
||||||
Search.reindex(query, Image, batch_size: 5000)
|
query
|
||||||
|
|> Search.reindex_stream(Image, batch_size: 1024)
|
||||||
|
|> Enum.each(&IO.inspect/1)
|
||||||
|
|
||||||
"""
|
"""
|
||||||
@spec reindex(queryable(), schema_module(), Batch.batch_options()) :: :ok
|
@spec reindex_stream(queryable(), schema_module(), Batch.batch_options()) ::
|
||||||
def reindex(queryable, module, opts \\ []) do
|
Enumerable.t({:ok, integer()})
|
||||||
|
def reindex_stream(queryable, module, opts \\ []) do
|
||||||
max_concurrency = Keyword.get(opts, :max_concurrency, 1)
|
max_concurrency = Keyword.get(opts, :max_concurrency, 1)
|
||||||
index = @policy.index_for(module)
|
index = @policy.index_for(module)
|
||||||
|
|
||||||
|
@ -208,10 +215,10 @@ defmodule PhilomenaQuery.Search do
|
||||||
|> Batch.query_batches(opts)
|
|> Batch.query_batches(opts)
|
||||||
|> Task.async_stream(
|
|> Task.async_stream(
|
||||||
fn query ->
|
fn query ->
|
||||||
|
records = Repo.all(query)
|
||||||
|
|
||||||
lines =
|
lines =
|
||||||
query
|
Enum.flat_map(records, fn record ->
|
||||||
|> Repo.all()
|
|
||||||
|> Enum.flat_map(fn record ->
|
|
||||||
doc = index.as_json(record)
|
doc = index.as_json(record)
|
||||||
|
|
||||||
[
|
[
|
||||||
|
@ -221,10 +228,52 @@ defmodule PhilomenaQuery.Search do
|
||||||
end)
|
end)
|
||||||
|
|
||||||
Api.bulk(@policy.opensearch_url(), lines)
|
Api.bulk(@policy.opensearch_url(), lines)
|
||||||
|
|
||||||
|
last_id(records)
|
||||||
end,
|
end,
|
||||||
timeout: :infinity,
|
timeout: :infinity,
|
||||||
max_concurrency: max_concurrency
|
max_concurrency: max_concurrency
|
||||||
)
|
)
|
||||||
|
|> flatten_stream()
|
||||||
|
end
|
||||||
|
|
||||||
|
defp last_id([]), do: []
|
||||||
|
defp last_id(records), do: [Enum.max_by(records, & &1.id).id]
|
||||||
|
|
||||||
|
@spec flatten_stream(Enumerable.t({:ok, [integer()]})) :: Enumerable.t({:ok, integer()})
|
||||||
|
defp flatten_stream(stream) do
|
||||||
|
# Converts [{:ok, [1, 2]}] into [{:ok, 1}, {:ok, 2}]
|
||||||
|
Stream.transform(stream, [], fn {:ok, last_id}, _ ->
|
||||||
|
{Enum.map(last_id, &{:ok, &1}), []}
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
Efficiently index a batch of documents in the index named by the module.
|
||||||
|
|
||||||
|
This function is substantially more efficient than running `index_document/2` for
|
||||||
|
each instance of a schema struct and can index with hundreds of times the throughput.
|
||||||
|
|
||||||
|
The queryable should be a schema type with its indexing preloads included in
|
||||||
|
the query. The options are forwarded to `PhilomenaQuery.Batch.record_batches/3`.
|
||||||
|
|
||||||
|
Note that indexing is near real-time and requires an index refresh before documents will
|
||||||
|
become visible. Unless changed in the mapping, this happens after 5 seconds have elapsed.
|
||||||
|
|
||||||
|
## Example
|
||||||
|
|
||||||
|
query =
|
||||||
|
from i in Image,
|
||||||
|
where: i.id < 100_000,
|
||||||
|
preload: ^Images.indexing_preloads()
|
||||||
|
|
||||||
|
Search.reindex(query, Image, batch_size: 1024)
|
||||||
|
|
||||||
|
"""
|
||||||
|
@spec reindex(queryable(), schema_module(), Batch.batch_options()) :: :ok
|
||||||
|
def reindex(queryable, module, opts \\ []) do
|
||||||
|
queryable
|
||||||
|
|> reindex_stream(module, opts)
|
||||||
|> Stream.run()
|
|> Stream.run()
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue