Przeglądaj źródła

Merge branch 'luojunhui-2025-01-13-updateMsgDailyV2_dev' of luojunhui/LongArticlesJob into master

luojunhui 3 miesięcy temu
rodzic
commit
843e848087

+ 29 - 2
applications/const.py

@@ -175,6 +175,33 @@ class AccountAssociationTaskConst:
     SEED_TITLE_LIMIT = 100
 
 
-
-
+# 从aigc获取文章
+class ArticleCollectorConst:
+    """
+    文章采集任务常量配置
+    """
+    # 发送方式
+
+    # 手动推送
+    MANUAL_PUSH = 1
+    # 自动群发
+    BULK_AUTO_PUSH = 2
+    # 无限流推送
+    UNLIMITED_PUSH = 3
+
+    # 文章状态
+    # 初始状态
+    INIT_STATUS = 0
+    # 成功状态
+    SUCCESS_STATUS = 1
+    # 失败状态
+    FAIL_STATUS = -1
+    # 发布状态
+    PUBLISHED_STATUS = 2
+
+    # 爬虫接口
+    ARTICLE_ILLEGAL_CODE = 25012
+    ARTICLE_DELETE_CODE = 25005
+    ARTICLE_SUCCESS_CODE = 0
+    ARTICLE_UNKNOWN_CODE = 10000
 

+ 29 - 0
applications/functions.py

@@ -12,6 +12,7 @@ import requests
 from uuid import uuid4
 from datetime import datetime, timezone
 from fake_useragent import FakeUserAgent
+from urllib.parse import urlparse, parse_qs
 
 
 class Functions(object):
@@ -267,3 +268,31 @@ class Functions(object):
         )
         bucket.put_object_from_file(key=oss_video_key, filename=local_video_path)
         return oss_video_key
+
+    @classmethod
+    def extract_path(cls, path: str):
+        """
+        提取path参数
+        :param path:
+        :return:
+        """
+        params = parse_qs(urlparse(path).query)
+        jump_page = params.get('jumpPage', [None])[0]
+        if jump_page:
+            params2 = parse_qs(jump_page)
+            res = {
+                "video_id": params2['pages/user-videos?id'][0],
+                "root_source_id": params2['rootSourceId'][0],
+            }
+            return res
+        else:
+            return {}
+
+    @classmethod
+    def extract_params_from_url(cls, url: str, key: str):
+        """
+        extract params from url
+        """
+        params = parse_qs(urlparse(url).query)
+        info = params.get(key, [])
+        return info[0] if info else None

+ 9 - 0
run_update_articles_from_aigc_system.py

@@ -0,0 +1,9 @@
+"""
+@author: luojunhui
+"""
+from tasks.update_article_info_from_aigc import UpdateArticleInfoFromAIGC
+
+
+if __name__ == "__main__":
+    update_article_info_from_aigc = UpdateArticleInfoFromAIGC()
+    update_article_info_from_aigc.deal()

+ 26 - 0
sh/run_update_article_info_from_aigc.sh

@@ -0,0 +1,26 @@
+#!/bin/bash
+
+# 获取当前日期,格式为 YYYY-MM-DD
+CURRENT_DATE=$(date +%F)
+
+# 日志文件路径,包含日期
+LOG_FILE="/root/luojunhui/logs/update_article_info_from_aigc_task_log_$CURRENT_DATE.txt"
+
+# 重定向整个脚本的输出到带日期的日志文件
+exec >> "$LOG_FILE" 2>&1
+if pgrep -f "python3 run_update_articles_from_aigc_system.py" > /dev/null
+then
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - run_update_articles_from_aigc_system.py is running"
+else
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - trying to restart run_update_articles_from_aigc_system.py"
+    # 切换到指定目录
+    cd /root/luojunhui/LongArticlesJob
+
+    # 激活 Conda 环境
+    source /root/miniconda3/etc/profile.d/conda.sh
+    conda activate tasks
+
+    # 在后台运行 Python 脚本并重定向日志输出
+    nohup python3 run_update_articles_from_aigc_system.py >> "${LOG_FILE}" 2>&1 &
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restarted run_update_articles_from_aigc_system.py"
+fi

+ 0 - 1
sh/run_update_published_articles_daily.sh

@@ -22,7 +22,6 @@ else
 
     if [[ "$current_time" < "$target_time" ]]; then
         nohup python3 updatePublishedMsgDaily.py --run_task update >> "${LOG_FILE}" 2>&1 &
-        nohup python3 updatePublishedMsgDaily.py --run_task detail >> "${LOG_FILE}" 2>&1 &
     else
         nohup python3 updatePublishedMsgDaily.py >> "${LOG_FILE}" 2>&1 &
     echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restarted updatePublishedMsgDaily.py"

+ 232 - 0
tasks/update_article_info_from_aigc.py

