2024-05-25 20:03:45 +02:00
|
|
|
defmodule PhilomenaQuery.Batch do
|
|
|
|
@moduledoc """
|
|
|
|
Locking-reduced database batch operations.
|
|
|
|
|
|
|
|
These operations are non-transactional by their very nature. This prevents inadvertent
|
|
|
|
downtimes due to blocking, but can result in consistency errors in the database,
|
|
|
|
and so should be used sparingly.
|
|
|
|
|
|
|
|
They are best suited for when large numbers of rows can be expected to be processed,
|
|
|
|
as doing so may otherwise result in Ecto timing out the query.
|
|
|
|
"""
|
|
|
|
|
|
|
|
alias Philomena.Repo
|
|
|
|
import Ecto.Query
|
|
|
|
|
2024-06-18 19:18:12 +02:00
|
|
|
@typedoc """
|
|
|
|
Represents an object which may be operated on via `m:Ecto.Query`.
|
|
|
|
|
|
|
|
This could be a schema object (e.g. `m:Philomena.Images.Image`) or a fully formed query
|
|
|
|
`from i in Image, where: i.hidden_from_users == false`.
|
|
|
|
"""
|
2024-05-25 20:03:45 +02:00
|
|
|
@type queryable :: any()
|
|
|
|
|
|
|
|
@type batch_size :: {:batch_size, integer()}
|
|
|
|
@type id_field :: {:id_field, atom()}
|
|
|
|
@type batch_options :: [batch_size() | id_field()]
|
|
|
|
|
2024-11-07 19:46:16 +01:00
|
|
|
@doc """
|
|
|
|
Stream schema structures on a queryable, using batches to avoid locking.
|
2024-06-18 19:18:12 +02:00
|
|
|
|
2024-11-07 19:46:16 +01:00
|
|
|
Valid options:
|
|
|
|
* `batch_size` (integer) - the number of records to load per batch
|
|
|
|
* `id_field` (atom) - the name of the field containing the ID
|
2024-06-18 19:18:12 +02:00
|
|
|
|
2024-11-07 19:46:16 +01:00
|
|
|
## Example
|
|
|
|
|
|
|
|
queryable = from i in Image, where: i.image_width >= 1920
|
|
|
|
|
|
|
|
queryable
|
|
|
|
|> PhilomenaQuery.Batch.record_batches()
|
|
|
|
|> Enum.each(fn image -> IO.inspect(image.id) end)
|
2024-06-18 19:18:12 +02:00
|
|
|
|
|
|
|
"""
|
2024-11-07 19:46:16 +01:00
|
|
|
@spec records(queryable(), batch_options()) :: Enumerable.t()
|
|
|
|
def records(queryable, opts \\ []) do
|
|
|
|
queryable
|
|
|
|
|> query_batches(opts)
|
|
|
|
|> Stream.flat_map(&Repo.all/1)
|
|
|
|
end
|
2024-05-25 20:03:45 +02:00
|
|
|
|
|
|
|
@doc """
|
2024-11-07 19:46:16 +01:00
|
|
|
Stream lists of schema structures on a queryable, using batches to avoid
|
|
|
|
locking.
|
2024-05-25 20:03:45 +02:00
|
|
|
|
|
|
|
Valid options:
|
|
|
|
* `batch_size` (integer) - the number of records to load per batch
|
|
|
|
* `id_field` (atom) - the name of the field containing the ID
|
|
|
|
|
|
|
|
## Example
|
|
|
|
|
|
|
|
queryable = from i in Image, where: i.image_width >= 1920
|
|
|
|
|
|
|
|
cb = fn images ->
|
|
|
|
Enum.each(images, &IO.inspect(&1.id))
|
|
|
|
end
|
|
|
|
|
2024-11-07 19:46:16 +01:00
|
|
|
queryable
|
|
|
|
|> PhilomenaQuery.Batch.record_batches()
|
|
|
|
|> Enum.each(cb)
|
2024-05-25 20:03:45 +02:00
|
|
|
|
|
|
|
"""
|
2024-11-07 19:46:16 +01:00
|
|
|
@spec record_batches(queryable(), batch_options()) :: Enumerable.t()
|
|
|
|
def record_batches(queryable, opts \\ []) do
|
|
|
|
queryable
|
|
|
|
|> query_batches(opts)
|
|
|
|
|> Stream.map(&Repo.all/1)
|
2024-05-25 20:03:45 +02:00
|
|
|
end
|
|
|
|
|
|
|
|
@doc """
|
2024-11-07 19:46:16 +01:00
|
|
|
Stream bulk queries on a queryable, using batches to avoid locking.
|
2024-05-25 20:03:45 +02:00
|
|
|
|
|
|
|
Valid options:
|
|
|
|
* `batch_size` (integer) - the number of records to load per batch
|
|
|
|
* `id_field` (atom) - the name of the field containing the ID
|
|
|
|
|
|
|
|
> #### Info {: .info}
|
|
|
|
>
|
|
|
|
> If you are looking to receive schema structures (e.g., you are querying for `Image`s,
|
|
|
|
> and you want to receive `Image` objects, then use `record_batches/3` instead.
|
|
|
|
|
2024-11-07 19:46:16 +01:00
|
|
|
`m:Ecto.Query` structs which select the IDs in each batch are streamed out.
|
2024-05-25 20:03:45 +02:00
|
|
|
|
|
|
|
## Example
|
|
|
|
|
|
|
|
queryable = from ui in ImageVote, where: ui.user_id == 1234
|
|
|
|
|
2024-11-07 19:46:16 +01:00
|
|
|
queryable
|
|
|
|
|> PhilomenaQuery.Batch.query_batches(id_field: :image_id)
|
|
|
|
|> Enum.each(fn batch_query -> Repo.delete_all(batch_query) end)
|
2024-05-25 20:03:45 +02:00
|
|
|
|
|
|
|
"""
|
2024-11-07 19:46:16 +01:00
|
|
|
@spec query_batches(queryable(), batch_options()) :: Enumerable.t(Ecto.Query.t())
|
|
|
|
def query_batches(queryable, opts \\ []) do
|
2024-05-25 20:03:45 +02:00
|
|
|
id_field = Keyword.get(opts, :id_field, :id)
|
|
|
|
|
2024-11-07 19:46:16 +01:00
|
|
|
Stream.unfold(
|
|
|
|
load_ids(queryable, -1, opts),
|
|
|
|
fn
|
|
|
|
[] ->
|
|
|
|
# Stop when no more results are produced
|
|
|
|
nil
|
2024-05-25 20:03:45 +02:00
|
|
|
|
2024-11-07 19:46:16 +01:00
|
|
|
ids ->
|
|
|
|
# Process results and output next query
|
|
|
|
output = where(queryable, [m], field(m, ^id_field) in ^ids)
|
|
|
|
next_ids = load_ids(queryable, Enum.max(ids), opts)
|
2024-05-25 20:03:45 +02:00
|
|
|
|
2024-11-07 19:46:16 +01:00
|
|
|
{output, next_ids}
|
|
|
|
end
|
|
|
|
)
|
2024-05-25 20:03:45 +02:00
|
|
|
end
|
|
|
|
|
|
|
|
defp load_ids(queryable, max_id, opts) do
|
|
|
|
id_field = Keyword.get(opts, :id_field, :id)
|
|
|
|
batch_size = Keyword.get(opts, :batch_size, 1000)
|
|
|
|
|
|
|
|
queryable
|
|
|
|
|> exclude(:order_by)
|
2024-11-04 01:17:01 +01:00
|
|
|
|> exclude(:preload)
|
|
|
|
|> exclude(:select)
|
2024-05-25 20:03:45 +02:00
|
|
|
|> order_by(asc: ^id_field)
|
|
|
|
|> where([m], field(m, ^id_field) > ^max_id)
|
|
|
|
|> select([m], field(m, ^id_field))
|
|
|
|
|> limit(^batch_size)
|
|
|
|
|> Repo.all()
|
|
|
|
end
|
|
|
|
end
|