luojunhui 2 days ago
parent
commit
bee44016ee

+ 5 - 3
applications/api/async_feishu_api.py

@@ -1,4 +1,7 @@
 import json
+
+import requests
+
 from applications.utils import AsyncHttPClient
 
 
@@ -224,8 +227,7 @@ class FeishuBotApi(Feishu):
                 title=title, mention=mention, detail=detail
             )
 
-        payload = {"msg_type": "interactive", "card": card}
+        data = {"msg_type": "interactive", "card": card}
         async with AsyncHttPClient() as client:
-            res = await client.post(url=url, headers=headers, data=json.dumps(payload))
-
+            res = await client.post(url=url, headers=headers, json=data)
         return res

+ 1 - 1
applications/pipeline/__init__.py

@@ -1 +1 @@
-from .data_recycle_pipeline import insert_article_into_recycle_pool
+from .data_recycle_pipeline import insert_article_into_recycle_pool

+ 1 - 3
applications/pipeline/crawler_pipeline.py

@@ -1,5 +1,3 @@
-
-
 class CrawlerPipeline:
 
-    pass
+    pass

+ 111 - 97
applications/pipeline/data_recycle_pipeline.py

@@ -1,114 +1,128 @@
-
 import json
 from typing import List, Dict
 
 from applications.utils import show_desc_to_sta, str_to_md5
 
 