@@ -0,0 +1,232 @@
+"""
+@author: luojunhui
+"""
+
+import json
+import time
+import traceback
+from typing import List, Dict
+
+from pymysql.cursors import DictCursor
+from tqdm import tqdm
+
+from applications import aiditApi
+from applications import bot
+from applications import log
+from applications import WeixinSpider
+from applications.const import ArticleCollectorConst
+from applications.db import DatabaseConnector
+from applications.functions import Functions
+from config import denet_config, long_articles_config
+
+empty_dict = {}
+const = ArticleCollectorConst()
+functions = Functions()
+spider = WeixinSpider()
+
+
+class UpdateArticleInfoFromAIGC(object):
+    """
+    从aigc获取文章信息
+    """
+
+    def __init__(self):
+        self.aigc_db_client = DatabaseConnector(db_config=denet_config)
+        self.long_articles_db_client = DatabaseConnector(db_config=long_articles_config)
+        self.aigc_db_client.connect()
+        self.long_articles_db_client.connect()
+
+    def get_published_articles(self) -> List[Dict]:
+        """
+        获取当天发布文章的List
+        """
+        sql = f"""
+            SELECT trace_id, push_type
+            FROM long_articles_published_trace_id
+            WHERE create_timestamp > UNIX_TIMESTAMP(DATE_SUB(CURDATE(), INTERVAL 1 DAY)) AND status = {const.INIT_STATUS};
+        """
+        article_list = self.long_articles_db_client.fetch(sql, cursor_type=DictCursor)
+        return article_list
+
+    def get_article_info_from_aigc(self, trace_id: str) -> Dict:
+        """
+        从aigc获取发布结果
+        """
+        sql = f"""
+            SELECT t2.crawler_channel_content_id, t2.publish_stage_url, t2.publish_timestamp, t1.result_data
+            from publish_content_miniprogram t1
+            join publish_content t2 on t1.publish_content_id = t2.id
+            where t1.trace_id = '{trace_id}' and t2.status = {const.PUBLISHED_STATUS};
+        """
+        article_info = self.aigc_db_client.fetch(sql, cursor_type=DictCursor)
+        if article_info:
+            return article_info[0]
+        else:
+            return empty_dict
+
+    def get_article_info_by_trace_id(self, trace_id: str) -> Dict:
+        """
+        通过trace_id来查询文章信息
+        """
+        select_sql = f"""
+            SELECT t1.gh_id, t1.account_name, t2.article_title
+            FROM long_articles_match_videos t1
+            JOIN long_articles_text t2
+            ON t1.content_id = t2.content_id
+            WHERE t1.trace_id = '{trace_id}';
+        """
+        article_info = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
+        if article_info:
+            return article_info[0]
+        else:
+            return empty_dict
+
+    def update_each_article(self, article: Dict):
+        """
+        更新每个文章的信息
+        """
+        trace_id = article["trace_id"]
+        push_type = article["push_type"]
+
+        article_info = self.get_article_info_from_aigc(trace_id)
+        if article_info:
+            channel_content_id = article_info["crawler_channel_content_id"]
+            published_url = article_info["publish_stage_url"]
+            publish_timestamp = int(article_info["publish_timestamp"] / 1000)
+            result_data = json.loads(article_info["result_data"])
+            root_source_id_list = [
+                functions.extract_path(item["productionPath"])["root_source_id"] for item in result_data
+            ]
+            wx_sn = None
+            if published_url:
+                response = spider.get_article_text(content_link=published_url)
+                code = response['code']
+                match code:
+                    case const.ARTICLE_SUCCESS_CODE:
+                        long_url = response['data']['data']['content_link']
+                        wx_sn = functions.extract_params_from_url(url=long_url, key="sn")
+                        status = const.SUCCESS_STATUS
+                    case const.ARTICLE_DELETE_CODE:
+                        log(
+                            task="update_article_info_from_aigc",
+                            function="update_each_article",
+                            status="fail",
+                            message=trace_id,
+                            data={
+                                "msg": "文章被删文",
+                                "publish_timestamp": publish_timestamp,
+                                "article_delete_timestamp": int(time.time()),
+                                "duration": int(time.time()) - publish_timestamp
+                            }
+                        )
+                        status = const.FAIL_STATUS
+
+                    case const.ARTICLE_ILLEGAL_CODE:
+                        log(
+                            task="update_article_info_from_aigc",
+                            function="update_each_article",
+                            status="fail",
+                            message=trace_id,
+                            data={
+                                "msg": "文章被判断违规",
+                                "publish_timestamp": publish_timestamp,
+                                "illegal_timestamp": int(time.time()),
+                                "duration": int(time.time()) - publish_timestamp
+                            }
+                        )
+                        article_info = self.get_article_info_by_trace_id(trace_id)
+                        if article_info:
+                            error_detail = response.get("msg")
+                            insert_sql = f"""
+                                INSERT IGNORE INTO illegal_articles 
+                                (gh_id, account_name, title, wx_sn, publish_date, illegal_reason)
+                                VALUES 
+                                (%s, %s, %s, %s, %s, %s);
+                            """
+
+                            affected_rows = self.long_articles_db_client.save(
+                                query=insert_sql,
+                                params=(
+                                    article_info['gh_id'],
+                                    article_info['account_name'],
+                                    article_info['article_title'],
+                                    wx_sn,
+                                    functions.timestamp_to_str(publish_timestamp),
+                                    error_detail
+                                )
+                            )
+                            if affected_rows:
+                                bot(
+                                    title="文章违规告警(new task)",
+                                    detail={
+                                        "account_name": article_info['account_name'],
+                                        "gh_id": article_info['gh_id'],
+                                        "title": article_info['article_title'],
+                                        "wx_sn": wx_sn,
+                                        "publish_date": functions.timestamp_to_str(publish_timestamp),
+                                        "error_detail": error_detail,
+                                    },
+                                    mention=False
+                                )
+                                aiditApi.delete_articles(
+                                    gh_id=article_info['gh_id'],
+                                    title=article_info['article_title']
+                                )
+                        status = const.FAIL_STATUS
+
+                    case _:
+                        status = const.FAIL_STATUS
+
+            else:
+                if push_type == const.BULK_AUTO_PUSH:
+                    status = const.INIT_STATUS
+                else:
+                    status = const.SUCCESS_STATUS
+
+            update_sql = f"""
+                UPDATE long_articles_published_trace_id
+                SET published_url = %s, status = %s, wx_sn = %s, publish_timestamp = %s, crawler_channel_content_id = %s, root_source_id_list = %s
+                WHERE trace_id = %s;
+            """
+            self.long_articles_db_client.save(
+                query=update_sql,
+                params=(published_url, status, wx_sn, publish_timestamp, channel_content_id, json.dumps(root_source_id_list), trace_id)
+            )
+        else:
+            update_sql = f"""
+                UPDATE long_articles_published_trace_id
+                SET status = %s 
+                WHERE trace_id = %s;
+            """
+            self.long_articles_db_client.save(
+                query=update_sql, params=(const.FAIL_STATUS, trace_id)
+            )
+
+    def deal(self):
+        """
+        main function
+        """
+        article_list = self.get_published_articles()
+        log(
+            task="update_article_info_from_aigc",
+            function="deal",
+            data=article_list,
+            message="total got {} articles this time".format(len(article_list))
+        )
+        for article in tqdm(article_list, desc="更新文章信息"):
+            try:
+                self.update_each_article(article)
+            except Exception as e:
+                log(
+                    task="update_article_info_from_aigc",
+                    function="update_each_article",
+                    status="fail",
+                    message="update_article_fail",
+                    data={
+                        "trace_id": article["trace_id"],
+                        "error": str(e),
+                        "traceback": traceback.format_exc()
+                    }
+                )
+
+

