This commit is contained in:
Liam 2024-08-04 01:10:29 -04:00
parent f91d9f2143
commit 7dc57cee6f
11 changed files with 1139 additions and 0 deletions

276
lib/philomena_job/jobs.ex Normal file
View file

@ -0,0 +1,276 @@
defmodule PhilomenaJob.Jobs do
@moduledoc """
The Jobs context.
"""
import Ecto.Query, warn: false
alias Ecto.Multi
@doc """
Return a `m:Ecto.Multi` to create a job from the `:id` key of a previous operation.
This function automatically raises an error when a deadlock condition could be created.
When inserting into multiple tables in the same transaction, you must run job creation in
order of the creating module name.
On successful completion, the table is notified for new work.
## Example
Multi.new()
|> Multi.insert(:image, image_changeset)
|> Jobs.create_job_from_op(ImageIndexRequest, :image, %{index_type: "update"})
|> Repo.transaction()
|> case do
{:ok, %{image: image}} ->
{:ok, image}
_ ->
{:error, image_changeset}
end
"""
def create_job_from_op(multi, job_module, key, map \\ %{}) do
[primary_key] = job_module.__schema__(:primary_key)
job_update = update(job_module, set: [request_time: fragment("EXCLUDED.request_time")])
multi
|> Multi.run({:lock_table, job_module}, lock_table(job_module))
|> Multi.run({:insert, job_module}, fn repo, %{^key => %{id: id}} ->
entry =
Map.merge(map, %{
primary_key => id,
request_time: DateTime.utc_now()
})
result =
repo.insert_all(
job_module,
[entry],
on_conflict: job_update,
conflict_target: primary_key
)
{:ok, result}
end)
|> Multi.run({:notify, job_module}, notify_table(job_module))
|> avoid_deadlock()
end
@doc """
Return a `m:Ecto.Multi` to bulk create and notify jobs from an input query.
Due to [this Ecto bug](https://github.com/elixir-ecto/ecto/issues/4430), the caller must
select the fields needed for the table insert to succeed. The input query should look like:
job_query =
from i in Image,
where: ...,
select: %{
image_id: i.id,
request_time: ^DateTime.utc_now(),
index_type: "update"
}
This function automatically raises an error when a deadlock condition could be created.
When inserting into multiple tables in the same transaction, you must run job creation in
order of the creating module name.
On successful completion, the table is notified for new work.
## Example
Multi.new()
|> Multi.update_all(:images, images, [])
|> Jobs.create_jobs_from_query(ImageIndexRequest, job_query)
|> Repo.transaction()
|> case do
{:ok, %{images: images}} ->
{:ok, images}
_ ->
{:error, ...}
end
"""
def create_jobs_from_query(multi \\ Multi.new(), job_module, query) do
primary_key = job_module.__schema__(:primary_key)
job_update = update(job_module, set: [request_time: fragment("EXCLUDED.request_time")])
multi
|> Multi.run({:lock_table, job_module}, lock_table(job_module))
|> Multi.insert_all({:insert, job_module}, job_module, query,
on_conflict: job_update,
conflict_target: primary_key
)
|> Multi.run({:notify, job_module}, notify_table(job_module))
|> avoid_deadlock()
end
@doc """
Return a `m:Ecto.Multi` to fetch and assign `limit` number of free jobs.
Jobs can be ordered by request_time `:asc` (default) or `:desc`. This can
be used e.g. to have workers sample from both ends of the queue.
Results are returned in the `jobs` key of the multi. The table is not notified
for new work.
## Example
ImageIndexRequest
|> Jobs.fetch_and_assign_jobs("images_0", 500, :desc)
|> Repo.transaction()
|> case do
{:ok, %{jobs: {_, jobs}}} ->
{:ok, jobs}
_ ->
{:error, :job_assignment_failed}
end
"""
def fetch_and_assign_jobs(job_module, worker_name, limit, order \\ :asc) do
update_query =
from job in job_module,
where: is_nil(job.worker_name),
limit: ^limit,
order_by: [{^order, :request_time}],
update: [set: [worker_name: ^worker_name]],
select: job
Multi.new()
|> Multi.run(:lock_table, lock_table(job_module))
|> Multi.update_all(:jobs, update_query, [])
end
@doc """
Return a `m:Ecto.Multi` to release all jobs with the given worker name.
On successful completion, the table is notified for new work.
## Example
ImageIndexRequest
|> Jobs.release_jobs("images_0")
|> Repo.transaction()
"""
def release_jobs(job_module, worker_name) do
update_query =
from job in job_module,
where: job.worker_name == ^worker_name,
update: [set: [worker_name: nil]]
Multi.new()
|> Multi.run(:lock_table, lock_table(job_module))
|> Multi.update_all(:update, update_query, [])
|> Multi.run(:notify, notify_table(job_module))
end
@doc """
Return a `m:Ecto.Multi` to complete all jobs in the list of jobs.
Jobs where the request time is identical to the fetched job are deleted
entirely. Jobs where the request time is newer than the fetched job are
updated to reset their attempt count.
On successful completion, the table is notified for new work.
## Example
ImageIndexRequest
|> Jobs.complete_jobs(jobs)
|> Repo.transaction()
"""
def complete_jobs(job_module, jobs) do
[primary_key] = job_module.__schema__(:primary_key)
delete_query = where(job_module, fragment("'t' = 'f'"))
delete_query =
Enum.reduce(jobs, delete_query, fn job, query ->
or_where(
query,
[q],
field(q, ^primary_key) == ^Map.fetch!(job, primary_key) and
q.request_time == ^job.request_time
)
end)
job_keys = Enum.map(jobs, &Map.fetch!(&1, primary_key))
update_query =
from job in job_module,
where: field(job, ^primary_key) in ^job_keys,
update: [set: [attempt_count: 0, worker_name: nil]]
Multi.new()
|> Multi.run(:lock_table, lock_table(job_module))
|> Multi.delete_all(:delete, delete_query)
|> Multi.update_all(:update, update_query, [])
|> Multi.run(:notify, notify_table(job_module))
end
@doc """
Return a `m:Ecto.Multi` to fail the given job, incrementing its attempt
counter.
On successful completion, the table is notified for new work.
## Example
ImageIndexRequest
|> Jobs.fail_job(job)
|> Repo.transaction()
"""
def fail_job(job_module, job) do
[primary_key] = job_module.__schema__(:primary_key)
update_query =
from q in job_module,
where: field(q, ^primary_key) == ^Map.fetch!(job, primary_key),
update: [inc: [attempt_count: 1], set: [worker_name: nil]]
Multi.new()
|> Multi.run(:lock_table, lock_table(job_module))
|> Multi.update_all(:update, update_query, [])
|> Multi.run(:notify, notify_table(job_module))
end
defp avoid_deadlock(multi) do
table_lock_operations =
multi
|> Multi.to_list()
|> Enum.flat_map(fn
{{:lock_table, name}, _} -> [name]
_ -> []
end)
if table_lock_operations != Enum.sort(table_lock_operations) do
raise "Table lock operations do not occur in sorted order.\n" <>
"Got: #{inspect(table_lock_operations)}\n" <>
"Sort the lock operations to prevent deadlock."
else
multi
end
end
defp lock_table(job_module) do
fn repo, _changes ->
repo.query("LOCK TABLE $1 IN EXCLUSIVE MODE", [table_name(job_module)])
end
end
defp notify_table(job_module) do
fn repo, _changes ->
repo.query("NOTIFY $1", [table_name(job_module)])
end
end
defp table_name(job_module) do
job_module.__schema__(:source)
end
end

