Merge pull request #397 from philomena-dev/faster-indexing

Faster indexing
This commit is contained in:
liamwhite 2025-01-11 12:32:42 -05:00 committed by GitHub
commit db46e2c314
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 54 additions and 31 deletions

View file

@ -40,6 +40,16 @@ defmodule Philomena.SearchIndexer do
Tag => Tags Tag => Tags
} }
@batch_sizes %{
Comment => 2048,
Filter => 2048,
Gallery => 1024,
Image => 32,
Post => 2048,
Report => 128,
Tag => 2048
}
@doc """ @doc """
Recreate the index corresponding to all schemas, and then reindex all of the Recreate the index corresponding to all schemas, and then reindex all of the
documents within. documents within.
@ -53,11 +63,7 @@ defmodule Philomena.SearchIndexer do
@spec recreate_reindex_all_destructive! :: :ok @spec recreate_reindex_all_destructive! :: :ok
def recreate_reindex_all_destructive! do def recreate_reindex_all_destructive! do
@schemas @schemas
|> Task.async_stream( |> Stream.map(&recreate_reindex_schema_destructive!/1)
&recreate_reindex_schema_destructive!/1,
ordered: false,
timeout: :infinity
)
|> Stream.run() |> Stream.run()
end end
@ -91,11 +97,7 @@ defmodule Philomena.SearchIndexer do
@spec reindex_all :: :ok @spec reindex_all :: :ok
def reindex_all do def reindex_all do
@schemas @schemas
|> Task.async_stream( |> Stream.map(&reindex_schema/1)
&reindex_schema/1,
ordered: false,
timeout: :infinity
)
|> Stream.run() |> Stream.run()
end end
@ -115,12 +117,17 @@ defmodule Philomena.SearchIndexer do
# Reports currently require handling for their polymorphic nature # Reports currently require handling for their polymorphic nature
Report Report
|> preload([:user, :admin]) |> preload([:user, :admin])
|> Batch.record_batches() |> Batch.record_batches(batch_size: @batch_sizes[Report])
|> Enum.each(fn records -> |> Task.async_stream(
fn records ->
records records
|> Polymorphic.load_polymorphic(reportable: [reportable_id: :reportable_type]) |> Polymorphic.load_polymorphic(reportable: [reportable_id: :reportable_type])
|> Enum.map(&Search.index_document(&1, Report)) |> Enum.map(&Search.index_document(&1, Report))
end) end,
timeout: :infinity,
max_concurrency: max_concurrency()
)
|> Stream.run()
end end
def reindex_schema(schema) when schema in @schemas do def reindex_schema(schema) when schema in @schemas do
@ -129,6 +136,14 @@ defmodule Philomena.SearchIndexer do
schema schema
|> preload(^context.indexing_preloads()) |> preload(^context.indexing_preloads())
|> Search.reindex(schema) |> Search.reindex(schema,
batch_size: @batch_sizes[schema],
max_concurrency: max_concurrency()
)
end
@spec max_concurrency() :: pos_integer()
defp max_concurrency do
System.schedulers_online()
end end
end end

View file

@ -201,13 +201,17 @@ defmodule PhilomenaQuery.Search do
""" """
@spec reindex(queryable(), schema_module(), Batch.batch_options()) :: :ok @spec reindex(queryable(), schema_module(), Batch.batch_options()) :: :ok
def reindex(queryable, module, opts \\ []) do def reindex(queryable, module, opts \\ []) do
max_concurrency = Keyword.get(opts, :max_concurrency, 1)
index = @policy.index_for(module) index = @policy.index_for(module)
queryable queryable
|> Batch.record_batches(opts) |> Batch.query_batches(opts)
|> Enum.each(fn records -> |> Task.async_stream(
fn query ->
lines = lines =
Enum.flat_map(records, fn record -> query
|> Repo.all()
|> Enum.flat_map(fn record ->
doc = index.as_json(record) doc = index.as_json(record)
[ [
@ -217,7 +221,11 @@ defmodule PhilomenaQuery.Search do
end) end)
Api.bulk(@policy.opensearch_url(), lines) Api.bulk(@policy.opensearch_url(), lines)
end) end,
timeout: :infinity,
max_concurrency: max_concurrency
)
|> Stream.run()
end end
@doc ~S""" @doc ~S"""

View file

@ -46,10 +46,10 @@ defmodule PhilomenaQuery.Search.Client do
end end
defp encode_body(body) when is_map(body), defp encode_body(body) when is_map(body),
do: Jason.encode!(body) do: Jason.encode_to_iodata!(body)
defp encode_body(body) when is_list(body), defp encode_body(body) when is_list(body),
do: [Enum.map_intersperse(body, "\n", &Jason.encode!(&1)), "\n"] do: [Enum.map_intersperse(body, "\n", &Jason.encode_to_iodata!(&1)), "\n"]
defp encode_options, defp encode_options,
do: [headers: request_headers(), receive_timeout: @receive_timeout] do: [headers: request_headers(), receive_timeout: @receive_timeout]