luojunhui 5 місяців тому
батько
коміт
1a2205d1f6

+ 54 - 1
applications/utils/common.py

@@ -60,6 +60,7 @@ def request_retry(retry_times, min_retry_delay, max_retry_delay):
     )
     return common_retry
 
+
 def yield_batch(data, batch_size):
     """
     生成批次数据
@@ -68,4 +69,56 @@ def yield_batch(data, batch_size):
     :return:
     """
     for i in range(0, len(data), batch_size):
-        yield data[i:i + batch_size]
+        yield data[i:i + batch_size]
+
+
+def show_desc_to_sta(show_desc):
+    """
+
+    :return:
+    """
+
+    def decode_show_v(show_v):
+        """
+
+        :param show_v:
+        :return:
+        """
+        foo = show_v.replace('千', 'e3').replace('万', 'e4').replace('亿', 'e8')
+        foo = eval(foo)
+        return int(foo)
+
+    def decode_show_k(show_k):
+        """
+
+        :param show_k:
+        :return:
+        """
+        this_dict = {
+            '阅读': 'show_view_count',  # 文章
+            '看过': 'show_view_count',  # 图文
+            '观看': 'show_view_count',  # 视频
+            '赞': 'show_like_count',
+            '付费': 'show_pay_count',
+            '赞赏': 'show_zs_count',
+        }
+        if show_k not in this_dict:
+            print(f'error from decode_show_k, show_k not found: {show_k}')
+        return this_dict.get(show_k, 'show_unknown')
+
+    show_desc = show_desc.replace('+', '')
+    sta = {}
+    for show_kv in show_desc.split('\u2004\u2005'):
+        if not show_kv:
+            continue
+        show_k, show_v = show_kv.split('\u2006')
+        k = decode_show_k(show_k)
+        v = decode_show_v(show_v)
+        sta[k] = v
+    res = {
+        'show_view_count': sta.get('show_view_count', 0),
+        'show_like_count': sta.get('show_like_count', 0),
+        'show_pay_count': sta.get('show_pay_count', 0),
+        'show_zs_count': sta.get('show_zs_count', 0),
+    }
+    return res

+ 384 - 0
tasks/data_tasks/update_published_articles_read_detail.py

