batched updater for user profile info

This commit is contained in:
byte[] 2019-12-18 14:32:21 -05:00
parent fe746dac0f
commit d6e89a1449
4 changed files with 130 additions and 31 deletions

View file

@ -18,6 +18,8 @@ defmodule Philomena.Application do
Philomena.Servers.UserLinkUpdater, Philomena.Servers.UserLinkUpdater,
Philomena.Servers.PicartoChannelUpdater, Philomena.Servers.PicartoChannelUpdater,
Philomena.Servers.PiczelChannelUpdater, Philomena.Servers.PiczelChannelUpdater,
Philomena.Servers.UserFingerprintUpdater,
Philomena.Servers.UserIpUpdater,
Pow.Store.Backend.MnesiaCache, Pow.Store.Backend.MnesiaCache,
{Redix, name: :redix, host: Application.get_env(:philomena, :redis_host)} {Redix, name: :redix, host: Application.get_env(:philomena, :redis_host)}
] ]

View file

@ -0,0 +1,60 @@
defmodule Philomena.Servers.UserFingerprintUpdater do
alias Philomena.UserFingerprints.UserFingerprint
alias Philomena.Repo
import Ecto.Query
def child_spec([]) do
%{
id: Philomena.Servers.UserFingerprintUpdater,
start: {Philomena.Servers.UserFingerprintUpdater, :start_link, [[]]}
}
end
def start_link([]) do
{:ok, spawn_link(&init/0)}
end
def cast(user_id, <<"c", _rest::binary>> = fingerprint, updated_at) when byte_size(fingerprint) <= 12 do
pid = Process.whereis(:fingerprint_updater)
if pid, do: send(pid, {user_id, fingerprint, updated_at})
end
def cast(_user_id, _fingerprint, _updated_at), do: nil
defp init do
Process.register(self(), :fingerprint_updater)
run()
end
defp run do
user_fps = Enum.map(receive_all(), &into_insert_all/1)
update_query = update(UserFingerprint, inc: [uses: 1], set: [updated_at: fragment("EXCLUDED.updated_at")])
Repo.insert_all(UserFingerprint, user_fps, on_conflict: update_query, conflict_target: [:user_id, :fingerprint])
:timer.sleep(:timer.seconds(60))
run()
end
defp receive_all(user_fps \\ %{}) do
receive do
{user_id, fingerprint, updated_at} ->
user_fps
|> Map.put({user_id, fingerprint}, updated_at)
|> receive_all()
after
0 ->
user_fps
end
end
defp into_insert_all({{user_id, fingerprint}, updated_at}) do
%{
user_id: user_id,
fingerprint: fingerprint,
uses: 1,
created_at: updated_at,
updated_at: updated_at
}
end
end

View file

@ -0,0 +1,62 @@
defmodule Philomena.Servers.UserIpUpdater do
alias Philomena.UserIps.UserIp
alias Philomena.Repo
import Ecto.Query
def child_spec([]) do
%{
id: Philomena.Servers.UserIpUpdater,
start: {Philomena.Servers.UserIpUpdater, :start_link, [[]]}
}
end
def start_link([]) do
{:ok, spawn_link(&init/0)}
end
def cast(user_id, ip_address, updated_at) do
pid = Process.whereis(:ip_updater)
if pid, do: send(pid, {user_id, ip_address, updated_at})
end
defp init do
Process.register(self(), :ip_updater)
run()
end
defp run do
user_ips = Enum.map(receive_all(), &into_insert_all/1)
update_query = update(UserIp, inc: [uses: 1], set: [updated_at: fragment("EXCLUDED.updated_at")])
Repo.insert_all(UserIp, user_ips, on_conflict: update_query, conflict_target: [:user_id, :ip])
:timer.sleep(:timer.seconds(60))
run()
end
defp receive_all(user_ips \\ %{}) do
receive do
{user_id, ip_address, updated_at} ->
user_ips
|> Map.put({user_id, ip_address}, updated_at)
|> receive_all()
after
0 ->
user_ips
end
end
defp into_insert_all({{user_id, ip_address}, updated_at}) do
%{
user_id: user_id,
ip: cast_ip(ip_address),
uses: 1,
created_at: updated_at,
updated_at: updated_at
}
end
# There exists no EctoNetwork.INET.cast!/1
defp cast_ip(ip), do: elem(EctoNetwork.INET.cast(ip), 1)
end

View file

@ -3,10 +3,8 @@ defmodule PhilomenaWeb.ReloadUserPlug do
alias Pow.Plug alias Pow.Plug
alias Philomena.Users alias Philomena.Users
alias Philomena.UserIps.UserIp alias Philomena.Servers.UserIpUpdater
alias Philomena.UserFingerprints.UserFingerprint alias Philomena.Servers.UserFingerprintUpdater
alias Philomena.Repo
import Ecto.Query
def init(opts), do: opts def init(opts), do: opts
@ -18,41 +16,18 @@ defmodule PhilomenaWeb.ReloadUserPlug do
conn conn
user -> user ->
spawn fn -> update_usages(conn, user) end update_usages(conn, user)
reloaded_user = Users.get_by(id: user.id) reloaded_user = Users.get_by(id: user.id)
Plug.assign_current_user(conn, reloaded_user, config) Plug.assign_current_user(conn, reloaded_user, config)
end end
end end
# TODO: move this to a background server instead of spawning
# off for every connection
defp update_usages(conn, user) do defp update_usages(conn, user) do
now = NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second)
conn = Conn.fetch_cookies(conn) conn = Conn.fetch_cookies(conn)
{:ok, ip} = EctoNetwork.INET.cast(conn.remote_ip) UserIpUpdater.cast(user.id, conn.remote_ip, now)
fp = conn.cookies["_ses"] UserFingerprintUpdater.cast(user.id, conn.cookies["_ses"], now)
if ip do
update = update(UserIp, inc: [uses: 1], set: [updated_at: fragment("now()")])
Repo.insert_all(
UserIp,
[%{user_id: user.id, ip: ip, uses: 1}],
conflict_target: [:user_id, :ip],
on_conflict: update
)
end
if fp do
update = update(UserFingerprint, inc: [uses: 1], set: [updated_at: fragment("now()")])
Repo.insert_all(
UserFingerprint,
[%{user_id: user.id, fingerprint: fp, uses: 1}],
conflict_target: [:user_id, :fingerprint],
on_conflict: update
)
end
end end
end end