-async def insert_article_into_recycle_pool(pool, log_client, msg_list: List[Dict], account_info: Dict):
-        """insert article into recycle pool"""
-        table_name = 'official_articles_v2'
-        for info in msg_list:
-            base_info = info.get("BaseInfo", {})
-            app_msg_id = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
-            create_timestamp = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
-            update_timestamp = info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
-            publish_type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
-            detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
-            if detail_article_list:
-                for article in detail_article_list:
-                    title = article.get("Title", None)
-                    digest = article.get("Digest", None)
-                    item_index = article.get("ItemIndex", None)
-                    content_url = article.get("ContentUrl", None)
-                    source_url = article.get("SourceUrl", None)
-                    cover_img_url = article.get("CoverImgUrl", None)
-                    cover_img_url_1_1 = article.get("CoverImgUrl_1_1", None)
-                    cover_img_url_235_1 = article.get("CoverImgUrl_235_1", None)
-                    item_show_type = article.get("ItemShowType", None)
-                    is_original = article.get("IsOriginal", None)
-                    show_desc = article.get("ShowDesc", None)
-                    show_stat = show_desc_to_sta(show_desc)
-                    ori_content = article.get("ori_content", None)
-                    show_view_count = show_stat.get("show_view_count", 0)
-                    show_like_count = show_stat.get("show_like_count", 0)
-                    show_zs_count = show_stat.get("show_zs_count", 0)
-                    show_pay_count = show_stat.get("show_pay_count", 0)
-                    wx_sn = content_url.split("&sn=")[1].split("&")[0] if content_url else None
-                    status = account_info['using_status']
-                    info_tuple = (
-                        account_info['gh_id'],
-                        account_info['account_name'],
-                        app_msg_id,
-                        title,
-                        publish_type,
-                        create_timestamp,
-                        update_timestamp,
-                        digest,
-                        item_index,
-                        content_url,
-                        source_url,
-                        cover_img_url,
-                        cover_img_url_1_1,
-                        cover_img_url_235_1,
-                        item_show_type,
-                        is_original,
-                        show_desc,
-                        ori_content,
-                        show_view_count,
-                        show_like_count,
-                        show_zs_count,
-                        show_pay_count,
-                        wx_sn,
-                        json.dumps(base_info, ensure_ascii=False),
-                        str_to_md5(title),
-                        status
-                    )
-                    try:
-                        insert_query = f"""
+async def insert_article_into_recycle_pool(
+    pool, log_client, msg_list: List[Dict], account_info: Dict
+):
+    """insert article into recycle pool"""
+    table_name = "official_articles_v2"
+    for info in msg_list:
+        base_info = info.get("BaseInfo", {})
+        app_msg_id = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
+        create_timestamp = (
+            info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
+        )
+        update_timestamp = (
+            info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
+        )
+        publish_type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
+        detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
+        if detail_article_list:
+            for article in detail_article_list:
+                title = article.get("Title", None)
+                digest = article.get("Digest", None)
+                item_index = article.get("ItemIndex", None)
+                content_url = article.get("ContentUrl", None)
+                source_url = article.get("SourceUrl", None)
+                cover_img_url = article.get("CoverImgUrl", None)
+                cover_img_url_1_1 = article.get("CoverImgUrl_1_1", None)
+                cover_img_url_235_1 = article.get("CoverImgUrl_235_1", None)
+                item_show_type = article.get("ItemShowType", None)
+                is_original = article.get("IsOriginal", None)
+                show_desc = article.get("ShowDesc", None)
+                show_stat = show_desc_to_sta(show_desc)
+                ori_content = article.get("ori_content", None)
+                show_view_count = show_stat.get("show_view_count", 0)
+                show_like_count = show_stat.get("show_like_count", 0)
+                show_zs_count = show_stat.get("show_zs_count", 0)
+                show_pay_count = show_stat.get("show_pay_count", 0)
+                wx_sn = (
+                    content_url.split("&sn=")[1].split("&")[0] if content_url else None
+                )
+                status = account_info["using_status"]
+                info_tuple = (
+                    account_info["gh_id"],
+                    account_info["name"],
+                    app_msg_id,
+                    title,
+                    publish_type,
+                    create_timestamp,
+                    update_timestamp,
+                    digest,
+                    item_index,
+                    content_url,
+                    source_url,
+                    cover_img_url,
+                    cover_img_url_1_1,
+                    cover_img_url_235_1,
+                    item_show_type,
+                    is_original,
+                    show_desc,
+                    ori_content,
+                    show_view_count,
+                    show_like_count,
+                    show_zs_count,
+                    show_pay_count,
+                    wx_sn,
+                    json.dumps(base_info, ensure_ascii=False),
+                    str_to_md5(title),
+                    status,
+                )
+                try:
+                    insert_query = f"""
                             insert into {table_name}
                             (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5, status)
                             values
                             (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
                             """
-                        await pool.async_save(query=insert_query, params=info_tuple, db_name="piaoquan_crawler")
+                    await pool.async_save(
+                        query=insert_query,
+                        params=info_tuple,
+                        db_name="piaoquan_crawler",
+                    )
+                    await log_client.log(
+                        contents={
+                            "function": "insert_article_into_recycle_pool",
+                            "status": "success",
+                            "data": info_tuple,
+                        }
+                    )
+                    print("insert_article_into_recycle_pool success")
+
+                except Exception as e:
+                    try:
+                        update_sql = f"""update {table_name} set show_view_count = %s, show_like_count=%s where wx_sn = %s;"""
+                        await pool.async_save(
+                            query=update_sql,
+                            params=(show_view_count, show_like_count, wx_sn),
+                            db_name="piaoquan_crawler",
+                        )
+                        print("update_article_into_recycle_pool success")
+
+                    except Exception as e:
                         await log_client.log(
                             contents={
                                 "function": "insert_article_into_recycle_pool",
-                                "status": "success",
-                                "data": info_tuple
+                                "status": "fail",
+                                "message": "更新文章失败",
+                                "data": {
+                                    "error": str(e),
+                                    "content_link": content_url,
+                                    "account_name": account_info["name"],
+                                },
                             }
                         )
+                        continue
 
-                    except Exception as e:
-                        try:
-                            update_sql = f"""update {table_name} set show_view_count = %s, show_like_count=%s where wx_sn = %s;"""
-                            await pool.async_save(query=update_sql, params=(show_view_count, show_like_count, wx_sn), db_name="piaoquan_crawler")
-                        except Exception as e:
-                            await log_client.log(
-                                contents={
-                                    "function": "insert_article_into_recycle_pool",
-                                    "status": "fail",
-                                    "message": "更新文章失败",
-                                    "data": {
-                                        "error": str(e),
-                                        "content_link": content_url,
-                                        "account_name": account_info["account_name"]
-                                    }
-                                }
-                            )
-                            continue
-
-            else:
-                await log_client.log(
-                    contents={
-                        "function": "insert_article_into_recycle_pool",
-                        "status": "fail",
-                        "message": "account has no articles",
-                        "data": {
-                            "account_name": account_info["account_name"]
-                        }
-                    }
-                )
-
-
+        else:
+            await log_client.log(
+                contents={
+                    "function": "insert_article_into_recycle_pool",
+                    "status": "fail",
+                    "message": "account has no articles",
+                    "data": {"account_name": account_info["name"]},
+                }
+            )

+ 3 - 0
applications/tasks/data_recycle_tasks/__init__.py

@@ -0,0 +1,3 @@
+from .recycle_daily_publish_articles import RecycleDailyPublishArticlesTask
+from .recycle_daily_publish_articles import CheckDailyPublishArticlesTask
+from .recycle_daily_publish_articles import UpdateRootSourceIdAndUpdateTimeTask

+ 328 - 19
applications/tasks/data_recycle_tasks/recycle_daily_publish_articles.py

@@ -1,22 +1,58 @@
+import json
 import time
-from unittest import case
+import datetime
+import urllib.parse
+import traceback
 
 from tqdm import tqdm
 
 from applications.api import feishu_robot
 from applications.crawler.wechat import get_article_list_from_account
+from applications.crawler.wechat import get_article_detail
 from applications.pipeline import insert_article_into_recycle_pool
 
+
 class Const:
     # 订阅号
     SUBSCRIBE_TYPE_SET = {0, 1}
 
     NEW_ACCOUNT_CRAWL_PERIOD = 60 * 60 * 24 * 30
 
+    FORBIDDEN_GH_IDS = [
+        "gh_4c058673c07e",
+        "gh_de9f9ebc976b",
+        "gh_7b4a5f86d68c",
+        "gh_f902cea89e48",
+        "gh_789a40fe7935",
+        "gh_cd041ed721e6",
+        "gh_62d7f423f382",
+        "gh_043223059726",
+    ]
+
+    # 文章状态
+    # 记录默认状态
+    DEFAULT_STATUS = 0
+    # 请求接口失败状态
+    REQUEST_FAIL_STATUS = -1
+    # 文章被删除状态
+    DELETE_STATUS = -2
+    # 未知原因无信息返回状态
+    UNKNOWN_STATUS = -3
+    # 文章违规状态
+    ILLEGAL_STATUS = -4
+
+    ARTICLE_ILLEGAL_CODE = 25012
+    ARTICLE_DELETE_CODE = 25005
+    ARTICLE_SUCCESS_CODE = 0
+    ARTICLE_UNKNOWN_CODE = 10000
+
+
 class RecycleDailyPublishArticlesTask(Const):
-    def __init__(self, pool, log_client):
+
+    def __init__(self, pool, log_client, date_string):
         self.pool = pool
         self.log_client = log_client
+        self.date_string = date_string
 
     async def get_publish_accounts(self):
         """
@@ -35,8 +71,8 @@ class RecycleDailyPublishArticlesTask(Const):
             where t1.plan_status = 1 and t1.content_modal = 3 and t3.channel = 5
             group by t3.id;
         """
-        account_list = await self.pool.async_fetch(query, db_name='aigc')
-        return [i for i in account_list if '自动回复' not in str(i['account_remark'])]
+        account_list, error = await self.pool.async_fetch(query, db_name="aigc")
+        return [i for i in account_list if "自动回复" not in str(i["account_remark"])]
 
     async def get_account_status(self):
         """get account experiment status"""
@@ -45,8 +81,10 @@ class RecycleDailyPublishArticlesTask(Const):
             from wx_statistics_group_source_account t1
             join wx_statistics_group_source t2 on t1.group_source_name = t2.account_source_name;
         """
-        account_status_list = await self.pool.async_fetch(sql, db_name='aigc')
-        account_status_dict = {account['account_id']: account['status'] for account in account_status_list}
+        account_status_list, error = await self.pool.async_fetch(sql, db_name="aigc")
+        account_status_dict = {
+            account["account_id"]: account["status"] for account in account_status_list
+        }
         return account_status_dict
 
     async def recycle_single_account(self, account):
@@ -54,23 +92,27 @@ class RecycleDailyPublishArticlesTask(Const):
         query = f"""
             select max(publish_timestamp) as publish_timestamp from official_articles_v2 where ghId = %s;
         """
-        response = await self.pool.async_fetch(query, params=(account['gh_id'],), db_name='aigc')
+        response, error = await self.pool.async_fetch(
+            query, params=(account["gh_id"],), db_name="piaoquan_crawler"
+        )
         if response:
-            max_publish_timestamp = response[0]['publish_timestamp']
+            max_publish_timestamp = response[0]["publish_timestamp"]
         else:
             max_publish_timestamp = int(time.time()) - self.NEW_ACCOUNT_CRAWL_PERIOD
 
         cursor = None
         while True:
-            response = get_article_list_from_account(account_id=account['account_id'], cursor=cursor)
-            response_code = response['code']
+            response = get_article_list_from_account(
+                account_id=account["gh_id"], index=cursor
+            )
+            response_code = response["code"]
             match response_code:
                 case 25013:
                     await feishu_robot.bot(
                         title="发布账号封禁",
                         detail={
-                            "账号名称": account['name'],
-                            "账号id": account['account_id']
+                            "账号名称": account["name"],
+                            "账号id": account["gh_id"],
                         },
                     )
                     return
@@ -79,46 +121,313 @@ class RecycleDailyPublishArticlesTask(Const):
                     if not msg_list:
                         return
 
-                    await insert_article_into_recycle_pool(self.pool, self.log_client, msg_list, account)
+                    await insert_article_into_recycle_pool(
+                        self.pool, self.log_client, msg_list, account
+                    )
 
                     # check last article
                     last_article = msg_list[-1]
-                    last_publish_timestamp = last_article['AppMsg']['BaseInfo']['UpdateTime']
+                    last_publish_timestamp = last_article["AppMsg"]["BaseInfo"][
+                        "UpdateTime"
+                    ]
                     if last_publish_timestamp <= max_publish_timestamp:
                         return
 
-                    cursor = response['data'].get('next_cursor')
+                    cursor = response["data"].get("next_cursor")
                     if not cursor:
                         return
                 case _:
                     return
 
-    async def deal(self):
+    async def get_task_list(self):
         """recycle all publish accounts articles"""
         binding_accounts = await self.get_publish_accounts()
+        # 过滤封禁账号
+        binding_accounts = [
+            i for i in binding_accounts if i["gh_id"] not in self.FORBIDDEN_GH_IDS
+        ]
+
         account_status = await self.get_account_status()
         account_list = [
             {
                 **item,
-                'using_status': 0 if account_status.get(item['account_id']) == '实验' else 1
+                "using_status": (
+                    0 if account_status.get(item["account_id"]) == "实验" else 1
+                ),
             }
             for item in binding_accounts
         ]
 
         # 订阅号
-        subscription_accounts = [i for i in account_list if i['account_type'] in self.SUBSCRIBE_TYPE_SET]
+        subscription_accounts = [
+            i for i in account_list if i["account_type"] in self.SUBSCRIBE_TYPE_SET
+        ]
+        return subscription_accounts
 
+    async def deal(self):
+        subscription_accounts = await self.get_task_list()
         for account in tqdm(subscription_accounts, desc="recycle each account"):
             try:
                 await self.recycle_single_account(account)
 
             except Exception as e:
-                print("recycle account error:", e)
+                print(
+                    f"{account['name']}\t{account['gh_id']}: recycle account error:", e
+                )
+
+
+class CheckDailyPublishArticlesTask(RecycleDailyPublishArticlesTask):
+
+    async def check_account(self, account: dict, date_string: str) -> bool:
+        """check account data"""
+        query = f"""
+            select accountName, count(1) as publish_count 
+            from official_articles_v2 where ghId = %s and from_unixtime(createTime) > %s;
+        """
+        response, error = await self.pool.async_fetch(
+            query=query,
+            db_name="piaoquan_crawler",
+            params=(account["gh_id"], date_string),
+        )
+        if error:
+            await feishu_robot.bot(
+                title="sql错误",
+                detail={
+                    "task": "CheckDailyPublishArticlesTask",
+                    "function": "check_account",
+                    "account": account,
+                    "date_string": date_string,
+                },
+                mention=False,
+            )
+            return False
+        else:
+            today_publish_count = response[0]["publish_count"]
+            return today_publish_count > 0
+
+    async def deal(self):
+        task_list = await self.get_task_list()
+        for task in tqdm(task_list, desc="check each account step1: "):
+            if await self.check_account(task, self.date_string):
+                continue
+            else:
+                await self.recycle_single_account(task)
+
+        # check again
+        fail_list = []
+        for second_task in tqdm(task_list, desc="check each account step2: "):
+            if await self.check_account(second_task, self.date_string):
+                continue
+            else:
+                second_task.pop("account_type", None)
+                second_task.pop("account_auth", None)
+                second_task.pop("account_id", None)
+                second_task.pop("account_remark", None)
+                fail_list.append(second_task)
 
+        if fail_list:
+            columns = [
+                feishu_robot.create_feishu_columns_sheet(
+                    sheet_type="plain_text",
+                    sheet_name="name",
+                    display_name="公众号名称",
+                ),
+                feishu_robot.create_feishu_columns_sheet(
+                    sheet_type="plain_text", sheet_name="gh_id", display_name="gh_id"
+                ),
+                feishu_robot.create_feishu_columns_sheet(
+                    sheet_type="number",
+                    sheet_name="follower_count",
+                    display_name="粉丝数",
+                ),
+                feishu_robot.create_feishu_columns_sheet(
+                    sheet_type="date",
+                    sheet_name="account_init_timestamp",
+                    display_name="账号接入系统时间",
+                ),
+                feishu_robot.create_feishu_columns_sheet(
+                    sheet_type="plain_text",
+                    sheet_name="using_status",
+                    display_name="利用状态",
+                ),
+            ]
+            await feishu_robot.bot(
+                title=f"{self.date_string} 发布文章,存在未更新的账号",
+                detail={"columns": columns, "rows": fail_list},
+                table=True,
+                mention=False,
+            )
+        else:
+            await feishu_robot.bot(
+                title=f"{self.date_string} 发布文章,所有文章更新成功",
+                detail={
+                    "date_string": self.date_string,
+                    "finish_time": datetime.datetime.now().__str__(),
+                },
+                mention=False,
+            )
+
+
+class UpdateRootSourceIdAndUpdateTimeTask(Const):
+    """
+    update publish_timestamp && root_source_id
+    """
+
+    def __init__(self, pool, log_client):
+        self.pool = pool
+        self.log_client = log_client
+
+    async def get_article_list(self):
+        query = f"""select ContentUrl, wx_sn from official_articles_v2 where publish_timestamp in %s;"""
+        article_list, error = await self.pool.async_fetch(
+            query=query, db_name="piaoquan_crawler", params=(tuple([0, -1]),)
+        )
+        return article_list
+
+    async def check_each_article(self, article: dict):
+        url = article["ContentUrl"]
+        wx_sn = article["wx_sn"].decode("utf-8")
+        try:
+            response = get_article_detail(url)
+            response_code = response["code"]
+
+            if response_code == self.ARTICLE_DELETE_CODE:
+                publish_timestamp_s = self.DELETE_STATUS
+                root_source_id_list = []
+            elif response_code == self.ARTICLE_ILLEGAL_CODE:
+                publish_timestamp_s = self.ILLEGAL_STATUS
+                root_source_id_list = []
+            elif response_code == self.ARTICLE_SUCCESS_CODE:
+                data = response["data"]["data"]
+                publish_timestamp_ms = data["publish_timestamp"]
+                publish_timestamp_s = int(publish_timestamp_ms / 1000)
+                mini_program = data.get("mini_program", [])
+                if mini_program:
+                    root_source_id_list = [
+                        urllib.parse.parse_qs(urllib.parse.unquote(i["path"]))[
+                            "rootSourceId"
+                        ][0]
+                        for i in mini_program
+                    ]
+                else:
+                    root_source_id_list = []
+            else:
+                publish_timestamp_s = self.UNKNOWN_STATUS
+                root_source_id_list = []
+        except Exception as e:
+            publish_timestamp_s = self.REQUEST_FAIL_STATUS
+            root_source_id_list = None
+            error_msg = traceback.format_exc()
+            await self.log_client.log(
+                contents={
+                    "task": "get_official_article_detail",
+                    "data": {
+                        "url": url,
+                        "wx_sn": wx_sn,
+                        "error_msg": error_msg,
+                        "error": str(e),
+                    },
+                    "function": "check_each_article",
+                    "status": "fail",
+                }
+            )
+        query = f"""
+            update official_articles_v2 set publish_timestamp = %s, root_source_id_list = %s
+            where wx_sn = %s;
+        """
+        await self.pool.async_save(
+            query=query,
+            db_name="piaoquan_crawler",
+            params=(
+                publish_timestamp_s,
+                json.dumps(root_source_id_list, ensure_ascii=False),
+                wx_sn,
+            ),
+        )
+        if publish_timestamp_s == self.REQUEST_FAIL_STATUS:
+            return article
+        else:
+            return None
 
+    async def fallback_mechanism(self):
+        # 通过msgId 来修改publish_timestamp
+        update_sql = f"""
+            update official_articles_v2 oav 
+            join (
+                select ghId, appMsgId, max(publish_timestamp) as publish_timestamp 
+                from official_articles_v2
+                where publish_timestamp > %s 
+                group by ghId, appMsgId
+                ) vv 
+                on oav.appMsgId = vv.appMsgId and oav.ghId = vv.ghId
+            set oav.publish_timestamp = vv.publish_timestamp
+            where oav.publish_timestamp <= %s;
+        """
+        await self.pool.async_save(query=update_sql, params=(0, 0), db_name="piaoquan_crawler")
 
+        # 若还是无 publish_timestamp,用update_time当作 publish_timestamp
+        update_sql_2 = f"""
+            update official_articles_v2
+            set publish_timestamp = updateTime
+            where publish_timestamp < %s;
+        """
+        await self.pool.async_save(query=update_sql_2, params=0)
 
+    async def deal(self):
+        task_list = await self.get_article_list()
+        for task in tqdm(task_list, desc="get article detail step1: "):
+            try:
+                await self.check_each_article(task)
+            except Exception as e:
+                try:
+                    await self.log_client.log(
+                        contents={
+                            "task": "get_official_article_detail_step1",
+                            "data": {
+                                "detail": {
+                                    "url": task["ContentUrl"],
+                                    "wx_sn": task["wx_sn"].decode("utf-8"),
+                                },
+                                "error_msg": traceback.format_exc(),
+                                "error": str(e),
+                            },
+                            "function": "check_each_article",
+                            "status": "fail",
+                        }
+                    )
+                except Exception as e:
+                    print(e)
+                    print(traceback.format_exc())
 
+        # process_failed_task_reproduce
+        fail_tasks = await self.get_article_list()
+        fail_list = []
+        for fail_task in tqdm(fail_tasks, desc="get article detail step2: "):
+            try:
+                res = await self.check_each_article(fail_task)
+                if res:
+                    fail_list.append(res)
+            except Exception as e:
+                await self.log_client.log(
+                    contents={
+                        "task": "get_official_article_detail_step2",
+                        "data": {
+                            "detail": {
+                                "url": fail_task["ContentUrl"],
+                                "wx_sn": fail_task["wx_sn"].decode("utf-8"),
+                            },
+                            "error_msg": traceback.format_exc(),
+                            "error": str(e),
+                        },
+                        "function": "check_each_article",
+                        "status": "fail",
+                    }
+                )
+        if fail_list:
+            await feishu_robot.bot(title="更新文章,获取detail失败", detail=fail_list)
 
+            current_hour = datetime.datetime.now().hour
+            if current_hour >= 21:
+                await self.fallback_mechanism()
 
 

+ 1 - 1
applications/tasks/llm_tasks/__init__.py

@@ -1 +1 @@
-from .process_title import TitleRewrite
+from .process_title import TitleRewrite

+ 18 - 13
applications/tasks/llm_tasks/process_title.py

@@ -5,6 +5,7 @@ from tqdm import tqdm
 
 from applications.api import fetch_deepseek_completion
 
+
 class Const:
     # title rewrite status
     TITLE_REWRITE_INIT_STATUS = 0
@@ -126,15 +127,15 @@ class TitleRewrite(TitleProcess):
             from publish_single_video_source
             where title_rewrite_status = {self.TITLE_REWRITE_LOCK_STATUS};
         """
-        article_list = await self.pool.async_fetch(query=query, db_name="long_articles", )
+        article_list = await self.pool.async_fetch(
+            query=query,
+            db_name="long_articles",
+        )
         if article_list:
             blocked_id_list = [
                 i["id"]
                 for i in article_list
-                if (
-                    int(time.time())
-                    - i["title_rewrite_status_update_timestamp"]
-                )
+                if (int(time.time()) - i["title_rewrite_status_update_timestamp"])
                 > self.TITLE_REWRITE_LOCK_TIME
             ]
             if blocked_id_list:
@@ -149,7 +150,7 @@ class TitleRewrite(TitleProcess):
                         self.TITLE_REWRITE_INIT_STATUS,
                         tuple(blocked_id_list),
                         self.TITLE_REWRITE_LOCK_STATUS,
-                    )
+                    ),
                 )
 
     async def get_articles_batch(self, batch_size=1000):
@@ -164,14 +165,17 @@ class TitleRewrite(TitleProcess):
         """
         return await self.pool.async_fetch(query=query, db_name="long_articles")
 
-    async def update_title_rewrite_status(self, content_trace_id, ori_status, new_status):
+    async def update_title_rewrite_status(
+        self, content_trace_id, ori_status, new_status
+    ):
         query = f"""
             update publish_single_video_source
             set title_rewrite_status = %s, title_rewrite_status_update_timestamp = %s
             where content_trace_id = %s and title_rewrite_status= %s;
         """
         affected_rows = await self.pool.async_save(
-            query=query, params=(new_status, int(time.time()), content_trace_id, ori_status)
+            query=query,
+            params=(new_status, int(time.time()), content_trace_id, ori_status),
         )
         return affected_rows
 
@@ -190,7 +194,7 @@ class TitleRewrite(TitleProcess):
                 content_trace_id,
                 new_title,
                 self.TITLE_USEFUL_STATUS,
-                self.PROMPT_VERSION
+                self.PROMPT_VERSION,
             ),
         )
 
