Spawn for upload persistence

This commit is contained in:
byte[] 2022-07-18 10:13:24 -04:00
parent c0f64cd635
commit 6cdcfb2dcd
4 changed files with 60 additions and 8 deletions

View file

@ -95,8 +95,7 @@ config :ex_aws, :hackney_opts,
use_default_pool: false, use_default_pool: false,
pool: false pool: false
config :ex_aws, :retries, config :ex_aws, :retries, max_attempts: 20
max_attempts: 20
if config_env() != :test do if config_env() != :test do
# Database config # Database config

View file

@ -4,6 +4,7 @@ defmodule Philomena.Images do
""" """
import Ecto.Query, warn: false import Ecto.Query, warn: false
require Logger
alias Ecto.Multi alias Ecto.Multi
alias Philomena.Repo alias Philomena.Repo
@ -108,9 +109,7 @@ defmodule Philomena.Images do
|> Repo.transaction() |> Repo.transaction()
|> case do |> case do
{:ok, %{image: image}} = result -> {:ok, %{image: image}} = result ->
Uploader.persist_upload(image) async_upload(image, attrs["image"])
repair_image(image)
reindex_image(image) reindex_image(image)
Tags.reindex_tags(image.added_tags) Tags.reindex_tags(image.added_tags)
maybe_approve_image(image, attribution[:user]) maybe_approve_image(image, attribution[:user])
@ -122,6 +121,44 @@ defmodule Philomena.Images do
end end
end end
defp async_upload(image, plug_upload) do
linked_pid =
spawn(fn ->
# Make sure task will finish before VM exit
Process.flag(:trap_exit, true)
# Wait to be freed up by the caller
receive do
:ready -> nil
end
# Start trying to upload
try_upload(image, 0)
end)
# Give the upload to the linked process
Plug.Upload.give_away(plug_upload, linked_pid, self())
# Free up the linked process
send(linked_pid, :ready)
end
defp try_upload(image, retry_count) when retry_count < 100 do
try do
Uploader.persist_upload(image)
repair_image(image)
rescue
e ->
Logger.error("Upload failed: #{inspect(e)} [try ##{retry_count}]")
Process.sleep(5000)
try_upload(image, retry_count + 1)
end
end
defp try_upload(image, retry_count) do
Logger.error("Aborting upload of #{image.id} after #{retry_count} retries")
end
defp maybe_create_subscription_on_upload(multi, %User{watch_on_upload: true} = user) do defp maybe_create_subscription_on_upload(multi, %User{watch_on_upload: true} = user) do
multi multi
|> Multi.run(:subscribe, fn _repo, %{image: image} -> |> Multi.run(:subscribe, fn _repo, %{image: image} ->

View file

@ -86,10 +86,26 @@ defmodule Philomena.Objects do
end) end)
end end
defp run_all(fun) do defp run_all(wrapped) do
fun = fn opts ->
try do
wrapped.(opts)
:ok
rescue
_ -> :error
end
end
backends() backends()
|> Task.async_stream(fun, timeout: :infinity) |> Task.async_stream(fun, timeout: :infinity)
|> Stream.run() |> Enum.any?(fn {_, v} -> v == :error end)
|> case do
true ->
raise "Failed to operate on all backends"
_ ->
:ok
end
end end
defp backends do defp backends do

View file

@ -34,7 +34,7 @@ defmodule PhilomenaWeb.ScraperPlug do
params_name = Keyword.get(opts, :params_name, "image") params_name = Keyword.get(opts, :params_name, "image")
params_key = Keyword.get(opts, :params_key, "image") params_key = Keyword.get(opts, :params_key, "image")
name = extract_filename(url, headers) name = extract_filename(url, headers)
file = Briefly.create!() file = Plug.Upload.random_file!(UUID.uuid1())
File.write!(file, body) File.write!(file, body)