mirror of
https://github.com/philomena-dev/philomena.git
synced 2024-11-23 20:18:00 +01:00
Merge pull request #361 from philomena-dev/streamout
Change PhilomenaQuery.Batch to Stream-based instead of callback-based
This commit is contained in:
commit
d752a6c128
5 changed files with 69 additions and 51 deletions
|
@ -121,7 +121,9 @@ defmodule Mix.Tasks.UploadToS3 do
|
||||||
end
|
end
|
||||||
|
|
||||||
defp upload_typical(queryable, batch_size, file_root, new_file_root, field_name) do
|
defp upload_typical(queryable, batch_size, file_root, new_file_root, field_name) do
|
||||||
Batch.record_batches(queryable, [batch_size: batch_size], fn models ->
|
queryable
|
||||||
|
|> Batch.record_batches(batch_size: batch_size)
|
||||||
|
|> Enum.each(fn models ->
|
||||||
models
|
models
|
||||||
|> Task.async_stream(&upload_typical_model(&1, file_root, new_file_root, field_name),
|
|> Task.async_stream(&upload_typical_model(&1, file_root, new_file_root, field_name),
|
||||||
timeout: :infinity
|
timeout: :infinity
|
||||||
|
@ -142,7 +144,9 @@ defmodule Mix.Tasks.UploadToS3 do
|
||||||
end
|
end
|
||||||
|
|
||||||
defp upload_images(queryable, batch_size, file_root, new_file_root) do
|
defp upload_images(queryable, batch_size, file_root, new_file_root) do
|
||||||
Batch.record_batches(queryable, [batch_size: batch_size], fn models ->
|
queryable
|
||||||
|
|> Batch.record_batches(batch_size: batch_size)
|
||||||
|
|> Enum.each(fn models ->
|
||||||
models
|
models
|
||||||
|> Task.async_stream(&upload_image_model(&1, file_root, new_file_root), timeout: :infinity)
|
|> Task.async_stream(&upload_image_model(&1, file_root, new_file_root), timeout: :infinity)
|
||||||
|> Stream.run()
|
|> Stream.run()
|
||||||
|
|
|
@ -15,7 +15,8 @@ defmodule Philomena.UserDownvoteWipe do
|
||||||
|
|
||||||
ImageVote
|
ImageVote
|
||||||
|> where(user_id: ^user.id, up: false)
|
|> where(user_id: ^user.id, up: false)
|
||||||
|> Batch.query_batches([id_field: :image_id], fn queryable ->
|
|> Batch.query_batches(id_field: :image_id)
|
||||||
|
|> Enum.each(fn queryable ->
|
||||||
{_, image_ids} = Repo.delete_all(select(queryable, [i_v], i_v.image_id))
|
{_, image_ids} = Repo.delete_all(select(queryable, [i_v], i_v.image_id))
|
||||||
|
|
||||||
{count, nil} =
|
{count, nil} =
|
||||||
|
@ -31,7 +32,8 @@ defmodule Philomena.UserDownvoteWipe do
|
||||||
if upvotes_and_faves_too do
|
if upvotes_and_faves_too do
|
||||||
ImageVote
|
ImageVote
|
||||||
|> where(user_id: ^user.id, up: true)
|
|> where(user_id: ^user.id, up: true)
|
||||||
|> Batch.query_batches([id_field: :image_id], fn queryable ->
|
|> Batch.query_batches(id_field: :image_id)
|
||||||
|
|> Enum.each(fn queryable ->
|
||||||
{_, image_ids} = Repo.delete_all(select(queryable, [i_v], i_v.image_id))
|
{_, image_ids} = Repo.delete_all(select(queryable, [i_v], i_v.image_id))
|
||||||
|
|
||||||
{count, nil} =
|
{count, nil} =
|
||||||
|
@ -46,7 +48,8 @@ defmodule Philomena.UserDownvoteWipe do
|
||||||
|
|
||||||
ImageFave
|
ImageFave
|
||||||
|> where(user_id: ^user.id)
|
|> where(user_id: ^user.id)
|
||||||
|> Batch.query_batches([id_field: :image_id], fn queryable ->
|
|> Batch.query_batches(id_field: :image_id)
|
||||||
|
|> Enum.each(fn queryable ->
|
||||||
{_, image_ids} = Repo.delete_all(select(queryable, [i_f], i_f.image_id))
|
{_, image_ids} = Repo.delete_all(select(queryable, [i_f], i_f.image_id))
|
||||||
|
|
||||||
{count, nil} =
|
{count, nil} =
|
||||||
|
|
|
@ -27,7 +27,9 @@ defmodule Philomena.TagChangeRevertWorker do
|
||||||
batch_size = attributes["batch_size"] || 100
|
batch_size = attributes["batch_size"] || 100
|
||||||
attributes = Map.delete(attributes, "batch_size")
|
attributes = Map.delete(attributes, "batch_size")
|
||||||
|
|
||||||
Batch.query_batches(queryable, [batch_size: batch_size], fn queryable ->
|
queryable
|
||||||
|
|> Batch.query_batches(batch_size: batch_size)
|
||||||
|
|> Enum.each(fn queryable ->
|
||||||
ids = Repo.all(select(queryable, [tc], tc.id))
|
ids = Repo.all(select(queryable, [tc], tc.id))
|
||||||
TagChanges.mass_revert(ids, cast_ip(atomify_keys(attributes)))
|
TagChanges.mass_revert(ids, cast_ip(atomify_keys(attributes)))
|
||||||
end)
|
end)
|
||||||
|
|
|
@ -25,24 +25,32 @@ defmodule PhilomenaQuery.Batch do
|
||||||
@type id_field :: {:id_field, atom()}
|
@type id_field :: {:id_field, atom()}
|
||||||
@type batch_options :: [batch_size() | id_field()]
|
@type batch_options :: [batch_size() | id_field()]
|
||||||
|
|
||||||
@typedoc """
|
@doc """
|
||||||
The callback for `record_batches/3`.
|
Stream schema structures on a queryable, using batches to avoid locking.
|
||||||
|
|
||||||
|
Valid options:
|
||||||
|
* `batch_size` (integer) - the number of records to load per batch
|
||||||
|
* `id_field` (atom) - the name of the field containing the ID
|
||||||
|
|
||||||
|
## Example
|
||||||
|
|
||||||
|
queryable = from i in Image, where: i.image_width >= 1920
|
||||||
|
|
||||||
|
queryable
|
||||||
|
|> PhilomenaQuery.Batch.record_batches()
|
||||||
|
|> Enum.each(fn image -> IO.inspect(image.id) end)
|
||||||
|
|
||||||
Takes a list of schema structs which were returned in the batch. Return value is ignored.
|
|
||||||
"""
|
"""
|
||||||
@type record_batch_callback :: ([struct()] -> any())
|
@spec records(queryable(), batch_options()) :: Enumerable.t()
|
||||||
|
def records(queryable, opts \\ []) do
|
||||||
@typedoc """
|
queryable
|
||||||
The callback for `query_batches/3`.
|
|> query_batches(opts)
|
||||||
|
|> Stream.flat_map(&Repo.all/1)
|
||||||
Takes an `m:Ecto.Query` that can be processed with `m:Philomena.Repo` query commands, such
|
end
|
||||||
as `Philomena.Repo.update_all/3` or `Philomena.Repo.delete_all/2`. Return value is ignored.
|
|
||||||
"""
|
|
||||||
@type query_batch_callback :: ([Ecto.Query.t()] -> any())
|
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
Execute a callback with lists of schema structures on a queryable,
|
Stream lists of schema structures on a queryable, using batches to avoid
|
||||||
using batches to avoid locking.
|
locking.
|
||||||
|
|
||||||
Valid options:
|
Valid options:
|
||||||
* `batch_size` (integer) - the number of records to load per batch
|
* `batch_size` (integer) - the number of records to load per batch
|
||||||
|
@ -56,16 +64,20 @@ defmodule PhilomenaQuery.Batch do
|
||||||
Enum.each(images, &IO.inspect(&1.id))
|
Enum.each(images, &IO.inspect(&1.id))
|
||||||
end
|
end
|
||||||
|
|
||||||
PhilomenaQuery.Batch.record_batches(queryable, cb)
|
queryable
|
||||||
|
|> PhilomenaQuery.Batch.record_batches()
|
||||||
|
|> Enum.each(cb)
|
||||||
|
|
||||||
"""
|
"""
|
||||||
@spec record_batches(queryable(), batch_options(), record_batch_callback()) :: []
|
@spec record_batches(queryable(), batch_options()) :: Enumerable.t()
|
||||||
def record_batches(queryable, opts \\ [], callback) do
|
def record_batches(queryable, opts \\ []) do
|
||||||
query_batches(queryable, opts, &callback.(Repo.all(&1)))
|
queryable
|
||||||
|
|> query_batches(opts)
|
||||||
|
|> Stream.map(&Repo.all/1)
|
||||||
end
|
end
|
||||||
|
|
||||||
@doc """
|
@doc """
|
||||||
Execute a callback with bulk queries on a queryable, using batches to avoid locking.
|
Stream bulk queries on a queryable, using batches to avoid locking.
|
||||||
|
|
||||||
Valid options:
|
Valid options:
|
||||||
* `batch_size` (integer) - the number of records to load per batch
|
* `batch_size` (integer) - the number of records to load per batch
|
||||||
|
@ -76,41 +88,36 @@ defmodule PhilomenaQuery.Batch do
|
||||||
> If you are looking to receive schema structures (e.g., you are querying for `Image`s,
|
> If you are looking to receive schema structures (e.g., you are querying for `Image`s,
|
||||||
> and you want to receive `Image` objects, then use `record_batches/3` instead.
|
> and you want to receive `Image` objects, then use `record_batches/3` instead.
|
||||||
|
|
||||||
An `m:Ecto.Query` which selects all IDs in the current batch is passed into the callback
|
`m:Ecto.Query` structs which select the IDs in each batch are streamed out.
|
||||||
during each invocation.
|
|
||||||
|
|
||||||
## Example
|
## Example
|
||||||
|
|
||||||
queryable = from ui in ImageVote, where: ui.user_id == 1234
|
queryable = from ui in ImageVote, where: ui.user_id == 1234
|
||||||
|
|
||||||
opts = [id_field: :image_id]
|
queryable
|
||||||
|
|> PhilomenaQuery.Batch.query_batches(id_field: :image_id)
|
||||||
cb = fn bulk_query ->
|
|> Enum.each(fn batch_query -> Repo.delete_all(batch_query) end)
|
||||||
Repo.delete_all(bulk_query)
|
|
||||||
end
|
|
||||||
|
|
||||||
PhilomenaQuery.Batch.query_batches(queryable, opts, cb)
|
|
||||||
|
|
||||||
"""
|
"""
|
||||||
@spec query_batches(queryable(), batch_options(), query_batch_callback()) :: []
|
@spec query_batches(queryable(), batch_options()) :: Enumerable.t(Ecto.Query.t())
|
||||||
def query_batches(queryable, opts \\ [], callback) do
|
def query_batches(queryable, opts \\ []) do
|
||||||
ids = load_ids(queryable, -1, opts)
|
|
||||||
|
|
||||||
query_batches(queryable, opts, callback, ids)
|
|
||||||
end
|
|
||||||
|
|
||||||
defp query_batches(_queryable, _opts, _callback, []), do: []
|
|
||||||
|
|
||||||
defp query_batches(queryable, opts, callback, ids) do
|
|
||||||
id_field = Keyword.get(opts, :id_field, :id)
|
id_field = Keyword.get(opts, :id_field, :id)
|
||||||
|
|
||||||
queryable
|
Stream.unfold(
|
||||||
|> where([m], field(m, ^id_field) in ^ids)
|
load_ids(queryable, -1, opts),
|
||||||
|> callback.()
|
fn
|
||||||
|
[] ->
|
||||||
|
# Stop when no more results are produced
|
||||||
|
nil
|
||||||
|
|
||||||
ids = load_ids(queryable, Enum.max(ids), opts)
|
ids ->
|
||||||
|
# Process results and output next query
|
||||||
|
output = where(queryable, [m], field(m, ^id_field) in ^ids)
|
||||||
|
next_ids = load_ids(queryable, Enum.max(ids), opts)
|
||||||
|
|
||||||
query_batches(queryable, opts, callback, ids)
|
{output, next_ids}
|
||||||
|
end
|
||||||
|
)
|
||||||
end
|
end
|
||||||
|
|
||||||
defp load_ids(queryable, max_id, opts) do
|
defp load_ids(queryable, max_id, opts) do
|
||||||
|
|
|
@ -199,11 +199,13 @@ defmodule PhilomenaQuery.Search do
|
||||||
Search.reindex(query, Image, batch_size: 5000)
|
Search.reindex(query, Image, batch_size: 5000)
|
||||||
|
|
||||||
"""
|
"""
|
||||||
@spec reindex(queryable(), schema_module(), Batch.batch_options()) :: []
|
@spec reindex(queryable(), schema_module(), Batch.batch_options()) :: :ok
|
||||||
def reindex(queryable, module, opts \\ []) do
|
def reindex(queryable, module, opts \\ []) do
|
||||||
index = @policy.index_for(module)
|
index = @policy.index_for(module)
|
||||||
|
|
||||||
Batch.record_batches(queryable, opts, fn records ->
|
queryable
|
||||||
|
|> Batch.record_batches(opts)
|
||||||
|
|> Enum.each(fn records ->
|
||||||
lines =
|
lines =
|
||||||
Enum.flat_map(records, fn record ->
|
Enum.flat_map(records, fn record ->
|
||||||
doc = index.as_json(record)
|
doc = index.as_json(record)
|
||||||
|
|
Loading…
Reference in a new issue