@@ -0,0 +1,384 @@
+"""
+@author: luojunhui
+@desc: 更新文章的阅读详情
+"""
+
+import json
+import time
+import traceback
+import urllib.parse
+from datetime import datetime
+from typing import Dict, List
+
+from pymysql.cursors import DictCursor
+from tqdm import tqdm
+
+
+from applications import log
+from applications.api import FeishuBotApi
+from applications.db import DatabaseConnector
+from applications.utils import str_to_md5, show_desc_to_sta
+from cold_start.crawler.wechat import get_article_list_from_account
+from config import denet_config, long_articles_config, piaoquan_crawler_config
+
+
+class UpdatePublishedArticlesTaskConst:
+    """
+    更新已发布文章消息常量配置
+    """
+
+    SUCCESS_CODE = 0
+
+    # 爬虫详情接口返回code
+    ARTICLE_ILLEGAL_CODE = 25012
+    ARTICLE_DELETE_CODE = 25005
+    ARTICLE_UNKNOWN_CODE = 10000
+
+    # 账号违规状态码
+    ACCOUNT_ILLEGAL_CODE = 25013
+    UNKNOWN_SPIDER_ERROR_CODE = 20000
+
+    # 记录默认状态
+    DEFAULT_STATUS = 0
+    # 请求接口失败状态
+    REQUEST_FAIL_STATUS = -1
+    # 文章被删除状态
+    DELETE_STATUS = -2
+    # 未知原因无信息返回状态
+    UNKNOWN_STATUS = -3
+    # 文章违规状态
+    ILLEGAL_STATUS = -4
+
+    # 公众号类型(订阅号 or 服务号)
+    # 订阅号
+    SUBSCRIBE_TYPE_SET = {0, 1}
+    # 服务号
+    SERVICE_TYPE = 2
+    # 监测周期(秒)
+    MONITOR_PERIOD = 60 * 60 * 24 * 3
+
+    # 新号抓文章周期
+    NEW_ACCOUNT_CRAWL_PERIOD = 60 * 60 * 24 * 30
+
+    # 订阅号,抓取失败失败率报警阈值
+    TASK_FAIL_RATE_THRESHOLD = 0.3
+
+    # ARTICLE TABLE
+    ARTICLE_TABLE_NAME = "official_articles_v2"
+
+
+class UpdatePublishedArticlesTaskBase(UpdatePublishedArticlesTaskConst):
+
+    def __init__(self):
+        self.crawler_client = DatabaseConnector(piaoquan_crawler_config)
+        self.long_articles_client = DatabaseConnector(long_articles_config)
+        self.denet_client = DatabaseConnector(denet_config)
+        self.crawler_client.connect()
+        self.long_articles_client.connect()
+        self.denet_client.connect()
+        self.feishu_bot_api = FeishuBotApi()
+
+    def fetch_published_accounts(self) -> List[Dict]:
+        """
+        get published articles from aigc
+        """
+        fetch_query = f"""
+            SELECT DISTINCT
+                t3.`name` as account_name,
+                t3.gh_id,
+                t3.follower_count as fans,
+                t3.create_timestamp as account_init_timestamp,
+                t4.service_type_info as account_type,
+                t4.verify_type_info as account_auth,
+                t3.id as account_id,
+                group_concat(distinct t5.remark) as account_remark
+            FROM
+                publish_plan t1
+                JOIN publish_plan_account t2 ON t1.id = t2.plan_id
+                JOIN publish_account t3 ON t2.account_id = t3.id
+                LEFT JOIN publish_account_wx_type t4 on t3.id = t4.account_id
+                LEFT JOIN publish_account_remark t5 on t3.id = t5.publish_account_id
+            WHERE
+                t1.plan_status = 1
+                AND t3.channel = 5
+                GROUP BY t3.id;
+        """
+        fetch_response = self.denet_client.fetch(
+            query=fetch_query, cursor_type=DictCursor
+        )
+        account_list = [
+            i for i in fetch_response if "自动回复" not in str(i["account_remark"])
+        ]
+        return account_list
+
+    def fetch_account_experiment_status(self) -> Dict[str, str]:
+        fetch_query = f"""  
+            SELECT t1.account_id, t2.status
+            FROM wx_statistics_group_source_account t1
+            JOIN wx_statistics_group_source t2
+            ON t1.group_source_name = t2.account_source_name;
+        """
+        account_status_list = self.denet_client.fetch(
+            query=fetch_query, cursor_type=DictCursor
+        )
+        account_status_dict = {
+            account["account_id"]: account["status"] for account in account_status_list
+        }
+        return account_status_dict
+
+    def get_account_list(self) -> List[Dict]:
+        account_without_status = self.fetch_published_accounts()
+        account_status_dict = self.fetch_account_experiment_status()
+        account_list = [
+            {
+                **item,
+                "using_status": (
+                    0 if account_status_dict.get(item["account_id"]) == "实验" else 1
+                ),
+            }
+            for item in account_without_status
+        ]
+        return account_list
+
+    def fetch_account_max_publish_timestamp(self, gh_id: str) -> int:
+        # get max published timestamp for this account
+        fetch_query = f"""
+            SELECT MAX(publish_timestamp) AS max_publish_timestamp
+            FROM {self.ARTICLE_TABLE_NAME}
+            WHERE ghId = %s;
+        """
+        fetch_response = self.crawler_client.fetch(
+            fetch_query, cursor_type=DictCursor, params=(gh_id,)
+        )
+        if fetch_response:
+            max_publish_timestamp = fetch_response[0]["max_publish_timestamp"]
+        else:
+            max_publish_timestamp = int(time.time()) - self.NEW_ACCOUNT_CRAWL_PERIOD
+        return max_publish_timestamp
+
+    def crawl_account_published_articles(self, account: Dict[str, str]):
+        # max_publish_timestamp = self.fetch_account_max_publish_timestamp(
+        #     account["gh_id"]
+        # )
+        cursor = None
+        while True:
+            crawl_response = get_article_list_from_account(
+                account["gh_id"], index=cursor
+            )
+            crawl_response_code = crawl_response["code"]
+            match crawl_response_code:
+
+                # 请求成功
+                case self.SUCCESS_CODE:
+                    print("success")
+                    break
+                    # msg_list = crawl_response.get("data", {}).get("data", [])
+                    # if not msg_list:
+                    #     break
+                    #
+                    # self.record_each_msg(account, msg_list)
+                    # earliest_msg = msg_list[-1]
+                    # earliest_update_timestamp = earliest_msg["AppMsg"]["BaseInfo"][
+                    #     "UpdateTime"
+                    # ]
+                    # if earliest_update_timestamp > max_publish_timestamp:
+                    #     cursor = crawl_response["data"]["next_cursor"]
+                    # else:
+                    #     break
+
+                # 账号违规
+                case self.ACCOUNT_ILLEGAL_CODE:
+                    log(
+                        task="update_published_articles",
+                        function="crawl_account_published_articles",
+                        message="账号违规",
+                        data=account,
+                    )
+                    self.feishu_bot_api.bot(
+                        title="公众号账号违规报警",
+                        detail={
+                            "账号名称": account["account_name"],
+                            "账号ID": account["gh_id"],
+                            "违规原因": crawl_response["msg"],
+                            "粉丝数": account["fans"],
+                            "利用状态": account["using_status"]
+                        },
+                        env="dev",
+                        mention=False
+                    )
+                    break
+
+                case self.UNKNOWN_SPIDER_ERROR_CODE:
+                    log(
+                        task="update_published_articles",
+                        function="crawl_account_published_articles",
+                        message=f"未知错误",
+                        data=account,
+                    )
+                    self.feishu_bot_api.bot(
+                        title="接口请求失败报警",
+                        detail={
+                            "账号名称": account["account_name"],
+                            "账号ID": account["gh_id"],
+                            "违规原因": crawl_response["msg"],
+                            "粉丝数": account["fans"],
+                            "利用状态": account["using_status"]
+                        },
+                        env="dev",
+                        mention=False
+                    )
+                    break
+
+                # 其他 code
+                case _:
+                    print("unknown code:", crawl_response_code)
+                    break
+
+    def record_each_msg(self, account, msg_list):
+        for msg in msg_list:
+            base_info = msg.get("BaseInfo", {})
+            app_msg_id = msg.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
+            create_timestamp = (
+                msg.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
+            )
+            update_timestamp = (
+                msg.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
+            )
+            publish_type = msg.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
+            detail_article_list = msg.get("AppMsg", {}).get("DetailInfo", [])
+            if detail_article_list:
+                for article in detail_article_list:
+                    title = article.get("Title", None)
+                    digest = article.get("Digest", None)
+                    item_index = article.get("ItemIndex", None)
+                    content_url = article.get("ContentUrl", None)
+                    source_url = article.get("SourceUrl", None)
+                    cover_img_url = article.get("CoverImgUrl", None)
+                    cover_img_url_1_1 = article.get("CoverImgUrl_1_1", None)
+                    cover_img_url_235_1 = article.get("CoverImgUrl_235_1", None)
+                    item_show_type = article.get("ItemShowType", None)
+                    is_original = article.get("IsOriginal", None)
+                    show_desc = article.get("ShowDesc", None)
+                    show_stat = show_desc_to_sta(show_desc)
+                    ori_content = article.get("ori_content", None)
+                    show_view_count = show_stat.get("show_view_count", 0)
+                    show_like_count = show_stat.get("show_like_count", 0)
+                    show_zs_count = show_stat.get("show_zs_count", 0)
+                    show_pay_count = show_stat.get("show_pay_count", 0)
+                    wx_sn = (
+                        content_url.split("&sn=")[1].split("&")[0]
+                        if content_url
+                        else None
+                    )
+                    status = account["using_status"]
+                    info_tuple = (
+                        account["gh_id"],
+                        account["account_name"],
+                        app_msg_id,
+                        title,
+                        publish_type,
+                        create_timestamp,
+                        update_timestamp,
+                        digest,
+                        item_index,
+                        content_url,
+                        source_url,
+                        cover_img_url,
+                        cover_img_url_1_1,
+                        cover_img_url_235_1,
+                        item_show_type,
+                        is_original,
+                        show_desc,
+                        ori_content,
+                        show_view_count,
+                        show_like_count,
+                        show_zs_count,
+                        show_pay_count,
+                        wx_sn,
+                        json.dumps(base_info, ensure_ascii=False),
+                        str_to_md5(title),
+                        status,
+                    )
+                    try:
+                        insert_sql = f"""
+                            INSERT INTO {self.ARTICLE_TABLE_NAME}
+                            (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5, status)
+                            VALUES
+                            (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+                        """
+                        self.crawler_client.save(query=insert_sql, params=info_tuple)
+                        log(
+                            task="updatePublishedMsgDaily",
+                            function="insert_each_msg",
+                            message="插入文章数据成功",
+                            data={"info": info_tuple},
+                        )
+                    except Exception as e:
+                        try:
+                            update_sql = f"""
+                                UPDATE {self.ARTICLE_TABLE_NAME}
+                                SET show_view_count = %s, show_like_count=%s
+                                WHERE wx_sn = %s;
+                            """
+                            self.crawler_client.save(
+                                query=update_sql,
+                                params=(show_view_count, show_like_count, wx_sn),
+                            )
+                            log(
+                                task="updatePublishedMsgDaily",
+                                function="insert_each_msg",
+                                message="更新文章数据成功",
+                                data={
+                                    "wxSn": wx_sn,
+                                    "likeCount": show_like_count,
+                                    "viewCount": show_view_count,
+                                },
+                            )
+                        except Exception as e:
+                            log(
+                                task="updatePublishedMsgDaily",
+                                function="insert_each_msg",
+                                message="更新文章失败, 报错原因是: {}".format(e),
+                                status="fail",
+                            )
+                            continue
+
+
+class UpdatePublishedArticlesTaskCollector(UpdatePublishedArticlesTaskBase):
+
+    def deal(self):
+        account_list = self.get_account_list()
+        for account in tqdm(account_list, desc="抓取每个账号的文章信息"):
+            try:
+                self.crawl_account_published_articles(account)
+
+            except Exception as e:
+                log(
+                    task="update_published_articles_collector",
+                    function="crawl_account_published_articles",
+                    message=f"抓取账号文章信息失败, 报错原因是: {e}",
+                    status="fail",
+                    data=account
+
+                )
+
+        # self.feishu_bot_api.bot(
+        #     title='更新每日发布文章任务完成通知',
+        #     detail={
+        #         "msg": "账号更新完成",
+        #         "finish_time": datetime.today().__str__()
+        #     },
+        #     mention=False
+        # )
+
+
+
+class UpdatePublishedArticlesTaskChecker(UpdatePublishedArticlesTaskBase):
+
+    def deal(self):
+        pass
+
+
+class UpdatePublishedArticlesTaskArticlesMonitor(UpdatePublishedArticlesTaskBase):
+    pass
+