View file

@ -0,0 +1,58 @@
defmodule PhilomenaJob.Jobs.Job do
@moduledoc """
Base schema module for processing jobs.
"""
@doc false
defmacro __using__ do
quote do
use Ecto.Schema
end
end
@doc """
Defines custom schema fields for processing jobs.
Processing jobs have three default fields, which are created automatically
by the `job_schema/2` macro and should not be redefined:
- `:request_time`
- `:attempt_count`
- `:worker_name`
The client should define the primary key and any additional fields.
## Examples
defmodule Philomena.Images.IndexRequest do
use Philomena.Jobs.Job
job_schema "image_index_requests" do
belongs_to Philomena.Images.Image, primary_key: true
field :index_type, :string, default: "update"
end
end
defmodule Philomena.Images.StorageRequest do
use Philomena.Jobs.Job
job_schema "image_storage_requests" do
belongs_to Philomena.Images.Image, primary_key: true
field :operation, :string, default: "put"
field :key, :string
field :data, :blob
end
end
"""
defmacro job_schema(name, do: block) do
quote do
schema unquote(name) do
field :request_time, :utc_datetime_usec
field :attempt_count, :integer
field :worker_name, :string
unquote(block)
end
end
end
end

View file

@ -0,0 +1,71 @@
defmodule PhilomenaJob.Listener.Server do
@moduledoc """
A listener server which holds references to worker processes, and converts database
event notifications into messages for those worker processes.
A `PhilomenaJob.NotifierServer` reference must be provided. This is a server pid or name.
A supervision tree example:
children = [
{PhilomenaJob.Listener.Server, name: WorkerListener, notifier: WorkerNotifier}
]
"""
alias PhilomenaJob.Listener.State
alias PhilomenaJob.NotifierServer
alias PhilomenaJob.Worker
use GenServer, restart: :temporary, significant: true
@doc """
Registers the current process to activate when `channel_name` receives a notification.
Process listeners are automatically unregistered when the process exits.
## Example
iex> link_worker(listener_ref, "image_index_requests")
:ok
"""
def link_worker(listener, channel_name) do
:ok = GenServer.call(listener, {:link_worker, channel_name})
end
@doc false
def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
end
@doc false
@impl true
def init(opts) do
notifier = Keyword.fetch!(opts, :notifier)
unlisten = &NotifierServer.unlisten!(notifier, &1)
listen = &NotifierServer.listen!(notifier, &1)
notify = &Worker.notify/1
{:ok, State.new(unlisten: unlisten, listen: listen, notify: notify)}
end
@doc false
@impl true
def handle_call({:link_worker, channel_name}, {pid, _}, state) do
{:reply, :ok, State.add_worker(state, channel_name, pid)}
end
@doc false
@impl true
def handle_info(message, state)
def handle_info({:DOWN, _ref, :process, worker_pid, _reason}, state) do
{:noreply, State.remove_worker(state, worker_pid)}
end
def handle_info({:notification, _pid, listen_ref, _channel_name, _message}, state) do
{:noreply, State.notify_worker(state, listen_ref)}
end
end

