瀏覽代碼

新增服务号发文回收

luojunhui 4 周之前
父節點
當前提交
2949b33452

+ 2 - 0
applications/tasks/data_recycle_tasks/__init__.py

@@ -1,10 +1,12 @@
 from .recycle_daily_publish_articles import RecycleDailyPublishArticlesTask
 from .recycle_daily_publish_articles import CheckDailyPublishArticlesTask
 from .recycle_daily_publish_articles import UpdateRootSourceIdAndUpdateTimeTask
+from .recycle_daily_publish_articles import RecycleFwhDailyPublishArticlesTask
 
 
 __all__ = [
     "RecycleDailyPublishArticlesTask",
     "CheckDailyPublishArticlesTask",
     "UpdateRootSourceIdAndUpdateTimeTask",
+    "RecycleFwhDailyPublishArticlesTask",
 ]

+ 172 - 1
applications/tasks/data_recycle_tasks/recycle_daily_publish_articles.py

@@ -1,15 +1,17 @@
+import asyncio
 import json
 import time
 import datetime
 import urllib.parse
 import traceback
 
-from tqdm import tqdm
+from tqdm.asyncio import tqdm
 
 from applications.api import feishu_robot
 from applications.crawler.wechat import get_article_list_from_account
 from applications.crawler.wechat import get_article_detail
 from applications.pipeline import insert_article_into_recycle_pool
+from applications.utils import str_to_md5
 
 
 class Const:
@@ -46,6 +48,13 @@ class Const:
     ARTICLE_SUCCESS_CODE = 0
     ARTICLE_UNKNOWN_CODE = 10000
 
+    STAT_PERIOD = 3 * 24 * 3600
+
+    INIT_STATUS = 0
+    PROCESSING_STATUS = 1
+    SUCCESS_STATUS = 2
+    FAILED_STATUS = 99
+
 
 class RecycleDailyPublishArticlesTask(Const):
 
@@ -433,3 +442,165 @@ class UpdateRootSourceIdAndUpdateTimeTask(Const):
             current_hour = datetime.datetime.now().hour
             if current_hour >= 21:
                 await self.fallback_mechanism()
