mirror of
https://github.com/philomena-dev/philomena.git
synced 2025-01-19 14:17:59 +01:00
Remove spawn calls (#80)
This commit is contained in:
parent
7f348ff5b5
commit
6ebf080826
28 changed files with 399 additions and 249 deletions
|
@ -20,7 +20,7 @@ config :elastix,
|
|||
json_codec: Jason
|
||||
|
||||
config :exq,
|
||||
max_retries: 2,
|
||||
max_retries: 5,
|
||||
scheduler_enable: true,
|
||||
start_on_application: false
|
||||
|
||||
|
|
|
@ -55,7 +55,12 @@ config :philomena,
|
|||
|
||||
config :exq,
|
||||
host: System.get_env("REDIS_HOST", "localhost"),
|
||||
queues: [{"videos", 2}, {"images", 4}, {"indexing", 16}]
|
||||
queues: [
|
||||
{"videos", 2},
|
||||
{"images", 4},
|
||||
{"indexing", 12},
|
||||
{"notifications", 2}
|
||||
]
|
||||
|
||||
if is_nil(System.get_env("START_WORKER")) do
|
||||
# Make queueing available but don't process any jobs
|
||||
|
|
|
@ -11,9 +11,11 @@ defmodule Philomena.Comments do
|
|||
alias Philomena.Reports.Report
|
||||
alias Philomena.Comments.Comment
|
||||
alias Philomena.Comments.ElasticsearchIndex, as: CommentIndex
|
||||
alias Philomena.IndexWorker
|
||||
alias Philomena.Images.Image
|
||||
alias Philomena.Images
|
||||
alias Philomena.Notifications
|
||||
alias Philomena.NotificationWorker
|
||||
alias Philomena.Versions
|
||||
alias Philomena.Reports
|
||||
|
||||
|
@ -64,31 +66,33 @@ defmodule Philomena.Comments do
|
|||
end
|
||||
|
||||
def notify_comment(comment) do
|
||||
spawn(fn ->
|
||||
image =
|
||||
comment
|
||||
|> Repo.preload(:image)
|
||||
|> Map.fetch!(:image)
|
||||
Exq.enqueue(Exq, "notifications", NotificationWorker, ["Comments", comment.id])
|
||||
end
|
||||
|
||||
subscriptions =
|
||||
image
|
||||
|> Repo.preload(:subscriptions)
|
||||
|> Map.fetch!(:subscriptions)
|
||||
def perform_notify(comment_id) do
|
||||
comment = get_comment!(comment_id)
|
||||
|
||||
Notifications.notify(
|
||||
comment,
|
||||
subscriptions,
|
||||
%{
|
||||
actor_id: image.id,
|
||||
actor_type: "Image",
|
||||
actor_child_id: comment.id,
|
||||
actor_child_type: "Comment",
|
||||
action: "commented on"
|
||||
}
|
||||
)
|
||||
end)
|
||||
image =
|
||||
comment
|
||||
|> Repo.preload(:image)
|
||||
|> Map.fetch!(:image)
|
||||
|
||||
comment
|
||||
subscriptions =
|
||||
image
|
||||
|> Repo.preload(:subscriptions)
|
||||
|> Map.fetch!(:subscriptions)
|
||||
|
||||
Notifications.notify(
|
||||
comment,
|
||||
subscriptions,
|
||||
%{
|
||||
actor_id: image.id,
|
||||
actor_type: "Image",
|
||||
actor_child_id: comment.id,
|
||||
actor_child_type: "Comment",
|
||||
action: "commented on"
|
||||
}
|
||||
)
|
||||
end
|
||||
|
||||
@doc """
|
||||
|
@ -216,24 +220,13 @@ defmodule Philomena.Comments do
|
|||
end
|
||||
|
||||
def reindex_comment(%Comment{} = comment) do
|
||||
spawn(fn ->
|
||||
Comment
|
||||
|> preload(^indexing_preloads())
|
||||
|> where(id: ^comment.id)
|
||||
|> Repo.one()
|
||||
|> Elasticsearch.index_document(Comment)
|
||||
end)
|
||||
Exq.enqueue(Exq, "indexing", IndexWorker, ["Comments", "id", [comment.id]])
|
||||
|
||||
comment
|
||||
end
|
||||
|
||||
def reindex_comments(image) do
|
||||
spawn(fn ->
|
||||
Comment
|
||||
|> preload(^indexing_preloads())
|
||||
|> where(image_id: ^image.id)
|
||||
|> Elasticsearch.reindex(Comment)
|
||||
end)
|
||||
Exq.enqueue(Exq, "indexing", IndexWorker, ["Comments", "image_id", [image.id]])
|
||||
|
||||
image
|
||||
end
|
||||
|
@ -241,4 +234,11 @@ defmodule Philomena.Comments do
|
|||
def indexing_preloads do
|
||||
[:user, image: :tags]
|
||||
end
|
||||
|
||||
def perform_reindex(column, condition) do
|
||||
Comment
|
||||
|> preload(^indexing_preloads())
|
||||
|> where([c], field(c, ^column) in ^condition)
|
||||
|> Elasticsearch.reindex(Comment)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -11,7 +11,10 @@ defmodule Philomena.Galleries do
|
|||
alias Philomena.Galleries.Gallery
|
||||
alias Philomena.Galleries.Interaction
|
||||
alias Philomena.Galleries.ElasticsearchIndex, as: GalleryIndex
|
||||
alias Philomena.IndexWorker
|
||||
alias Philomena.GalleryReorderWorker
|
||||
alias Philomena.Notifications
|
||||
alias Philomena.NotificationWorker
|
||||
alias Philomena.Images
|
||||
|
||||
@doc """
|
||||
|
@ -117,21 +120,13 @@ defmodule Philomena.Galleries do
|
|||
end
|
||||
|
||||
def reindex_gallery(%Gallery{} = gallery) do
|
||||
spawn(fn ->
|
||||
Gallery
|
||||
|> preload(^indexing_preloads())
|
||||
|> where(id: ^gallery.id)
|
||||
|> Repo.one()
|
||||
|> Elasticsearch.index_document(Gallery)
|
||||
end)
|
||||
Exq.enqueue(Exq, "indexing", IndexWorker, ["Galleries", "id", [gallery.id]])
|
||||
|
||||
gallery
|
||||
end
|
||||
|
||||
def unindex_gallery(%Gallery{} = gallery) do
|
||||
spawn(fn ->
|
||||
Elasticsearch.delete_document(gallery.id, Gallery)
|
||||
end)
|
||||
Elasticsearch.delete_document(gallery.id, Gallery)
|
||||
|
||||
gallery
|
||||
end
|
||||
|
@ -140,6 +135,13 @@ defmodule Philomena.Galleries do
|
|||
[:subscribers, :creator, :interactions]
|
||||
end
|
||||
|
||||
def perform_reindex(column, condition) do
|
||||
Gallery
|
||||
|> preload(^indexing_preloads())
|
||||
|> where([g], field(g, ^column) in ^condition)
|
||||
|> Elasticsearch.reindex(Gallery)
|
||||
end
|
||||
|
||||
def add_image_to_gallery(gallery, image) do
|
||||
Multi.new()
|
||||
|> Multi.run(:lock, fn repo, %{} ->
|
||||
|
@ -231,92 +233,96 @@ defmodule Philomena.Galleries do
|
|||
end
|
||||
|
||||
def notify_gallery(gallery) do
|
||||
spawn(fn ->
|
||||
subscriptions =
|
||||
gallery
|
||||
|> Repo.preload(:subscriptions)
|
||||
|> Map.fetch!(:subscriptions)
|
||||
Exq.enqueue(Exq, "notifications", NotificationWorker, ["Galleries", gallery.id])
|
||||
end
|
||||
|
||||
Notifications.notify(
|
||||
gallery,
|
||||
subscriptions,
|
||||
%{
|
||||
actor_id: gallery.id,
|
||||
actor_type: "Gallery",
|
||||
actor_child_id: nil,
|
||||
actor_child_type: nil,
|
||||
action: "added images to"
|
||||
}
|
||||
)
|
||||
end)
|
||||
def perform_notify(gallery_id) do
|
||||
gallery = get_gallery!(gallery_id)
|
||||
|
||||
gallery
|
||||
subscriptions =
|
||||
gallery
|
||||
|> Repo.preload(:subscriptions)
|
||||
|> Map.fetch!(:subscriptions)
|
||||
|
||||
Notifications.notify(
|
||||
gallery,
|
||||
subscriptions,
|
||||
%{
|
||||
actor_id: gallery.id,
|
||||
actor_type: "Gallery",
|
||||
actor_child_id: nil,
|
||||
actor_child_type: nil,
|
||||
action: "added images to"
|
||||
}
|
||||
)
|
||||
end
|
||||
|
||||
def reorder_gallery(gallery, image_ids) do
|
||||
spawn(fn ->
|
||||
interactions =
|
||||
Interaction
|
||||
|> where([gi], gi.image_id in ^image_ids and gi.gallery_id == ^gallery.id)
|
||||
|> order_by(^position_order(gallery))
|
||||
|> Repo.all()
|
||||
Exq.enqueue(Exq, "indexing", GalleryReorderWorker, [gallery.id, image_ids])
|
||||
end
|
||||
|
||||
interaction_positions =
|
||||
interactions
|
||||
|> Enum.with_index()
|
||||
|> Map.new(fn {interaction, index} -> {index, interaction.position} end)
|
||||
def perform_reorder(gallery_id, image_ids) do
|
||||
gallery = get_gallery!(gallery_id)
|
||||
|
||||
images_present = Map.new(interactions, &{&1.image_id, true})
|
||||
interactions =
|
||||
Interaction
|
||||
|> where([gi], gi.image_id in ^image_ids and gi.gallery_id == ^gallery.id)
|
||||
|> order_by(^position_order(gallery))
|
||||
|> Repo.all()
|
||||
|
||||
requested =
|
||||
image_ids
|
||||
|> Enum.filter(&images_present[&1])
|
||||
|> Enum.with_index()
|
||||
|> Map.new()
|
||||
interaction_positions =
|
||||
interactions
|
||||
|> Enum.with_index()
|
||||
|> Map.new(fn {interaction, index} -> {index, interaction.position} end)
|
||||
|
||||
changes =
|
||||
interactions
|
||||
|> Enum.with_index()
|
||||
|> Enum.flat_map(fn {interaction, current_index} ->
|
||||
new_index = requested[interaction.image_id]
|
||||
images_present = Map.new(interactions, &{&1.image_id, true})
|
||||
|
||||
case new_index == current_index do
|
||||
true ->
|
||||
[]
|
||||
requested =
|
||||
image_ids
|
||||
|> Enum.filter(&images_present[&1])
|
||||
|> Enum.with_index()
|
||||
|> Map.new()
|
||||
|
||||
false ->
|
||||
changes =
|
||||
interactions
|
||||
|> Enum.with_index()
|
||||
|> Enum.flat_map(fn {interaction, current_index} ->
|
||||
new_index = requested[interaction.image_id]
|
||||
|
||||
case new_index == current_index do
|
||||
true ->
|
||||
[]
|
||||
|
||||
false ->
|
||||
[
|
||||
[
|
||||
[
|
||||
id: interaction.id,
|
||||
position: interaction_positions[new_index]
|
||||
]
|
||||
id: interaction.id,
|
||||
position: interaction_positions[new_index]
|
||||
]
|
||||
end
|
||||
end)
|
||||
|
||||
changes
|
||||
|> Enum.map(fn change ->
|
||||
id = Keyword.fetch!(change, :id)
|
||||
change = Keyword.delete(change, :id)
|
||||
|
||||
Interaction
|
||||
|> where([i], i.id == ^id)
|
||||
|> Repo.update_all(set: change)
|
||||
]
|
||||
end
|
||||
end)
|
||||
|
||||
# Do the update in a single statement
|
||||
# Repo.insert_all(
|
||||
# Interaction,
|
||||
# changes,
|
||||
# on_conflict: {:replace, [:position]},
|
||||
# conflict_target: [:id]
|
||||
# )
|
||||
changes
|
||||
|> Enum.map(fn change ->
|
||||
id = Keyword.fetch!(change, :id)
|
||||
change = Keyword.delete(change, :id)
|
||||
|
||||
# Now update all the associated images
|
||||
Images.reindex_images(Map.keys(requested))
|
||||
Interaction
|
||||
|> where([i], i.id == ^id)
|
||||
|> Repo.update_all(set: change)
|
||||
end)
|
||||
|
||||
gallery
|
||||
# Do the update in a single statement
|
||||
# Repo.insert_all(
|
||||
# Interaction,
|
||||
# changes,
|
||||
# on_conflict: {:replace, [:position]},
|
||||
# conflict_target: [:id]
|
||||
# )
|
||||
|
||||
# Now update all the associated images
|
||||
Images.reindex_images(Map.keys(requested))
|
||||
end
|
||||
|
||||
defp position_order(%{order_position_asc: true}), do: [asc: :position]
|
||||
|
|
|
@ -16,9 +16,11 @@ defmodule Philomena.Images do
|
|||
alias Philomena.Images.Uploader
|
||||
alias Philomena.Images.Tagging
|
||||
alias Philomena.Images.ElasticsearchIndex, as: ImageIndex
|
||||
alias Philomena.IndexWorker
|
||||
alias Philomena.ImageFeatures.ImageFeature
|
||||
alias Philomena.SourceChanges.SourceChange
|
||||
alias Philomena.Notifications.Notification
|
||||
alias Philomena.NotificationWorker
|
||||
alias Philomena.TagChanges.TagChange
|
||||
alias Philomena.Tags
|
||||
alias Philomena.UserStatistics
|
||||
|
@ -644,18 +646,13 @@ defmodule Philomena.Images do
|
|||
end
|
||||
|
||||
def reindex_image(%Image{} = image) do
|
||||
reindex_images([image.id])
|
||||
Exq.enqueue(Exq, "indexing", IndexWorker, ["Images", "id", [image.id]])
|
||||
|
||||
image
|
||||
end
|
||||
|
||||
def reindex_images(image_ids) do
|
||||
spawn(fn ->
|
||||
Image
|
||||
|> preload(^indexing_preloads())
|
||||
|> where([i], i.id in ^image_ids)
|
||||
|> Elasticsearch.reindex(Image)
|
||||
end)
|
||||
Exq.enqueue(Exq, "indexing", IndexWorker, ["Images", "id", image_ids])
|
||||
|
||||
image_ids
|
||||
end
|
||||
|
@ -673,6 +670,13 @@ defmodule Philomena.Images do
|
|||
]
|
||||
end
|
||||
|
||||
def perform_reindex(column, condition) do
|
||||
Image
|
||||
|> preload(^indexing_preloads())
|
||||
|> where([i], field(i, ^column) in ^condition)
|
||||
|> Elasticsearch.reindex(Image)
|
||||
end
|
||||
|
||||
alias Philomena.Images.Subscription
|
||||
|
||||
def subscribed?(_image, nil), do: false
|
||||
|
@ -740,24 +744,28 @@ defmodule Philomena.Images do
|
|||
end
|
||||
|
||||
def notify_merge(source, target) do
|
||||
spawn(fn ->
|
||||
subscriptions =
|
||||
target
|
||||
|> Repo.preload(:subscriptions)
|
||||
|> Map.fetch!(:subscriptions)
|
||||
Exq.enqueue(Exq, "notifications", NotificationWorker, ["Images", [source.id, target.id]])
|
||||
end
|
||||
|
||||
Notifications.notify(
|
||||
nil,
|
||||
subscriptions,
|
||||
%{
|
||||
actor_id: target.id,
|
||||
actor_type: "Image",
|
||||
actor_child_id: nil,
|
||||
actor_child_type: nil,
|
||||
action: "merged ##{source.id} into"
|
||||
}
|
||||
)
|
||||
end)
|
||||
def perform_notify([source_id, target_id]) do
|
||||
target = get_image!(target_id)
|
||||
|
||||
subscriptions =
|
||||
target
|
||||
|> Repo.preload(:subscriptions)
|
||||
|> Map.fetch!(:subscriptions)
|
||||
|
||||
Notifications.notify(
|
||||
nil,
|
||||
subscriptions,
|
||||
%{
|
||||
actor_id: target.id,
|
||||
actor_type: "Image",
|
||||
actor_child_id: nil,
|
||||
actor_child_type: nil,
|
||||
action: "merged ##{source_id} into"
|
||||
}
|
||||
)
|
||||
end
|
||||
|
||||
def clear_notification(_image, nil), do: nil
|
||||
|
|
|
@ -12,8 +12,10 @@ defmodule Philomena.Posts do
|
|||
alias Philomena.Topics
|
||||
alias Philomena.Posts.Post
|
||||
alias Philomena.Posts.ElasticsearchIndex, as: PostIndex
|
||||
alias Philomena.IndexWorker
|
||||
alias Philomena.Forums.Forum
|
||||
alias Philomena.Notifications
|
||||
alias Philomena.NotificationWorker
|
||||
alias Philomena.Versions
|
||||
alias Philomena.Reports
|
||||
alias Philomena.Reports.Report
|
||||
|
@ -93,31 +95,33 @@ defmodule Philomena.Posts do
|
|||
end
|
||||
|
||||
def notify_post(post) do
|
||||
spawn(fn ->
|
||||
topic =
|
||||
post
|
||||
|> Repo.preload(:topic)
|
||||
|> Map.fetch!(:topic)
|
||||
Exq.enqueue(Exq, "notifications", NotificationWorker, ["Posts", post.id])
|
||||
end
|
||||
|
||||
subscriptions =
|
||||
topic
|
||||
|> Repo.preload(:subscriptions)
|
||||
|> Map.fetch!(:subscriptions)
|
||||
def perform_notify(post_id) do
|
||||
post = get_post!(post_id)
|
||||
|
||||
Notifications.notify(
|
||||
post,
|
||||
subscriptions,
|
||||
%{
|
||||
actor_id: topic.id,
|
||||
actor_type: "Topic",
|
||||
actor_child_id: post.id,
|
||||
actor_child_type: "Post",
|
||||
action: "posted a new reply in"
|
||||
}
|
||||
)
|
||||
end)
|
||||
topic =
|
||||
post
|
||||
|> Repo.preload(:topic)
|
||||
|> Map.fetch!(:topic)
|
||||
|
||||
post
|
||||
subscriptions =
|
||||
topic
|
||||
|> Repo.preload(:subscriptions)
|
||||
|> Map.fetch!(:subscriptions)
|
||||
|
||||
Notifications.notify(
|
||||
post,
|
||||
subscriptions,
|
||||
%{
|
||||
actor_id: topic.id,
|
||||
actor_type: "Topic",
|
||||
actor_child_id: post.id,
|
||||
actor_child_type: "Post",
|
||||
action: "posted a new reply in"
|
||||
}
|
||||
)
|
||||
end
|
||||
|
||||
@doc """
|
||||
|
@ -232,13 +236,7 @@ defmodule Philomena.Posts do
|
|||
end
|
||||
|
||||
def reindex_post(%Post{} = post) do
|
||||
spawn(fn ->
|
||||
Post
|
||||
|> preload(^indexing_preloads())
|
||||
|> where(id: ^post.id)
|
||||
|> Repo.one()
|
||||
|> Elasticsearch.index_document(Post)
|
||||
end)
|
||||
Exq.enqueue(Exq, "indexing", IndexWorker, ["Posts", "id", [post.id]])
|
||||
|
||||
post
|
||||
end
|
||||
|
@ -246,4 +244,11 @@ defmodule Philomena.Posts do
|
|||
def indexing_preloads do
|
||||
[:user, topic: :forum]
|
||||
end
|
||||
|
||||
def perform_reindex(column, condition) do
|
||||
Post
|
||||
|> preload(^indexing_preloads())
|
||||
|> where([p], field(p, ^column) in ^condition)
|
||||
|> Elasticsearch.reindex(Post)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -9,6 +9,7 @@ defmodule Philomena.Reports do
|
|||
alias Philomena.Elasticsearch
|
||||
alias Philomena.Reports.Report
|
||||
alias Philomena.Reports.ElasticsearchIndex, as: ReportIndex
|
||||
alias Philomena.IndexWorker
|
||||
alias Philomena.Polymorphic
|
||||
|
||||
@doc """
|
||||
|
@ -143,24 +144,26 @@ defmodule Philomena.Reports do
|
|||
defp maybe_reindex_report(result), do: result
|
||||
|
||||
def reindex_reports(report_ids) do
|
||||
spawn(fn ->
|
||||
Report
|
||||
|> where([r], r.id in ^report_ids)
|
||||
|> preload([:user, :admin])
|
||||
|> Repo.all()
|
||||
|> Polymorphic.load_polymorphic(reportable: [reportable_id: :reportable_type])
|
||||
|> Enum.map(&Elasticsearch.index_document(&1, Report))
|
||||
end)
|
||||
Exq.enqueue(Exq, "indexing", IndexWorker, ["Reports", "id", report_ids])
|
||||
|
||||
report_ids
|
||||
end
|
||||
|
||||
def reindex_report(%Report{} = report) do
|
||||
reindex_reports([report.id])
|
||||
Exq.enqueue(Exq, "indexing", IndexWorker, ["Reports", "id", [report.id]])
|
||||
|
||||
report
|
||||
end
|
||||
|
||||
def perform_reindex(column, condition) do
|
||||
Report
|
||||
|> where([r], field(r, ^column) in ^condition)
|
||||
|> preload([:user, :admin])
|
||||
|> Repo.all()
|
||||
|> Polymorphic.load_polymorphic(reportable: [reportable_id: :reportable_type])
|
||||
|> Enum.map(&Elasticsearch.index_document(&1, Report))
|
||||
end
|
||||
|
||||
def count_reports(user) do
|
||||
if Canada.Can.can?(user, :index, Report) do
|
||||
Report
|
||||
|
|
|
@ -7,6 +7,11 @@ defmodule Philomena.Tags do
|
|||
alias Philomena.Repo
|
||||
|
||||
alias Philomena.Elasticsearch
|
||||
alias Philomena.IndexWorker
|
||||
alias Philomena.TagAliasWorker
|
||||
alias Philomena.TagUnaliasWorker
|
||||
alias Philomena.TagReindexWorker
|
||||
alias Philomena.TagDeleteWorker
|
||||
alias Philomena.Tags.Tag
|
||||
alias Philomena.Tags.Uploader
|
||||
alias Philomena.Images
|
||||
|
@ -73,7 +78,7 @@ defmodule Philomena.Tags do
|
|||
** (Ecto.NoResultsError)
|
||||
|
||||
"""
|
||||
def get_tag!(slug), do: Repo.get_by!(Tag, slug: slug)
|
||||
def get_tag!(id), do: Repo.get!(Tag, id)
|
||||
|
||||
@doc """
|
||||
Creates a tag.
|
||||
|
@ -162,6 +167,14 @@ defmodule Philomena.Tags do
|
|||
|
||||
"""
|
||||
def delete_tag(%Tag{} = tag) do
|
||||
Exq.enqueue(Exq, "indexing", TagDeleteWorker, [tag.id])
|
||||
|
||||
{:ok, tag}
|
||||
end
|
||||
|
||||
def perform_delete(tag_id) do
|
||||
tag = get_tag!(tag_id)
|
||||
|
||||
image_ids =
|
||||
Image
|
||||
|> join(:inner, [i], _ in assoc(i, :tags))
|
||||
|
@ -188,9 +201,7 @@ defmodule Philomena.Tags do
|
|||
|> Repo.update()
|
||||
|> case do
|
||||
{:ok, tag} ->
|
||||
spawn(fn ->
|
||||
perform_alias(tag, target_tag)
|
||||
end)
|
||||
Exq.enqueue(Exq, "indexing", TagAliasWorker, [tag.id, target_tag.id])
|
||||
|
||||
{:ok, tag}
|
||||
|
||||
|
@ -199,7 +210,10 @@ defmodule Philomena.Tags do
|
|||
end
|
||||
end
|
||||
|
||||
defp perform_alias(tag, target_tag) do
|
||||
def perform_alias(tag_id, target_tag_id) do
|
||||
tag = get_tag!(tag_id)
|
||||
target_tag = get_tag!(target_tag_id)
|
||||
|
||||
filters_hidden =
|
||||
where(Filter, [f], fragment("? @> ARRAY[?]::integer[]", f.hidden_tag_ids, ^tag.id))
|
||||
|
||||
|
@ -253,6 +267,14 @@ defmodule Philomena.Tags do
|
|||
end
|
||||
|
||||
def reindex_tag_images(%Tag{} = tag) do
|
||||
Exq.enqueue(Exq, "indexing", TagReindexWorker, [tag.id])
|
||||
|
||||
{:ok, tag}
|
||||
end
|
||||
|
||||
def perform_reindex_images(tag_id) do
|
||||
tag = get_tag!(tag_id)
|
||||
|
||||
# First recount the tag
|
||||
image_count =
|
||||
Image
|
||||
|
@ -273,6 +295,13 @@ defmodule Philomena.Tags do
|
|||
end
|
||||
|
||||
def unalias_tag(%Tag{} = tag) do
|
||||
Exq.enqueue(Exq, "indexing", TagUnaliasWorker, [tag.id])
|
||||
|
||||
{:ok, tag}
|
||||
end
|
||||
|
||||
def perform_unalias(tag_id) do
|
||||
tag = get_tag!(tag_id)
|
||||
former_alias = Repo.preload(tag, :aliased_tag).aliased_tag
|
||||
|
||||
tag
|
||||
|
@ -352,20 +381,13 @@ defmodule Philomena.Tags do
|
|||
end
|
||||
|
||||
def reindex_tag(%Tag{} = tag) do
|
||||
reindex_tags([%Tag{id: tag.id}])
|
||||
Exq.enqueue(Exq, "indexing", IndexWorker, ["Tags", "id", [tag.id]])
|
||||
|
||||
tag
|
||||
end
|
||||
|
||||
def reindex_tags(tags) do
|
||||
spawn(fn ->
|
||||
ids =
|
||||
tags
|
||||
|> Enum.map(& &1.id)
|
||||
|
||||
Tag
|
||||
|> preload(^indexing_preloads())
|
||||
|> where([t], t.id in ^ids)
|
||||
|> Elasticsearch.reindex(Tag)
|
||||
end)
|
||||
Exq.enqueue(Exq, "indexing", IndexWorker, ["Tags", "id", Enum.map(tags, & &1.id)])
|
||||
|
||||
tags
|
||||
end
|
||||
|
@ -374,6 +396,13 @@ defmodule Philomena.Tags do
|
|||
[:aliased_tag, :aliases, :implied_tags, :implied_by_tags]
|
||||
end
|
||||
|
||||
def perform_reindex(column, condition) do
|
||||
Tag
|
||||
|> preload(^indexing_preloads())
|
||||
|> where([t], field(t, ^column) in ^condition)
|
||||
|> Elasticsearch.reindex(Tag)
|
||||
end
|
||||
|
||||
alias Philomena.Tags.Implication
|
||||
|
||||
@doc """
|
||||
|
|
|
@ -9,7 +9,9 @@ defmodule Philomena.Topics do
|
|||
|
||||
alias Philomena.Topics.Topic
|
||||
alias Philomena.Forums.Forum
|
||||
alias Philomena.Posts
|
||||
alias Philomena.Notifications
|
||||
alias Philomena.NotificationWorker
|
||||
|
||||
@doc """
|
||||
Gets a single topic.
|
||||
|
@ -74,31 +76,34 @@ defmodule Philomena.Topics do
|
|||
end
|
||||
|
||||
def notify_topic(topic, post) do
|
||||
spawn(fn ->
|
||||
forum =
|
||||
topic
|
||||
|> Repo.preload(:forum)
|
||||
|> Map.fetch!(:forum)
|
||||
Exq.enqueue(Exq, "notifications", NotificationWorker, ["Topics", [topic.id, post.id]])
|
||||
end
|
||||
|
||||
subscriptions =
|
||||
forum
|
||||
|> Repo.preload(:subscriptions)
|
||||
|> Map.fetch!(:subscriptions)
|
||||
def perform_notify([topic_id, post_id]) do
|
||||
topic = get_topic!(topic_id)
|
||||
post = Posts.get_post!(post_id)
|
||||
|
||||
Notifications.notify(
|
||||
post,
|
||||
subscriptions,
|
||||
%{
|
||||
actor_id: topic.id,
|
||||
actor_type: "Topic",
|
||||
actor_child_id: post.id,
|
||||
actor_child_type: "Post",
|
||||
action: "posted a new topic in #{forum.name}"
|
||||
}
|
||||
)
|
||||
end)
|
||||
forum =
|
||||
topic
|
||||
|> Repo.preload(:forum)
|
||||
|> Map.fetch!(:forum)
|
||||
|
||||
topic
|
||||
subscriptions =
|
||||
forum
|
||||
|> Repo.preload(:subscriptions)
|
||||
|> Map.fetch!(:subscriptions)
|
||||
|
||||
Notifications.notify(
|
||||
post,
|
||||
subscriptions,
|
||||
%{
|
||||
actor_id: topic.id,
|
||||
actor_type: "Topic",
|
||||
actor_child_id: post.id,
|
||||
actor_child_type: "Post",
|
||||
action: "posted a new topic in #{forum.name}"
|
||||
}
|
||||
)
|
||||
end
|
||||
|
||||
@doc """
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
defmodule Philomena.UserDownvoteWipe do
|
||||
alias Philomena.Batch
|
||||
alias Philomena.Elasticsearch
|
||||
alias Philomena.Users
|
||||
alias Philomena.Users.User
|
||||
alias Philomena.Images.Image
|
||||
alias Philomena.Images
|
||||
|
@ -9,7 +10,9 @@ defmodule Philomena.UserDownvoteWipe do
|
|||
alias Philomena.Repo
|
||||
import Ecto.Query
|
||||
|
||||
def perform(user, upvotes_and_faves_too \\ false) do
|
||||
def perform(user_id, upvotes_and_faves_too \\ false) do
|
||||
user = Users.get_user!(user_id)
|
||||
|
||||
ImageVote
|
||||
|> where(user_id: ^user.id, up: false)
|
||||
|> Batch.query_batches([id_field: :image_id], fn queryable ->
|
||||
|
|
|
@ -10,11 +10,14 @@ defmodule Philomena.UserWipe do
|
|||
alias Philomena.TagChanges.TagChange
|
||||
alias Philomena.UserIps.UserIp
|
||||
alias Philomena.UserFingerprints.UserFingerprint
|
||||
alias Philomena.Users
|
||||
alias Philomena.Users.User
|
||||
alias Philomena.Repo
|
||||
import Ecto.Query
|
||||
|
||||
def perform(user) do
|
||||
def perform(user_id) do
|
||||
user = Users.get_user!(user_id)
|
||||
|
||||
random_hex = :crypto.strong_rand_bytes(16) |> Base.encode16(case: :lower)
|
||||
|
||||
for schema <- [Comment, Image, Post, Report, SourceChange, TagChange] do
|
||||
|
|
|
@ -17,6 +17,7 @@ defmodule Philomena.Users do
|
|||
alias Philomena.Posts
|
||||
alias Philomena.Galleries
|
||||
alias Philomena.Reports
|
||||
alias Philomena.UserRenameWorker
|
||||
|
||||
## Database getters
|
||||
|
||||
|
@ -604,13 +605,7 @@ defmodule Philomena.Users do
|
|||
|> Repo.transaction()
|
||||
|> case do
|
||||
{:ok, %{account: %{name: new_name} = account}} ->
|
||||
spawn(fn ->
|
||||
Images.user_name_reindex(old_name, new_name)
|
||||
Comments.user_name_reindex(old_name, new_name)
|
||||
Posts.user_name_reindex(old_name, new_name)
|
||||
Galleries.user_name_reindex(old_name, new_name)
|
||||
Reports.user_name_reindex(old_name, new_name)
|
||||
end)
|
||||
Exq.enqueue(Exq, "indexing", UserRenameWorker, [old_name, new_name])
|
||||
|
||||
{:ok, account}
|
||||
|
||||
|
@ -619,6 +614,14 @@ defmodule Philomena.Users do
|
|||
end
|
||||
end
|
||||
|
||||
def perform_rename(old_name, new_name) do
|
||||
Images.user_name_reindex(old_name, new_name)
|
||||
Comments.user_name_reindex(old_name, new_name)
|
||||
Posts.user_name_reindex(old_name, new_name)
|
||||
Galleries.user_name_reindex(old_name, new_name)
|
||||
Reports.user_name_reindex(old_name, new_name)
|
||||
end
|
||||
|
||||
def reactivate_user(%User{} = user) do
|
||||
user
|
||||
|> User.reactivate_changeset()
|
||||
|
|
7
lib/philomena/workers/gallery_reorder_worker.ex
Normal file
7
lib/philomena/workers/gallery_reorder_worker.ex
Normal file
|
@ -0,0 +1,7 @@
|
|||
defmodule Philomena.GalleryReorderWorker do
|
||||
alias Philomena.Galleries
|
||||
|
||||
def perform(gallery_id, image_ids) do
|
||||
Galleries.perform_reorder(gallery_id, image_ids)
|
||||
end
|
||||
end
|
23
lib/philomena/workers/index_worker.ex
Normal file
23
lib/philomena/workers/index_worker.ex
Normal file
|
@ -0,0 +1,23 @@
|
|||
defmodule Philomena.IndexWorker do
|
||||
@modules %{
|
||||
"Comments" => Philomena.Comments,
|
||||
"Galleries" => Philomena.Galleries,
|
||||
"Images" => Philomena.Images,
|
||||
"Posts" => Philomena.Posts,
|
||||
"Reports" => Philomena.Reports,
|
||||
"Tags" => Philomena.Tags
|
||||
}
|
||||
|
||||
# Perform the queued index. Context function looks like the following:
|
||||
#
|
||||
# def perform_reindex(column, condition) do
|
||||
# Image
|
||||
# |> preload(^indexing_preloads())
|
||||
# |> where([i], field(i, ^column) in ^condition)
|
||||
# |> Elasticsearch.reindex(Image)
|
||||
# end
|
||||
#
|
||||
def perform(module, column, condition) do
|
||||
@modules[module].perform_reindex(String.to_existing_atom(column), condition)
|
||||
end
|
||||
end
|
13
lib/philomena/workers/notification_worker.ex
Normal file
13
lib/philomena/workers/notification_worker.ex
Normal file
|
@ -0,0 +1,13 @@
|
|||
defmodule Philomena.NotificationWorker do
|
||||
@modules %{
|
||||
"Comments" => Philomena.Comments,
|
||||
"Galleries" => Philomena.Galleries,
|
||||
"Images" => Philomena.Images,
|
||||
"Posts" => Philomena.Posts,
|
||||
"Topics" => Philomena.Topics
|
||||
}
|
||||
|
||||
def perform(module, args) do
|
||||
@modules[module].perform_notify(args)
|
||||
end
|
||||
end
|
7
lib/philomena/workers/tag_alias_worker.ex
Normal file
7
lib/philomena/workers/tag_alias_worker.ex
Normal file
|
@ -0,0 +1,7 @@
|
|||
defmodule Philomena.TagAliasWorker do
|
||||
alias Philomena.Tags
|
||||
|
||||
def perform(tag_id, target_tag_id) do
|
||||
Tags.perform_alias(tag_id, target_tag_id)
|
||||
end
|
||||
end
|
7
lib/philomena/workers/tag_delete_worker.ex
Normal file
7
lib/philomena/workers/tag_delete_worker.ex
Normal file
|
@ -0,0 +1,7 @@
|
|||
defmodule Philomena.TagDeleteWorker do
|
||||
alias Philomena.Tags
|
||||
|
||||
def perform(tag_id) do
|
||||
Tags.perform_delete(tag_id)
|
||||
end
|
||||
end
|
7
lib/philomena/workers/tag_reindex_worker.ex
Normal file
7
lib/philomena/workers/tag_reindex_worker.ex
Normal file
|
@ -0,0 +1,7 @@
|
|||
defmodule Philomena.TagReindexWorker do
|
||||
alias Philomena.Tags
|
||||
|
||||
def perform(tag_id) do
|
||||
Tags.perform_reindex_images(tag_id)
|
||||
end
|
||||
end
|
7
lib/philomena/workers/tag_unalias_worker.ex
Normal file
7
lib/philomena/workers/tag_unalias_worker.ex
Normal file
|
@ -0,0 +1,7 @@
|
|||
defmodule Philomena.TagUnaliasWorker do
|
||||
alias Philomena.Tags
|
||||
|
||||
def perform(tag_id) do
|
||||
Tags.perform_unalias(tag_id)
|
||||
end
|
||||
end
|
7
lib/philomena/workers/user_rename_worker.ex
Normal file
7
lib/philomena/workers/user_rename_worker.ex
Normal file
|
@ -0,0 +1,7 @@
|
|||
defmodule Philomena.UserRenameWorker do
|
||||
alias Philomena.Users
|
||||
|
||||
def perform(old_name, new_name) do
|
||||
Users.perform_rename(old_name, new_name)
|
||||
end
|
||||
end
|
7
lib/philomena/workers/user_unvote_worker.ex
Normal file
7
lib/philomena/workers/user_unvote_worker.ex
Normal file
|
@ -0,0 +1,7 @@
|
|||
defmodule Philomena.UserUnvoteWorker do
|
||||
alias Philomena.UserDownvoteWipe
|
||||
|
||||
def perform(user_id, votes_and_faves_too?) do
|
||||
UserDownvoteWipe.perform(user_id, votes_and_faves_too?)
|
||||
end
|
||||
end
|
7
lib/philomena/workers/user_wipe_worker.ex
Normal file
7
lib/philomena/workers/user_wipe_worker.ex
Normal file
|
@ -0,0 +1,7 @@
|
|||
defmodule Philomena.UserWipeWorker do
|
||||
alias Philomena.UserWipe
|
||||
|
||||
def perform(user_id) do
|
||||
UserWipe.perform(user_id)
|
||||
end
|
||||
end
|
|
@ -1,16 +1,14 @@
|
|||
defmodule PhilomenaWeb.Admin.User.DownvoteController do
|
||||
use PhilomenaWeb, :controller
|
||||
|
||||
alias Philomena.UserDownvoteWipe
|
||||
alias Philomena.UserUnvoteWorker
|
||||
alias Philomena.Users.User
|
||||
|
||||
plug :verify_authorized
|
||||
plug :load_resource, model: User, id_name: "user_id", id_field: "slug", persisted: true
|
||||
|
||||
def delete(conn, _params) do
|
||||
spawn(fn ->
|
||||
UserDownvoteWipe.perform(conn.assigns.user)
|
||||
end)
|
||||
Exq.enqueue(Exq, "indexing", UserUnvoteWorker, [conn.assigns.user.id, false])
|
||||
|
||||
conn
|
||||
|> put_flash(:info, "Downvote wipe started.")
|
||||
|
|
|
@ -1,16 +1,14 @@
|
|||
defmodule PhilomenaWeb.Admin.User.VoteController do
|
||||
use PhilomenaWeb, :controller
|
||||
|
||||
alias Philomena.UserDownvoteWipe
|
||||
alias Philomena.UserUnvoteWorker
|
||||
alias Philomena.Users.User
|
||||
|
||||
plug :verify_authorized
|
||||
plug :load_resource, model: User, id_name: "user_id", id_field: "slug", persisted: true
|
||||
|
||||
def delete(conn, _params) do
|
||||
spawn(fn ->
|
||||
UserDownvoteWipe.perform(conn.assigns.user, true)
|
||||
end)
|
||||
Exq.enqueue(Exq, "indexing", UserUnvoteWorker, [conn.assigns.user.id, true])
|
||||
|
||||
conn
|
||||
|> put_flash(:info, "Vote and fave wipe started.")
|
||||
|
|
|
@ -1,21 +1,19 @@
|
|||
defmodule PhilomenaWeb.Admin.User.WipeController do
|
||||
use PhilomenaWeb, :controller
|
||||
|
||||
alias Philomena.UserWipe
|
||||
alias Philomena.UserWipeWorker
|
||||
alias Philomena.Users.User
|
||||
|
||||
plug :verify_authorized
|
||||
plug :load_resource, model: User, id_name: "user_id", id_field: "slug", persisted: true
|
||||
|
||||
def create(conn, _params) do
|
||||
spawn(fn ->
|
||||
UserWipe.perform(conn.assigns.user)
|
||||
end)
|
||||
Exq.enqueue(Exq, "indexing", UserWipeWorker, [conn.assigns.user.id])
|
||||
|
||||
conn
|
||||
|> put_flash(
|
||||
:info,
|
||||
"PII wipe started, please verify and then deactivate the account as necessary."
|
||||
"PII wipe queued, please verify and then deactivate the account as necessary."
|
||||
)
|
||||
|> redirect(to: Routes.profile_path(conn, :show, conn.assigns.user))
|
||||
end
|
||||
|
|
|
@ -31,12 +31,10 @@ defmodule PhilomenaWeb.Tag.AliasController do
|
|||
end
|
||||
|
||||
def delete(conn, _params) do
|
||||
spawn(fn ->
|
||||
{:ok, _tag} = Tags.unalias_tag(conn.assigns.tag)
|
||||
end)
|
||||
{:ok, tag} = Tags.unalias_tag(conn.assigns.tag)
|
||||
|
||||
conn
|
||||
|> put_flash(:info, "Tag dealias queued.")
|
||||
|> redirect(to: Routes.tag_path(conn, :show, conn.assigns.tag))
|
||||
|> redirect(to: Routes.tag_path(conn, :show, tag))
|
||||
end
|
||||
end
|
||||
|
|
|
@ -14,12 +14,10 @@ defmodule PhilomenaWeb.Tag.ReindexController do
|
|||
persisted: true
|
||||
|
||||
def create(conn, _params) do
|
||||
spawn(fn ->
|
||||
Tags.reindex_tag_images(conn.assigns.tag)
|
||||
end)
|
||||
{:ok, tag} = Tags.reindex_tag_images(conn.assigns.tag)
|
||||
|
||||
conn
|
||||
|> put_flash(:info, "Tag reindex started.")
|
||||
|> redirect(to: Routes.tag_path(conn, :edit, conn.assigns.tag))
|
||||
|> redirect(to: Routes.tag_path(conn, :edit, tag))
|
||||
end
|
||||
end
|
||||
|
|
|
@ -107,12 +107,10 @@ defmodule PhilomenaWeb.TagController do
|
|||
end
|
||||
|
||||
def delete(conn, _params) do
|
||||
spawn(fn ->
|
||||
Tags.delete_tag(conn.assigns.tag)
|
||||
end)
|
||||
{:ok, _tag} = Tags.delete_tag(conn.assigns.tag)
|
||||
|
||||
conn
|
||||
|> put_flash(:info, "Tag scheduled for deletion.")
|
||||
|> put_flash(:info, "Tag queued for deletion.")
|
||||
|> redirect(to: "/")
|
||||
end
|
||||
|
||||
|
|
Loading…
Reference in a new issue