View file

@ -0,0 +1,137 @@
defmodule PhilomenaJob.Listener.State do
@moduledoc """
Internal listener state.
"""
defmodule Entry do
@moduledoc false
defstruct channel_name: nil,
pid: nil,
process_ref: nil,
listen_ref: nil
end
defstruct entries: [],
notify: nil,
unlisten: nil,
listen: nil,
demonitor: nil,
monitor: nil
@doc """
Create a new instance of the internal listener state.
Supported options:
- `:notify`: callback which receives a worker pid. Required.
- `:unlisten`: callback to stop listening to a channel. Required.
- `:listen`: callback to begin listening to a channel. Required.
- `:demonitor`: callback to de-monitor a process. Optional, defaults to `Process.demonitor/1`.
- `:monitor`: callback to monitor a process. Optional, defaults to `Process.monitor/1`.
## Examples
iex> State.new(listen: &Notifier.listen!/2, unlisten: &Notifier.unlisten!/2)
%State{}
"""
def new(opts) do
notify = Keyword.fetch!(opts, :notify)
unlisten = Keyword.fetch!(opts, :unlisten)
listen = Keyword.fetch!(opts, :listen)
demonitor = Keyword.get(opts, :demonitor, &Process.demonitor/1)
monitor = Keyword.get(opts, :monitor, &Process.monitor/1)
%__MODULE__{
notify: notify,
unlisten: unlisten,
listen: listen,
demonitor: demonitor,
monitor: monitor
}
end
@doc """
Registers the given `worker_pid` to activate when `channel_name` receives a notification.
Processes which are added are monitored, and exits with any added process may trigger a
mailbox `DOWN` message by the caller which must be handled. See the documentation
for `Process.monitor/1` for more information about the `DOWN` message.
## Example
iex> add_worker(state, "image_index_requests", self())
%State{}
"""
def add_worker(%__MODULE__{} = state, channel_name, worker_pid) do
# Ensure that there is no worker already registered with this pid.
[] = filter_by(state, pid: worker_pid)
# Monitor and begin listening.
process_ref = state.monitor.(worker_pid)
listen_ref = state.listen.(channel_name)
# Add to state.
e = %Entry{
channel_name: channel_name,
pid: worker_pid,
process_ref: process_ref,
listen_ref: listen_ref
}
update_in(state.workers, &([e] ++ &1))
end
@doc """
Unregisters the given `worker_pid` for notifications.
## Example
iex> remove_worker(state, self())
%State{}
"""
def remove_worker(%__MODULE__{} = state, worker_pid) do
case filter_by(state, pid: worker_pid) do
[%Entry{} = entry] ->
# Stop listening and unmonitor.
state.unlisten.(entry.listen_ref)
state.demonitor.(entry.process_ref)
# Remove from state.
erase_by(state, pid: worker_pid)
[] ->
state
end
end
@doc """
Sends a worker listening on the given `listen_ref` a notification using the
`notify` callback.
## Example
iex> notify_workers(state, listen_ref)
%State{}
"""
def notify_worker(%__MODULE__{} = state, listen_ref) do
for entry <- filter_by(state, listen_ref: listen_ref) do
state.notify.(entry.pid)
end
state
end
defp filter_by(%__MODULE__{} = state, [{key, value}]) do
Enum.filter(state.entries, &match?(%{^key => ^value}, &1))
end
defp erase_by(%__MODULE__{} = state, [{key, value}]) do
workers = Enum.filter(state.entries, &(not match?(%{^key => ^value}, &1)))
put_in(state.workers, workers)
end
end