+
+
+class RecycleFwhDailyPublishArticlesTask(Const):
+    def __init__(self, pool, log_client):
+        self.pool = pool
+        self.log_client = log_client
+
+    @staticmethod
+    async def illegal_article_bot(
+        account_name: str,
+        gh_id: str,
+        group_id: str,
+        illegal_msg: str,
+        publish_date: str,
+    ):
+        await feishu_robot.bot(
+            title="服务号文章违规告警,请前往微信公众平台处理",
+            detail={
+                "account_name": account_name,
+                "gh_id": gh_id,
+                "group_id": group_id,
+                "illegal_msg": illegal_msg,
+                "publish_date": str(publish_date),
+            },
+            env="server_account_publish_monitor",
+        )
+
+    async def save_data_to_database(self, article):
+        """
+        save data to db
+        """
+        insert_query = f"""
+            insert into official_articles_v2
+            (ghId, accountName, appMsgId, title, Type, createTime, updateTime, ItemIndex, ContentUrl, show_view_count, 
+             wx_sn, title_md5, article_group, channel_content_id, root_source_id_list, publish_timestamp) 
+            values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);      
+        """
+        return await self.pool.async_save(
+            query=insert_query, db_name="piaoquan_crawler", params=article
+        )
+
+    async def update_article_read_cnt(self, wx_sn, new_read_cnt):
+        if new_read_cnt <= 0:
+            return 0
+
+        update_query = """
+            update official_articles_v2
+            set show_view_count = %s
+            where wx_sn = %s;
+        """
+        return await self.pool.async_save(
+            query=update_query, db_name="piaoquan_crawler", params=(new_read_cnt, wx_sn)
+        )
+
+    async def get_group_server_accounts(self):
+        fetch_query = "select gzh_id from article_gzh_developer;"
+        fetch_response = await self.pool.async_fetch(
+            query=fetch_query, db_name="piaoquan_crawler"
+        )
+        gh_id_list = [i["gzh_id"] for i in fetch_response]
+        return gh_id_list
+
+    async def get_stat_published_articles(self, gh_id):
+        earliest_timestamp = int(time.time()) - self.STAT_PERIOD
+        fetch_query = """
+            select publish_date, account_name, gh_id, user_group_id, url, publish_timestamp
+            from long_articles_group_send_result
+            where gh_id = %s and recycle_status = %s and create_time > %s;
+        """
+        earliest_time = datetime.datetime.fromtimestamp(earliest_timestamp).strftime(
+            "%Y-%m-%d %H:%M:%S"
+        )
+        return await self.pool.async_fetch(
+            query=fetch_query,
+            params=(gh_id, self.SUCCESS_STATUS, earliest_time),
+        )
+
+    async def process_each_account_data(self, account_published_article_list):
+        if not account_published_article_list:
+            return
+
+        for article in account_published_article_list:
+            account_name = article["account_name"]
+            gh_id = article["gh_id"]
+            user_group_id = article["user_group_id"]
+            url = article["url"]
+            publish_date = article["publish_date"]
+            # get article detail info with spider
+
+            try:
+                article_detail_info = await get_article_detail(
+                    url, is_count=True, is_cache=False
+                )
+                response_code = article_detail_info["code"]
+                if response_code == self.ARTICLE_ILLEGAL_CODE:
+                    await self.illegal_article_bot(
+                        account_name=account_name,
+                        gh_id=gh_id,
+                        group_id=user_group_id,
+                        illegal_msg=article_detail_info["msg"],
+                        publish_date=publish_date,
+                    )
+
+                await asyncio.sleep(1)
+                content_url = article_detail_info["data"]["data"]["content_link"]
+                app_msg_id = content_url.split("mid=")[-1].split("&")[0]
+                wx_sn = content_url.split("sn=")[-1]
+                publish_timestamp = int(
+                    article_detail_info["data"]["data"]["publish_timestamp"] / 1000
+                )
+                create_time = publish_timestamp
+                update_time = publish_timestamp
+                item_index = article_detail_info["data"]["data"]["item_index"]
+                show_view_count = article_detail_info["data"]["data"]["view_count"]
+                title = article_detail_info["data"]["data"]["title"]
+                title_md5 = str_to_md5(title)
+                channel_content_id = article_detail_info["data"]["data"][
+                    "channel_content_id"
+                ]
+                mini_program_info = article_detail_info["data"]["data"]["mini_program"]
+                root_source_id_list = [
+                    urllib.parse.parse_qs(urllib.parse.unquote(i["path"]))[
+                        "rootSourceId"
+                    ][0]
+                    for i in mini_program_info
+                ]
+                root_source_id_list = json.dumps(root_source_id_list)
+                try:
+                    await self.save_data_to_database(
+                        article=(
+                            gh_id,
+                            account_name,
+                            app_msg_id,
+                            title,
+                            "9",
+                            create_time,
+                            update_time,
+                            item_index,
+                            url,
+                            show_view_count,
+                            wx_sn,
+                            title_md5,
+                            user_group_id,
+                            channel_content_id,
+                            root_source_id_list,
+                            publish_timestamp,
+                        )
+                    )
+                except Exception as e:
+                    await self.update_article_read_cnt(wx_sn, show_view_count)
+            except Exception as e:
+                print(f"article {url} is not available, skip it")
+                print(e)
+
+    async def deal(self):
+        account_id_list = await self.get_group_server_accounts()
+        for account_id in account_id_list:
+            publish_articles = tqdm(
+                await self.get_stat_published_articles(account_id),
+                desc=f"<crawling> {account_id}",
+            )
+            await self.process_each_account_data(publish_articles)

+ 6 - 0
applications/tasks/task_handler.py

@@ -8,6 +8,7 @@ from applications.tasks.crawler_tasks import CrawlerGzhSearchArticles
 from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
 from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
 from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
+from applications.tasks.data_recycle_tasks import RecycleFwhDailyPublishArticlesTask
 from applications.tasks.llm_tasks import TitleRewrite
 from applications.tasks.llm_tasks import ArticlePoolCategoryGeneration
 from applications.tasks.llm_tasks import CandidateAccountQualityScoreRecognizer
@@ -156,3 +157,8 @@ class TaskHandler(TaskMapper):
             case _:
                 raise ValueError(f"Unsupported crawl mode {crawl_mode}")
         return self.TASK_SUCCESS_STATUS
+
+    async def _recycle_fwh_article_handler(self) -> int:
+        task = RecycleFwhDailyPublishArticlesTask(self.db_client, self.log_client)
+        await task.deal()
+        return self.TASK_SUCCESS_STATUS

+ 2 - 0
applications/tasks/task_scheduler.py

@@ -185,6 +185,8 @@ class TaskScheduler(TaskHandler):
             "crawler_account_manager": self._crawler_account_manager_handler,
             # 微信公众号文章抓取
             "crawler_gzh_articles": self._crawler_gzh_article_handler,
+            # 服务号发文回收
+            "fwh_daily_recycle": self._recycle_fwh_article_handler,
         }
 
         if task_name not in handlers: