diff --git a/apps/merge-media-sources/.idea/.gitignore b/apps/merge-media-sources/.idea/.gitignore new file mode 100644 index 0000000000..13566b81b0 --- /dev/null +++ b/apps/merge-media-sources/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/apps/merge-media-sources/.idea/inspectionProfiles/profiles_settings.xml b/apps/merge-media-sources/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000000..105ce2da2d --- /dev/null +++ b/apps/merge-media-sources/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/apps/merge-media-sources/.idea/merge-media-sources.iml b/apps/merge-media-sources/.idea/merge-media-sources.iml new file mode 100644 index 0000000000..5fdd65ba2a --- /dev/null +++ b/apps/merge-media-sources/.idea/merge-media-sources.iml @@ -0,0 +1,15 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/apps/merge-media-sources/.idea/misc.xml b/apps/merge-media-sources/.idea/misc.xml new file mode 100644 index 0000000000..927f93c5cb --- /dev/null +++ b/apps/merge-media-sources/.idea/misc.xml @@ -0,0 +1,4 @@ + + + + \ No newline at end of file diff --git a/apps/merge-media-sources/.idea/modules.xml b/apps/merge-media-sources/.idea/modules.xml new file mode 100644 index 0000000000..63b5104821 --- /dev/null +++ b/apps/merge-media-sources/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/apps/merge-media-sources/.idea/sqlDataSources.xml b/apps/merge-media-sources/.idea/sqlDataSources.xml new file mode 100644 index 0000000000..90ffac82a3 --- /dev/null +++ b/apps/merge-media-sources/.idea/sqlDataSources.xml @@ -0,0 +1,23 @@ + + + + + + \ No newline at end of file diff --git a/apps/merge-media-sources/.idea/sqldialects.xml b/apps/merge-media-sources/.idea/sqldialects.xml new file mode 100644 index 0000000000..533abce227 --- /dev/null +++ b/apps/merge-media-sources/.idea/sqldialects.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/apps/merge-media-sources/.idea/vcs.xml b/apps/merge-media-sources/.idea/vcs.xml new file mode 100644 index 0000000000..a4647a1c0e --- /dev/null +++ b/apps/merge-media-sources/.idea/vcs.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/apps/merge-media-sources/Dockerfile b/apps/merge-media-sources/Dockerfile new file mode 100644 index 0000000000..426000071d --- /dev/null +++ b/apps/merge-media-sources/Dockerfile @@ -0,0 +1,21 @@ +# +# Identify duplicate feeds or media sources and merge them into one entry, updating +# the feed and media source references to point to the new entry. +# + +FROM gcr.io/mcback/common:latest + +# Copy sources +COPY src/ /opt/mediacloud/src/merge-media-sources/ +ENV PERL5LIB="/opt/mediacloud/src/merge-media-sources/perl:${PERL5LIB}" \ + PYTHONPATH="/opt/mediacloud/src/merge-media-sources/python:${PYTHONPATH}" + +# Copy worker scripts +COPY bin /opt/mediacloud/bin + +USER mediacloud + +# Set a failing CMD because we'll be using the same image to run feeds merge + media merge, +# so the user is expected to set "command" in docker-compose.yml to run a specific worker. + +CMD ["SET_CONTAINER_COMMAND_TO_ONE_OF_THE_WORKERS"] diff --git a/apps/merge-media-sources/README.md b/apps/merge-media-sources/README.md new file mode 100644 index 0000000000..c09329192a --- /dev/null +++ b/apps/merge-media-sources/README.md @@ -0,0 +1,11 @@ +# Merging media sources + +## TODO + +* Create sample database with fake data +* Test running the same activity multiple times +* If an activity throws an exception, its message should get printed out to the console as well (in addition to + Temporal's log) +* Track failed workflows / activities in Munin +* Instead (in addition to) of setting `workflow_run_timeout` in `test_workflow.py`, limit retries of the individual + activities too so that when they fail, we'd get a nice error message printed to the test log diff --git a/apps/merge-media-sources/bin/feeds_to_merge.csv b/apps/merge-media-sources/bin/feeds_to_merge.csv new file mode 100644 index 0000000000..aeaa28d413 --- /dev/null +++ b/apps/merge-media-sources/bin/feeds_to_merge.csv @@ -0,0 +1,480 @@ +feed_id,parent_feed_id +18541,347569 +55406,865683 +58299,58354 +105195,1980261 +107128,176026 +107300,357899 +109082,118879 +109195,113506 +109925,107729 +110990,286407 +111237,43458 +112900,123734 +113085,153952 +113606,286407 +114041,877303 +114248,6697 +115643,106985 +117790,113435 +117860,118879 +118929,109971 +119899,6697 +120158,124153 +120373,173701 +120806,113309 +120939,109641 +123589,113056 +123619,107998 +123770,424573 +130033,81595 +130587,585863 +130626,873639 +132132,528155 +174779,348081 +174855,313933 +175080,347400 +175198,17986 +176024,107126 +176596,880526 +176876,504717 +177002,400534 +240986,379546 +314789,123443 +315209,315436 +315360,504145 +315469,315436 +348199,348190 +348293,177238 +348925,893363 +349026,348966 +349042,410833 +355629,176769 +355664,176769 +356679,356962 +358669,883977 +361018,400406 +361829,89720 +362786,810172 +362787,804684 +364984,69673 +366067,6697 +366191,429047 +367188,254373 +368816,379546 +369101,362079 +369718,379546 +374031,429320 +374396,374994 +374851,113056 +376910,857959 +380136,504100 +383036,402668 +383074,419887 +387001,118650 +392929,406284 +393012,414974 +393100,379546 +393616,520585 +393898,358638 +395809,514941 +396085,109948 +396182,396290 +399323,28546 +404955,429623 +406046,174650 +406303,502596 +416090,441469 +417558,406284 +423295,418169 +425512,535154 +427290,78177 +429637,880365 +438914,609853 +439628,905644 +462818,406284 +470172,494956 +502769,398131 +503201,1455534 +503698,2328935 +504133,448117 +504540,405795 +507787,609853 +509358,509376 +509366,509376 +514870,514869 +514925,514924 +535334,867266 +540465,442211 +553866,884015 +553867,427071 +558097,367781 +577416,842077 +583719,1771240 +626989,515582 +635639,525610 +695537,746931 +695757,903083 +695763,804555 +695899,811445 +696002,118650 +696170,811339 +725070,37 +738460,781361 +744967,710706 +782181,782180 +782218,782362 +782329,782362 +810326,132461 +842185,1516409 +848641,538809 +855639,725111 +855851,1201871 +860400,860787 +861032,861033 +861896,866211 +865089,348081 +865977,535154 +867228,866211 +867248,865733 +869831,1209649 +873677,1753176 +877135,439660 +882235,855434 +884056,403004 +884331,884415 +884711,884882 +886641,886642 +892835,892930 +895089,894937 +895191,894937 +895437,1201871 +895461,896504 +895806,895850 +895846,895850 +896482,895439 +898442,898479 +898471,898434 +898617,889774 +898623,889774 +898719,889774 +898928,889774 +899334,898954 +900986,1535268 +906798,583735 +912257,881149 +932480,875688 +946914,695690 +960028,725558 +961478,961477 +979456,979457 +1030511,1032213 +1031490,1015443 +1036937,881079 +1040504,1040496 +1040517,1040496 +1040705,1634140 +1040742,1634140 +1041374,1654294 +1043439,502594 +1055036,1055048 +1055044,1055048 +1055816,904106 +1056090,897261 +1061616,1061613 +1081063,695937 +1094176,970915 +1114943,1114932 +1115364,78114 +1139908,503397 +1151957,373697 +1153916,946645 +1162565,1452649 +1166245,1166343 +1166342,1166244 +1170658,1166244 +1171238,1166343 +1172666,78454 +1172836,172182 +1173492,1166244 +1173518,1650515 +1173551,1092182 +1173692,1455039 +1174165,1166244 +1175496,1175497 +1177289,1166343 +1177363,1166343 +1177625,1166343 +1177751,1173973 +1178658,129982 +1178861,78454 +1179046,1166343 +1179053,1166244 +1179067,78454 +1179268,7567 +1179353,1189129 +1180137,1182564 +1180272,1173867 +1180679,78454 +1180701,1178836 +1185354,399462 +1185388,399462 +1185527,1178874 +1185544,7683 +1185617,78454 +1185684,899976 +1186100,1189129 +1189096,1208405 +1204545,1187557 +1208142,524207 +1208240,1031285 +1208337,1208361 +1208413,1208345 +1208503,1208345 +1209789,2293489 +1210109,1210208 +1217945,2309770 +1223693,972200 +1227620,132750 +1227633,1185526 +1227901,1185526 +1227906,505592 +1227921,129982 +1227932,1719971 +1227944,698220 +1228907,1456650 +1229630,2283308 +1229862,1456600 +1262756,1185526 +1277071,1031367 +1308670,875584 +1372276,640836 +1372352,1372353 +1417817,1262381 +1421453,2125493 +1452657,2161108 +1454243,106462 +1454516,28546 +1455569,503220 +1455924,860721 +1456654,1729512 +1456759,1189129 +1456871,1032213 +1457205,124422 +1457223,1457215 +1457326,1459165 +1457566,1166343 +1457585,1167049 +1457632,1166343 +1457994,1178874 +1458356,1166244 +1458430,1166343 +1458585,1229045 +1458804,1703176 +1458970,1166244 +1459040,1166343 +1459209,884521 +1459683,1021426 +1459764,1081082 +1459767,129985 +1459777,129981 +1459800,129982 +1459817,1081082 +1459823,129982 +1460353,1701174 +1460774,1708647 +1460963,1704198 +1461715,1189129 +1461882,75252 +1472956,509348 +1488496,364128 +1489416,1225231 +1489605,794874 +1513852,1513932 +1513877,124422 +1513935,1513822 +1513967,1185526 +1514051,124422 +1514187,1567287 +1514343,609853 +1516124,1014349 +1516171,804540 +1516543,1185526 +1540074,1648401 +1549925,794874 +1577692,521172 +1583917,1262381 +1591908,1066831 +1591917,1066831 +1604948,1608304 +1608771,2013571 +1614000,1093722 +1618802,362188 +1619433,1616668 +1619601,1616667 +1619650,1093722 +1620087,1209654 +1620092,1567287 +1624141,1706328 +1633998,1633985 +1634269,912217 +1634818,1081082 +1643621,1185526 +1648391,1648394 +1648928,1092182 +1650316,1721675 +1651697,1653000 +1653182,1653131 +1653235,863097 +1670870,1719941 +1686997,1262381 +1688630,2123407 +1693605,1704851 +1700920,1208208 +1701120,1703030 +1702617,1172806 +1702639,1702900 +1702668,902832 +1702770,1172657 +1702892,1702631 +1702940,1719941 +1703031,1703030 +1703033,1703030 +1703175,1703030 +1703679,892617 +1704587,1700624 +1704903,1703539 +1706534,1703030 +1706979,1703030 +1707344,1745227 +1707353,1702627 +1707567,1172806 +1707710,811412 +1708202,1185526 +1708316,1703534 +1708722,1700629 +1709947,1697103 +1709953,1703030 +1710066,1458604 +1710405,1697039 +1710666,1184969 +1710700,1719941 +1710731,1719941 +1711426,1697613 +1711554,1719941 +1711748,1719941 +1712522,1719941 +1712533,1719941 +1712708,1712604 +1712710,1703030 +1712713,1703030 +1712730,1703030 +1712933,1721355 +1719605,1719569 +1719861,2125607 +1720068,1696959 +1721643,1743831 +1721750,1710083 +1722052,1721498 +1723622,1185526 +1723795,1743831 +1727283,1706464 +1728464,1227260 +1730464,2125500 +1730484,994791 +1737106,1178874 +1740709,1456900 +1740934,1604680 +1741067,860615 +1741108,1805734 +1741226,1741225 +1742692,2091704 +1743650,875584 +1743832,1721644 +1744772,1743831 +1762119,1762110 +1762158,1460202 +1801315,1951679 +1805782,875258 +1812605,873408 +1838969,1690444 +1841804,1840208 +1841826,2123324 +1848209,1152262 +1930894,356621 +1945930,2092688 +1945935,1744876 +1946321,1804134 +1946525,2326411 +1946956,1744876 +1946966,1744876 +1947123,1744876 +1947201,1744876 +1951678,1801314 +1974931,2153840 +1975818,1744876 +1976607,1800245 +1979940,1636131 +1980122,1461248 +1982216,379546 +2019859,877236 +2028140,1262381 +2034695,379546 +2035032,1634841 +2070887,971212 +2073610,2073588 +2073612,2073588 +2089720,2134537 +2091486,712212 +2091712,1742700 +2091919,1980233 +2093765,2067549 +2099414,736099 +2121411,2122600 +2123346,2123348 +2125204,2089518 +2125205,2089518 +2125392,1208089 +2125406,1208176 +2125455,1208089 +2125632,1979787 +2131628,2131625 +2134006,1209654 +2134534,122110 +2135029,1787254 +2135104,2135165 +2135392,2125524 +2141690,488131 +2161628,1228115 +2161629,1228115 +2161632,1228115 +2211112,2259916 +2211137,7683 +2211212,2211356 +2220068,2220067 +2225790,1605408 +2246063,2125603 +2288505,1809385 +2289396,736099 +2293008,1578794 +2299252,122108 +2299509,1178874 +2315110,808947 +2315606,2244905 +2316483,379546 +2321272,2321275 +2321274,2321275 +2321276,2321275 +2321280,2321129 +2321281,2321283 +2325617,1945550 +2326172,2326244 +2327393,1842987 +2327861,1801314 +2328757,1968840 +2329535,2210932 +2329656,395033 +2330284,2330256 +2331025,2331026 +2332463,1653000 +2336962,1744876 diff --git a/apps/merge-media-sources/bin/feeds_workflow_worker.py b/apps/merge-media-sources/bin/feeds_workflow_worker.py new file mode 100755 index 0000000000..547bcf098a --- /dev/null +++ b/apps/merge-media-sources/bin/feeds_workflow_worker.py @@ -0,0 +1,34 @@ +#!/usr/bin/env python3 + +import asyncio + +# noinspection PyPackageRequirements +from temporal.workerfactory import WorkerFactory + +from mediawords.util.log import create_logger +from mediawords.workflow.client import workflow_client + +from merge_media_sources.feeds_workflow import FeedsMergeWorkflowImpl, FeedsMergeActivitiesImpl +from merge_media_sources.feeds_workflow_interface import TASK_QUEUE, FeedsMergeActivities +from load_feeds_workflows import submit_feed_workflows + +log = create_logger(__name__) + + +async def _start_worker(): + client = workflow_client() + factory = WorkerFactory(client=client, namespace=client.namespace) + worker = factory.new_worker(task_queue=TASK_QUEUE) + worker.register_activities_implementation( + activities_instance=FeedsMergeActivitiesImpl(), + activities_cls_name=FeedsMergeActivities.__name__, + ) + worker.register_workflow_implementation_type(impl_cls=FeedsMergeWorkflowImpl) + factory.start() + + +if __name__ == '__main__': + loop = asyncio.get_event_loop() + asyncio.ensure_future(_start_worker()) + submit_feed_workflows() + loop.run_forever() diff --git a/apps/merge-media-sources/bin/load_feeds_workflows.py b/apps/merge-media-sources/bin/load_feeds_workflows.py new file mode 100644 index 0000000000..cee0dac55a --- /dev/null +++ b/apps/merge-media-sources/bin/load_feeds_workflows.py @@ -0,0 +1,38 @@ +#!/usr/bin/env python3 + +import asyncio + +from mediawords.util.log import create_logger +from mediawords.workflow.client import workflow_client + +# noinspection PyPackageRequirements +from temporal.workflow import WorkflowClient, WorkflowOptions + +from merge_media_sources.feeds_workflow_interface import FeedsMergeWorkflow + +log = create_logger(__name__) + + +async def _start_workflow(client: WorkflowClient, child_feed_id: int, parent_feed_id: int) -> None: + + log.info(f"Starting a workflow to merge feed {child_feed_id} into {parent_feed_id}...") + + workflow: FeedsMergeWorkflow = client.new_workflow_stub( + cls=FeedsMergeWorkflow, + workflow_options=WorkflowOptions(workflow_id=str(child_feed_id) + '_to_' + str(parent_feed_id)), + ) + + # Fire and forget as the workflow will do everything (including adding a extraction job) itself + await WorkflowClient.start(workflow.merge_feeds, child_feed_id, parent_feed_id) + + log.info(f"Started a workflow to merge feed {child_feed_id} into {parent_feed_id}...") + + +def submit_feed_workflows(): + client = workflow_client() + with open('feeds_to_merge.csv') as f: + feeds_to_merge = [{k: int(v) for k, v in row.items()} for row in csv.DictReader(f, skipinitialspace=True)] + for feed_pair in feeds_to_merge: + child_feed = feed_pair['feed_id'] + parent_feed = feed_pair['parent_feed_id'] + _start_workflow(client, child_feed, parent_feed) diff --git a/apps/merge-media-sources/docker-compose.tests.yml b/apps/merge-media-sources/docker-compose.tests.yml new file mode 100644 index 0000000000..c52e805d9f --- /dev/null +++ b/apps/merge-media-sources/docker-compose.tests.yml @@ -0,0 +1,135 @@ +version: "3.7" + +services: + + merge-media-sources: + image: gcr.io/mcback/merge-media-sources:latest + init: true + stop_signal: SIGKILL + volumes: + - type: bind + source: ./bin/ + target: /opt/mediacloud/bin/ + - type: bind + source: ./src/ + target: /opt/mediacloud/src/merge-media-sources/ + - type: bind + source: ./tests/ + target: /opt/mediacloud/tests/ + - type: bind + source: ./../common/src/ + target: /opt/mediacloud/src/common/ + depends_on: + - postgresql-pgbouncer + - rabbitmq-server + - temporal-server + + # Not needed for running the test but useful for debugging, demos + # and such + # - temporal-webapp + + postgresql-pgbouncer: + image: gcr.io/mcback/postgresql-pgbouncer:latest + init: true + stop_signal: SIGKILL + expose: + - 6432 + volumes: + - type: bind + source: ./../postgresql-pgbouncer/conf/ + target: /etc/pgbouncer/ + depends_on: + - postgresql-server + + postgresql-server: + image: gcr.io/mcback/postgresql-server:latest + init: true + stop_signal: SIGKILL + expose: + - 5432 + volumes: + - type: bind + source: ./../postgresql-server/bin/ + target: /opt/mediacloud/bin/ + - type: bind + source: ./../postgresql-server/pgmigrate/ + target: /opt/postgresql-server/pgmigrate/ + - type: bind + source: ./../postgresql-base/etc/postgresql/ + target: /etc/postgresql/ + + rabbitmq-server: + image: gcr.io/mcback/rabbitmq-server:latest + init: true + stop_signal: SIGKILL + expose: + - 5672 + - 15672 + volumes: + - type: bind + source: ./../rabbitmq-server/conf/ + target: /etc/rabbitmq/ + + temporal-server: + image: gcr.io/mcback/temporal-server:latest + init: true + stop_signal: SIGKILL + depends_on: + - temporal-postgresql + - temporal-elasticsearch + expose: + - 6933 + - 6934 + - 6935 + - 6939 + - 7233 + - 7234 + - 7235 + - 7239 + volumes: + - type: bind + source: ./../temporal-server/bin/ + target: /opt/temporal-server/bin/ + - type: bind + source: ./../temporal-server/config/dynamicconfig.yaml + target: /opt/temporal-server/config/dynamicconfig.yaml + - type: bind + source: ./../temporal-server/config/mediacloud_template.yaml + target: /opt/temporal-server/config/mediacloud_template.yaml + + temporal-postgresql: + image: gcr.io/mcback/temporal-postgresql:latest + init: true + stop_signal: SIGKILL + expose: + - 5432 + volumes: + - type: bind + source: ./../temporal-postgresql/bin/ + target: /opt/temporal-postgresql/bin/ + - type: bind + source: ./../postgresql-base/etc/postgresql/ + target: /etc/postgresql/ + + temporal-elasticsearch: + image: gcr.io/mcback/temporal-elasticsearch:latest + init: true + stop_signal: SIGKILL + expose: + - "9200" + - "9300" + volumes: + - type: bind + source: ./../elasticsearch-base/bin/elasticsearch.sh + target: /opt/elasticsearch/bin/elasticsearch.sh + # Not mounting config as it gets concatenated into a single file + + # temporal-webapp: + # image: gcr.io/mcback/temporal-webapp:latest + # init: true + # stop_signal: SIGKILL + # expose: + # - "8088" + # ports: + # # Expose to host for debugging + # - "8088:8088" diff --git a/apps/merge-media-sources/src/__init__.py b/apps/merge-media-sources/src/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/apps/merge-media-sources/src/python/__init__.py b/apps/merge-media-sources/src/python/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/apps/merge-media-sources/src/python/merge_media_sources/__init__.py b/apps/merge-media-sources/src/python/merge_media_sources/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/apps/merge-media-sources/src/python/merge_media_sources/feeds_workflow.py b/apps/merge-media-sources/src/python/merge_media_sources/feeds_workflow.py new file mode 100644 index 0000000000..b2c4e33a36 --- /dev/null +++ b/apps/merge-media-sources/src/python/merge_media_sources/feeds_workflow.py @@ -0,0 +1,125 @@ +# noinspection PyPackageRequirements +from temporal.workflow import Workflow + +from mediawords.db import connect_to_db_or_raise +from mediawords.db.handler import DatabaseHandler +from mediawords.util.log import create_logger +from mediawords.workflow.exceptions import McProgrammingError, McTransientError, McPermanentError + +from .feeds_workflow_interface import FeedsMergeWorkflow, FeedsMergeActivities + +log = create_logger(__name__) + + +class FeedsMergeActivitiesImpl(FeedsMergeActivities): + + #TODO: when implementing media merge, consider breaking helper functions below into separate module + + @staticmethod + def chunk_results(results: list) -> list: + """Break results of a query into chunks of 1000 (returns list of lists).""" + return [results[i:i + 1000] for i in range(0, len(results), 1000)] + + @staticmethod + def get_child_feed_entries(db: DatabaseHandler, table: str, table_id_field: str, child_feed_id: int) -> list: + log.info(f"Getting entries in {table} table associated with feed {str(child_feed_id)}") + + child_feed_id = db.find_by_id(table='feeds', object_id=child_feed_id) + if not child_feed_id: + raise McPermanentError(f"Feed {child_feed_id} was not found.") + + get_child_feed_entries_query = f""" + SELECT {table_id_field} + FROM {table} + WHERE feeds_id = {child_feed_id}; + """ + + child_feed_entries = db.query(get_child_feed_entries_query) + + log.info(f"Got all entries in downloads table for feed {str(child_feed_id)}") + + return child_feed_entries + + async def migrate_child_entries(self, table: str, table_id_field: str, id_list: list, child_feed_id: int, + parent_feed_id: int) -> None: + log.info(f"Updating {table} table to migrate {len(id_list)} entries associated with {child_feed_id} to " + f"parent {parent_feed_id}") + + db = connect_to_db_or_raise() + update_query = f""" + UPDATE {table} + SET feeds_id = {parent_feed_id} + WHERE {table_id_field} IN {id_list}; + """ + + db.query(update_query) + + log.info(f"Migrated {len(id_list)} entries in {table} for feed {child_feed_id} to parent {parent_feed_id}") + + async def delete_child_entries(self, child_feed_id: int, table: str) -> None: + log.info(f"Deleting entries in {table} table associated with feed {str(child_feed_id)}") + + db = connect_to_db_or_raise() + + delete_query = f""" + DELETE FROM {table} + WHERE feeds_id = {child_feed_id}; + """ + + db.query(delete_query) + + log.info(f"Deleted entries in {table} table associated with feed {str(child_feed_id)}") + + +class FeedsMergeWorkflowImpl(FeedsMergeWorkflow): + """Workflow implementation.""" + + def __init__(self): + self.activities: FeedsMergeActivities = Workflow.new_activity_stub( + activities_cls=FeedsMergeWorkflow, + # No retry_parameters here as they get set individually in @activity_method() + ) + + async def merge_feeds(self, child_feed_id: int, parent_feed_id: int) -> None: + + child_feed_downloads = self.activities.get_child_feed_entries('downloads', 'downloads_id', child_feed_id) + + for chunk in self.activities.chunk_results(child_feed_downloads): + await self.activities.migrate_child_entries('downloads', 'downloads_id', chunk, child_feed_id, + parent_feed_id) + + child_feed_stories_map = self.activities.get_child_feed_entries('feeds_stories_map_p', 'feeds_stories_map_p_id', + child_feed_id) + + for chunk in self.activities.chunk_results(child_feed_stories_map): + await self.activities.migrate_child_entries('feeds_stories_map_p', 'feeds_stories_map_p_id', chunk, + child_feed_id) + + child_scraped_feeds = self.activities.get_child_feed_entries('scraped_feeds', 'scraped_feeds_id', child_feed_id) + + await self.activities.migrate_child_entries('scraped_feeds', 'feed_scrapes_id', child_scraped_feeds, + child_feed_id, parent_feed_id) + + child_feeds_from_yesterday = self.activities.get_child_feed_entries('feeds_from_yesterday', 'feeds_id', + child_feed_id) + + await self.activities.migrate_child_entries('feeds_from_yesterday', 'feeds_id', child_feeds_from_yesterday, + child_feed_id, parent_feed_id) + + child_feeds_tags_map = self.activities.get_child_feed_entries('feeds_tags_map', 'feeds_tags_map_id', + child_feed_id) + + await self.activities.migrate_child_entries('feeds_tags_map', 'feeds_tags_map_id', child_feeds_tags_map, + child_feed_id, parent_feed_id) + + await self.activities.delete_child_entries(child_feed_id, 'downloads') + + await self.activities.delete_child_entries(child_feed_id, 'feeds_stories_map') + + await self.activities.delete_child_entries(child_feed_id, 'scraped_feeds') + + await self.activities.delete_child_entries(child_feed_id, 'feeds_from_yesterday') + + await self.activities.delete_child_entries(child_feed_id, 'feeds_tags_map') + + await self.activities.delete_child_entries(child_feed_id, 'feeds') diff --git a/apps/merge-media-sources/src/python/merge_media_sources/feeds_workflow_interface.py b/apps/merge-media-sources/src/python/merge_media_sources/feeds_workflow_interface.py new file mode 100644 index 0000000000..d4ad85526c --- /dev/null +++ b/apps/merge-media-sources/src/python/merge_media_sources/feeds_workflow_interface.py @@ -0,0 +1,51 @@ +from datetime import timedelta + +# noinspection PyPackageRequirements +from temporal.activity_method import activity_method, RetryParameters +# noinspection PyPackageRequirements +from temporal.workflow import workflow_method + +from mediawords.workflow.exceptions import McPermanentError + + +TASK_QUEUE = "merge-feeds" +"""Temporal task queue.""" + +DEFAULT_RETRY_PARAMETERS = RetryParameters( + initial_interval=timedelta(seconds=1), + backoff_coefficient=2, + maximum_interval=timedelta(hours=2), + maximum_attempts=1000, + non_retryable_error_types=[ + McPermanentError.__name__, + ], +) + + +class FeedsMergeActivities(object): + """Activities interface.""" + + @activity_method( + task_queue=TASK_QUEUE, + start_to_close_timeout=timedelta(seconds=60), + retry_parameters=DEFAULT_RETRY_PARAMETERS, + ) + async def migrate_child_entries(self, table: str, table_id_field: str, id_list: list, child_feed_id: int, + parent_feed_id: int) -> None: + raise NotImplementedError + + @activity_method( + task_queue=TASK_QUEUE, + start_to_close_timeout=timedelta(seconds=60), + retry_parameters=DEFAULT_RETRY_PARAMETERS, + ) + async def delete_child_entries(self, child_feed_id: int, table: str) -> None: + raise NotImplementedError + + +class FeedsMergeWorkflow(object): + """Workflow interface.""" + + @workflow_method(task_queue=TASK_QUEUE) + async def merge_feeds(self) -> None: + raise NotImplementedError diff --git a/apps/merge-media-sources/tests/python/__init__.py b/apps/merge-media-sources/tests/python/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/apps/merge-media-sources/tests/python/test_feeds_merge_workflow.py b/apps/merge-media-sources/tests/python/test_feeds_merge_workflow.py new file mode 100644 index 0000000000..5936b7528c --- /dev/null +++ b/apps/merge-media-sources/tests/python/test_feeds_merge_workflow.py @@ -0,0 +1,104 @@ +import csv +from datetime import timedelta + +# noinspection PyPackageRequirements +import pytest +# noinspection PyPackageRequirements +from temporal.workerfactory import WorkerFactory +# noinspection PyPackageRequirements +from temporal.workflow import WorkflowOptions + +from mediawords.db import connect_to_db +from mediawords.db.handler import DatabaseHandler +from mediawords.test.db.create import create_test_feed, create_test_medium, create_test_story, create_download_for_feed +from mediawords.util.log import create_logger +from mediawords.workflow.client import workflow_client +from mediawords.workflow.worker import stop_worker_faster + +from merge_media_sources import FeedsMergeWorkflow, FeedsMergeWorkflowImpl, FeedsMergeActivities, FeedsMergeActivitiesImpl + +log = create_logger(__name__) + + +def check_successful_feed_migration(db: DatabaseHandler, table: str, parent_feed_id: int) -> None: + results = db.select(table=f'{table}', what_to_select='*').hashes() + assert len(results) == 1 + assert results[0]['feeds_id'] == parent_feed_id + + +@pytest.mark.asyncio +async def test_feeds_merge_workflow() -> None: + db = connect_to_db() + test_medium = create_test_medium(db=db, label='test') + child_feed = create_test_feed(db=db, label='child_feed', medium=test_medium) + parent_feed = create_test_feed(db=db, label='parent_feed', medium=test_medium) + + create_download_for_feed(db=db, feed=child_feed) + + db.insert(table='feeds_stories_map_p', insert_hash={ + 'feeds_id': child_feed['feeds_id'], + 'stories_id': 1 + }) + db.insert(table='scraped_feeds', insert_hash={ + 'feeds_id': child_feed['feed_id'], + 'url': child_feed['url'], + 'scrape_date': 'NOW()', + 'import_module': 'mediawords' + }) + db.insert(table='feeds_from_yesterday', insert_hash={ + 'feeds_id': child_feed['feeds_id'], + 'media_id': test_medium['media_id'], + 'name': F"feed_from_yesterday_{child_feed['name']}", + 'url': child_feed['url'], + 'type': 'test', + 'active': True + }) + db.insert(table='feeds_tags_map', insert_hash={ + 'feeds_id': child_feed['feeds_id'], + 'tags_id': test_medium['media_id'], + }) + + client = workflow_client() + + # Start worker + factory = WorkerFactory(client=client, namespace=client.namespace) + worker = factory.new_worker(task_queue="merge-feeds") + + activities = FeedsMergeActivities() + + worker.register_activities_implementation( + activities_instance=activities, + activities_cls_name=FeedsMergeActivities.__name__, + ) + worker.register_workflow_implementation_type(impl_cls=FeedsMergeWorkflowImpl) + factory.start() + + # Initialize workflow instance + + workflow: FeedsMergeWorkflow = client.new_workflow_stub( + cls=FeedsMergeWorkflow, + workflow_options=WorkflowOptions(workflow_id='test'), + ) + + # Fire and forget as the workflow will do everything (including adding a extraction job) itself + await client.start(workflow.merge_feeds, child_feed['feeds_id'], parent_feed['feeds_id']) + # Wait for the workflow to complete + await workflow.merge_feeds( child_feed['feeds_id'], parent_feed['feeds_id']) + + downloads = db.select(table='downloads', what_to_select='*').hashes() + assert len(downloads) == 1 + first_download = downloads[0] + assert first_download['feeds_id'] == parent_feed['feeds_id'] + + tables = ['downloads', 'scraped_feeds', 'feeds_from_yesterday', 'feeds_tags_map', 'feeds_stories_map_p'] + for table in tables: + check_successful_feed_migration(db, table, parent_feed['feeds_id']) + + results = db.select(table='feeds', what_to_select='*', condition_hash={'id': child_feed['feeds_id']}).hashes() + assert len(results) == 0 + + await worker.stop(background=True) + + log.info("Stopping workers...") + await stop_worker_faster(worker) + log.info("Stopped workers") diff --git a/apps/merge-media-sources/tests/python/test_media_merge_workflow.py b/apps/merge-media-sources/tests/python/test_media_merge_workflow.py new file mode 100644 index 0000000000..e69de29bb2