View file

@ -0,0 +1,41 @@
defmodule PhilomenaJob.NotifierServer do
@moduledoc """
Process wrapper to receive notifications from the Postgres LISTEN command.
A supervision tree example:
children = [
{PhilomenaJob.NotifierServer, repo_url: "ecto://postgres@postgres/philomena_dev"}
]
"""
alias Postgrex.Notifications
@doc false
def child_spec(opts) do
url = Keyword.fetch!(opts, :repo_url)
opts = Ecto.Repo.Supervisor.parse_url(url)
%{
id: __MODULE__,
start: {Notifications, :start_link, [opts]},
restart: :temporary,
significant: true
}
end
@doc """
Begin listening to the given channel. Returns a reference.
See `Postgrex.Notifications.listen!/3` for more information.
"""
defdelegate listen!(pid, channel, opts \\ []), to: Notifications
@doc """
Stop listening to the channel identified by the given reference.
See `Postgrex.Notifications.unlisten!/3` for more information.
"""
defdelegate unlisten!(pid, ref, opts \\ []), to: Notifications
end

View file

@ -0,0 +1,13 @@
defmodule PhilomenaJob.Processor do
@moduledoc """
Interface implemented by processors passed to `PhilomenaJob.Supervisor`.
"""
@doc """
Check to see if work is available, and if so, run it.
Return false to temporarily yield control and get called again.
Return true only when no more work is available.
"""
@callback check_work(keyword()) :: boolean()
end

View file

@ -0,0 +1,114 @@
defmodule PhilomenaJob.Semaphore.Server do
@moduledoc """
A counting semaphore.
This is used to limit the concurrency of a potentially large group of processes by calling
`acquire/1` or `run/1` - only up to the number of processes indicated by the startup value
are allowed to run concurrently.
A supervision tree example:
children = [
{PhilomenaJob.Semaphore.Server, name: WorkerSemaphore, max_concurrency: 16}
]
"""
alias PhilomenaJob.Semaphore.State
use GenServer, restart: :temporary, significant: true
@doc """
Wraps the given callback with an acquire before the callback and a release after running
the callback.
Returns the return value of the callback.
See `acquire/1` and `release/1` for additional details.
## Example
iex> Semaphore.run(WorkerSemaphore, fn -> check_work(state) end)
true
"""
def run(name, callback) do
acquire(name)
ret = callback.()
release(name)
ret
end
@doc """
Decrement the semaphore with the given name.
This either returns immediately with the semaphore value acquired, or blocks indefinitely until
sufficient other processes have released their holds on the semaphore to allow a new one to
acquire.
Processes which acquire the semaphore are monitored, and exits with an acquired value trigger
automatic release, so exceptions will not break the semaphore.
## Example
iex> Semaphore.acquire(semaphore)
:ok
"""
def acquire(name) do
:ok = GenServer.call(name, :acquire, :infinity)
end
@doc """
Increment the semaphore with the given name.
This releases the hold given by the current process and allows another process to begin running.
## Example
iex> Semaphore.release(semaphore)
:ok
"""
def release(name) do
:ok = GenServer.call(name, :release, :infinity)
end
@doc false
def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
end
@doc false
@impl true
def init(opts) do
{:ok, State.new(opts)}
end
@doc false
@impl true
def handle_call(message, from, state)
def handle_call(:acquire, from, state) do
case State.add_pending_process(state, from) do
{:ok, new_state} ->
{:noreply, new_state}
error ->
{:reply, error, state}
end
end
def handle_call(:release, {pid, _}, state) do
{:ok, new_state} = State.release_active_process(state, pid)
{:reply, :ok, new_state}
end
@doc false
@impl true
def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do
{:ok, state} = State.release_active_process(state, pid)
{:noreply, state}
end
end