+ 0 - 742
tasks/update_published_articles_read_detail.py

@@ -1,742 +0,0 @@
-"""
-@author: luojunhui
-@desc: 更新文章的阅读详情
-"""
-import json
-import time
-import traceback
-import urllib.parse
-from datetime import datetime
-from typing import Dict, List
-
-from pymysql.cursors import DictCursor
-from tqdm import tqdm
-
-from applications import aiditApi
-from applications import bot
-from applications import create_feishu_columns_sheet
-from applications import Functions
-from applications import log
-from applications import WeixinSpider
-from applications.const import updatePublishedMsgTaskConst
-from applications.db import DatabaseConnector
-from config import denet_config, long_articles_config, piaoquan_crawler_config
-
-ARTICLE_TABLE = "official_articles"
-const = updatePublishedMsgTaskConst()
-spider = WeixinSpider()
-functions = Functions()
-empty_dict = {}
-
-
-def generate_bot_columns():
-    """
-    生成列
-    :return:
-    """
-    columns = [
-        create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="name", display_name="公众号名称"),
-        create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="ghId", display_name="ghId"),
-        create_feishu_columns_sheet(sheet_type="number", sheet_name="follower_count", display_name="粉丝数"),
-        create_feishu_columns_sheet(sheet_type="date", sheet_name="account_init_timestamp",
-                                    display_name="账号接入系统时间"),
-        create_feishu_columns_sheet(sheet_type="plain_text", sheet_name="using_status", display_name="利用状态")
-    ]
-    return columns
-
-
-class UpdatePublishedArticlesReadDetail(object):
-    """
-    更新每日发布文章的阅读详情
-    """
-
-    def __init__(self):
-        self.aigc_db_client = None
-        self.piaoquan_crawler_db_client = None
-        self.long_articles_db_client = None
-
-    def get_account_list(self) -> List[Dict]:
-        """
-        从 aigc 数据库中获取目前处于发布状态的账号
-        :return:
-        "name": line[0],
-        "ghId": line[1],
-        "follower_count": line[2],
-        "account_init_time": int(line[3] / 1000),
-        "account_type": line[4], # 订阅号 or 服务号
-        "account_auth": line[5]
-        """
-
-        def get_account_status() -> Dict:
-            """
-            获取账号的实验状态
-            :return:
-            """
-            sql = f"""  
-                SELECT t1.account_id, t2.status
-                FROM wx_statistics_group_source_account t1
-                JOIN wx_statistics_group_source t2
-                ON t1.group_source_name = t2.account_source_name;
-            """
-            account_status_list = self.aigc_db_client.fetch(sql, cursor_type=DictCursor)
-            account_status = {account['account_id']: account['status'] for account in account_status_list}
-            return account_status
-
-        account_list_with_out_using_status = aiditApi.get_publish_account_from_aigc()
-        account_status_dict = get_account_status()
-        account_list = [
-            {
-                **item,
-                'using_status': 0 if account_status_dict.get(item['account_id']) == '实验' else 1
-            }
-            for item in account_list_with_out_using_status
-        ]
-        return account_list
-
-    def get_article_info_by_trace_id(self, trace_id: str) -> Dict:
-        """
-        通过trace_id来查询文章信息
-        """
-        select_sql = f"""
-            SELECT t1.gh_id, t1.account_name, t2.article_title
-            FROM long_articles_match_videos t1
-            JOIN long_articles_text t2
-            ON t1.content_id = t2.content_id
-            WHERE t1.trace_id = '{trace_id}';
-        """
-        article_info = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
-        if article_info:
-            return article_info[0]
-        else:
-            return empty_dict
-
-    def init_database(self):
-        """
-        初始化数据库连接
-        """
-        # 初始化数据库连接
-        try:
-            self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
-            self.piaoquan_crawler_db_client.connect()
-            self.aigc_db_client = DatabaseConnector(denet_config)
-            self.aigc_db_client.connect()
-            self.long_articles_db_client = DatabaseConnector(long_articles_config)
-            self.long_articles_db_client.connect()
-        except Exception as e:
-            error_msg = traceback.format_exc()
-            bot(
-                title="更新文章任务连接数据库失败",
-                detail={
-                    "error": e,
-                    "msg": error_msg
-                }
-            )
-            return
-
-    def insert_each_msg(self, account_info: Dict, msg_list: List[Dict]) -> None:
-        """
-        把消息数据更新到数据库中
-        :param account_info:
-        :param msg_list:
-        :return:
-        """
-        gh_id = account_info['ghId']
-        account_name = account_info['name']
-        for info in msg_list:
-            baseInfo = info.get("BaseInfo", {})
-            appMsgId = info.get("AppMsg", {}).get("BaseInfo", {}).get("AppMsgId", None)
-            createTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("CreateTime", None)
-            updateTime = info.get("AppMsg", {}).get("BaseInfo", {}).get("UpdateTime", None)
-            Type = info.get("AppMsg", {}).get("BaseInfo", {}).get("Type", None)
-            detail_article_list = info.get("AppMsg", {}).get("DetailInfo", [])
-            if detail_article_list:
-                for article in detail_article_list:
-                    title = article.get("Title", None)
-                    Digest = article.get("Digest", None)
-                    ItemIndex = article.get("ItemIndex", None)
-                    ContentUrl = article.get("ContentUrl", None)
-                    SourceUrl = article.get("SourceUrl", None)
-                    CoverImgUrl = article.get("CoverImgUrl", None)
-                    CoverImgUrl_1_1 = article.get("CoverImgUrl_1_1", None)
-                    CoverImgUrl_235_1 = article.get("CoverImgUrl_235_1", None)
-                    ItemShowType = article.get("ItemShowType", None)
-                    IsOriginal = article.get("IsOriginal", None)
-                    ShowDesc = article.get("ShowDesc", None)
-                    show_stat = functions.show_desc_to_sta(ShowDesc)
-                    ori_content = article.get("ori_content", None)
-                    show_view_count = show_stat.get("show_view_count", 0)
-                    show_like_count = show_stat.get("show_like_count", 0)
-                    show_zs_count = show_stat.get("show_zs_count", 0)
-                    show_pay_count = show_stat.get("show_pay_count", 0)
-                    wx_sn = ContentUrl.split("&sn=")[1].split("&")[0] if ContentUrl else None
-                    status = account_info['using_status']
-                    info_tuple = (
-                        gh_id,
-                        account_name,
-                        appMsgId,
-                        title,
-                        Type,
-                        createTime,
-                        updateTime,
-                        Digest,
-                        ItemIndex,
-                        ContentUrl,
-                        SourceUrl,
-                        CoverImgUrl,
-                        CoverImgUrl_1_1,
-                        CoverImgUrl_235_1,
-                        ItemShowType,
-                        IsOriginal,
-                        ShowDesc,
-                        ori_content,
-                        show_view_count,
-                        show_like_count,
-                        show_zs_count,
-                        show_pay_count,
-                        wx_sn,
-                        json.dumps(baseInfo, ensure_ascii=False),
-                        functions.str_to_md5(title),
-                        status
-                    )
-                    self.insert_each_article(
-                        info_tuple=info_tuple,
-                        show_view_count=show_view_count,
-                        show_like_count=show_like_count,
-                        wx_sn=wx_sn
-                    )
-
-    def insert_each_article(self, info_tuple, show_view_count, show_like_count, wx_sn):
-        """
-        插入每一篇文章
-        """
-        try:
-            insert_sql = f"""
-                    INSERT INTO {ARTICLE_TABLE}
-                    (ghId, accountName, appMsgId, title, Type, createTime, updateTime, Digest, ItemIndex, ContentUrl, SourceUrl, CoverImgUrl, CoverImgUrl_1_1, CoverImgUrl_255_1, ItemShowType, IsOriginal, ShowDesc, ori_content, show_view_count, show_like_count, show_zs_count, show_pay_count, wx_sn, baseInfo, title_md5, status)
-                    values
-                    (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
-            """
-            self.piaoquan_crawler_db_client.save(query=insert_sql, params=info_tuple)
-            log(
-                task="updatePublishedMsgDaily",
-                function="insert_each_msg",
-                message="插入文章数据成功",
-                data={
-                    "info": info_tuple
-                }
-            )
-        except Exception as e:
-            try:
-                update_sql = f"""
-                    UPDATE {ARTICLE_TABLE}
-                    SET show_view_count = %s, show_like_count=%s
-                    WHERE wx_sn = %s;
-                """
-                self.piaoquan_crawler_db_client.save(query=update_sql,
-                                                     params=(show_view_count, show_like_count, wx_sn))
-                log(
-                    task="updatePublishedMsgDaily",
-                    function="insert_each_msg",
-                    message="更新文章数据成功",
-                    data={
-                        "wxSn": wx_sn,
-                        "likeCount": show_like_count,
-                        "viewCount": show_view_count
-                    }
-
-                )
-            except Exception as e:
-                log(
-                    task="updatePublishedMsgDaily",
-                    function="insert_each_msg",
-                    message="更新文章失败, 报错原因是: {}".format(e),
-                    status="fail"
-                )
-
-    def update_account_by_spider(self, account_info: Dict, cursor=None):
-        """
-        更新每一个账号信息
-        :param account_info:
-        :param cursor:
-        :return: None
-        """
-        gh_id = account_info['ghId']
-        latest_update_time = self.get_account_info(gh_id)
-        response = spider.update_msg_list(ghId=gh_id, index=cursor)
-        if not response:
-            log(
-                task="updatePublishedMsgDaily",
-                function="update_account_by_spider",
-                status="fail",
-                message="账号更新请求爬虫接口失败",
-                data=account_info
-            )
-            return
-        msg_list = response.get("data", {}).get("data", [])
-        if msg_list:
-            # do
-            last_article_in_this_msg = msg_list[-1]
-            last_time_stamp_in_this_msg = last_article_in_this_msg['AppMsg']['BaseInfo']['UpdateTime']
-            last_url = last_article_in_this_msg['AppMsg']['DetailInfo'][0]['ContentUrl']
-            resdata = spider.get_account_by_url(last_url)
-            check_id = resdata['data'].get('data', {}).get('wx_gh')
-            if check_id == gh_id:
-                self.insert_each_msg(
-                    account_info=account_info,
-                    msg_list=msg_list
-                )
-                # if last_time_stamp_in_this_msg > latest_update_time:
-                #     next_cursor = response['data']['next_cursor']
-                #     return self.update_account_by_spider(
-                #         account_info=account_info,
-                #         cursor=next_cursor
-                #     )
-                log(
-                    task="updatePublishedMsgDaily",
-                    function="update_each_account",
-                    message="账号文章更新成功",
-                    data=response
-                )
-        else:
-            log(
-                task="updatePublishedMsgDaily",
-                function="update_each_account",
-                message="账号文章更新失败",
-                status="fail",
-                data=response
-            )
-            return
-
-    def update_account_by_aigc(self, account_info: Dict, run_date: str):
-        """
-        更新单个账号的文章
-        """
-        gh_id = account_info['ghId']
-        select_sql = f"""
-            SELECT trace_id, wx_sn, published_url, publish_timestamp, root_source_id_list, create_timestamp
-            FROM long_articles_published_trace_id
-            WHERE gh_id = '{gh_id}' AND publish_timestamp > UNIX_TIMESTAMP(DATE_SUB('{run_date}', INTERVAL 3 DAY)) AND delete_status = 0;
-        """
-        result = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
-        for article in result:
-            trace_id = article['trace_id']
-            wx_sn = article['wx_sn']
-            published_url = article['published_url']
-            publish_timestamp = article['publish_timestamp']
-            article_info = spider.get_article_text(content_link=published_url, is_cache=False, is_count=True)
-            response_code = article_info['code']
-            match response_code:
-                case const.ARTICLE_SUCCESS_CODE:
-                    response_data = article_info['data']['data']
-                    title = response_data['title']
-                    article_url = response_data['content_link']
-                    show_view_count = response_data['view_count']
-                    show_like_count = response_data['like_count']
-                    show_zs_count = 0
-                    show_pay_count = 0
-                    wx_sn = article_url.split("&sn=")[1].split("&")[0] if article_url else None
-                    app_msg_id = article_url.split("&mid=")[1].split("&")[0] if article_url else None
-                    status = account_info['using_status']
-                    info_tuple = (
-                        gh_id,
-                        account_info['name'],
-                        app_msg_id,
-                        title,
-                        "9",
-                        article['create_timestamp'],
-                        response_data['update_timestamp'],
-                        None,
-                        response_data['item_index'],
-                        response_data['content_link'],
-                        None,
-                        None,
-                        None,
-                        None,
-                        None,
-                        response_data.get("is_original", None),
-                        None,
-                        None,
-                        show_view_count,
-                        show_like_count,
-                        show_zs_count,
-                        show_pay_count,
-                        wx_sn,
-                        None,
-                        functions.str_to_md5(title),
-                        status
-                    )
-                    self.insert_each_article(
-                        info_tuple=info_tuple,
-                        show_view_count=show_view_count,
-                        show_like_count=show_like_count,
-                        wx_sn=wx_sn
-                    )
-
-                case const.ARTICLE_DELETE_CODE:
-                    log(
-                        task="updatePublishedMsgDaily",
-                        function="update_account_by_aigc",
-                        message="文章被删除",
-                        data={
-                            "ghId": gh_id,
-                            "publishedUrl": published_url
-                        }
-                    )
-
-                case const.ARTICLE_ILLEGAL_CODE:
-                    article_detail = self.get_article_info_by_trace_id(trace_id)
-                    if article_detail:
-                        error_detail = article_info.get("msg")
-                        insert_sql = f"""
-                                INSERT IGNORE INTO illegal_articles 
-                                (gh_id, account_name, title, wx_sn, publish_date, illegal_reason)
-                                VALUES 
-                                (%s, %s, %s, %s, %s, %s);
-                            """
-
-                        affected_rows = self.long_articles_db_client.save(
-                            query=insert_sql,
-                            params=(
-                                article_info['gh_id'],
-                                article_info['account_name'],
-                                article_info['article_title'],
-                                wx_sn,
-                                functions.timestamp_to_str(publish_timestamp),
-                                error_detail
-                            )
-                        )
-                        if affected_rows:
-                            bot(
-                                title="文章违规告警(new task)",
-                                detail={
-                                    "account_name": article_info['account_name'],
-                                    "gh_id": article_info['gh_id'],
-                                    "title": article_info['article_title'],
-                                    "wx_sn": wx_sn,
-                                    "publish_date": functions.timestamp_to_str(publish_timestamp),
-                                    "error_detail": error_detail,
-                                },
-                                mention=False
-                            )
-                            aiditApi.delete_articles(
-                                gh_id=article_info['gh_id'],
-                                title=article_info['article_title']
-                            )
-
-    def get_account_info(self, gh_id: str) -> int:
-        """
-        通过 gh_id查询账号信息的最新发布时间
-        :param gh_id:
-        :return:
-        """
-        sql = f"""
-            SELECT MAX(publish_timestamp)
-            FROM {ARTICLE_TABLE}
-            WHERE ghId = '{gh_id}';
-            """
-        result = self.piaoquan_crawler_db_client.fetch(sql)
-        if result:
-            return result[0][0]
-        else:
-            # 新号,抓取周期定位抓取时刻往前推30天
-            return int(time.time()) - const.NEW_ACCOUNT_CRAWL_PERIOD
-
-    def check_single_account(self, account_item: Dict) -> bool:
-        """
-        校验每个账号是否更新
-        :param account_item:
-        :return: True / False
-        """
-        gh_id = account_item['ghId']
-        account_type = account_item['account_type']
-        today_str = datetime.today().strftime("%Y-%m-%d")
-        today_date_time = datetime.strptime(today_str, "%Y-%m-%d")
-        today_timestamp = today_date_time.timestamp()
-        sql = f"""
-                SELECT max(updateTime)
-                FROM {ARTICLE_TABLE}
-                WHERE ghId = '{gh_id}';
-                """
-        try:
-            latest_update_time = self.piaoquan_crawler_db_client.fetch(sql)[0][0]
-            # 判断该账号当天发布的文章是否被收集
-            if account_type in const.SUBSCRIBE_TYPE_SET:
-                if int(latest_update_time) > int(today_timestamp):
-                    return True
-                else:
-                    return False
-            else:
-                if int(latest_update_time) > int(today_timestamp) - 7 * 24 * 3600:
-                    return True
-                else:
-                    return False
-        except Exception as e:
-            print(e)
-            return False
-
-    def process_single_account(self, account_info: Dict, run_date: str):
-        """
-        处理单个账号
-        """
-        gh_id = account_info['ghId']
-        # 判断该账号当天是否有自动群发且没有无限流发表
-        select_sql = f"""
-            SELECT push_type
-            FROM long_articles_published_trace_id
-            WHERE gh_id = '{gh_id}' AND publish_timestamp > UNIX_TIMESTAMP('{run_date}');
-        """
-        response = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
-        UNLIMITED_PUSH = 3
-        if response:
-            unlimited_push_list = [item for item in response if item['push_type'] == UNLIMITED_PUSH]
-            if unlimited_push_list:
-                self.update_account_by_spider(account_info=account_info)
-            else:
-                print("By AIGC", account_info)
-                self.update_account_by_aigc(account_info=account_info, run_date=run_date)
-        else:
-            self.update_account_by_spider(account_info=account_info)
-
-    def update_publish_timestamp(self, article_info: Dict):
-        """
-        更新发布时间戳 && minigram 信息
-        :param article_info:
-        :return:
-        """
-        url = article_info['ContentUrl']
-        wx_sn = article_info['wx_sn']
-        try:
-            response = spider.get_article_text(url)
-            response_code = response['code']
-
-            if response_code == const.ARTICLE_DELETE_CODE:
-                publish_timestamp_s = const.DELETE_STATUS
-                root_source_id_list = []
-            elif response_code == const.ARTICLE_ILLEGAL_CODE:
-                publish_timestamp_s = const.ILLEGAL_STATUS
-                root_source_id_list = []
-            elif response_code == const.ARTICLE_SUCCESS_CODE:
-                data = response['data']['data']
-                publish_timestamp_ms = data['publish_timestamp']
-                publish_timestamp_s = int(publish_timestamp_ms / 1000)
-                mini_program = data.get('mini_program', [])
-                if mini_program:
-                    root_source_id_list = [
-                        urllib.parse.parse_qs(
-                            urllib.parse.unquote(i['path'])
-                        )['rootSourceId'][0]
-                        for i in mini_program
-                    ]
-                else:
-                    root_source_id_list = []
-            else:
-                publish_timestamp_s = const.UNKNOWN_STATUS
-                root_source_id_list = []
-        except Exception as e:
-            publish_timestamp_s = const.REQUEST_FAIL_STATUS
-            root_source_id_list = None
-            error_msg = traceback.format_exc()
-            print(e, error_msg)
-
-        update_sql = f"""
-                UPDATE {ARTICLE_TABLE}
-                SET publish_timestamp = %s, root_source_id_list = %s
-                WHERE wx_sn = %s;
-            """
-        self.piaoquan_crawler_db_client.save(
-            query=update_sql,
-            params=(
-                publish_timestamp_s,
-                json.dumps(root_source_id_list, ensure_ascii=False),
-                wx_sn
-            ))
-        if publish_timestamp_s == const.REQUEST_FAIL_STATUS:
-            return article_info
-        else:
-            return None
-
-    def update_job(self, biz_date: str = None):
-        """
-        执行更新任务
-        """
-        account_list = self.get_account_list()
-        if not biz_date:
-            biz_date = datetime.today().strftime('%Y-%m-%d')
-
-        # 处理订阅号
-        subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
-        success_count = 0
-        fail_count = 0
-        for account in tqdm(subscription_accounts):
-            try:
-                self.process_single_account(account_info=account, run_date=biz_date)
-                success_count += 1
-                time.sleep(3)
-            except Exception as e:
-                fail_count += 1
-                log(
-                    task="updatePublishedMsgDaily",
-                    function="update_job",
-                    message="单个账号文章更新失败, 报错信息是: {}".format(e),
-                    status="fail",
-                )
-        log(
-            task="updatePublishedMsgDaily",
-            function="update_job",
-            message="订阅号更新完成",
-            data={
-                "success": success_count,
-                "fail": fail_count
-            }
-        )
-        if fail_count / (success_count + fail_count) > const.SUBSCRIBE_FAIL_RATE_THRESHOLD:
-            bot(
-                title="订阅号超过 {}% 的账号更新失败".format(int(const.SUBSCRIBE_FAIL_RATE_THRESHOLD * 100)),
-                detail={
-                    "success": success_count,
-                    "fail": fail_count,
-                    "failRate": fail_count / (success_count + fail_count)
-                }
-            )
-        bot(
-            title="更新每日发布文章任务完成通知(new)",
-            detail={
-                "msg": "订阅号更新完成",
-                "finish_time": datetime.today().__str__()
-            },
-            mention=False
-        )
-
-        # 服务号
-        server_accounts = [i for i in account_list if i['account_type'] == const.SERVICE_TYPE]
-        for account in tqdm(server_accounts):
-            try:
-                self.process_single_account(account_info=account, run_date=biz_date)
-                time.sleep(1)
-            except Exception as e:
-                print(e)
-        bot(
-            title="更新每日发布文章任务完成通知(new)",
-            detail={
-                "msg": "服务号更新完成",
-                "finish_time": datetime.today().__str__()
-            },
-            mention=False
-        )
-
-    def check_job(self, biz_date: str = None):
-        """
-        执行检查任务,check each account
-        """
-        if not biz_date:
-            biz_date = datetime.today().strftime('%Y-%m-%d')
-
-        account_list = self.get_account_list()
-        subscription_accounts = [i for i in account_list if i['account_type'] in const.SUBSCRIBE_TYPE_SET]
-        fail_list = []
-        # check and rework if fail
-        for sub_item in tqdm(subscription_accounts):
-            res = self.check_single_account(sub_item)
-            if not res:
-                self.process_single_account(sub_item, biz_date)
-
-        # check whether success and bot if fails
-        for sub_item in tqdm(subscription_accounts):
-            res = self.check_single_account(sub_item)
-            if not res:
-                # 去掉三个不需要查看的字段
-                sub_item.pop('account_type', None)
-                sub_item.pop('account_auth', None)
-                sub_item.pop('account_id', None)
-                fail_list.append(sub_item)
-        if fail_list:
-            try:
-                bot(
-                    title="更新当天发布文章,存在未更新的账号(new)",
-                    detail={
-                        "columns": generate_bot_columns(),
-                        "rows": fail_list
-                    },
-                    table=True
-                )
-            except Exception as e:
-                print("Timeout Error: {}".format(e))
-        else:
-            bot(
-                title="更新当天发布文章,所有账号均更新成功(new)",
-                mention=False,
-                detail={
-                    "msg": "校验任务完成",
-                    "finish_time": datetime.today().__str__()
-                }
-            )
-
-    def get_article_detail_job(self):
-        """
-        获取发布文章详情
-        :return:
-        """
-        select_sql = f"""
-            SELECT ContentUrl, wx_sn 
-            FROM {ARTICLE_TABLE}
-            WHERE publish_timestamp in {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)};
-        """
-        article_list = self.piaoquan_crawler_db_client.fetch(select_sql, cursor_type=DictCursor)
-        for article in tqdm(article_list):
-            try:
-                self.update_publish_timestamp(article)
-            except Exception as e:
-                print(e)
-                error_msg = traceback.format_exc()
-                print(error_msg)
-        # check 一遍存在请求失败-1 && 0 的文章
-        select_sql = f"""
-                    SELECT ContentUrl, wx_sn 
-                    FROM {ARTICLE_TABLE}
-                    WHERE publish_timestamp in {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)};
-                """
-        process_failed_articles = self.piaoquan_crawler_db_client.fetch(select_sql, cursor_type=DictCursor)
-        fail_list = []
-        if process_failed_articles:
-            for article in tqdm(process_failed_articles):
-                try:
-                    res = self.update_publish_timestamp(article)
-                    fail_list.append(res)
-                except Exception as e:
-                    print(e)
-                    error_msg = traceback.format_exc()
-                    print(error_msg)
-
-        # 通过msgId 来修改publish_timestamp
-        update_sql = f"""
-            UPDATE {ARTICLE_TABLE} oav 
-            JOIN (
-                SELECT ghId, appMsgId, MAX(publish_timestamp) AS publish_timestamp 
-                FROM {ARTICLE_TABLE} 
-                WHERE publish_timestamp > %s 
-                GROUP BY ghId, appMsgId
-                ) vv
-                ON oav.appMsgId = vv.appMsgId and oav.ghId = vv.ghId
-            SET oav.publish_timestamp = vv.publish_timestamp
-            WHERE oav.publish_timestamp <= %s;
-        """
-        self.piaoquan_crawler_db_client.save(
-            query=update_sql,
-            params=(0, 0)
-        )
-
-        # 若还是无 publish_timestamp,用update_time当作 publish_timestamp
-        update_sql_2 = f"""
-            UPDATE {ARTICLE_TABLE}
-            SET publish_timestamp = updateTime
-            WHERE publish_timestamp < %s;
-        """
-        self.piaoquan_crawler_db_client.save(
-            query=update_sql_2,
-            params=0
-        )
-        if fail_list:
-            bot(
-                title="更新文章任务,请求detail失败",
-                detail=fail_list
-            )

+ 6 - 0
temp_task.py

@@ -0,0 +1,6 @@
+from tasks.data_tasks.update_published_articles_read_detail import UpdatePublishedArticlesTaskCollector
+
+
+if __name__ == '__main__':
+    collector = UpdatePublishedArticlesTaskCollector()
+    collector.deal()