+ 742 - 0
tasks/update_published_articles_read_detail.py

@@ -0,0 +1,742 @@
+"""
+@author: luojunhui
+@desc: 更新文章的阅读详情
+"""
+import json
+import time
+import traceback
+import urllib.parse
+from datetime import datetime
+from typing import Dict, List
+
+from pymysql.cursors import DictCursor
+from tqdm import tqdm
+
+from applications import aiditApi
+from applications import bot
+from applications import create_feishu_columns_sheet
+from applications import Functions
+from applications import log
+from applications import WeixinSpider
+from applications.const import updatePublishedMsgTaskConst
+from applications.db import DatabaseConnector
+from config import denet_config, long_articles_config, piaoquan_crawler_config
+
+ARTICLE_TABLE = "official_articles"
+const = updatePublishedMsgTaskConst()
+spider = WeixinSpider()
+functions = Functions()
+empty_dict = {}
+
+
+def generate_bot_columns():
+    """
+    生成列
+    :return:
+    """
+    columns = [
+        create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="name", display_name="公众号名称"),
+        create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="ghId", display_name="ghId"),
+        create_feishu_columns_sheet(sheet_type="number", sheet_name="follower_count", display_name="粉丝数"),
+        create_feishu_columns_sheet(sheet_type="date", sheet_name="account_init_timestamp",
+                                    display_name="账号接入系统时间"),
+        create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="using_status", display_name="利用状态")
+    ]
+    return columns
+
+
+class UpdatePublishedArticlesReadDetail(object):
+    """
+    更新每日发布文章的阅读详情
+    """
+
+    def __init__(self):
+        self.aigc_db_client = None
+        self.piaoquan_crawler_db_client = None
+        self.long_articles_db_client = None
+
+    def get_account_list(self) -> List[Dict]:
+        """
+        从 aigc 数据库中获取目前处于发布状态的账号
+        :return:
+        "name": line[0],
+        "ghId": line[1],
+        "follower_count": line[2],
+        "account_init_time": int(line[3] / 1000),
+        "account_type": line[4], # 订阅号 or 服务号
+        "account_auth": line[5]
+        """
+
+        def get_account_status() -> Dict:
+            """
+            获取账号的实验状态
+            :return:
+            """
+            sql = f"""  
+                SELECT t1.account_id, t2.status
+                FROM wx_statistics_group_source_account t1
+                JOIN wx_statistics_group_source t2
+                ON t1.group_source_name = t2.account_source_name;
+            """
+            account_status_list = self.aigc_db_client.fetch(sql, cursor_type=DictCursor)
+            account_status = {account['account_id']: account['status'] for account in account_status_list}
+            return account_status
+
+        account_list_with_out_using_status = aiditApi.get_publish_account_from_aigc()
+        account_status_dict = get_account_status()
+        account_list = [
+            {
+                **item,
+                'using_status': 0 if account_status_dict.get(item['account_id']) == '实验' else 1
+            }
+            for item in account_list_with_out_using_status
+        ]
+        return account_list
+
+    def get_article_info_by_trace_id(self, trace_id: str) -> Dict:
+        """
+        通过trace_id来查询文章信息
+        """
+        select_sql = f"""
+            SELECT t1.gh_id, t1.account_name, t2.article_title
+            FROM long_articles_match_videos t1
+            JOIN long_articles_text t2
+            ON t1.content_id = t2.content_id
+            WHERE t1.trace_id = '{trace_id}';
+        """
+        article_info = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
+        if article_info:
+            return article_info[0]
+        else:
+            return empty_dict
+
+    def init_database(self):
+        """
+        初始化数据库连接
+        """
+        # 初始化数据库连接
+        try:
+            self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
+            self.piaoquan_crawler_db_client.connect()
+            self.aigc_db_client = DatabaseConnector(denet_config)
+            self.aigc_db_client.connect()
+            self.long_articles_db_client = DatabaseConnector(long_articles_config)
+            self.long_articles_db_client.connect()
+        except Exception as e:
+            error_msg = traceback.format_exc()
+            bot(
+                title="更新文章任务连接数据库失败",
+                detail={
+                    "error": e,
+                    "msg": error_msg
+                }
+            )
+            return
+
+    def insert_each_msg(self, account_info: Dict, msg_list: List[Dict]) -> None:
+        """
+        把消息数据更新到数据库中
+        :param account_info:
+        :param msg_list:
+        :return:
+        """
+        gh_id = account_info['ghId']
+        account_name = account_info['name']
+        for info in msg_list:
+            baseInfo = info.get("BaseInfo", {})
+            appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
+            createTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
+            updateTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
+            Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
+            detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
+            if detail_article_list:
+                for article in detail_article_list:
+                    title = article.get("Title", None)
+                    Digest = article.get("Digest", None)
+                    ItemIndex = article.get("ItemIndex", None)
+                    ContentUrl = article.get("ContentUrl", None)
+                    SourceUrl = article.get("SourceUrl", None)
+                    CoverImgUrl = article.get("CoverImgUrl", None)
+                    CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None)
+                    CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None)
+                    ItemShowType = article.get("ItemShowType", None)
+                    IsOriginal = article.get("IsOriginal", None)
+                    ShowDesc = article.get("ShowDesc", None)
+                    show_stat = functions.show_desc_to_sta(ShowDesc)
+                    ori_content = article.get("ori_content", None)
+                    show_view_count = show_stat.get("show_view_count", 0)
+                    show_like_count = show_stat.get("show_like_count", 0)
+                    show_zs_count = show_stat.get("show_zs_count", 0)
+                    show_pay_count = show_stat.get("show_pay_count", 0)
+                    wx_sn = ContentUrl.split("&sn=")[1].split("&")[0] if ContentUrl else None
+                    status = account_info['using_status']
+                    info_tuple = (
+                        gh_id,
+                        account_name,
+                        appMsgId,
+                        title,
+                        Type,
+                        createTime,
+                        updateTime,
+                        Digest,
+                        ItemIndex,
+                        ContentUrl,
+                        SourceUrl,
+                        CoverImgUrl,
+                        CoverImgUrl_1_1,
+                        CoverImgUrl_235_1,
+                        ItemShowType,
+                        IsOriginal,
+                        ShowDesc,
+                        ori_content,
+                        show_view_count,
+                        show_like_count,
+                        show_zs_count,
+                        show_pay_count,
+                        wx_sn,
+                        json.dumps(baseInfo, ensure_ascii=False),
+                        functions.str_to_md5(title),
+                        status
+                    )
+                    self.insert_each_article(
+                        info_tuple=info_tuple,
+                        show_view_count=show_view_count,
+                        show_like_count=show_like_count,
+                        wx_sn=wx_sn
+                    )
+
+    def insert_each_article(self, info_tuple, show_view_count, show_like_count, wx_sn):
+        """
+        插入每一篇文章
+        """
+        try:
+            insert_sql = f"""
+                    INSERT INTO {ARTICLE_TABLE}
+                    (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5, status)
+                    values
+                    (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+            """
+            self.piaoquan_crawler_db_client.save(query=insert_sql, params=info_tuple)
+            log(
+                task="updatePublishedMsgDaily",
+                function="insert_each_msg",
+                message="插入文章数据成功",
+                data={
+                    "info": info_tuple
+                }
+            )
+        except Exception as e:
+            try:
+                update_sql = f"""
+                    UPDATE {ARTICLE_TABLE}
+                    SET show_view_count = %s, show_like_count=%s
+                    WHERE wx_sn = %s;
+                """
+                self.piaoquan_crawler_db_client.save(query=update_sql,
+                                                     params=(show_view_count, show_like_count, wx_sn))
+                log(
+                    task="updatePublishedMsgDaily",
+                    function="insert_each_msg",
+                    message="更新文章数据成功",
+                    data={
+                        "wxSn": wx_sn,
+                        "likeCount": show_like_count,
+                        "viewCount": show_view_count
+                    }
+
+                )
+            except Exception as e:
+                log(
+                    task="updatePublishedMsgDaily",
+                    function="insert_each_msg",
+                    message="更新文章失败, 报错原因是: {}".format(e),
+                    status="fail"
+                )
+
+    def update_account_by_spider(self, account_info: Dict, cursor=None):
+        """
+        更新每一个账号信息
+        :param account_info:
+        :param cursor:
+        :return: None
+        """
+        gh_id = account_info['ghId']
+        latest_update_time = self.get_account_info(gh_id)
+        response = spider.update_msg_list(ghId=gh_id, index=cursor)
+        if not response:
+            log(
+                task="updatePublishedMsgDaily",
+                function="update_account_by_spider",
+                status="fail",
+                message="账号更新请求爬虫接口失败",
+                data=account_info
+            )
+            return
+        msg_list = response.get("data", {}).get("data", [])
+        if msg_list:
+            # do
+            last_article_in_this_msg = msg_list[-1]
+            last_time_stamp_in_this_msg = last_article_in_this_msg['AppMsg']['BaseInfo']['UpdateTime']
+            last_url = last_article_in_this_msg['AppMsg']['DetailInfo'][0]['ContentUrl']
+            resdata = spider.get_account_by_url(last_url)
+            check_id = resdata['data'].get('data', {}).get('wx_gh')
+            if check_id == gh_id:
+                self.insert_each_msg(
+                    account_info=account_info,
+                    msg_list=msg_list
+                )
+                # if last_time_stamp_in_this_msg > latest_update_time:
+                #     next_cursor = response['data']['next_cursor']
+                #     return self.update_account_by_spider(
+                #         account_info=account_info,
+                #         cursor=next_cursor
+                #     )
+                log(
+                    task="updatePublishedMsgDaily",
+                    function="update_each_account",
+                    message="账号文章更新成功",
+                    data=response
+                )
+        else:
+            log(
+                task="updatePublishedMsgDaily",
+                function="update_each_account",
+                message="账号文章更新失败",
+                status="fail",
+                data=response
+            )
+            return
+
+    def update_account_by_aigc(self, account_info: Dict, run_date: str):
+        """
+        更新单个账号的文章
+        """
+        gh_id = account_info['ghId']
+        select_sql = f"""
+            SELECT trace_id, wx_sn, published_url, publish_timestamp, root_source_id_list, create_timestamp
+            FROM long_articles_published_trace_id
+            WHERE gh_id = '{gh_id}' AND publish_timestamp > UNIX_TIMESTAMP(DATE_SUB('{run_date}', INTERVAL 3 DAY)) AND delete_status = 0;
+        """
+        result = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
+        for article in result:
+            trace_id = article['trace_id']
+            wx_sn = article['wx_sn']
+            published_url = article['published_url']
+            publish_timestamp = article['publish_timestamp']
+            article_info = spider.get_article_text(content_link=published_url, is_cache=False, is_count=True)
+            response_code = article_info['code']
+            match response_code:
+                case const.ARTICLE_SUCCESS_CODE:
+                    response_data = article_info['data']['data']
+                    title = response_data['title']
+                    article_url = response_data['content_link']
+                    show_view_count = response_data['view_count']
+                    show_like_count = response_data['like_count']
+                    show_zs_count = 0
+                    show_pay_count = 0
+                    wx_sn = article_url.split("&sn=")[1].split("&")[0] if article_url else None
+                    app_msg_id = article_url.split("&mid=")[1].split("&")[0] if article_url else None
+                    status = account_info['using_status']
+                    info_tuple = (
+                        gh_id,
+                        account_info['name'],
+                        app_msg_id,
+                        title,
+                        "9",
+                        article['create_timestamp'],
+                        response_data['update_timestamp'],
+                        None,
+                        response_data['item_index'],
+                        response_data['content_link'],
+                        None,
+                        None,
+                        None,
+                        None,
+                        None,
+                        response_data.get("is_original", None),
+                        None,
+                        None,
+                        show_view_count,
+                        show_like_count,
+                        show_zs_count,
+                        show_pay_count,
+                        wx_sn,
+                        None,
+                        functions.str_to_md5(title),
+                        status
+                    )
+                    self.insert_each_article(
+                        info_tuple=info_tuple,
+                        show_view_count=show_view_count,
+                        show_like_count=show_like_count,
+                        wx_sn=wx_sn
+                    )
+
+                case const.ARTICLE_DELETE_CODE:
+                    log(
+                        task="updatePublishedMsgDaily",
+                        function="update_account_by_aigc",
+                        message="文章被删除",
+                        data={
+                            "ghId": gh_id,
+                            "publishedUrl": published_url
+                        }
+                    )
+
+                case const.ARTICLE_ILLEGAL_CODE:
+                    article_detail = self.get_article_info_by_trace_id(trace_id)
+                    if article_detail:
+                        error_detail = article_info.get("msg")
+                        insert_sql = f"""
+                                INSERT IGNORE INTO illegal_articles 
+                                (gh_id, account_name, title, wx_sn, publish_date, illegal_reason)
+                                VALUES 
+                                (%s, %s, %s, %s, %s, %s);
+                            """
+
+                        affected_rows = self.long_articles_db_client.save(
+                            query=insert_sql,
+                            params=(
+                                article_info['gh_id'],
+                                article_info['account_name'],
+                                article_info['article_title'],
+                                wx_sn,
+                                functions.timestamp_to_str(publish_timestamp),
+                                error_detail
+                            )
+                        )
+                        if affected_rows:
+                            bot(
+                                title="文章违规告警(new task)",
+                                detail={
+                                    "account_name": article_info['account_name'],
+                                    "gh_id": article_info['gh_id'],
+                                    "title": article_info['article_title'],
+                                    "wx_sn": wx_sn,
+                                    "publish_date": functions.timestamp_to_str(publish_timestamp),
+                                    "error_detail": error_detail,
+                                },
+                                mention=False
+                            )
+                            aiditApi.delete_articles(
+                                gh_id=article_info['gh_id'],
+                                title=article_info['article_title']
+                            )
+
+    def get_account_info(self, gh_id: str) -> int:
+        """
+        通过 gh_id查询账号信息的最新发布时间
+        :param gh_id:
+        :return:
+        """
+        sql = f"""
+            SELECT MAX(publish_timestamp)
+            FROM {ARTICLE_TABLE}
+            WHERE ghId = '{gh_id}';
+            """
+        result = self.piaoquan_crawler_db_client.fetch(sql)
+        if result:
+            return result[0][0]
+        else:
+            # 新号,抓取周期定位抓取时刻往前推30天
+            return int(time.time()) - const.NEW_ACCOUNT_CRAWL_PERIOD
+
+    def check_single_account(self, account_item: Dict) -> bool:
+        """
+        校验每个账号是否更新
+        :param account_item:
+        :return: True / False
+        """
+        gh_id = account_item['ghId']
+        account_type = account_item['account_type']
+        today_str = datetime.today().strftime("%Y-%m-%d")
+        today_date_time = datetime.strptime(today_str, "%Y-%m-%d")
+        today_timestamp = today_date_time.timestamp()
+        sql = f"""
+                SELECT max(updateTime)
+                FROM {ARTICLE_TABLE}
+                WHERE ghId = '{gh_id}';
+                """
+        try:
+            latest_update_time = self.piaoquan_crawler_db_client.fetch(sql)[0][0]
+            # 判断该账号当天发布的文章是否被收集
+            if account_type in const.SUBSCRIBE_TYPE_SET:
+                if int(latest_update_time) > int(today_timestamp):
+                    return True
+                else:
+                    return False
+            else:
+                if int(latest_update_time) > int(today_timestamp) - 7 * 24 * 3600:
+                    return True
+                else:
+                    return False
+        except Exception as e:
+            print(e)
+            return False
+
+    def process_single_account(self, account_info: Dict, run_date: str):
+        """
+        处理单个账号
+        """
+        gh_id = account_info['ghId']
+        # 判断该账号当天是否有自动群发且没有无限流发表
+        select_sql = f"""
+            SELECT push_type
+            FROM long_articles_published_trace_id
+            WHERE gh_id = '{gh_id}' AND publish_timestamp > UNIX_TIMESTAMP('{run_date}');
+        """
+        response = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
+        UNLIMITED_PUSH = 3
+        if response:
+            unlimited_push_list = [item for item in response if item['push_type'] == UNLIMITED_PUSH]
+            if unlimited_push_list:
+                self.update_account_by_spider(account_info=account_info)
+            else:
+                print("By AIGC", account_info)
+                self.update_account_by_aigc(account_info=account_info, run_date=run_date)
+        else:
+            self.update_account_by_spider(account_info=account_info)
+
+    def update_publish_timestamp(self, article_info: Dict):
+        """
+        更新发布时间戳 && minigram 信息
+        :param article_info:
+        :return:
+        """
+        url = article_info['ContentUrl']
+        wx_sn = article_info['wx_sn']
+        try:
+            response = spider.get_article_text(url)
+            response_code = response['code']
+
+            if response_code == const.ARTICLE_DELETE_CODE:
+                publish_timestamp_s = const.DELETE_STATUS
+                root_source_id_list = []
+            elif response_code == const.ARTICLE_ILLEGAL_CODE:
+                publish_timestamp_s = const.ILLEGAL_STATUS
+                root_source_id_list = []
+            elif response_code == const.ARTICLE_SUCCESS_CODE:
+                data = response['data']['data']
+                publish_timestamp_ms = data['publish_timestamp']
+                publish_timestamp_s = int(publish_timestamp_ms / 1000)
+                mini_program = data.get('mini_program', [])
+                if mini_program:
+                    root_source_id_list = [
+                        urllib.parse.parse_qs(
+                            urllib.parse.unquote(i['path'])
+                        )['rootSourceId'][0]
+                        for i in mini_program
+                    ]
+                else:
+                    root_source_id_list = []
+            else:
+                publish_timestamp_s = const.UNKNOWN_STATUS
+                root_source_id_list = []
+        except Exception as e:
+            publish_timestamp_s = const.REQUEST_FAIL_STATUS
+            root_source_id_list = None
+            error_msg = traceback.format_exc()
+            print(e, error_msg)
+
+        update_sql = f"""
+                UPDATE {ARTICLE_TABLE}
+                SET publish_timestamp = %s, root_source_id_list = %s
+                WHERE wx_sn = %s;
+            """
+        self.piaoquan_crawler_db_client.save(
+            query=update_sql,
+            params=(
+                publish_timestamp_s,
+                json.dumps(root_source_id_list, ensure_ascii=False),
+                wx_sn
+            ))
+        if publish_timestamp_s == const.REQUEST_FAIL_STATUS:
+            return article_info
+        else:
+            return None
+
+    def update_job(self, biz_date: str = None):
+        """
+        执行更新任务
+        """
+        account_list = self.get_account_list()
+        if not biz_date:
+            biz_date = datetime.today().strftime('%Y-%m-%d')
+
+        # 处理订阅号
+        subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
+        success_count = 0
+        fail_count = 0
+        for account in tqdm(subscription_accounts[:10]):
+            try:
+                self.process_single_account(account_info=account, run_date=biz_date)
+                success_count += 1
+                time.sleep(3)
+            except Exception as e:
+                fail_count += 1
+                log(
+                    task="updatePublishedMsgDaily",
+                    function="update_job",
+                    message="单个账号文章更新失败, 报错信息是: {}".format(e),
+                    status="fail",
+                )
+        log(
+            task="updatePublishedMsgDaily",
+            function="update_job",
+            message="订阅号更新完成",
+            data={
+                "success": success_count,
+                "fail": fail_count
+            }
+        )
+        if fail_count / (success_count + fail_count) > const.SUBSCRIBE_FAIL_RATE_THRESHOLD:
+            bot(
+                title="订阅号超过 {}% 的账号更新失败".format(int(const.SUBSCRIBE_FAIL_RATE_THRESHOLD * 100)),
+                detail={
+                    "success": success_count,
+                    "fail": fail_count,
+                    "failRate": fail_count / (success_count + fail_count)
+                }
+            )
+        bot(
+            title="更新每日发布文章任务完成通知(new)",
+            detail={
+                "msg": "订阅号更新完成",
+                "finish_time": datetime.today().__str__()
+            },
+            mention=False
+        )
+
+        # 服务号
+        server_accounts = [i for i in account_list if i['account_type'] == const.SERVICE_TYPE]
+        for account in tqdm(server_accounts):
+            try:
+                self.process_single_account(account_info=account, run_date=biz_date)
+                time.sleep(1)
+            except Exception as e:
+                print(e)
+        bot(
+            title="更新每日发布文章任务完成通知(new)",
+            detail={
+                "msg": "服务号更新完成",
+                "finish_time": datetime.today().__str__()
+            },
+            mention=False
+        )
+
+    def check_job(self, biz_date: str = None):
+        """
+        执行检查任务,check each account
+        """
+        if not biz_date:
+            biz_date = datetime.today().strftime('%Y-%m-%d')
+
+        account_list = self.get_account_list()
+        subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
+        fail_list = []
+        # check and rework if fail
+        for sub_item in tqdm(subscription_accounts[:10]):
+            res = self.check_single_account(sub_item)
+            if not res:
+                self.process_single_account(sub_item, biz_date)
+
+        # check whether success and bot if fails
+        for sub_item in tqdm(subscription_accounts[:10]):
+            res = self.check_single_account(sub_item)
+            if not res:
+                # 去掉三个不需要查看的字段
+                sub_item.pop('account_type', None)
+                sub_item.pop('account_auth', None)
+                sub_item.pop('account_id', None)
+                fail_list.append(sub_item)
+        if fail_list:
+            try:
+                bot(
+                    title="更新当天发布文章,存在未更新的账号(new)",
+                    detail={
+                        "columns": generate_bot_columns(),
+                        "rows": fail_list
+                    },
+                    table=True
+                )
+            except Exception as e:
+                print("Timeout Error: {}".format(e))
+        else:
+            bot(
+                title="更新当天发布文章,所有账号均更新成功(new)",
+                mention=False,
+                detail={
+                    "msg": "校验任务完成",
+                    "finish_time": datetime.today().__str__()
+                }
+            )
+
+    def get_article_detail_job(self):
+        """
+        获取发布文章详情
+        :return:
+        """
+        select_sql = f"""
+            SELECT ContentUrl, wx_sn 
+            FROM {ARTICLE_TABLE}
+            WHERE publish_timestamp in {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)};
+        """
+        article_list = self.piaoquan_crawler_db_client.fetch(select_sql, cursor_type=DictCursor)
+        for article in tqdm(article_list):
+            try:
+                self.update_publish_timestamp(article)
+            except Exception as e:
+                print(e)
+                error_msg = traceback.format_exc()
+                print(error_msg)
+        # check 一遍存在请求失败-1 && 0 的文章
+        select_sql = f"""
+                    SELECT ContentUrl, wx_sn 
+                    FROM {ARTICLE_TABLE}
+                    WHERE publish_timestamp in {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)};
+                """
+        process_failed_articles = self.piaoquan_crawler_db_client.fetch(select_sql, cursor_type=DictCursor)
+        fail_list = []
+        if process_failed_articles:
+            for article in tqdm(process_failed_articles):
+                try:
+                    res = self.update_publish_timestamp(article)
+                    fail_list.append(res)
+                except Exception as e:
+                    print(e)
+                    error_msg = traceback.format_exc()
+                    print(error_msg)
+
+        # 通过msgId 来修改publish_timestamp
+        update_sql = f"""
+            UPDATE {ARTICLE_TABLE} oav 
+            JOIN (
+                SELECT ghId, appMsgId, MAX(publish_timestamp) AS publish_timestamp 
+                FROM {ARTICLE_TABLE} 
+                WHERE publish_timestamp > %s 
+                GROUP BY ghId, appMsgId
+                ) vv
+                ON oav.appMsgId = vv.appMsgId and oav.ghId = vv.ghId
+            SET oav.publish_timestamp = vv.publish_timestamp
+            WHERE oav.publish_timestamp <= %s;
+        """
+        self.piaoquan_crawler_db_client.save(
+            query=update_sql,
+            params=(0, 0)
+        )
+
+        # 若还是无 publish_timestamp,用update_time当作 publish_timestamp
+        update_sql_2 = f"""
+            UPDATE {ARTICLE_TABLE}
+            SET publish_timestamp = updateTime
+            WHERE publish_timestamp < %s;
+        """
+        self.piaoquan_crawler_db_client.save(
+            query=update_sql_2,
+            params=0
+        )
+        if fail_list:
+            bot(
+                title="更新文章任务,请求detail失败",
+                detail=fail_list
+            )