View file

@ -0,0 +1,131 @@
defmodule PhilomenaJob.Semaphore.State do
@doc """
Internal semaphore state.
"""
defstruct active_processes: %{},
pending_processes: %{},
max_concurrency: nil,
demonitor: nil,
monitor: nil,
reply: nil
@doc """
Create a new instance of the internal semaphore state.
Supported options:
- `:max_concurrency`: the maximum number of processes which can be active. Required.
- `:demonitor`: callback to de-monitor a process. Optional, defaults to `Process.demonitor/1`.
- `:monitor`: callback to monitor a process. Optional, defaults to `Process.monitor/1`.
- `:reply`: callback to reply to a process. Optional, defaults to `GenServer.reply/2`.
## Examples
iex> State.new(max_concurrency: System.schedulers_online())
%State{}
"""
def new(opts) do
max_concurrency = Keyword.fetch!(opts, :max_concurrency)
demonitor = Keyword.get(opts, :demonitor, &Process.demonitor/1)
monitor = Keyword.get(opts, :monitor, &Process.monitor/1)
reply = Keyword.get(opts, :reply, &GenServer.reply/2)
%__MODULE__{
max_concurrency: max_concurrency,
demonitor: demonitor,
monitor: monitor,
reply: reply
}
end
@doc """
Decrement the semaphore with the given name.
This returns immediately with the state updated. The referenced process will be called with
the reply function once the semaphore is available.
Processes which acquire the semaphore are monitored, and exits with an acquired process may
trigger a mailbox `DOWN` message by the caller which must be handled. See the documentation
for `Process.monitor/1` for more information about the `DOWN` message.
## Example
iex> State.add_pending_process(state, {self(), 0})
{:ok, %State{}}
iex> State.add_pending_process(state, {self(), 0})
{:error, :already_pending_or_active}
"""
def add_pending_process(%__MODULE__{} = state, {pid, _} = from) do
if active?(state, pid) or pending?(state, pid) do
{:error, :already_pending_or_active, state}
else
state = update_in(state.pending_processes, &Map.put(&1, pid, from))
{:ok, try_acquire_process(state)}
end
end
@doc """
Increment the semaphore with the given name.
This returns immediately with the state updated, releases the hold given by the specified
process, and potentially allows another process to begin running.
## Example
iex> State.release_active_process(state, self())
{:ok, %State{}}
"""
def release_active_process(%__MODULE__{} = state, pid) do
if active?(state, pid) do
{:ok, release_process(state, pid)}
else
{:ok, state}
end
end
defp try_acquire_process(%__MODULE__{} = state)
when state.pending_processes != %{} and
map_size(state.active_processes) < state.max_concurrency do
# Start monitoring the process. We will automatically clean up when it exits.
{pid, from} = Enum.at(state.pending_processes, 0)
ref = state.monitor.(pid)
# Drop from pending and add to active.
state = update_in(state.pending_processes, &Map.delete(&1, pid))
state = update_in(state.active_processes, &Map.put(&1, pid, ref))
# Reply to the client which has now acquired the semaphore.
state.reply.(from, :ok)
state
end
defp try_acquire_process(state) do
# No pending processes or too many active processes, so nothing to do.
state
end
defp release_process(%__MODULE__{} = state, pid) do
# Stop watching the process.
ref = Map.fetch!(state.active_processes, pid)
state.demonitor.(ref)
# Drop from active set.
state = update_in(state.active_processes, &Map.delete(&1, pid))
# Try to acquire something new.
try_acquire_process(state)
end
defp active?(%__MODULE__{} = state, pid) do
Map.has_key?(state.active_processes, pid)
end
defp pending?(%__MODULE__{} = state, pid) do
Map.has_key?(state.pending_processes, pid)
end
end

