diff --git a/lib/mix/tasks/upload_to_s3.ex b/lib/mix/tasks/upload_to_s3.ex index 87ab48d3..61c61f87 100644 --- a/lib/mix/tasks/upload_to_s3.ex +++ b/lib/mix/tasks/upload_to_s3.ex @@ -121,7 +121,9 @@ defmodule Mix.Tasks.UploadToS3 do end defp upload_typical(queryable, batch_size, file_root, new_file_root, field_name) do - Batch.record_batches(queryable, [batch_size: batch_size], fn models -> + queryable + |> Batch.record_batches(batch_size: batch_size) + |> Enum.each(fn models -> models |> Task.async_stream(&upload_typical_model(&1, file_root, new_file_root, field_name), timeout: :infinity @@ -142,7 +144,9 @@ defmodule Mix.Tasks.UploadToS3 do end defp upload_images(queryable, batch_size, file_root, new_file_root) do - Batch.record_batches(queryable, [batch_size: batch_size], fn models -> + queryable + |> Batch.record_batches(batch_size: batch_size) + |> Enum.each(fn models -> models |> Task.async_stream(&upload_image_model(&1, file_root, new_file_root), timeout: :infinity) |> Stream.run() diff --git a/lib/philomena/user_downvote_wipe.ex b/lib/philomena/user_downvote_wipe.ex index 6fb217fe..b958ab9b 100644 --- a/lib/philomena/user_downvote_wipe.ex +++ b/lib/philomena/user_downvote_wipe.ex @@ -15,7 +15,8 @@ defmodule Philomena.UserDownvoteWipe do ImageVote |> where(user_id: ^user.id, up: false) - |> Batch.query_batches([id_field: :image_id], fn queryable -> + |> Batch.query_batches(id_field: :image_id) + |> Enum.each(fn queryable -> {_, image_ids} = Repo.delete_all(select(queryable, [i_v], i_v.image_id)) {count, nil} = @@ -31,7 +32,8 @@ defmodule Philomena.UserDownvoteWipe do if upvotes_and_faves_too do ImageVote |> where(user_id: ^user.id, up: true) - |> Batch.query_batches([id_field: :image_id], fn queryable -> + |> Batch.query_batches(id_field: :image_id) + |> Enum.each(fn queryable -> {_, image_ids} = Repo.delete_all(select(queryable, [i_v], i_v.image_id)) {count, nil} = @@ -46,7 +48,8 @@ defmodule Philomena.UserDownvoteWipe do ImageFave |> where(user_id: ^user.id) - |> Batch.query_batches([id_field: :image_id], fn queryable -> + |> Batch.query_batches(id_field: :image_id) + |> Enum.each(fn queryable -> {_, image_ids} = Repo.delete_all(select(queryable, [i_f], i_f.image_id)) {count, nil} = diff --git a/lib/philomena/workers/tag_change_revert_worker.ex b/lib/philomena/workers/tag_change_revert_worker.ex index 519b8404..80058a31 100644 --- a/lib/philomena/workers/tag_change_revert_worker.ex +++ b/lib/philomena/workers/tag_change_revert_worker.ex @@ -27,7 +27,9 @@ defmodule Philomena.TagChangeRevertWorker do batch_size = attributes["batch_size"] || 100 attributes = Map.delete(attributes, "batch_size") - Batch.query_batches(queryable, [batch_size: batch_size], fn queryable -> + queryable + |> Batch.query_batches(batch_size: batch_size) + |> Enum.each(fn queryable -> ids = Repo.all(select(queryable, [tc], tc.id)) TagChanges.mass_revert(ids, cast_ip(atomify_keys(attributes))) end) diff --git a/lib/philomena_query/batch.ex b/lib/philomena_query/batch.ex index 918a3b5e..20bdc364 100644 --- a/lib/philomena_query/batch.ex +++ b/lib/philomena_query/batch.ex @@ -25,24 +25,32 @@ defmodule PhilomenaQuery.Batch do @type id_field :: {:id_field, atom()} @type batch_options :: [batch_size() | id_field()] - @typedoc """ - The callback for `record_batches/3`. + @doc """ + Stream schema structures on a queryable, using batches to avoid locking. + + Valid options: + * `batch_size` (integer) - the number of records to load per batch + * `id_field` (atom) - the name of the field containing the ID + + ## Example + + queryable = from i in Image, where: i.image_width >= 1920 + + queryable + |> PhilomenaQuery.Batch.record_batches() + |> Enum.each(fn image -> IO.inspect(image.id) end) - Takes a list of schema structs which were returned in the batch. Return value is ignored. """ - @type record_batch_callback :: ([struct()] -> any()) - - @typedoc """ - The callback for `query_batches/3`. - - Takes an `m:Ecto.Query` that can be processed with `m:Philomena.Repo` query commands, such - as `Philomena.Repo.update_all/3` or `Philomena.Repo.delete_all/2`. Return value is ignored. - """ - @type query_batch_callback :: ([Ecto.Query.t()] -> any()) + @spec records(queryable(), batch_options()) :: Enumerable.t() + def records(queryable, opts \\ []) do + queryable + |> query_batches(opts) + |> Stream.flat_map(&Repo.all/1) + end @doc """ - Execute a callback with lists of schema structures on a queryable, - using batches to avoid locking. + Stream lists of schema structures on a queryable, using batches to avoid + locking. Valid options: * `batch_size` (integer) - the number of records to load per batch @@ -56,16 +64,20 @@ defmodule PhilomenaQuery.Batch do Enum.each(images, &IO.inspect(&1.id)) end - PhilomenaQuery.Batch.record_batches(queryable, cb) + queryable + |> PhilomenaQuery.Batch.record_batches() + |> Enum.each(cb) """ - @spec record_batches(queryable(), batch_options(), record_batch_callback()) :: [] - def record_batches(queryable, opts \\ [], callback) do - query_batches(queryable, opts, &callback.(Repo.all(&1))) + @spec record_batches(queryable(), batch_options()) :: Enumerable.t() + def record_batches(queryable, opts \\ []) do + queryable + |> query_batches(opts) + |> Stream.map(&Repo.all/1) end @doc """ - Execute a callback with bulk queries on a queryable, using batches to avoid locking. + Stream bulk queries on a queryable, using batches to avoid locking. Valid options: * `batch_size` (integer) - the number of records to load per batch @@ -76,41 +88,36 @@ defmodule PhilomenaQuery.Batch do > If you are looking to receive schema structures (e.g., you are querying for `Image`s, > and you want to receive `Image` objects, then use `record_batches/3` instead. - An `m:Ecto.Query` which selects all IDs in the current batch is passed into the callback - during each invocation. + `m:Ecto.Query` structs which select the IDs in each batch are streamed out. ## Example queryable = from ui in ImageVote, where: ui.user_id == 1234 - opts = [id_field: :image_id] - - cb = fn bulk_query -> - Repo.delete_all(bulk_query) - end - - PhilomenaQuery.Batch.query_batches(queryable, opts, cb) + queryable + |> PhilomenaQuery.Batch.query_batches(id_field: :image_id) + |> Enum.each(fn batch_query -> Repo.delete_all(batch_query) end) """ - @spec query_batches(queryable(), batch_options(), query_batch_callback()) :: [] - def query_batches(queryable, opts \\ [], callback) do - ids = load_ids(queryable, -1, opts) - - query_batches(queryable, opts, callback, ids) - end - - defp query_batches(_queryable, _opts, _callback, []), do: [] - - defp query_batches(queryable, opts, callback, ids) do + @spec query_batches(queryable(), batch_options()) :: Enumerable.t(Ecto.Query.t()) + def query_batches(queryable, opts \\ []) do id_field = Keyword.get(opts, :id_field, :id) - queryable - |> where([m], field(m, ^id_field) in ^ids) - |> callback.() + Stream.unfold( + load_ids(queryable, -1, opts), + fn + [] -> + # Stop when no more results are produced + nil - ids = load_ids(queryable, Enum.max(ids), opts) + ids -> + # Process results and output next query + output = where(queryable, [m], field(m, ^id_field) in ^ids) + next_ids = load_ids(queryable, Enum.max(ids), opts) - query_batches(queryable, opts, callback, ids) + {output, next_ids} + end + ) end defp load_ids(queryable, max_id, opts) do diff --git a/lib/philomena_query/search.ex b/lib/philomena_query/search.ex index 2519e580..cd02137c 100644 --- a/lib/philomena_query/search.ex +++ b/lib/philomena_query/search.ex @@ -199,11 +199,13 @@ defmodule PhilomenaQuery.Search do Search.reindex(query, Image, batch_size: 5000) """ - @spec reindex(queryable(), schema_module(), Batch.batch_options()) :: [] + @spec reindex(queryable(), schema_module(), Batch.batch_options()) :: :ok def reindex(queryable, module, opts \\ []) do index = @policy.index_for(module) - Batch.record_batches(queryable, opts, fn records -> + queryable + |> Batch.record_batches(opts) + |> Enum.each(fn records -> lines = Enum.flat_map(records, fn record -> doc = index.as_json(record)