+ 1 - 0
updatePublishedMsgDaily.py

@@ -686,6 +686,7 @@ def main():
         match run_task:
             case "update":
                 update_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
+                get_article_detail_job(db_client=piaoquan_crawler_db_client)
             case "check":
                 check_job(piaoquan_crawler_db_client=piaoquan_crawler_db_client, aigc_db_client=aigc_db_client)
             case "detail":

+ 34 - 0
update_published_articles_v2.py

@@ -0,0 +1,34 @@
+"""
+@author: luojunhui
+"""
+from argparse import ArgumentParser
+
+from tasks.update_published_articles_read_detail import UpdatePublishedArticlesReadDetail
+
+
+def main():
+    """
+    update mini program detail main
+    :return:
+    """
+    parser = ArgumentParser()
+    parser.add_argument("--run-date",
+                        help="Run only once for date in format of %Y-%m-%d. \
+                            If no specified, run as daily jobs.")
+    args = parser.parse_args()
+
+    update_publish_articles_task = UpdatePublishedArticlesReadDetail()
+    update_publish_articles_task.init_database()
+
+    if args.run_date:
+        update_publish_articles_task.update_job(args.run_date)
+        update_publish_articles_task.check_job(args.run_date)
+    else:
+        update_publish_articles_task.update_job()
+        update_publish_articles_task.check_job()
+
+    update_publish_articles_task.get_article_detail_job()
+
+
+if __name__ == '__main__':
+    main()