View file

@ -0,0 +1,92 @@
defmodule PhilomenaJob.Supervisor do
@moduledoc """
Main supervisor for jobs processing.
Supported options:
- `:max_concurrency`: the maximum number of processors which can run in parallel. Required.
This is global across all processors specified by this supervisor instance.
Lowering the maximum concurrency delays processors until the concurrency drops.
- `:repo_url`: the Ecto URL to the database. Can be fetched from application env. Required.
- `:processors`: A list of processor modules to create worker processes for. Required.
- `:name`: the global name for this supervisor instance. Required.
Processor modules should implement the processor behaviour. See the `PhilomenaJob.Processor`
documentation for more information on required callbacks.
## Example
children = [
{PhilomenaJob.Supervisor,
max_concurrency: System.schedulers_online(),
repo_url: Application.get_env(:philomena, Philomena.Repo)[:url],
processors: [
CommentIndexUpdater,
FilterIndexUpdater,
GalleryIndexUpdater,
ImageIndexUpdater,
PostIndexUpdater,
ReportIndexUpdater,
TagIndexUpdater
],
name: IndexWorkSupervisor
}
]
Supervisor.start_link(children, opts)
"""
alias PhilomenaJob.Semaphore.Server, as: SemaphoreServer
alias PhilomenaJob.Listener.Server, as: ListenerServer
alias PhilomenaJob.NotifierServer
alias PhilomenaJob.Worker
@doc false
def child_spec(opts) do
name = Keyword.fetch!(opts, :name)
%{
id: Module.concat(__MODULE__, name),
start: {__MODULE__, :start_link, [opts]}
}
end
@doc false
def start_link(opts) do
processors = Keyword.fetch!(opts, :processors)
name = Keyword.fetch!(opts, :name)
# Start the main supervisor.
{:ok, main_sup} =
Supervisor.start_link(
[],
strategy: :one_for_one,
auto_shutdown: :any_significant,
name: name
)
# Start all three significant processes.
# If any of these exit, the supervisor exits.
opts =
opts
|> start_named_child(name, :notifier, NotifierServer)
|> start_named_child(name, :semaphore, SemaphoreServer)
|> start_named_child(name, :listener, ListenerServer)
# Start workers. These can restart automatically.
for processor <- processors do
opts = Keyword.merge(opts, processor: processor)
Supervisor.start_child(name, {Worker, opts})
end
# Return the main supervisor.
{:ok, main_sup}
end
defp start_named_child(opts, sup, name, child_module) do
{:ok, child} = Supervisor.start_child(sup, {child_module, opts})
Keyword.merge(opts, [{name, child}])
end
end

View file

@ -0,0 +1,63 @@
defmodule PhilomenaJob.Worker do
@moduledoc false
alias PhilomenaJob.Listener.Server, as: ListenerServer
alias PhilomenaJob.Semaphore.Server, as: SemaphoreServer
use GenServer
@doc """
Notify the given worker that work may be available.
"""
def notify(name) do
GenServer.cast(name, :check_work)
end
defstruct semaphore: nil,
listener: nil,
processor: nil,
opts: nil
@doc false
def init(opts) do
state = %__MODULE__{
semaphore: Keyword.fetch!(opts, :semaphore),
listener: Keyword.fetch!(opts, :listener),
processor: Keyword.fetch!(opts, :processor),
opts: Keyword.drop(opts, [:semaphore, :listener, :processor])
}
# Start listening for events.
ListenerServer.link_worker(state.listener, state.processor.channel())
# Check for new work.
{:ok, check_work(state)}
end
@doc false
def handle_cast(:check_work, %__MODULE__{} = state) do
{:noreply, check_work(state)}
end
defp check_work(%__MODULE__{} = state) do
# We have just started or received notification that work may be available.
processor = state.processor
opts = state.opts
# Keep calling check_work until we run out of work.
cycle(fn ->
SemaphoreServer.run(state.semaphore, fn ->
processor.check_work(opts)
end)
end)
state
end
defp cycle(callback) do
if callback.() do
:ok
else
cycle(callback)
end
end
end

