philomena/lib/philomena/elasticsearch.ex

142 lines
3.8 KiB
Elixir
Raw Normal View History

2019-08-18 21:43:06 -04:00
defmodule Philomena.Elasticsearch do
defmacro __using__(opts) do
definition = Keyword.fetch!(opts, :definition)
index_name = Keyword.fetch!(opts, :index_name)
2019-08-28 21:18:40 -04:00
doc_type = Keyword.fetch!(opts, :doc_type)
2019-08-18 21:43:06 -04:00
elastic_url = Application.get_env(:philomena, :elasticsearch_url)
quote do
alias Philomena.Repo
import Ecto.Query, warn: false
2019-10-08 19:19:57 -04:00
require Logger
2019-08-18 21:43:06 -04:00
def create_index! do
Elastix.Index.create(
unquote(elastic_url),
unquote(index_name),
unquote(definition).mapping()
)
end
def delete_index! do
Elastix.Index.delete(unquote(elastic_url), unquote(index_name))
end
def index_document(doc) do
data = unquote(definition).as_json(doc)
2019-08-28 21:35:01 -04:00
Elastix.Document.index(
unquote(elastic_url),
unquote(index_name),
[unquote(doc_type)],
data.id,
data
)
2019-08-18 21:43:06 -04:00
end
2019-12-04 19:11:31 -05:00
def delete_document(id) do
Elastix.Document.delete(
unquote(elastic_url),
unquote(index_name),
unquote(doc_type),
id
)
end
2019-08-18 21:43:06 -04:00
def reindex(ecto_query, batch_size \\ 1000) do
ids =
ecto_query
|> exclude(:preload)
|> exclude(:order_by)
|> order_by(asc: :id)
|> select([m], m.id)
|> limit(^batch_size)
|> Repo.all()
reindex(ecto_query, batch_size, ids)
end
def reindex(ecto_query, batch_size, []), do: nil
def reindex(ecto_query, batch_size, ids) do
lines =
ecto_query
|> where([m], m.id in ^ids)
|> Repo.all()
|> Enum.flat_map(fn m ->
doc = unquote(definition).as_json(m)
[
2019-08-28 21:18:40 -04:00
%{index: %{_index: unquote(index_name), _type: unquote(doc_type), _id: doc.id}},
2019-08-18 21:43:06 -04:00
doc
]
end)
Elastix.Bulk.post(unquote(elastic_url), lines,
index: unquote(index_name),
httpoison_options: [timeout: 30_000]
)
ids =
ecto_query
|> exclude(:preload)
|> exclude(:order_by)
|> order_by(asc: :id)
|> where([m], m.id > ^Enum.max(ids))
|> select([m], m.id)
|> limit(^batch_size)
|> Repo.all()
reindex(ecto_query, batch_size, ids)
end
2019-11-29 17:45:44 -05:00
def search(query_body) do
2019-08-18 21:43:06 -04:00
{:ok, %{body: results, status_code: 200}} =
Elastix.Search.search(
unquote(elastic_url),
unquote(index_name),
2019-08-28 21:18:40 -04:00
[unquote(doc_type)],
2019-11-29 17:45:44 -05:00
query_body
2019-08-18 21:43:06 -04:00
)
2019-11-29 17:45:44 -05:00
results
end
def search_results(elastic_query, pagination_params \\ %{}) do
page_number = pagination_params[:page_number] || 1
page_size = pagination_params[:page_size] || 25
elastic_query = Map.merge(elastic_query, %{from: (page_number - 1) * page_size, size: page_size, _source: false})
results = search(elastic_query)
2019-10-08 19:19:57 -04:00
time = results["took"]
count = results["hits"]["total"]
entries = results["hits"]["hits"] |> Enum.map(&String.to_integer(&1["_id"]))
Logger.debug("[Elasticsearch] Query took #{time}ms")
%Scrivener.Page{
entries: entries,
page_number: page_number,
page_size: page_size,
total_entries: count,
total_pages: div(count + page_size - 1, page_size)
}
2019-08-18 21:43:06 -04:00
end
2019-10-08 19:19:57 -04:00
def search_records(elastic_query, pagination_params \\ %{}, ecto_query \\ __MODULE__) do
page = search_results(elastic_query, pagination_params)
ids = page.entries
2019-08-18 21:43:06 -04:00
2019-10-08 19:19:57 -04:00
records =
ecto_query
|> where([m], m.id in ^ids)
|> Repo.all()
|> Enum.sort_by(&Enum.find_index(ids, fn el -> el == &1.id end))
2019-08-18 21:43:06 -04:00
2019-10-08 19:19:57 -04:00
%{page | entries: records}
2019-08-18 21:43:06 -04:00
end
end
end
end