@@ -233,10 +237,10 @@ class TitleRewrite(TitleProcess):
                     "message": content_trace_id,
                     "status": "fail",
                     "data": {
-                    "error_message": str(e),
-                    "error_type": type(e).__name__,
-                    "traceback": traceback.format_exc(),
-                }
+                        "error_message": str(e),
+                        "error_type": type(e).__name__,
+                        "traceback": traceback.format_exc(),
+                    },
                 }
             )
             await self.update_title_rewrite_status(
@@ -260,5 +264,6 @@ class TitleRewrite(TitleProcess):
 class VideoPoolCategoryGeneration:
     pass
 
+
 class ArticlePoolCategoryGeneration:
     pass

+ 2 - 3
applications/tasks/monitor_tasks/gzh_article_monitor.py

@@ -249,7 +249,6 @@ class InnerGzhArticlesMonitor(MonitorConst):
         response, error = await self.pool.async_fetch(query=query)
         return True if response else False
 
-
     async def fetch_article_list_to_check(self, run_date: str = None) -> Optional[List]:
         """
         :param run_date: 执行日期,格式为“%Y-%m-%d”, default None
@@ -318,12 +317,12 @@ class InnerGzhArticlesMonitor(MonitorConst):
                             "error_detail": error_detail,
                         },
                         mention=False,
-                        env="prod"
+                        env="prod",
                     )
                     await delete_illegal_gzh_articles(gh_id, title)
 
         except Exception as e:
-            print(f"crawler failed: {article['account_name']}, error: {e}")
+            print(f"crawler failed: {account_name}, error: {e}")
 
     async def deal(self):
         article_list = await self.fetch_article_list_to_check()

+ 52 - 8
applications/tasks/task_scheduler.py

@@ -3,9 +3,11 @@ import time
 from datetime import datetime
 
 from applications.api import feishu_robot
-from applications.tasks.llm_tasks import TitleRewrite
 from applications.utils import task_schedule_response
 
+from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
+from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
+from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
 from applications.tasks.llm_tasks import TitleRewrite
 from applications.tasks.monitor_tasks import check_kimi_balance
 from applications.tasks.monitor_tasks import GetOffVideos
@@ -187,22 +189,24 @@ class TaskScheduler:
                 async def background_inner_article_monitor():
                     task = InnerGzhArticlesMonitor(self.db_client)
                     final_status = await task.deal()
-                    await self.release_task(task_name, date_string, final_status=final_status)
+                    await self.release_task(
+                        task_name, date_string, final_status=final_status
+                    )
 
                 asyncio.create_task(background_inner_article_monitor())
                 return await task_schedule_response.success_response(
                     task_name=task_name,
-                    data={
-                        "code": 0,
-                        "message": "inner_article_monitor started background",
-                    }
+                    data={"code": 0, "message": "task started background"},
                 )
 
             case "title_rewrite":
+
                 async def background_title_rewrite():
                     sub_task = TitleRewrite(self.db_client, self.log_client)
                     await sub_task.deal()
-                    await self.release_task(task_name=task_name, date_string=date_string, final_status=2)
+                    await self.release_task(
+                        task_name=task_name, date_string=date_string, final_status=2
+                    )
 
                 asyncio.create_task(background_title_rewrite())
                 return await task_schedule_response.success_response(
@@ -210,7 +214,47 @@ class TaskScheduler:
                     data={
                         "code": 0,
                         "message": "inner_article_monitor started background",
-                    }
+                    },
+                )
+
+            case "daily_publish_articles_recycle":
+
+                async def background_daily_publish_articles_recycle():
+                    sub_task = RecycleDailyPublishArticlesTask(
+                        self.db_client, self.log_client, date_string
+                    )
+                    await sub_task.deal()
+
+                    task = CheckDailyPublishArticlesTask(
+                        self.db_client, self.log_client, date_string
+                    )
+                    await task.deal()
+
+                    await self.release_task(
+                        task_name=task_name, date_string=date_string, final_status=2
+                    )
+
+                asyncio.create_task(background_daily_publish_articles_recycle())
+                return await task_schedule_response.success_response(
+                    task_name=task_name,
+                    data={"code": 0, "message": "task started background"},
+                )
+
+            case "update_root_source_id":
+
+                async def background_update_root_source_id():
+                    sub_task = UpdateRootSourceIdAndUpdateTimeTask(
+                        self.db_client, self.log_client
+                    )
+                    await sub_task.deal()
+                    await self.release_task(
+                        task_name=task_name, date_string=date_string, final_status=2
+                    )
+
+                asyncio.create_task(background_update_root_source_id())
+                return await task_schedule_response.success_response(
+                    task_name=task_name,
+                    data={"code": 0, "message": "task started background"},
                 )
 
             case _: