From 7dc57cee6f37b4d437d505256d73587c28240ae7 Mon Sep 17 00:00:00 2001 From: Liam Date: Sun, 4 Aug 2024 01:10:29 -0400 Subject: [PATCH] wip --- lib/philomena_job/jobs.ex | 276 ++++++++++++++++++++++++++ lib/philomena_job/jobs/job.ex | 58 ++++++ lib/philomena_job/listener/server.ex | 71 +++++++ lib/philomena_job/listener/state.ex | 137 +++++++++++++ lib/philomena_job/notifier_server.ex | 41 ++++ lib/philomena_job/processor.ex | 13 ++ lib/philomena_job/semaphore/server.ex | 114 +++++++++++ lib/philomena_job/semaphore/state.ex | 131 ++++++++++++ lib/philomena_job/supervisor.ex | 92 +++++++++ lib/philomena_job/worker.ex | 63 ++++++ test/philomena_job/semaphore_test.exs | 143 +++++++++++++ 11 files changed, 1139 insertions(+) create mode 100644 lib/philomena_job/jobs.ex create mode 100644 lib/philomena_job/jobs/job.ex create mode 100644 lib/philomena_job/listener/server.ex create mode 100644 lib/philomena_job/listener/state.ex create mode 100644 lib/philomena_job/notifier_server.ex create mode 100644 lib/philomena_job/processor.ex create mode 100644 lib/philomena_job/semaphore/server.ex create mode 100644 lib/philomena_job/semaphore/state.ex create mode 100644 lib/philomena_job/supervisor.ex create mode 100644 lib/philomena_job/worker.ex create mode 100644 test/philomena_job/semaphore_test.exs diff --git a/lib/philomena_job/jobs.ex b/lib/philomena_job/jobs.ex new file mode 100644 index 00000000..522b373a --- /dev/null +++ b/lib/philomena_job/jobs.ex @@ -0,0 +1,276 @@ +defmodule PhilomenaJob.Jobs do + @moduledoc """ + The Jobs context. + """ + + import Ecto.Query, warn: false + alias Ecto.Multi + + @doc """ + Return a `m:Ecto.Multi` to create a job from the `:id` key of a previous operation. + + This function automatically raises an error when a deadlock condition could be created. + When inserting into multiple tables in the same transaction, you must run job creation in + order of the creating module name. + + On successful completion, the table is notified for new work. + + ## Example + + Multi.new() + |> Multi.insert(:image, image_changeset) + |> Jobs.create_job_from_op(ImageIndexRequest, :image, %{index_type: "update"}) + |> Repo.transaction() + |> case do + {:ok, %{image: image}} -> + {:ok, image} + + _ -> + {:error, image_changeset} + end + + """ + def create_job_from_op(multi, job_module, key, map \\ %{}) do + [primary_key] = job_module.__schema__(:primary_key) + job_update = update(job_module, set: [request_time: fragment("EXCLUDED.request_time")]) + + multi + |> Multi.run({:lock_table, job_module}, lock_table(job_module)) + |> Multi.run({:insert, job_module}, fn repo, %{^key => %{id: id}} -> + entry = + Map.merge(map, %{ + primary_key => id, + request_time: DateTime.utc_now() + }) + + result = + repo.insert_all( + job_module, + [entry], + on_conflict: job_update, + conflict_target: primary_key + ) + + {:ok, result} + end) + |> Multi.run({:notify, job_module}, notify_table(job_module)) + |> avoid_deadlock() + end + + @doc """ + Return a `m:Ecto.Multi` to bulk create and notify jobs from an input query. + + Due to [this Ecto bug](https://github.com/elixir-ecto/ecto/issues/4430), the caller must + select the fields needed for the table insert to succeed. The input query should look like: + + job_query = + from i in Image, + where: ..., + select: %{ + image_id: i.id, + request_time: ^DateTime.utc_now(), + index_type: "update" + } + + This function automatically raises an error when a deadlock condition could be created. + When inserting into multiple tables in the same transaction, you must run job creation in + order of the creating module name. + + On successful completion, the table is notified for new work. + + ## Example + + Multi.new() + |> Multi.update_all(:images, images, []) + |> Jobs.create_jobs_from_query(ImageIndexRequest, job_query) + |> Repo.transaction() + |> case do + {:ok, %{images: images}} -> + {:ok, images} + + _ -> + {:error, ...} + end + + """ + def create_jobs_from_query(multi \\ Multi.new(), job_module, query) do + primary_key = job_module.__schema__(:primary_key) + job_update = update(job_module, set: [request_time: fragment("EXCLUDED.request_time")]) + + multi + |> Multi.run({:lock_table, job_module}, lock_table(job_module)) + |> Multi.insert_all({:insert, job_module}, job_module, query, + on_conflict: job_update, + conflict_target: primary_key + ) + |> Multi.run({:notify, job_module}, notify_table(job_module)) + |> avoid_deadlock() + end + + @doc """ + Return a `m:Ecto.Multi` to fetch and assign `limit` number of free jobs. + + Jobs can be ordered by request_time `:asc` (default) or `:desc`. This can + be used e.g. to have workers sample from both ends of the queue. + + Results are returned in the `jobs` key of the multi. The table is not notified + for new work. + + ## Example + + ImageIndexRequest + |> Jobs.fetch_and_assign_jobs("images_0", 500, :desc) + |> Repo.transaction() + |> case do + {:ok, %{jobs: {_, jobs}}} -> + {:ok, jobs} + + _ -> + {:error, :job_assignment_failed} + end + + """ + def fetch_and_assign_jobs(job_module, worker_name, limit, order \\ :asc) do + update_query = + from job in job_module, + where: is_nil(job.worker_name), + limit: ^limit, + order_by: [{^order, :request_time}], + update: [set: [worker_name: ^worker_name]], + select: job + + Multi.new() + |> Multi.run(:lock_table, lock_table(job_module)) + |> Multi.update_all(:jobs, update_query, []) + end + + @doc """ + Return a `m:Ecto.Multi` to release all jobs with the given worker name. + + On successful completion, the table is notified for new work. + + ## Example + + ImageIndexRequest + |> Jobs.release_jobs("images_0") + |> Repo.transaction() + + """ + def release_jobs(job_module, worker_name) do + update_query = + from job in job_module, + where: job.worker_name == ^worker_name, + update: [set: [worker_name: nil]] + + Multi.new() + |> Multi.run(:lock_table, lock_table(job_module)) + |> Multi.update_all(:update, update_query, []) + |> Multi.run(:notify, notify_table(job_module)) + end + + @doc """ + Return a `m:Ecto.Multi` to complete all jobs in the list of jobs. + + Jobs where the request time is identical to the fetched job are deleted + entirely. Jobs where the request time is newer than the fetched job are + updated to reset their attempt count. + + On successful completion, the table is notified for new work. + + ## Example + + ImageIndexRequest + |> Jobs.complete_jobs(jobs) + |> Repo.transaction() + + """ + def complete_jobs(job_module, jobs) do + [primary_key] = job_module.__schema__(:primary_key) + + delete_query = where(job_module, fragment("'t' = 'f'")) + + delete_query = + Enum.reduce(jobs, delete_query, fn job, query -> + or_where( + query, + [q], + field(q, ^primary_key) == ^Map.fetch!(job, primary_key) and + q.request_time == ^job.request_time + ) + end) + + job_keys = Enum.map(jobs, &Map.fetch!(&1, primary_key)) + + update_query = + from job in job_module, + where: field(job, ^primary_key) in ^job_keys, + update: [set: [attempt_count: 0, worker_name: nil]] + + Multi.new() + |> Multi.run(:lock_table, lock_table(job_module)) + |> Multi.delete_all(:delete, delete_query) + |> Multi.update_all(:update, update_query, []) + |> Multi.run(:notify, notify_table(job_module)) + end + + @doc """ + Return a `m:Ecto.Multi` to fail the given job, incrementing its attempt + counter. + + On successful completion, the table is notified for new work. + + ## Example + + ImageIndexRequest + |> Jobs.fail_job(job) + |> Repo.transaction() + + """ + def fail_job(job_module, job) do + [primary_key] = job_module.__schema__(:primary_key) + + update_query = + from q in job_module, + where: field(q, ^primary_key) == ^Map.fetch!(job, primary_key), + update: [inc: [attempt_count: 1], set: [worker_name: nil]] + + Multi.new() + |> Multi.run(:lock_table, lock_table(job_module)) + |> Multi.update_all(:update, update_query, []) + |> Multi.run(:notify, notify_table(job_module)) + end + + defp avoid_deadlock(multi) do + table_lock_operations = + multi + |> Multi.to_list() + |> Enum.flat_map(fn + {{:lock_table, name}, _} -> [name] + _ -> [] + end) + + if table_lock_operations != Enum.sort(table_lock_operations) do + raise "Table lock operations do not occur in sorted order.\n" <> + "Got: #{inspect(table_lock_operations)}\n" <> + "Sort the lock operations to prevent deadlock." + else + multi + end + end + + defp lock_table(job_module) do + fn repo, _changes -> + repo.query("LOCK TABLE $1 IN EXCLUSIVE MODE", [table_name(job_module)]) + end + end + + defp notify_table(job_module) do + fn repo, _changes -> + repo.query("NOTIFY $1", [table_name(job_module)]) + end + end + + defp table_name(job_module) do + job_module.__schema__(:source) + end +end diff --git a/lib/philomena_job/jobs/job.ex b/lib/philomena_job/jobs/job.ex new file mode 100644 index 00000000..fc3ad9f9 --- /dev/null +++ b/lib/philomena_job/jobs/job.ex @@ -0,0 +1,58 @@ +defmodule PhilomenaJob.Jobs.Job do + @moduledoc """ + Base schema module for processing jobs. + """ + + @doc false + defmacro __using__ do + quote do + use Ecto.Schema + end + end + + @doc """ + Defines custom schema fields for processing jobs. + + Processing jobs have three default fields, which are created automatically + by the `job_schema/2` macro and should not be redefined: + - `:request_time` + - `:attempt_count` + - `:worker_name` + + The client should define the primary key and any additional fields. + + ## Examples + + defmodule Philomena.Images.IndexRequest do + use Philomena.Jobs.Job + + job_schema "image_index_requests" do + belongs_to Philomena.Images.Image, primary_key: true + field :index_type, :string, default: "update" + end + end + + defmodule Philomena.Images.StorageRequest do + use Philomena.Jobs.Job + + job_schema "image_storage_requests" do + belongs_to Philomena.Images.Image, primary_key: true + field :operation, :string, default: "put" + field :key, :string + field :data, :blob + end + end + + """ + defmacro job_schema(name, do: block) do + quote do + schema unquote(name) do + field :request_time, :utc_datetime_usec + field :attempt_count, :integer + field :worker_name, :string + + unquote(block) + end + end + end +end diff --git a/lib/philomena_job/listener/server.ex b/lib/philomena_job/listener/server.ex new file mode 100644 index 00000000..4b6ac277 --- /dev/null +++ b/lib/philomena_job/listener/server.ex @@ -0,0 +1,71 @@ +defmodule PhilomenaJob.Listener.Server do + @moduledoc """ + A listener server which holds references to worker processes, and converts database + event notifications into messages for those worker processes. + + A `PhilomenaJob.NotifierServer` reference must be provided. This is a server pid or name. + + A supervision tree example: + + children = [ + {PhilomenaJob.Listener.Server, name: WorkerListener, notifier: WorkerNotifier} + ] + + """ + + alias PhilomenaJob.Listener.State + alias PhilomenaJob.NotifierServer + alias PhilomenaJob.Worker + + use GenServer, restart: :temporary, significant: true + + @doc """ + Registers the current process to activate when `channel_name` receives a notification. + + Process listeners are automatically unregistered when the process exits. + + ## Example + + iex> link_worker(listener_ref, "image_index_requests") + :ok + + """ + def link_worker(listener, channel_name) do + :ok = GenServer.call(listener, {:link_worker, channel_name}) + end + + @doc false + def start_link(opts) do + GenServer.start_link(__MODULE__, opts) + end + + @doc false + @impl true + def init(opts) do + notifier = Keyword.fetch!(opts, :notifier) + + unlisten = &NotifierServer.unlisten!(notifier, &1) + listen = &NotifierServer.listen!(notifier, &1) + notify = &Worker.notify/1 + + {:ok, State.new(unlisten: unlisten, listen: listen, notify: notify)} + end + + @doc false + @impl true + def handle_call({:link_worker, channel_name}, {pid, _}, state) do + {:reply, :ok, State.add_worker(state, channel_name, pid)} + end + + @doc false + @impl true + def handle_info(message, state) + + def handle_info({:DOWN, _ref, :process, worker_pid, _reason}, state) do + {:noreply, State.remove_worker(state, worker_pid)} + end + + def handle_info({:notification, _pid, listen_ref, _channel_name, _message}, state) do + {:noreply, State.notify_worker(state, listen_ref)} + end +end diff --git a/lib/philomena_job/listener/state.ex b/lib/philomena_job/listener/state.ex new file mode 100644 index 00000000..9d5aba97 --- /dev/null +++ b/lib/philomena_job/listener/state.ex @@ -0,0 +1,137 @@ +defmodule PhilomenaJob.Listener.State do + @moduledoc """ + Internal listener state. + """ + + defmodule Entry do + @moduledoc false + + defstruct channel_name: nil, + pid: nil, + process_ref: nil, + listen_ref: nil + end + + defstruct entries: [], + notify: nil, + unlisten: nil, + listen: nil, + demonitor: nil, + monitor: nil + + @doc """ + Create a new instance of the internal listener state. + + Supported options: + - `:notify`: callback which receives a worker pid. Required. + - `:unlisten`: callback to stop listening to a channel. Required. + - `:listen`: callback to begin listening to a channel. Required. + - `:demonitor`: callback to de-monitor a process. Optional, defaults to `Process.demonitor/1`. + - `:monitor`: callback to monitor a process. Optional, defaults to `Process.monitor/1`. + + ## Examples + + iex> State.new(listen: &Notifier.listen!/2, unlisten: &Notifier.unlisten!/2) + %State{} + + """ + def new(opts) do + notify = Keyword.fetch!(opts, :notify) + unlisten = Keyword.fetch!(opts, :unlisten) + listen = Keyword.fetch!(opts, :listen) + demonitor = Keyword.get(opts, :demonitor, &Process.demonitor/1) + monitor = Keyword.get(opts, :monitor, &Process.monitor/1) + + %__MODULE__{ + notify: notify, + unlisten: unlisten, + listen: listen, + demonitor: demonitor, + monitor: monitor + } + end + + @doc """ + Registers the given `worker_pid` to activate when `channel_name` receives a notification. + + Processes which are added are monitored, and exits with any added process may trigger a + mailbox `DOWN` message by the caller which must be handled. See the documentation + for `Process.monitor/1` for more information about the `DOWN` message. + + ## Example + + iex> add_worker(state, "image_index_requests", self()) + %State{} + + """ + def add_worker(%__MODULE__{} = state, channel_name, worker_pid) do + # Ensure that there is no worker already registered with this pid. + [] = filter_by(state, pid: worker_pid) + + # Monitor and begin listening. + process_ref = state.monitor.(worker_pid) + listen_ref = state.listen.(channel_name) + + # Add to state. + e = %Entry{ + channel_name: channel_name, + pid: worker_pid, + process_ref: process_ref, + listen_ref: listen_ref + } + + update_in(state.workers, &([e] ++ &1)) + end + + @doc """ + Unregisters the given `worker_pid` for notifications. + + ## Example + + iex> remove_worker(state, self()) + %State{} + + """ + def remove_worker(%__MODULE__{} = state, worker_pid) do + case filter_by(state, pid: worker_pid) do + [%Entry{} = entry] -> + # Stop listening and unmonitor. + state.unlisten.(entry.listen_ref) + state.demonitor.(entry.process_ref) + + # Remove from state. + erase_by(state, pid: worker_pid) + + [] -> + state + end + end + + @doc """ + Sends a worker listening on the given `listen_ref` a notification using the + `notify` callback. + + ## Example + + iex> notify_workers(state, listen_ref) + %State{} + + """ + def notify_worker(%__MODULE__{} = state, listen_ref) do + for entry <- filter_by(state, listen_ref: listen_ref) do + state.notify.(entry.pid) + end + + state + end + + defp filter_by(%__MODULE__{} = state, [{key, value}]) do + Enum.filter(state.entries, &match?(%{^key => ^value}, &1)) + end + + defp erase_by(%__MODULE__{} = state, [{key, value}]) do + workers = Enum.filter(state.entries, &(not match?(%{^key => ^value}, &1))) + + put_in(state.workers, workers) + end +end diff --git a/lib/philomena_job/notifier_server.ex b/lib/philomena_job/notifier_server.ex new file mode 100644 index 00000000..66d0b5e6 --- /dev/null +++ b/lib/philomena_job/notifier_server.ex @@ -0,0 +1,41 @@ +defmodule PhilomenaJob.NotifierServer do + @moduledoc """ + Process wrapper to receive notifications from the Postgres LISTEN command. + + A supervision tree example: + + children = [ + {PhilomenaJob.NotifierServer, repo_url: "ecto://postgres@postgres/philomena_dev"} + ] + + """ + + alias Postgrex.Notifications + + @doc false + def child_spec(opts) do + url = Keyword.fetch!(opts, :repo_url) + opts = Ecto.Repo.Supervisor.parse_url(url) + + %{ + id: __MODULE__, + start: {Notifications, :start_link, [opts]}, + restart: :temporary, + significant: true + } + end + + @doc """ + Begin listening to the given channel. Returns a reference. + + See `Postgrex.Notifications.listen!/3` for more information. + """ + defdelegate listen!(pid, channel, opts \\ []), to: Notifications + + @doc """ + Stop listening to the channel identified by the given reference. + + See `Postgrex.Notifications.unlisten!/3` for more information. + """ + defdelegate unlisten!(pid, ref, opts \\ []), to: Notifications +end diff --git a/lib/philomena_job/processor.ex b/lib/philomena_job/processor.ex new file mode 100644 index 00000000..881f59b9 --- /dev/null +++ b/lib/philomena_job/processor.ex @@ -0,0 +1,13 @@ +defmodule PhilomenaJob.Processor do + @moduledoc """ + Interface implemented by processors passed to `PhilomenaJob.Supervisor`. + """ + + @doc """ + Check to see if work is available, and if so, run it. + + Return false to temporarily yield control and get called again. + Return true only when no more work is available. + """ + @callback check_work(keyword()) :: boolean() +end diff --git a/lib/philomena_job/semaphore/server.ex b/lib/philomena_job/semaphore/server.ex new file mode 100644 index 00000000..0496a80d --- /dev/null +++ b/lib/philomena_job/semaphore/server.ex @@ -0,0 +1,114 @@ +defmodule PhilomenaJob.Semaphore.Server do + @moduledoc """ + A counting semaphore. + + This is used to limit the concurrency of a potentially large group of processes by calling + `acquire/1` or `run/1` - only up to the number of processes indicated by the startup value + are allowed to run concurrently. + + A supervision tree example: + + children = [ + {PhilomenaJob.Semaphore.Server, name: WorkerSemaphore, max_concurrency: 16} + ] + + """ + + alias PhilomenaJob.Semaphore.State + use GenServer, restart: :temporary, significant: true + + @doc """ + Wraps the given callback with an acquire before the callback and a release after running + the callback. + + Returns the return value of the callback. + + See `acquire/1` and `release/1` for additional details. + + ## Example + + iex> Semaphore.run(WorkerSemaphore, fn -> check_work(state) end) + true + + """ + def run(name, callback) do + acquire(name) + ret = callback.() + release(name) + ret + end + + @doc """ + Decrement the semaphore with the given name. + + This either returns immediately with the semaphore value acquired, or blocks indefinitely until + sufficient other processes have released their holds on the semaphore to allow a new one to + acquire. + + Processes which acquire the semaphore are monitored, and exits with an acquired value trigger + automatic release, so exceptions will not break the semaphore. + + ## Example + + iex> Semaphore.acquire(semaphore) + :ok + + """ + def acquire(name) do + :ok = GenServer.call(name, :acquire, :infinity) + end + + @doc """ + Increment the semaphore with the given name. + + This releases the hold given by the current process and allows another process to begin running. + + ## Example + + iex> Semaphore.release(semaphore) + :ok + + """ + def release(name) do + :ok = GenServer.call(name, :release, :infinity) + end + + @doc false + def start_link(opts) do + GenServer.start_link(__MODULE__, opts) + end + + @doc false + @impl true + def init(opts) do + {:ok, State.new(opts)} + end + + @doc false + @impl true + def handle_call(message, from, state) + + def handle_call(:acquire, from, state) do + case State.add_pending_process(state, from) do + {:ok, new_state} -> + {:noreply, new_state} + + error -> + {:reply, error, state} + end + end + + def handle_call(:release, {pid, _}, state) do + {:ok, new_state} = State.release_active_process(state, pid) + + {:reply, :ok, new_state} + end + + @doc false + @impl true + def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do + {:ok, state} = State.release_active_process(state, pid) + + {:noreply, state} + end +end diff --git a/lib/philomena_job/semaphore/state.ex b/lib/philomena_job/semaphore/state.ex new file mode 100644 index 00000000..f822bb44 --- /dev/null +++ b/lib/philomena_job/semaphore/state.ex @@ -0,0 +1,131 @@ +defmodule PhilomenaJob.Semaphore.State do + @doc """ + Internal semaphore state. + """ + + defstruct active_processes: %{}, + pending_processes: %{}, + max_concurrency: nil, + demonitor: nil, + monitor: nil, + reply: nil + + @doc """ + Create a new instance of the internal semaphore state. + + Supported options: + - `:max_concurrency`: the maximum number of processes which can be active. Required. + - `:demonitor`: callback to de-monitor a process. Optional, defaults to `Process.demonitor/1`. + - `:monitor`: callback to monitor a process. Optional, defaults to `Process.monitor/1`. + - `:reply`: callback to reply to a process. Optional, defaults to `GenServer.reply/2`. + + ## Examples + + iex> State.new(max_concurrency: System.schedulers_online()) + %State{} + + """ + def new(opts) do + max_concurrency = Keyword.fetch!(opts, :max_concurrency) + demonitor = Keyword.get(opts, :demonitor, &Process.demonitor/1) + monitor = Keyword.get(opts, :monitor, &Process.monitor/1) + reply = Keyword.get(opts, :reply, &GenServer.reply/2) + + %__MODULE__{ + max_concurrency: max_concurrency, + demonitor: demonitor, + monitor: monitor, + reply: reply + } + end + + @doc """ + Decrement the semaphore with the given name. + + This returns immediately with the state updated. The referenced process will be called with + the reply function once the semaphore is available. + + Processes which acquire the semaphore are monitored, and exits with an acquired process may + trigger a mailbox `DOWN` message by the caller which must be handled. See the documentation + for `Process.monitor/1` for more information about the `DOWN` message. + + ## Example + + iex> State.add_pending_process(state, {self(), 0}) + {:ok, %State{}} + + iex> State.add_pending_process(state, {self(), 0}) + {:error, :already_pending_or_active} + + """ + def add_pending_process(%__MODULE__{} = state, {pid, _} = from) do + if active?(state, pid) or pending?(state, pid) do + {:error, :already_pending_or_active, state} + else + state = update_in(state.pending_processes, &Map.put(&1, pid, from)) + {:ok, try_acquire_process(state)} + end + end + + @doc """ + Increment the semaphore with the given name. + + This returns immediately with the state updated, releases the hold given by the specified + process, and potentially allows another process to begin running. + + ## Example + + iex> State.release_active_process(state, self()) + {:ok, %State{}} + + """ + def release_active_process(%__MODULE__{} = state, pid) do + if active?(state, pid) do + {:ok, release_process(state, pid)} + else + {:ok, state} + end + end + + defp try_acquire_process(%__MODULE__{} = state) + when state.pending_processes != %{} and + map_size(state.active_processes) < state.max_concurrency do + # Start monitoring the process. We will automatically clean up when it exits. + {pid, from} = Enum.at(state.pending_processes, 0) + ref = state.monitor.(pid) + + # Drop from pending and add to active. + state = update_in(state.pending_processes, &Map.delete(&1, pid)) + state = update_in(state.active_processes, &Map.put(&1, pid, ref)) + + # Reply to the client which has now acquired the semaphore. + state.reply.(from, :ok) + + state + end + + defp try_acquire_process(state) do + # No pending processes or too many active processes, so nothing to do. + state + end + + defp release_process(%__MODULE__{} = state, pid) do + # Stop watching the process. + ref = Map.fetch!(state.active_processes, pid) + state.demonitor.(ref) + + # Drop from active set. + state = update_in(state.active_processes, &Map.delete(&1, pid)) + + # Try to acquire something new. + try_acquire_process(state) + end + + defp active?(%__MODULE__{} = state, pid) do + Map.has_key?(state.active_processes, pid) + end + + defp pending?(%__MODULE__{} = state, pid) do + Map.has_key?(state.pending_processes, pid) + end +end diff --git a/lib/philomena_job/supervisor.ex b/lib/philomena_job/supervisor.ex new file mode 100644 index 00000000..0f6ea63b --- /dev/null +++ b/lib/philomena_job/supervisor.ex @@ -0,0 +1,92 @@ +defmodule PhilomenaJob.Supervisor do + @moduledoc """ + Main supervisor for jobs processing. + + Supported options: + - `:max_concurrency`: the maximum number of processors which can run in parallel. Required. + This is global across all processors specified by this supervisor instance. + Lowering the maximum concurrency delays processors until the concurrency drops. + - `:repo_url`: the Ecto URL to the database. Can be fetched from application env. Required. + - `:processors`: A list of processor modules to create worker processes for. Required. + - `:name`: the global name for this supervisor instance. Required. + + Processor modules should implement the processor behaviour. See the `PhilomenaJob.Processor` + documentation for more information on required callbacks. + + ## Example + + children = [ + {PhilomenaJob.Supervisor, + max_concurrency: System.schedulers_online(), + repo_url: Application.get_env(:philomena, Philomena.Repo)[:url], + processors: [ + CommentIndexUpdater, + FilterIndexUpdater, + GalleryIndexUpdater, + ImageIndexUpdater, + PostIndexUpdater, + ReportIndexUpdater, + TagIndexUpdater + ], + name: IndexWorkSupervisor + } + ] + + Supervisor.start_link(children, opts) + + """ + + alias PhilomenaJob.Semaphore.Server, as: SemaphoreServer + alias PhilomenaJob.Listener.Server, as: ListenerServer + alias PhilomenaJob.NotifierServer + alias PhilomenaJob.Worker + + @doc false + def child_spec(opts) do + name = Keyword.fetch!(opts, :name) + + %{ + id: Module.concat(__MODULE__, name), + start: {__MODULE__, :start_link, [opts]} + } + end + + @doc false + def start_link(opts) do + processors = Keyword.fetch!(opts, :processors) + name = Keyword.fetch!(opts, :name) + + # Start the main supervisor. + {:ok, main_sup} = + Supervisor.start_link( + [], + strategy: :one_for_one, + auto_shutdown: :any_significant, + name: name + ) + + # Start all three significant processes. + # If any of these exit, the supervisor exits. + opts = + opts + |> start_named_child(name, :notifier, NotifierServer) + |> start_named_child(name, :semaphore, SemaphoreServer) + |> start_named_child(name, :listener, ListenerServer) + + # Start workers. These can restart automatically. + for processor <- processors do + opts = Keyword.merge(opts, processor: processor) + + Supervisor.start_child(name, {Worker, opts}) + end + + # Return the main supervisor. + {:ok, main_sup} + end + + defp start_named_child(opts, sup, name, child_module) do + {:ok, child} = Supervisor.start_child(sup, {child_module, opts}) + + Keyword.merge(opts, [{name, child}]) + end +end diff --git a/lib/philomena_job/worker.ex b/lib/philomena_job/worker.ex new file mode 100644 index 00000000..81c4ea46 --- /dev/null +++ b/lib/philomena_job/worker.ex @@ -0,0 +1,63 @@ +defmodule PhilomenaJob.Worker do + @moduledoc false + + alias PhilomenaJob.Listener.Server, as: ListenerServer + alias PhilomenaJob.Semaphore.Server, as: SemaphoreServer + use GenServer + + @doc """ + Notify the given worker that work may be available. + """ + def notify(name) do + GenServer.cast(name, :check_work) + end + + defstruct semaphore: nil, + listener: nil, + processor: nil, + opts: nil + + @doc false + def init(opts) do + state = %__MODULE__{ + semaphore: Keyword.fetch!(opts, :semaphore), + listener: Keyword.fetch!(opts, :listener), + processor: Keyword.fetch!(opts, :processor), + opts: Keyword.drop(opts, [:semaphore, :listener, :processor]) + } + + # Start listening for events. + ListenerServer.link_worker(state.listener, state.processor.channel()) + + # Check for new work. + {:ok, check_work(state)} + end + + @doc false + def handle_cast(:check_work, %__MODULE__{} = state) do + {:noreply, check_work(state)} + end + + defp check_work(%__MODULE__{} = state) do + # We have just started or received notification that work may be available. + processor = state.processor + opts = state.opts + + # Keep calling check_work until we run out of work. + cycle(fn -> + SemaphoreServer.run(state.semaphore, fn -> + processor.check_work(opts) + end) + end) + + state + end + + defp cycle(callback) do + if callback.() do + :ok + else + cycle(callback) + end + end +end diff --git a/test/philomena_job/semaphore_test.exs b/test/philomena_job/semaphore_test.exs new file mode 100644 index 00000000..19e4d27e --- /dev/null +++ b/test/philomena_job/semaphore_test.exs @@ -0,0 +1,143 @@ +defmodule PhilomenaJob.SemaphoreTest do + use ExUnit.Case, async: true + + alias PhilomenaJob.Semaphore.Server, as: SemaphoreServer + + @max_concurrency 8 + + describe "Server functionality" do + setup do + {:ok, pid} = SemaphoreServer.start_link(max_concurrency: @max_concurrency) + on_exit(fn -> Process.exit(pid, :kill) end) + + %{pid: pid} + end + + test "allows max_concurrency processes to acquire", %{pid: pid} do + # Check acquire + results = + (1..@max_concurrency) + |> Task.async_stream(fn _ -> SemaphoreServer.acquire(pid) end) + |> Enum.map(fn {:ok, res} -> res end) + + assert true == Enum.all?(results) + + # Check linking to process exit + # If this hangs, linking to process exit does not release the semaphore + results = + (1..@max_concurrency) + |> Task.async_stream(fn _ -> SemaphoreServer.acquire(pid) end) + |> Enum.map(fn {:ok, res} -> res end) + + assert true == Enum.all?(results) + end + + test "does not allow max_concurrency + 1 processes to acquire (exit)", %{pid: pid} do + processes = + (1..@max_concurrency) + |> Enum.map(fn _ -> acquire_and_wait_for_release(pid) end) + + # This task should not be able to acquire + task = Task.async(fn -> SemaphoreServer.acquire(pid) end) + assert nil == Task.yield(task, 10) + + # Terminate processes holding the semaphore + Enum.each(processes, &Process.exit(&1, :kill)) + + # Now the task should be able to acquire + assert {:ok, :ok} == Task.yield(task, 10) + end + + test "does not allow max_concurrency + 1 processes to acquire (release)", %{pid: pid} do + processes = + (1..@max_concurrency) + |> Enum.map(fn _ -> acquire_and_wait_for_release(pid) end) + + # This task should not be able to acquire + task = Task.async(fn -> SemaphoreServer.acquire(pid) end) + assert nil == Task.yield(task, 10) + + # Release processes holding the semaphore + Enum.each(processes, &send(&1, :release)) + + # Now the task should be able to acquire + assert {:ok, :ok} == Task.yield(task, 10) + end + + test "does not allow max_concurrency + 1 processes to acquire (run)", %{pid: pid} do + processes = + (1..@max_concurrency) + |> Enum.map(fn _ -> run_and_wait_for_release(pid) end) + + # This task should not be able to acquire + task = Task.async(fn -> SemaphoreServer.acquire(pid) end) + assert nil == Task.yield(task, 10) + + # Release processes holding the semaphore + Enum.each(processes, &send(&1, :release)) + + # Now the task should be able to acquire + assert {:ok, :ok} == Task.yield(task, 10) + end + + test "does not allow re-acquire from the same process", %{pid: pid} do + acquire = fn -> + try do + {:ok, SemaphoreServer.acquire(pid)} + rescue + err -> {:error, err} + end + end + + task = Task.async(fn -> + acquire.() + acquire.() + end) + + assert {:ok, {:error, %MatchError{}}} = Task.yield(task) + end + + test "allows re-release from the same process", %{pid: pid} do + release = fn -> + try do + {:ok, SemaphoreServer.release(pid)} + rescue + err -> {:error, err} + end + end + + task = Task.async(fn -> + release.() + release.() + end) + + assert {:ok, {:ok, :ok}} = Task.yield(task) + end + end + + defp run_and_wait_for_release(pid) do + spawn(fn -> + SemaphoreServer.run(pid, fn -> + wait_for_release() + end) + end) + end + + defp acquire_and_wait_for_release(pid) do + spawn(fn -> + SemaphoreServer.acquire(pid) + wait_for_release() + SemaphoreServer.release(pid) + end) + end + + defp wait_for_release do + receive do + :release -> + :ok + + _ -> + wait_for_release() + end + end +end