View file

@ -0,0 +1,143 @@
defmodule PhilomenaJob.SemaphoreTest do
use ExUnit.Case, async: true
alias PhilomenaJob.Semaphore.Server, as: SemaphoreServer
@max_concurrency 8
describe "Server functionality" do
setup do
{:ok, pid} = SemaphoreServer.start_link(max_concurrency: @max_concurrency)
on_exit(fn -> Process.exit(pid, :kill) end)
%{pid: pid}
end
test "allows max_concurrency processes to acquire", %{pid: pid} do
# Check acquire
results =
(1..@max_concurrency)
|> Task.async_stream(fn _ -> SemaphoreServer.acquire(pid) end)
|> Enum.map(fn {:ok, res} -> res end)
assert true == Enum.all?(results)
# Check linking to process exit
# If this hangs, linking to process exit does not release the semaphore
results =
(1..@max_concurrency)
|> Task.async_stream(fn _ -> SemaphoreServer.acquire(pid) end)
|> Enum.map(fn {:ok, res} -> res end)
assert true == Enum.all?(results)
end
test "does not allow max_concurrency + 1 processes to acquire (exit)", %{pid: pid} do
processes =
(1..@max_concurrency)
|> Enum.map(fn _ -> acquire_and_wait_for_release(pid) end)
# This task should not be able to acquire
task = Task.async(fn -> SemaphoreServer.acquire(pid) end)
assert nil == Task.yield(task, 10)
# Terminate processes holding the semaphore
Enum.each(processes, &Process.exit(&1, :kill))
# Now the task should be able to acquire
assert {:ok, :ok} == Task.yield(task, 10)
end
test "does not allow max_concurrency + 1 processes to acquire (release)", %{pid: pid} do
processes =
(1..@max_concurrency)
|> Enum.map(fn _ -> acquire_and_wait_for_release(pid) end)
# This task should not be able to acquire
task = Task.async(fn -> SemaphoreServer.acquire(pid) end)
assert nil == Task.yield(task, 10)
# Release processes holding the semaphore
Enum.each(processes, &send(&1, :release))
# Now the task should be able to acquire
assert {:ok, :ok} == Task.yield(task, 10)
end
test "does not allow max_concurrency + 1 processes to acquire (run)", %{pid: pid} do
processes =
(1..@max_concurrency)
|> Enum.map(fn _ -> run_and_wait_for_release(pid) end)
# This task should not be able to acquire
task = Task.async(fn -> SemaphoreServer.acquire(pid) end)
assert nil == Task.yield(task, 10)
# Release processes holding the semaphore
Enum.each(processes, &send(&1, :release))
# Now the task should be able to acquire
assert {:ok, :ok} == Task.yield(task, 10)
end
test "does not allow re-acquire from the same process", %{pid: pid} do
acquire = fn ->
try do
{:ok, SemaphoreServer.acquire(pid)}
rescue
err -> {:error, err}
end
end
task = Task.async(fn ->
acquire.()
acquire.()
end)
assert {:ok, {:error, %MatchError{}}} = Task.yield(task)
end
test "allows re-release from the same process", %{pid: pid} do
release = fn ->
try do
{:ok, SemaphoreServer.release(pid)}
rescue
err -> {:error, err}
end
end
task = Task.async(fn ->
release.()
release.()
end)
assert {:ok, {:ok, :ok}} = Task.yield(task)
end
end
defp run_and_wait_for_release(pid) do
spawn(fn ->
SemaphoreServer.run(pid, fn ->
wait_for_release()
end)
end)
end
defp acquire_and_wait_for_release(pid) do
spawn(fn ->
SemaphoreServer.acquire(pid)
wait_for_release()
SemaphoreServer.release(pid)
end)
end
defp wait_for_release do
receive do
:release ->
:ok
_ ->
wait_for_release()
end
end
end