philomena/lib/philomena_query/search.ex

625 lines
18 KiB
Elixir
Raw Normal View History

defmodule PhilomenaQuery.Search do
@moduledoc """
Low-level search engine interaction.
This module generates and delivers search bodies to the OpenSearch backend.
Note that before an index can be used to index or query documents, a call to
`create_index!/1` must be made. When setting up an application, or dealing with data loss
in the search engine, you must call `create_index!/1` before running an indexing task.
"""
alias PhilomenaQuery.Batch
2024-06-20 22:11:39 +02:00
alias PhilomenaQuery.Search.Api
alias Philomena.Repo
require Logger
import Ecto.Query
# todo: fetch through compile_env?
@policy Philomena.SearchPolicy
@typedoc """
Any schema module which has an associated search index. See the policy module
for more information.
"""
@type schema_module :: @policy.schema_module()
@typedoc """
Represents an object which may be operated on via `m:Ecto.Query`.
This could be a schema object (e.g. `m:Philomena.Images.Image`) or a fully formed query
`from i in Image, where: i.hidden_from_users == false`.
"""
@type queryable :: any()
@typedoc """
A query body, as deliverable to any index's `_search` endpoint.
See the query DSL documentation for additional information:
https://opensearch.org/docs/latest/query-dsl/
"""
@type query_body :: map()
@typedoc """
Given a term at the given path, replace the old term with the new term.
`path` is a list of names to be followed to find the old term. For example,
a document containing `{"condiments": "dijon"}` would permit `["condiments"]`
as the path, and a document containing `{"namespaced_tags": {"name": ["old"]}}`
would permit `["namespaced_tags", "name"]` as the path.
"""
@type replacement :: %{
path: [String.t()],
old: term(),
new: term()
}
@type search_definition :: %{
module: schema_module(),
body: query_body(),
page_number: integer(),
page_size: integer()
}
@type pagination_params :: %{
optional(:page_number) => integer(),
optional(:page_size) => integer()
}
@doc ~S"""
Create the index with the module's index name and mapping.
`PUT /#{index_name}`
You **must** use this function before indexing documents in order for the mapping to be created
correctly. If you index documents without a mapping created, the search engine will create a
mapping which does not contain the correct types for mapping fields, which will require
destroying and recreating the index.
## Example
iex> Search.create_index!(Image)
"""
@spec create_index!(schema_module()) :: any()
def create_index!(module) do
index = @policy.index_for(module)
2024-06-20 22:11:39 +02:00
Api.create_index(@policy.opensearch_url(), index.index_name(), index.mapping())
end
@doc ~S"""
Delete the index with the module's index name.
`DELETE /#{index_name}`
This undoes the effect of `create_index!/1` and removes the index permanently, deleting
all indexed documents within.
## Example
iex> Search.delete_index!(Image)
"""
@spec delete_index!(schema_module()) :: any()
def delete_index!(module) do
index = @policy.index_for(module)
2024-06-20 22:11:39 +02:00
Api.delete_index(@policy.opensearch_url(), index.index_name())
end
@doc ~S"""
Update the schema mapping for the module's index name.
`PUT /#{index_name}/_mapping`
This is used to add new fields to an existing search mapping. This cannot be used to
remove fields; removing fields requires recreating the index.
## Example
iex> Search.update_mapping!(Image)
"""
@spec update_mapping!(schema_module()) :: any()
def update_mapping!(module) do
index = @policy.index_for(module)
index_name = index.index_name()
mapping = index.mapping().mappings.properties
2024-06-20 22:11:39 +02:00
Api.update_index_mapping(@policy.opensearch_url(), index_name, %{properties: mapping})
end
@doc ~S"""
Add a single document to the index named by the module.
`PUT /#{index_name}/_doc/#{id}`
This allows the search engine to query the document.
Note that indexing is near real-time and requires an index refresh before the document will
become visible. Unless changed in the mapping, this happens after 5 seconds have elapsed.
## Example
iex> Search.index_document(%Image{...}, Image)
"""
@spec index_document(struct(), schema_module()) :: any()
def index_document(doc, module) do
index = @policy.index_for(module)
data = index.as_json(doc)
2024-06-20 22:11:39 +02:00
Api.index_document(@policy.opensearch_url(), index.index_name(), data, data.id)
end
@doc ~S"""
Remove a single document from the index named by the module.
`DELETE /#{index_name}/_doc/#{id}`
This undoes the effect of `index_document/2`; it instructs the search engine to discard
the document and no longer return it in queries.
Note that indexing is near real-time and requires an index refresh before the document will
be removed. Unless changed in the mapping, this happens after 5 seconds have elapsed.
## Example
iex> Search.delete_document(image.id, Image)
"""
@spec delete_document(term(), schema_module()) :: any()
def delete_document(id, module) do
index = @policy.index_for(module)
2024-06-20 22:11:39 +02:00
Api.delete_document(@policy.opensearch_url(), index.index_name(), id)
end
@doc """
Efficiently index a batch of documents in the index named by the module.
This function is substantially more efficient than running `index_document/2` for
each instance of a schema struct and can index with hundreds of times the throughput.
The queryable should be a schema type with its indexing preloads included in
the query. The options are forwarded to `PhilomenaQuery.Batch.record_batches/3`.
Note that indexing is near real-time and requires an index refresh before documents will
become visible. Unless changed in the mapping, this happens after 5 seconds have elapsed.
## Example
query =
from i in Image,
where: i.id < 100_000,
preload: ^Images.indexing_preloads()
Search.reindex(query, Image, batch_size: 5000)
"""
@spec reindex(queryable(), schema_module(), Batch.batch_options()) :: :ok
def reindex(queryable, module, opts \\ []) do
index = @policy.index_for(module)
process =
fn records ->
lines =
Enum.flat_map(records, fn record ->
doc = index.as_json(record)
[
%{index: %{_index: index.index_name(), _id: doc.id}},
doc
]
end)
Api.bulk(@policy.opensearch_url(), lines)
end
queryable
|> Batch.record_batches(opts)
|> Task.async_stream(process, ordered: false, timeout: :infinity)
|> Stream.run()
end
@doc ~S"""
Asynchronously update all documents in the given index matching a query.
`POST /#{index_name}/_update_by_query`
This is used to replace values in documents on the fly without requiring a more-expensive
reindex operation from the database.
`set_replacements` are used to rename values in fields which are conceptually sets (arrays).
`replacements` are used to rename values in fields which are standalone terms.
Both `replacements` and `set_replacements` may be specified. Specifying neither will waste
the search engine's time evaluating the query and indexing the documents, so be sure to
specify at least one.
This function does not wait for completion of the update.
## Examples
query_body = %{term: %{"namespaced_tags.name" => old_name}}
replacement = %{path: ["namespaced_tags", "name"], old: old_name, new: new_name}
Search.update_by_query(Image, query_body, [], [replacement])
query_body = %{term: %{author: old_name}}
set_replacement = %{path: ["author"], old: old_name, new: new_name}
Search.update_by_query(Post, query_body, [set_replacement], [])
"""
@spec update_by_query(schema_module(), query_body(), [replacement()], [replacement()]) :: any()
def update_by_query(module, query_body, set_replacements, replacements) do
index = @policy.index_for(module)
# "Painless" scripting language
script = """
// Replace values in "sets" (arrays in the source document)
for (int i = 0; i < params.set_replacements.length; ++i) {
def replacement = params.set_replacements[i];
def path = replacement.path;
def old_value = replacement.old;
def new_value = replacement.new;
def reference = ctx._source;
for (int j = 0; j < path.length; ++j) {
reference = reference[path[j]];
}
for (int j = 0; j < reference.length; ++j) {
if (reference[j].equals(old_value)) {
reference[j] = new_value;
}
}
}
// Replace values in standalone fields
for (int i = 0; i < params.replacements.length; ++i) {
def replacement = params.replacements[i];
def path = replacement.path;
def old_value = replacement.old;
def new_value = replacement.new;
def reference = ctx._source;
// A little bit more complicated: go up to the last one before it
// so that the value can actually be replaced
for (int j = 0; j < path.length - 1; ++j) {
reference = reference[path[j]];
}
if (reference[path[path.length - 1]] != null && reference[path[path.length - 1]].equals(old_value)) {
reference[path[path.length - 1]] = new_value;
}
}
"""
body =
2024-06-20 22:11:39 +02:00
%{
script: %{
source: script,
params: %{
set_replacements: set_replacements,
replacements: replacements
}
},
query: query_body
2024-06-20 22:11:39 +02:00
}
2024-06-20 22:11:39 +02:00
Api.update_by_query(@policy.opensearch_url(), index.index_name(), body)
end
@doc ~S"""
Search the index named by the module.
`GET /#{index_name}/_search`
Given a query body, this returns the raw query results.
## Example
iex> Search.search(Image, %{query: %{match_all: %{}}})
%{
"_shards" => %{"failed" => 0, "skipped" => 0, "successful" => 5, "total" => 5},
"hits" => %{
"hits" => [%{"_id" => "1", "_index" => "images", "_score" => 1.0, ...}, ...]
"max_score" => 1.0,
"total" => %{"relation" => "eq", "value" => 6}
},
"timed_out" => false,
"took" => 1
}
"""
@spec search(schema_module(), query_body()) :: map()
def search(module, query_body) do
index = @policy.index_for(module)
2024-06-20 22:11:39 +02:00
{:ok, %{body: results, status: 200}} =
Api.search(@policy.opensearch_url(), index.index_name(), query_body)
results
end
@doc ~S"""
Given maps of module and body, searches each index with the respective body.
`GET /_all/_search`
This is more efficient than performing a `search/1` for each index individually.
Like `search/1`, this returns the raw query results.
## Example
iex> Search.msearch([
...> %{module: Image, body: %{query: %{match_all: %{}}}},
...> %{module: Post, body: %{query: %{match_all: %{}}}}
...> ])
[
%{"_shards" => ..., "hits" => ..., "timed_out" => false, "took" => 1},
%{"_shards" => ..., "hits" => ..., "timed_out" => false, "took" => 2}
]
"""
@spec msearch([search_definition()]) :: [map()]
def msearch(definitions) do
msearch_body =
Enum.flat_map(definitions, fn def ->
[
%{index: @policy.index_for(def.module).index_name()},
def.body
]
end)
2024-06-20 22:11:39 +02:00
{:ok, %{body: results, status: 200}} =
Api.msearch(@policy.opensearch_url(), msearch_body)
results["responses"]
end
@doc """
Transforms an index module, query body, and pagination parameters into a query suitable
for submission to the search engine.
Any of the following functions may be used for submission:
- `search_results/1`
- `msearch_results/1`
- `search_records/2`
- `msearch_records/2`
- `search_records_with_hits/2`
- `msearch_records_with_hits/2`
## Example
iex> Search.search_definition(Image, %{query: %{match_all: %{}}}, %{page_number: 3, page_size: 50})
%{
module: Image,
body: %{
size: 50,
query: %{match_all: %{}},
from: 100,
_source: false,
track_total_hits: true
},
page_size: 50,
page_number: 3
}
"""
@spec search_definition(schema_module(), query_body(), pagination_params()) ::
search_definition()
def search_definition(module, search_query, pagination_params \\ %{}) do
page_number = pagination_params[:page_number] || 1
page_size = pagination_params[:page_size] || 25
search_query =
Map.merge(search_query, %{
from: (page_number - 1) * page_size,
size: page_size,
_source: false,
track_total_hits: true
})
%{
module: module,
body: search_query,
page_number: page_number,
page_size: page_size
}
end
defp process_results(results, definition) do
time = results["took"]
count = results["hits"]["total"]["value"]
entries = Enum.map(results["hits"]["hits"], &{String.to_integer(&1["_id"]), &1})
Logger.debug("[Search] Query took #{time}ms")
Logger.debug("[Search] #{Jason.encode!(definition.body)}")
%Scrivener.Page{
entries: entries,
page_number: definition.page_number,
page_size: definition.page_size,
total_entries: count,
total_pages: div(count + definition.page_size - 1, definition.page_size)
}
end
@doc """
Given a search definition generated by `search_definition/3`, submit the query and return
a `m:Scrivener.Page` of results.
The `entries` in the page are a list of tuples of record IDs paired with the hit that generated
them.
## Example
iex> Search.search_results(definition)
%Scrivener.Page{
entries: [{1, %{"_id" => "1", ...}}, ...],
page_number: 1,
page_size: 25,
total_entries: 6,
total_pages: 1
}
"""
@spec search_results(search_definition()) :: Scrivener.Page.t()
def search_results(definition) do
process_results(search(definition.module, definition.body), definition)
end
@doc """
Given a list of search definitions, each generated by `search_definition/3`, submit the query
and return a corresponding list of `m:Scrivener.Page` for each query.
The `entries` in the page are a list of tuples of record IDs paired with the hit that generated
them.
## Example
iex> Search.msearch_results([definition])
[
%Scrivener.Page{
entries: [{1, %{"_id" => "1", ...}}, ...],
page_number: 1,
page_size: 25,
total_entries: 6,
total_pages: 1
}
]
"""
@spec msearch_results([search_definition()]) :: [Scrivener.Page.t()]
def msearch_results(definitions) do
Enum.map(Enum.zip(msearch(definitions), definitions), fn {result, definition} ->
process_results(result, definition)
end)
end
defp load_records_from_results(results, ecto_queries) do
Enum.map(Enum.zip(results, ecto_queries), fn {page, ecto_query} ->
{ids, hits} = Enum.unzip(page.entries)
records =
ecto_query
|> where([m], m.id in ^ids)
|> Repo.all()
|> Enum.sort_by(&Enum.find_index(ids, fn el -> el == &1.id end))
%{page | entries: Enum.zip(records, hits)}
end)
end
@doc """
Given a search definition generated by `search_definition/3`, submit the query and return a
`m:Scrivener.Page` of results.
The `entries` in the page are a list of tuples of schema structs paired with the hit that
generated them.
## Example
iex> Search.search_records_with_hits(definition, preload(Image, :tags))
%Scrivener.Page{
entries: [{%Image{id: 1, ...}, %{"_id" => "1", ...}}, ...],
page_number: 1,
page_size: 25,
total_entries: 6,
total_pages: 1
}
"""
@spec search_records_with_hits(search_definition(), queryable()) :: Scrivener.Page.t()
def search_records_with_hits(definition, ecto_query) do
[page] = load_records_from_results([search_results(definition)], [ecto_query])
page
end
@doc """
Given a list of search definitions, each generated by `search_definition/3`, submit the query
and return a corresponding list of `m:Scrivener.Page` for each query.
The `entries` in the page are a list of tuples of schema structs paired with the hit that
generated them.
## Example
iex> Search.msearch_records_with_hits([definition], [preload(Image, :tags)])
[
%Scrivener.Page{
entries: [{%Image{id: 1, ...}, %{"_id" => "1", ...}}, ...],
page_number: 1,
page_size: 25,
total_entries: 6,
total_pages: 1
}
]
"""
@spec msearch_records_with_hits([search_definition()], [queryable()]) :: [Scrivener.Page.t()]
def msearch_records_with_hits(definitions, ecto_queries) do
load_records_from_results(msearch_results(definitions), ecto_queries)
end
@doc """
Given a search definition generated by `search_definition/3`, submit the query and return a
`m:Scrivener.Page` of results.
The `entries` in the page are a list of schema structs.
## Example
iex> Search.search_records(definition, preload(Image, :tags))
%Scrivener.Page{
entries: [%Image{id: 1, ...}, ...],
page_number: 1,
page_size: 25,
total_entries: 6,
total_pages: 1
}
"""
@spec search_records(search_definition(), queryable()) :: Scrivener.Page.t()
def search_records(definition, ecto_query) do
page = search_records_with_hits(definition, ecto_query)
{records, _hits} = Enum.unzip(page.entries)
%{page | entries: records}
end
@doc """
Given a list of search definitions, each generated by `search_definition/3`, submit the query
and return a corresponding list of `m:Scrivener.Page` for each query.
The `entries` in the page are a list of schema structs.
## Example
iex> Search.msearch_records([definition], [preload(Image, :tags)])
[
%Scrivener.Page{
entries: [%Image{id: 1, ...}, ...],
page_number: 1,
page_size: 25,
total_entries: 6,
total_pages: 1
}
]
"""
@spec msearch_records([search_definition()], [queryable()]) :: [Scrivener.Page.t()]
def msearch_records(definitions, ecto_queries) do
Enum.map(load_records_from_results(msearch_results(definitions), ecto_queries), fn page ->
{records, _hits} = Enum.unzip(page.entries)
%{page | entries: records}
end)
end
end