From d6e89a144985fab5162604c128bf6f076bdc7e7d Mon Sep 17 00:00:00 2001 From: "byte[]" Date: Wed, 18 Dec 2019 14:32:21 -0500 Subject: [PATCH] batched updater for user profile info --- lib/philomena/application.ex | 2 + .../servers/user_fingerprint_updater.ex | 60 ++++++++++++++++++ lib/philomena/servers/user_ip_updater.ex | 62 +++++++++++++++++++ lib/philomena_web/plugs/reload_user_plug.ex | 37 ++--------- 4 files changed, 130 insertions(+), 31 deletions(-) create mode 100644 lib/philomena/servers/user_fingerprint_updater.ex create mode 100644 lib/philomena/servers/user_ip_updater.ex diff --git a/lib/philomena/application.ex b/lib/philomena/application.ex index f56b6323..524a98dd 100644 --- a/lib/philomena/application.ex +++ b/lib/philomena/application.ex @@ -18,6 +18,8 @@ defmodule Philomena.Application do Philomena.Servers.UserLinkUpdater, Philomena.Servers.PicartoChannelUpdater, Philomena.Servers.PiczelChannelUpdater, + Philomena.Servers.UserFingerprintUpdater, + Philomena.Servers.UserIpUpdater, Pow.Store.Backend.MnesiaCache, {Redix, name: :redix, host: Application.get_env(:philomena, :redis_host)} ] diff --git a/lib/philomena/servers/user_fingerprint_updater.ex b/lib/philomena/servers/user_fingerprint_updater.ex new file mode 100644 index 00000000..ef169620 --- /dev/null +++ b/lib/philomena/servers/user_fingerprint_updater.ex @@ -0,0 +1,60 @@ +defmodule Philomena.Servers.UserFingerprintUpdater do + alias Philomena.UserFingerprints.UserFingerprint + alias Philomena.Repo + import Ecto.Query + + def child_spec([]) do + %{ + id: Philomena.Servers.UserFingerprintUpdater, + start: {Philomena.Servers.UserFingerprintUpdater, :start_link, [[]]} + } + end + + def start_link([]) do + {:ok, spawn_link(&init/0)} + end + + def cast(user_id, <<"c", _rest::binary>> = fingerprint, updated_at) when byte_size(fingerprint) <= 12 do + pid = Process.whereis(:fingerprint_updater) + if pid, do: send(pid, {user_id, fingerprint, updated_at}) + end + def cast(_user_id, _fingerprint, _updated_at), do: nil + + defp init do + Process.register(self(), :fingerprint_updater) + run() + end + + defp run do + user_fps = Enum.map(receive_all(), &into_insert_all/1) + update_query = update(UserFingerprint, inc: [uses: 1], set: [updated_at: fragment("EXCLUDED.updated_at")]) + + Repo.insert_all(UserFingerprint, user_fps, on_conflict: update_query, conflict_target: [:user_id, :fingerprint]) + + :timer.sleep(:timer.seconds(60)) + + run() + end + + defp receive_all(user_fps \\ %{}) do + receive do + {user_id, fingerprint, updated_at} -> + user_fps + |> Map.put({user_id, fingerprint}, updated_at) + |> receive_all() + after + 0 -> + user_fps + end + end + + defp into_insert_all({{user_id, fingerprint}, updated_at}) do + %{ + user_id: user_id, + fingerprint: fingerprint, + uses: 1, + created_at: updated_at, + updated_at: updated_at + } + end +end diff --git a/lib/philomena/servers/user_ip_updater.ex b/lib/philomena/servers/user_ip_updater.ex new file mode 100644 index 00000000..096ae21f --- /dev/null +++ b/lib/philomena/servers/user_ip_updater.ex @@ -0,0 +1,62 @@ +defmodule Philomena.Servers.UserIpUpdater do + alias Philomena.UserIps.UserIp + alias Philomena.Repo + import Ecto.Query + + def child_spec([]) do + %{ + id: Philomena.Servers.UserIpUpdater, + start: {Philomena.Servers.UserIpUpdater, :start_link, [[]]} + } + end + + def start_link([]) do + {:ok, spawn_link(&init/0)} + end + + def cast(user_id, ip_address, updated_at) do + pid = Process.whereis(:ip_updater) + if pid, do: send(pid, {user_id, ip_address, updated_at}) + end + + defp init do + Process.register(self(), :ip_updater) + run() + end + + defp run do + user_ips = Enum.map(receive_all(), &into_insert_all/1) + update_query = update(UserIp, inc: [uses: 1], set: [updated_at: fragment("EXCLUDED.updated_at")]) + + Repo.insert_all(UserIp, user_ips, on_conflict: update_query, conflict_target: [:user_id, :ip]) + + :timer.sleep(:timer.seconds(60)) + + run() + end + + defp receive_all(user_ips \\ %{}) do + receive do + {user_id, ip_address, updated_at} -> + user_ips + |> Map.put({user_id, ip_address}, updated_at) + |> receive_all() + after + 0 -> + user_ips + end + end + + defp into_insert_all({{user_id, ip_address}, updated_at}) do + %{ + user_id: user_id, + ip: cast_ip(ip_address), + uses: 1, + created_at: updated_at, + updated_at: updated_at + } + end + + # There exists no EctoNetwork.INET.cast!/1 + defp cast_ip(ip), do: elem(EctoNetwork.INET.cast(ip), 1) +end diff --git a/lib/philomena_web/plugs/reload_user_plug.ex b/lib/philomena_web/plugs/reload_user_plug.ex index f154ff6a..355410c6 100644 --- a/lib/philomena_web/plugs/reload_user_plug.ex +++ b/lib/philomena_web/plugs/reload_user_plug.ex @@ -3,10 +3,8 @@ defmodule PhilomenaWeb.ReloadUserPlug do alias Pow.Plug alias Philomena.Users - alias Philomena.UserIps.UserIp - alias Philomena.UserFingerprints.UserFingerprint - alias Philomena.Repo - import Ecto.Query + alias Philomena.Servers.UserIpUpdater + alias Philomena.Servers.UserFingerprintUpdater def init(opts), do: opts @@ -18,41 +16,18 @@ defmodule PhilomenaWeb.ReloadUserPlug do conn user -> - spawn fn -> update_usages(conn, user) end + update_usages(conn, user) reloaded_user = Users.get_by(id: user.id) Plug.assign_current_user(conn, reloaded_user, config) end end - # TODO: move this to a background server instead of spawning - # off for every connection defp update_usages(conn, user) do + now = NaiveDateTime.utc_now() |> NaiveDateTime.truncate(:second) conn = Conn.fetch_cookies(conn) - {:ok, ip} = EctoNetwork.INET.cast(conn.remote_ip) - fp = conn.cookies["_ses"] - - if ip do - update = update(UserIp, inc: [uses: 1], set: [updated_at: fragment("now()")]) - - Repo.insert_all( - UserIp, - [%{user_id: user.id, ip: ip, uses: 1}], - conflict_target: [:user_id, :ip], - on_conflict: update - ) - end - - if fp do - update = update(UserFingerprint, inc: [uses: 1], set: [updated_at: fragment("now()")]) - - Repo.insert_all( - UserFingerprint, - [%{user_id: user.id, fingerprint: fp, uses: 1}], - conflict_target: [:user_id, :fingerprint], - on_conflict: update - ) - end + UserIpUpdater.cast(user.id, conn.remote_ip, now) + UserFingerprintUpdater.cast(user.id, conn.cookies["_ses"], now) end end