From 5b9bebc076c34c26577d2c26534c73b63a4a9bff Mon Sep 17 00:00:00 2001 From: Liam Date: Sun, 8 Dec 2024 21:11:41 -0500 Subject: [PATCH] Remove makefile indexer and improve Elixir-side indexing --- config/runtime.exs | 4 +- index/all.mk | 25 ----- index/comments.mk | 49 ---------- index/filters.mk | 47 ---------- index/galleries.mk | 45 --------- index/images.mk | 156 -------------------------------- index/posts.mk | 51 ----------- index/reports.mk | 51 ----------- index/tags.mk | 54 ----------- lib/mix/tasks/reindex_all.ex | 48 +--------- lib/philomena/search_indexer.ex | 134 +++++++++++++++++++++++++++ lib/philomena_query/search.ex | 30 +++--- 12 files changed, 156 insertions(+), 538 deletions(-) delete mode 100644 index/all.mk delete mode 100644 index/comments.mk delete mode 100644 index/filters.mk delete mode 100644 index/galleries.mk delete mode 100644 index/images.mk delete mode 100644 index/posts.mk delete mode 100644 index/reports.mk delete mode 100644 index/tags.mk create mode 100644 lib/philomena/search_indexer.ex diff --git a/config/runtime.exs b/config/runtime.exs index 5ebb59e9..e5c35a87 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -101,7 +101,9 @@ if config_env() != :test do url: System.fetch_env!("DATABASE_URL"), pool_size: String.to_integer(System.get_env("POOL_SIZE", "16")), timeout: 60_000, - ownership_timeout: 60_000 + ownership_timeout: 60_000, + queue_target: 20_000, + queue_interval: 20_000 end if config_env() == :prod do diff --git a/index/all.mk b/index/all.mk deleted file mode 100644 index cfa1b0c3..00000000 --- a/index/all.mk +++ /dev/null @@ -1,25 +0,0 @@ -all: comments galleries images posts reports tags filters - -comments: - $(MAKE) -f comments.mk - -galleries: - $(MAKE) -f galleries.mk - -images: - $(MAKE) -f images.mk - -posts: - $(MAKE) -f posts.mk - -reports: - $(MAKE) -f reports.mk - -tags: - $(MAKE) -f tags.mk - -filters: - $(MAKE) -f filters.mk - -clean: - rm -f ./*.jsonl diff --git a/index/comments.mk b/index/comments.mk deleted file mode 100644 index 9c7403da..00000000 --- a/index/comments.mk +++ /dev/null @@ -1,49 +0,0 @@ -DATABASE ?= philomena -OPENSEARCH_URL ?= http://localhost:9200/ -ELASTICDUMP ?= elasticdump -.ONESHELL: - -all: import_es - -import_es: dump_jsonl - $(ELASTICDUMP) --input=comments.jsonl --output=$OPENSEARCH_URL --output-index=comments --limit 10000 --retryAttempts=5 --type=data --transform="doc._source = Object.assign({},doc); doc._id = doc.id" - -dump_jsonl: metadata authors tags - psql $(DATABASE) -v ON_ERROR_STOP=1 -c 'copy (select temp_comments.jsonb_object_agg(object) from temp_comments.comment_search_json group by comment_id) to stdout;' > comments.jsonl - psql $(DATABASE) -v ON_ERROR_STOP=1 -c 'drop schema temp_comments cascade;' - sed -i comments.jsonl -e 's/\\\\/\\/g' - -metadata: comment_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_comments.comment_search_json (comment_id, object) select c.id, jsonb_build_object( - 'id', c.id, - 'posted_at', c.created_at, - 'ip', c.ip, - 'fingerprint', c.fingerprint, - 'image_id', c.image_id, - 'user_id', c.user_id, - 'anonymous', c.anonymous, - 'body', c.body, - 'hidden_from_users', (c.hidden_from_users or i.hidden_from_users) - ) from comments c inner join images i on c.image_id=i.id; - SQL - -authors: comment_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_comments.comment_search_json (comment_id, object) select c.id, jsonb_build_object('author', (case when c.anonymous='t' then null else u.name end)) from comments c left join users u on c.user_id=u.id; - SQL - -tags: comment_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - create unlogged table temp_comments.image_tags (image_id bigint not null, tags jsonb not null); - insert into temp_comments.image_tags (image_id, tags) select it.image_id, jsonb_agg(it.tag_id) from image_taggings it group by it.image_id; - insert into temp_comments.comment_search_json (comment_id, object) select c.id, jsonb_build_object('image_tag_ids', it.tags) from comments c inner join temp_comments.image_tags it on c.image_id=it.image_id; - SQL - -comment_search_json: - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - drop schema if exists temp_comments cascade; - create schema temp_comments; - create unlogged table temp_comments.comment_search_json (comment_id bigint not null, object jsonb not null); - create or replace aggregate temp_comments.jsonb_object_agg(jsonb) (sfunc = 'jsonb_concat', stype = jsonb, initcond='{}'); - SQL diff --git a/index/filters.mk b/index/filters.mk deleted file mode 100644 index 93d260cb..00000000 --- a/index/filters.mk +++ /dev/null @@ -1,47 +0,0 @@ -DATABASE ?= philomena -OPENSEARCH_URL ?= http://localhost:9200/ -ELASTICDUMP ?= elasticdump -.ONESHELL: - -all: import_es - -import_es: dump_jsonl - $(ELASTICDUMP) --input=filters.jsonl --output=$OPENSEARCH_URL --output-index=filters --limit 10000 --retryAttempts=5 --type=data --transform="doc._source = Object.assign({},doc); doc._id = doc.id" - -dump_jsonl: metadata creators - psql $(DATABASE) -v ON_ERROR_STOP=1 -c 'copy (select temp_filters.jsonb_object_agg(object) from temp_filters.filter_search_json group by filter_id) to stdout;' > filters.jsonl - psql $(DATABASE) -v ON_ERROR_STOP=1 -c 'drop schema temp_filters cascade;' - sed -i filters.jsonl -e 's/\\\\/\\/g' - -metadata: filter_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_filters.filter_search_json (filter_id, object) select f.id, jsonb_build_object( - 'id', f.id, - 'created_at', f.created_at, - 'user_id', f.user_id, - 'public', f.public or f.system, - 'system', f.system, - 'name', lower(f.name), - 'description', f.description, - 'spoilered_count', array_length(f.spoilered_tag_ids, 1), - 'hidden_count', array_length(f.hidden_tag_ids, 1), - 'spoilered_tag_ids', f.spoilered_tag_ids, - 'hidden_tag_ids', f.hidden_tag_ids, - 'spoilered_complex_str', lower(f.spoilered_complex_str), - 'hidden_complex_str', lower(f.hidden_complex_str), - 'user_count', f.user_count - ) from filters f; - SQL - -creators: filter_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_filters.filter_search_json (filter_id, object) select f.id, jsonb_build_object('creator', lower(u.name)) from filters f left join users u on f.user_id=u.id; - SQL - -filter_search_json: - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - drop schema if exists temp_filters cascade; - create schema temp_filters; - create unlogged table temp_filters.filter_search_json (filter_id bigint not null, object jsonb not null); - create or replace aggregate temp_filters.jsonb_object_agg(jsonb) (sfunc = 'jsonb_concat', stype = jsonb, initcond='{}'); - SQL diff --git a/index/galleries.mk b/index/galleries.mk deleted file mode 100644 index 0243b7e5..00000000 --- a/index/galleries.mk +++ /dev/null @@ -1,45 +0,0 @@ -DATABASE ?= philomena -OPENSEARCH_URL ?= http://localhost:9200/ -ELASTICDUMP ?= elasticdump -.ONESHELL: - -all: import_es - -import_es: dump_jsonl - $(ELASTICDUMP) --input=galleries.jsonl --output=$OPENSEARCH_URL --output-index=galleries --limit 10000 --retryAttempts=5 --type=data --transform="doc._source = Object.assign({},doc); doc._id = doc.id" - -dump_jsonl: metadata subscribers images - psql $(DATABASE) -v ON_ERROR_STOP=1 -c 'copy (select temp_galleries.jsonb_object_agg(object) from temp_galleries.gallery_search_json group by gallery_id) to stdout;' > galleries.jsonl - psql $(DATABASE) -v ON_ERROR_STOP=1 -c 'drop schema temp_galleries cascade;' - sed -i galleries.jsonl -e 's/\\\\/\\/g' - -metadata: gallery_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_galleries.gallery_search_json (gallery_id, object) select g.id, jsonb_build_object( - 'id', g.id, - 'image_count', g.image_count, - 'updated_at', g.updated_at, - 'created_at', g.created_at, - 'title', lower(g.title), - 'creator', lower(u.name), - 'description', g.description - ) from galleries g left join users u on g.creator_id=u.id; - SQL - -subscribers: gallery_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_galleries.gallery_search_json (gallery_id, object) select gallery_id, json_build_object('watcher_ids', jsonb_agg(user_id), 'watcher_count', count(*)) from gallery_subscriptions group by gallery_id; - SQL - -images: gallery_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_galleries.gallery_search_json (gallery_id, object) select gallery_id, json_build_object('image_ids', jsonb_agg(image_id)) from gallery_interactions group by gallery_id; - SQL - -gallery_search_json: - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - drop schema if exists temp_galleries cascade; - create schema temp_galleries; - create unlogged table temp_galleries.gallery_search_json (gallery_id bigint not null, object jsonb not null); - create or replace aggregate temp_galleries.jsonb_object_agg(jsonb) (sfunc = 'jsonb_concat', stype = jsonb, initcond='{}'); - SQL diff --git a/index/images.mk b/index/images.mk deleted file mode 100644 index a96f446a..00000000 --- a/index/images.mk +++ /dev/null @@ -1,156 +0,0 @@ -DATABASE ?= philomena -OPENSEARCH_URL ?= http://localhost:9200/ -ELASTICDUMP ?= elasticdump -.ONESHELL: - -all: import_es - -import_es: dump_jsonl - $(ELASTICDUMP) --input=images.jsonl --output=$OPENSEARCH_URL --output-index=images --limit 10000 --retryAttempts=5 --type=data --transform="doc._source = Object.assign({},doc); doc._id = doc.id" - -dump_jsonl: metadata true_uploaders uploaders deleters galleries tags sources hides upvotes downvotes faves tag_names - psql $(DATABASE) -v ON_ERROR_STOP=1 -c 'copy (select temp_images.jsonb_object_agg(object) from temp_images.image_search_json group by image_id) to stdout;' > images.jsonl - psql $(DATABASE) -v ON_ERROR_STOP=1 -c 'drop schema temp_images cascade;' - sed -i images.jsonl -e 's/\\\\/\\/g' - -metadata: image_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_images.image_search_json (image_id, object) select id, jsonb_build_object( - 'approved', approved, - 'animated', is_animated, - 'anonymous', anonymous, - 'aspect_ratio', nullif(image_aspect_ratio, 'NaN'::float8), - 'comment_count', comments_count, - 'created_at', created_at, - 'deletion_reason', deletion_reason, - 'description', description, - 'downvotes', downvotes_count, - 'duplicate_id', duplicate_id, - 'duration', (case when is_animated then image_duration else 0::float end), - 'faves', faves_count, - 'file_name', image_name, - 'fingerprint', fingerprint, - 'first_seen_at', first_seen_at, - 'height', image_height, - 'hidden_from_users', hidden_from_users, - 'id', id, - 'ip', ip, - 'mime_type', image_mime_type, - 'orig_sha512_hash', image_orig_sha512_hash, - 'original_format', image_format, - 'pixels', cast(image_width as bigint)*cast(image_height as bigint), - 'processed', processed, - 'score', score, - 'size', image_size, - 'orig_size', image_orig_size, - 'sha512_hash', image_sha512_hash, - 'thumbnails_generated', thumbnails_generated, - 'updated_at', updated_at, - 'upvotes', upvotes_count, - 'width', image_width, - 'wilson_score', temp_images.wilson_995(upvotes_count, downvotes_count) - ) from images; - SQL - -true_uploaders: image_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_images.image_search_json (image_id, object) select i.id, jsonb_build_object('true_uploader_id', u.id, 'true_uploader', u.name) from images i left join users u on u.id = i.user_id; - SQL - -uploaders: image_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_images.image_search_json (image_id, object) select i.id, jsonb_build_object('uploader_id', (case when i.anonymous = 't' then null else u.id end), 'uploader', (case when i.anonymous = 't' then null else lower(u.name) end)) from images i left join users u on u.id = i.user_id; - SQL - -deleters: image_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_images.image_search_json (image_id, object) select i.id, jsonb_build_object('deleted_by_user_id', u.id, 'deleted_by_user', lower(u.name)) from images i left join users u on u.id = i.deleted_by_id; - SQL - -galleries: image_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_images.image_search_json (image_id, object) select gi.image_id, jsonb_build_object('gallery_interactions', jsonb_agg(jsonb_build_object('id', gi.gallery_id, 'position', gi.position))) from gallery_interactions gi group by image_id; - SQL - -tags: image_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_images.image_search_json (image_id, object) select it.image_id, jsonb_build_object( - 'tag_ids', jsonb_agg(it.tag_id), - 'tag_count', count(*), - 'error_tag_count', count(case when t.category = 'error' then t.category else null end), - 'rating_tag_count', count(case when t.category = 'rating' then t.category else null end), - 'origin_tag_count', count(case when t.category = 'origin' then t.category else null end), - 'character_tag_count', count(case when t.category = 'character' then t.category else null end), - 'oc_tag_count', count(case when t.category = 'oc' then t.category else null end), - 'species_tag_count', count(case when t.category = 'species' then t.category else null end), - 'body_type_tag_count', count(case when t.category = 'body-type' then t.category else null end), - 'content_fanmade_tag_count', count(case when t.category = 'content-fanmade' then t.category else null end), - 'content_official_tag_count', count(case when t.category = 'content-official' then t.category else null end), - 'spoiler_tag_count', count(case when t.category = 'spoiler' then t.category else null end) - ) from image_taggings it inner join tags t on t.id = it.tag_id group by image_id; - SQL - -sources: image_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_images.image_search_json (image_id, object) select s.image_id, jsonb_build_object('source_url', jsonb_agg(lower(s.source)), 'source_count', count(*)) from image_sources s group by image_id; - SQL - -hides: image_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_images.image_search_json (image_id, object) select ih.image_id, jsonb_build_object('hidden_by_user_ids', jsonb_agg(ih.user_id), 'hidden_by_users', jsonb_agg(lower(u.name))) from image_hides ih inner join users u on u.id = ih.user_id group by image_id; - SQL - -downvotes: image_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_images.image_search_json (image_id, object) select iv.image_id, jsonb_build_object('downvoter_ids', jsonb_agg(iv.user_id), 'downvoters', jsonb_agg(lower(u.name))) from image_votes iv inner join users u on u.id = iv.user_id where iv.up = false group by image_id; - SQL - -upvotes: image_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_images.image_search_json (image_id, object) select iv.image_id, jsonb_build_object('upvoter_ids', jsonb_agg(iv.user_id), 'upvoters', jsonb_agg(lower(u.name))) from image_votes iv inner join users u on u.id = iv.user_id where iv.up = true group by image_id; - SQL - -faves: image_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_images.image_search_json (image_id, object) select if.image_id, jsonb_build_object('favourited_by_user_ids', jsonb_agg(if.user_id), 'favourited_by_users', jsonb_agg(lower(u.name))) from image_faves if inner join users u on u.id = if.user_id group by image_id; - SQL - -tag_names: tags_with_aliases - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_images.image_search_json (image_id, object) select image_id, jsonb_build_object('namespaced_tags', jsonb_build_object('name', jsonb_agg(lower(tag_name)))) from temp_images.tags_with_aliases group by image_id; - SQL - -tags_with_aliases: image_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - create unlogged table if not exists temp_images.tags_with_aliases (image_id bigint not null, tag_name text not null); - truncate temp_images.tags_with_aliases; - insert into temp_images.tags_with_aliases (image_id, tag_name) select it.image_id, t.name from image_taggings it inner join tags t on t.id = it.tag_id; - insert into temp_images.tags_with_aliases (image_id, tag_name) select it.image_id, t.name from image_taggings it left outer join tags t on t.aliased_tag_id = it.tag_id where t.name is not null; - SQL - -image_search_json: - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - drop schema if exists temp_images cascade; - create schema temp_images; - create unlogged table temp_images.image_search_json (image_id bigint not null, object jsonb not null); - create function temp_images.wilson_995(succ bigint, fail bigint) returns double precision as ' - declare - n double precision; - p_hat double precision; - z double precision; - z2 double precision; - begin - if succ <= 0 then - return 0; - end if; - - n := succ + fail; - p_hat := succ / n; - z := 2.57583; - z2 := 6.634900189; - - return (p_hat + z2 / (2 * n) - z * sqrt((p_hat * (1 - p_hat) + z2 / (4 * n)) / n)) / (1 + z2 / n); - end - ' language plpgsql; - create aggregate temp_images.jsonb_object_agg(jsonb) (sfunc = 'jsonb_concat', stype = jsonb, initcond='{}'); - SQL diff --git a/index/posts.mk b/index/posts.mk deleted file mode 100644 index 8324d633..00000000 --- a/index/posts.mk +++ /dev/null @@ -1,51 +0,0 @@ -DATABASE ?= philomena -OPENSEARCH_URL ?= http://localhost:9200/ -ELASTICDUMP ?= elasticdump -.ONESHELL: - -all: import_es - -import_es: dump_jsonl - $(ELASTICDUMP) --input=posts.jsonl --output=$OPENSEARCH_URL --output-index=posts --limit 10000 --retryAttempts=5 --type=data --transform="doc._source = Object.assign({},doc); doc._id = doc.id" - -dump_jsonl: metadata authors - psql $(DATABASE) -v ON_ERROR_STOP=1 -c 'copy (select temp_posts.jsonb_object_agg(object) from temp_posts.post_search_json group by post_id) to stdout;' > posts.jsonl - psql $(DATABASE) -v ON_ERROR_STOP=1 -c 'drop schema temp_posts cascade;' - sed -i posts.jsonl -e 's/\\\\/\\/g' - -metadata: post_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_posts.post_search_json (post_id, object) select p.id, jsonb_build_object( - 'id', p.id, - 'topic_id', p.topic_id, - 'body', p.body, - 'subject', t.title, - 'ip', p.ip, - 'user_agent', '', - 'referrer', '', - 'fingerprint', p.fingerprint, - 'topic_position', p.topic_position, - 'forum', f.short_name, - 'forum_id', t.forum_id, - 'user_id', p.user_id, - 'anonymous', p.anonymous, - 'created_at', p.created_at, - 'updated_at', p.updated_at, - 'deleted', p.hidden_from_users, - 'destroyed_content', p.destroyed_content, - 'access_level', f.access_level - ) from posts p inner join topics t on t.id=p.topic_id inner join forums f on f.id=t.forum_id; - SQL - -authors: post_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_posts.post_search_json (post_id, object) select p.id, jsonb_build_object('author', (case when p.anonymous='t' then null else u.name end)) from posts p left join users u on p.user_id=u.id; - SQL - -post_search_json: - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - drop schema if exists temp_posts cascade; - create schema temp_posts; - create unlogged table temp_posts.post_search_json (post_id bigint not null, object jsonb not null); - create or replace aggregate temp_posts.jsonb_object_agg(jsonb) (sfunc = 'jsonb_concat', stype = jsonb, initcond='{}'); - SQL diff --git a/index/reports.mk b/index/reports.mk deleted file mode 100644 index 21b5189f..00000000 --- a/index/reports.mk +++ /dev/null @@ -1,51 +0,0 @@ -DATABASE ?= philomena -OPENSEARCH_URL ?= http://localhost:9200/ -ELASTICDUMP ?= elasticdump -.ONESHELL: - -all: import_es - -import_es: dump_jsonl - $(ELASTICDUMP) --input=reports.jsonl --output=$OPENSEARCH_URL --output-index=reports --limit 10000 --retryAttempts=5 --type=data --transform="doc._source = Object.assign({},doc); doc._id = doc.id" - -dump_jsonl: metadata image_ids comment_image_ids - psql $(DATABASE) -v ON_ERROR_STOP=1 -c 'copy (select temp_reports.jsonb_object_agg(object) from temp_reports.report_search_json group by report_id) to stdout;' > reports.jsonl - psql $(DATABASE) -v ON_ERROR_STOP=1 -c 'drop schema temp_reports cascade;' - sed -i reports.jsonl -e 's/\\\\/\\/g' - -metadata: report_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_reports.report_search_json (report_id, object) select r.id, jsonb_build_object( - 'id', r.id, - 'created_at', r.created_at, - 'ip', r.ip, - 'state', r.state, - 'user', lower(u.name), - 'user_id', r.user_id, - 'admin', lower(a.name), - 'admin_id', r.admin_id, - 'reportable_type', r.reportable_type, - 'reportable_id', r.reportable_id, - 'fingerprint', r.fingerprint, - 'open', r.open, - 'reason', r.reason - ) from reports r left join users u on r.user_id=u.id left join users a on r.admin_id=a.id; - SQL - -image_ids: report_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_reports.report_search_json (report_id, object) select r.id, jsonb_build_object('image_id', r.reportable_id) from reports r where r.reportable_type = 'Image'; - SQL - -comment_image_ids: report_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_reports.report_search_json (report_id, object) select r.id, jsonb_build_object('image_id', c.image_id) from reports r inner join comments c on c.id = r.reportable_id where r.reportable_type = 'Comment'; - SQL - -report_search_json: - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - drop schema if exists temp_reports cascade; - create schema temp_reports; - create unlogged table temp_reports.report_search_json (report_id bigint not null, object jsonb not null); - create or replace aggregate temp_reports.jsonb_object_agg(jsonb) (sfunc = 'jsonb_concat', stype = jsonb, initcond='{}'); - SQL diff --git a/index/tags.mk b/index/tags.mk deleted file mode 100644 index 49362f03..00000000 --- a/index/tags.mk +++ /dev/null @@ -1,54 +0,0 @@ -DATABASE ?= philomena -OPENSEARCH_URL ?= http://localhost:9200/ -ELASTICDUMP ?= elasticdump -.ONESHELL: - -all: import_es - -import_es: dump_jsonl - $(ELASTICDUMP) --input=tags.jsonl --output=$OPENSEARCH_URL --output-index=tags --limit 10000 --retryAttempts=5 --type=data --transform="doc._source = Object.assign({},doc); doc._id = doc.id" - -dump_jsonl: metadata aliases implied_tags implied_by_tags - psql $(DATABASE) -v ON_ERROR_STOP=1 -c 'copy (select temp_tags.jsonb_object_agg(object) from temp_tags.tag_search_json group by tag_id) to stdout;' > tags.jsonl - psql $(DATABASE) -v ON_ERROR_STOP=1 -c 'drop schema temp_tags cascade;' - sed -i tags.jsonl -e 's/\\\\/\\/g' - -metadata: tag_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_tags.tag_search_json (tag_id, object) select t.id, jsonb_build_object( - 'id', t.id, - 'slug', t.slug, - 'name', t.name, - 'name_in_namespace', t.name_in_namespace, - 'namespace', t.namespace, - 'analyzed_name', t.name, - 'aliased_tag', at.name, - 'category', t.category, - 'aliased', (t.aliased_tag_id is not null), - 'description', t.description, - 'short_description', t.short_description - ) from tags t left join tags at on t.aliased_tag_id=at.id; - SQL - -aliases: tag_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_tags.tag_search_json (tag_id, object) select t.aliased_tag_id, jsonb_build_object('aliases', jsonb_agg(t.name)) from tags t inner join tags at on t.aliased_tag_id=t.id group by t.aliased_tag_id; - SQL - -implied_tags: tag_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_tags.tag_search_json (tag_id, object) select it.tag_id, jsonb_build_object('implied_tag_ids', jsonb_agg(it.implied_tag_id), 'implied_tags', jsonb_agg(t.name)) from tags_implied_tags it inner join tags t on t.id=it.implied_tag_id group by it.tag_id; - SQL - -implied_by_tags: tag_search_json - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - insert into temp_tags.tag_search_json (tag_id, object) select it.implied_tag_id, jsonb_build_object('implied_by_tags', jsonb_agg(t.name)) from tags_implied_tags it inner join tags t on t.id=it.tag_id group by it.implied_tag_id; - SQL - -tag_search_json: - psql $(DATABASE) -v ON_ERROR_STOP=1 <<-SQL - drop schema if exists temp_tags cascade; - create schema temp_tags; - create unlogged table temp_tags.tag_search_json (tag_id bigint not null, object jsonb not null); - create or replace aggregate temp_tags.jsonb_object_agg(jsonb) (sfunc = 'jsonb_concat', stype = jsonb, initcond='{}'); - SQL diff --git a/lib/mix/tasks/reindex_all.ex b/lib/mix/tasks/reindex_all.ex index c1f24114..b1af4887 100644 --- a/lib/mix/tasks/reindex_all.ex +++ b/lib/mix/tasks/reindex_all.ex @@ -1,31 +1,7 @@ defmodule Mix.Tasks.ReindexAll do use Mix.Task - alias PhilomenaQuery.Search - - alias Philomena.{ - Comments.Comment, - Galleries.Gallery, - Posts.Post, - Images.Image, - Reports.Report, - Tags.Tag, - Filters.Filter - } - - alias Philomena.{Comments, Galleries, Posts, Images, Tags, Filters} - alias Philomena.Polymorphic - alias Philomena.Repo - import Ecto.Query - - @indices [ - {Images, Image}, - {Comments, Comment}, - {Galleries, Gallery}, - {Tags, Tag}, - {Posts, Post}, - {Filters, Filter} - ] + alias Philomena.SearchIndexer @shortdoc "Destroys and recreates all OpenSearch indices." @requirements ["app.start"] @@ -35,26 +11,6 @@ defmodule Mix.Tasks.ReindexAll do raise "do not run this task unless you know what you're doing" end - @indices - |> Enum.map(fn {context, schema} -> - Task.async(fn -> - Search.delete_index!(schema) - Search.create_index!(schema) - - Search.reindex(preload(schema, ^context.indexing_preloads()), schema) - end) - end) - |> Task.await_many(:infinity) - - # Reports are a bit special - - Search.delete_index!(Report) - Search.create_index!(Report) - - Report - |> preload([:user, :admin]) - |> Repo.all() - |> Polymorphic.load_polymorphic(reportable: [reportable_id: :reportable_type]) - |> Enum.map(&Search.index_document(&1, Report)) + SearchIndexer.recreate_reindex_all_destructive!() end end diff --git a/lib/philomena/search_indexer.ex b/lib/philomena/search_indexer.ex new file mode 100644 index 00000000..0e0a0c83 --- /dev/null +++ b/lib/philomena/search_indexer.ex @@ -0,0 +1,134 @@ +defmodule Philomena.SearchIndexer do + alias PhilomenaQuery.Batch + alias PhilomenaQuery.Search + + alias Philomena.Comments + alias Philomena.Comments.Comment + alias Philomena.Filters + alias Philomena.Filters.Filter + alias Philomena.Galleries + alias Philomena.Galleries.Gallery + alias Philomena.Images + alias Philomena.Images.Image + alias Philomena.Posts + alias Philomena.Posts.Post + alias Philomena.Reports + alias Philomena.Reports.Report + alias Philomena.Tags + alias Philomena.Tags.Tag + + alias Philomena.Polymorphic + import Ecto.Query + + @schemas [ + Comment, + Filter, + Gallery, + Image, + Post, + Report, + Tag + ] + + @contexts %{ + Comment => Comments, + Filter => Filters, + Gallery => Galleries, + Image => Images, + Post => Posts, + Report => Reports, + Tag => Tags + } + + @doc """ + Recreate the index corresponding to all schemas, and then reindex all of the + documents within. + + ## Example + + iex> SearchIndexer.recreate_reindex_all_destructive!() + :ok + + """ + @spec recreate_reindex_all_destructive! :: :ok + def recreate_reindex_all_destructive! do + @schemas + |> Task.async_stream( + &recreate_reindex_schema_destructive!/1, + ordered: false, + timeout: :infinity + ) + |> Stream.run() + end + + @doc """ + Recreate the index corresponding to a schema, and then reindex all of the + documents within the schema. + + ## Example + + iex> SearchIndexer.recreate_reindex_schema_destructive!(Report) + :ok + + """ + @spec recreate_reindex_schema_destructive!(schema :: module()) :: :ok + def recreate_reindex_schema_destructive!(schema) when schema in @schemas do + Search.delete_index!(schema) + Search.create_index!(schema) + + reindex_schema(schema) + end + + @doc """ + Reindex all of the documents within all schemas. + + ## Example + + iex> SearchIndexer.reindex_all() + :ok + + """ + @spec reindex_all :: :ok + def reindex_all do + @schemas + |> Task.async_stream( + &reindex_schema/1, + ordered: false, + timeout: :infinity + ) + |> Stream.run() + end + + @doc """ + Reindex all of the documents within a single schema. + + ## Example + + iex> SearchIndexer.reindex_schema(Report) + :ok + + """ + @spec reindex_schema(schema :: module()) :: :ok + def reindex_schema(schema) + + def reindex_schema(Report) do + # Reports currently require handling for their polymorphic nature + Report + |> preload([:user, :admin]) + |> Batch.record_batches() + |> Enum.each(fn records -> + records + |> Polymorphic.load_polymorphic(reportable: [reportable_id: :reportable_type]) + |> Enum.map(&Search.index_document(&1, Report)) + end) + end + + def reindex_schema(schema) when schema in @schemas do + # Normal schemas can simply be reindexed with indexing_preloads + context = Map.fetch!(@contexts, schema) + + schema + |> preload(^context.indexing_preloads()) + |> Search.reindex(schema) + end +end diff --git a/lib/philomena_query/search.ex b/lib/philomena_query/search.ex index cd02137c..cb249d5e 100644 --- a/lib/philomena_query/search.ex +++ b/lib/philomena_query/search.ex @@ -203,21 +203,25 @@ defmodule PhilomenaQuery.Search do def reindex(queryable, module, opts \\ []) do index = @policy.index_for(module) + process = + fn records -> + lines = + Enum.flat_map(records, fn record -> + doc = index.as_json(record) + + [ + %{index: %{_index: index.index_name(), _id: doc.id}}, + doc + ] + end) + + Api.bulk(@policy.opensearch_url(), lines) + end + queryable |> Batch.record_batches(opts) - |> Enum.each(fn records -> - lines = - Enum.flat_map(records, fn record -> - doc = index.as_json(record) - - [ - %{index: %{_index: index.index_name(), _id: doc.id}}, - doc - ] - end) - - Api.bulk(@policy.opensearch_url(), lines) - end) + |> Task.async_stream(process, ordered: false, timeout: :infinity) + |> Stream.run() end @doc ~S"""