Ver Fonte

Merge branch '2024-12-24-luojunhui-update-minigram-info-v2' of luojunhui/LongArticlesJob into master

luojunhui há 6 meses atrás
pai
commit
d310da6226

+ 6 - 0
applications/const.py

@@ -151,6 +151,12 @@ class WeixinVideoCrawlerConst:
     TITLE_MIN_LENGTH = 15
 
 
+class UpdateMiniProgramDetailConst(updatePublishedMsgTaskConst):
+    """
+    更新小程序详情常量配置
+    """
+
+
 
 
 

+ 1 - 0
applications/exception/__init__.py

@@ -2,4 +2,5 @@
 @author: luojunhui
 """
 from .query_error import QueryError
+from .spider_error import SpiderError
 from .transaction_error import TransactionError

+ 28 - 0
applications/exception/spider_error.py

@@ -0,0 +1,28 @@
+"""
+@author: luojunhui
+"""
+import json
+
+from applications import log
+
+
+class SpiderError(Exception):
+    """数据库查询异常"""
+
+    def __init__(self, error=None, spider=None, url=None):
+        """
+        :param error: 异常对象,可选,用于提供更详细的错误信息。
+        :param spider: 爬虫任务
+        """
+        error_obj = {
+            "error": str(error),
+            "spider": spider,
+            "message": "爬虫接口请求失败",
+            "url": url
+        }
+        log(
+            task="spider_task",
+            function="log_spider_error",
+            data=error_obj
+        )
+        super().__init__(json.dumps(error_obj, ensure_ascii=False, indent=4))

+ 22 - 0
sh/run_update_minigram_info_daily.sh

@@ -0,0 +1,22 @@
+#!/bin/bash
+
+# 获取当前日期,格式为 YYYY-MM-DD
+CURRENT_DATE=$(date +%F)
+
+# 日志文件路径,包含日期
+LOG_FILE="/root/luojunhui/logs/update_mini_program_info_task_log_$CURRENT_DATE.txt"
+
+# 重定向整个脚本的输出到带日期的日志文件
+exec >> "$LOG_FILE" 2>&1
+if pgrep -f "python3 update_mini_info_v2.py" > /dev/null
+then
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - update_mini_info_v2.py is running"
+else
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - trying to restart update_mini_info_v2.py"
+    cd /root/luojunhui/LongArticlesJob
+    source /root/miniconda3/etc/profile.d/conda.sh
+    conda activate tasks
+    nohup python3 update_mini_info_v2.py >> "${LOG_FILE}" 2>&1 &
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restarted update_mini_info_v2.py"
+    fi
+fi

+ 370 - 0
tasks/update_published_articles_minigram_detail.py

@@ -0,0 +1,370 @@
+"""
+@author: luojunhui
+"""
+import traceback
+
+from datetime import datetime, timedelta
+from typing import List, Dict
+
+from tqdm import tqdm
+from urllib.parse import urlparse, parse_qs
+from pymysql.cursors import DictCursor
+
+from applications import bot
+from applications import log
+from applications import Functions
+from applications import WeixinSpider
+from applications.db import DatabaseConnector
+from applications.const import UpdateMiniProgramDetailConst
+from applications.exception import SpiderError
+from config import long_articles_config, piaoquan_crawler_config
+
+const = UpdateMiniProgramDetailConst()
+spider = WeixinSpider()
+functions = Functions()
+
+TASK_NAME = "updateMinigramInfoDaily"
+ARTICLE_TABLE = "official_articles_v2"
+DETAIL_TABLE = "long_articles_detail_info"
+EMPTY_LIST = []
+EMPTY_DICT = {}
+
+
+def extract_path(path: str) -> Dict:
+    """
+    提取path参数
+    :param path:
+    :return:
+    """
+    params = parse_qs(urlparse(path).query)
+    jump_page = params.get('jumpPage', [None])[0]
+    if jump_page:
+        params2 = parse_qs(jump_page)
+        res = {
+            "video_id": params2['pages/user-videos?id'][0],
+            "root_source_id": params2['rootSourceId'][0],
+        }
+        return res
+    else:
+        return EMPTY_DICT
+
+
+def get_article_mini_program_info(content_url: str) -> List[Dict]:
+    """
+    获取文章的小程序信息
+    :return:
+    """
+    try:
+        article_detail = spider.get_article_text(content_url)
+    except Exception as e:
+        raise SpiderError(error=e, spider="detail", url=content_url)
+
+    response_code = article_detail['code']
+    if response_code == const.ARTICLE_SUCCESS_CODE:
+        mini_info = article_detail['data']['data']['mini_program']
+        return mini_info
+    else:
+        return EMPTY_LIST
+
+
+class UpdatePublishedArticlesMinigramDetail(object):
+    """
+    更新已发布文章数据
+    """
+
+    def __init__(self):
+        self.piaoquan_crawler_db_client = None
+        self.long_articles_db_client = None
+
+    def init_database(self) -> None:
+        """
+        init database connector
+        :return:
+        """
+        # 初始化数据库连接
+        try:
+            self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
+            self.piaoquan_crawler_db_client.connect()
+            self.long_articles_db_client = DatabaseConnector(long_articles_config)
+            self.long_articles_db_client.connect()
+        except Exception as e:
+            error_msg = traceback.format_exc()
+            bot(
+                title="更新小程序裂变信息任务连接数据库失败",
+                detail={
+                    "error": e,
+                    "msg": error_msg
+                }
+            )
+            return
+
+    def check_articles(self) -> List[Dict]:
+        """
+        校验是否存在文章未更新得到发布时间
+        :return:
+        """
+        sql = f"""
+            SELECT ContentUrl, wx_sn 
+            FROM {ARTICLE_TABLE}
+            WHERE publish_timestamp IN {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)};
+        """
+
+        response = self.piaoquan_crawler_db_client.fetch(sql, cursor_type=DictCursor)
+        return response
+
+    def get_root_source_id_result(self, root_source_id: str, dt: str) -> Dict:
+        """
+        获取文章的root_source_id
+        :param dt:
+        :param root_source_id:
+        :return:
+        """
+        select_sql = f"""
+            SELECT first_uv, split0, split1, split2
+            FROM changwen_data_rootsourceid
+            WHERE root_source_id = '{root_source_id}' AND dt = '{dt}';
+        """
+        result = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
+        if result:
+            return result[0]
+        else:
+            return EMPTY_DICT
+
+    def get_articles_published_yesterday(self, biz_date: str) -> List[Dict]:
+        """
+        获取发布时间在biz_date前一天0点-23:59:59的文章
+        :return:
+        """
+        sql = f"""
+             SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title
+             FROM official_articles_v2
+             WHERE FROM_UNIXTIME(publish_timestamp)
+             BETWEEN DATE_SUB('{biz_date}', INTERVAL 1 DAY) AND DATE_SUB('{biz_date}', INTERVAL 1 SECOND);
+         """
+        article_list = self.piaoquan_crawler_db_client.fetch(query=sql, cursor_type=DictCursor)
+        return article_list
+
+    def insert_each_root_source_id(self, wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt) -> int:
+        """
+        :param recall_dt: 召回日期
+        :param publish_dt: 文章发布日期
+        :param video_id: 视频id
+        :param video_index: 视频位置
+        :param cover_url: 视频封面
+        :param mini_name: 小程序名称
+        :param mini_title: 小程序标题
+        :param wx_sn:
+        :param root_source_id:
+        :return:
+        """
+        insert_sql = f"""
+            INSERT IGNORE INTO {DETAIL_TABLE}
+            (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
+            values
+            (%s, %s, %s, %s, %s, %s, %s, %s, %s);
+        """
+        affected_rows = self.piaoquan_crawler_db_client.save(
+            query=insert_sql,
+            params=(
+                wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt
+            )
+        )
+        return affected_rows
+
+    def record_each_article(self, article_info: Dict) -> Dict:
+        """
+        记录每篇文章的root_source_id
+        数量集: article_count * mini_program_count * days_count
+        :param article_info:
+        :return:
+        """
+        url = article_info['ContentUrl']
+        publish_timestamp = article_info['publish_timestamp']
+        wx_sn = article_info['wx_sn'].decode()
+
+        article_mini_program_detail = get_article_mini_program_info(url)
+        if article_mini_program_detail:
+            log(
+                task=TASK_NAME,
+                function="record_each_article",
+                message="获取文章链接对应的 rootSourceId 成功",
+                data={
+                    "ContentUrl": url,
+                    "wxSn": wx_sn,
+                    "publish_timestamp": publish_timestamp,
+                    "miniInfo": article_mini_program_detail
+                }
+            )
+            try:
+                publish_date = datetime.fromtimestamp(publish_timestamp)
+                # generate T+0, T+1, T+2 date string
+                recall_dt_str_list = [
+                    (publish_date + timedelta(days=i)).strftime('%Y-%m-%d')
+                    for i in range(3)
+                ]
+
+                for date_str in recall_dt_str_list:
+                    for video_index, mini_item in enumerate(article_mini_program_detail, 1):
+                        image_url = mini_item['image_url']
+                        nick_name = mini_item['nike_name']
+                        # extract video id and root_source_id
+                        id_info = extract_path(mini_item['path'])
+                        root_source_id = id_info['root_source_id']
+                        video_id = id_info['video_id']
+                        kimi_title = mini_item['title']
+                        self.insert_each_root_source_id(
+                            wx_sn=wx_sn,
+                            mini_title=kimi_title,
+                            mini_name=nick_name,
+                            cover_url=image_url,
+                            video_index=video_index,
+                            root_source_id=root_source_id,
+                            video_id=video_id,
+                            publish_dt=publish_date.strftime('%Y-%m-%d'),
+                            recall_dt=date_str
+                        )
+                return EMPTY_DICT
+            except Exception as e:
+                error_msg = traceback.format_exc()
+                log(
+                    task=TASK_NAME,
+                    function="record_each_article",
+                    status="fail",
+                    message="插入数据失败, 失败原因是{}--{}".format(e, error_msg)
+                )
+                return article_info
+
+        else:
+            return article_info
+
+    def get_root_source_id_for_three_days(self, biz_date: str) -> List[Dict]:
+        """
+        获取publish_dt在 biz_date前三天的root_source_id
+        :param biz_date:
+        :return:
+        """
+        sql = f"""
+             SELECT recall_dt, root_source_id
+             FROM {DETAIL_TABLE}
+             WHERE publish_dt
+             BETWEEN DATE_SUB('{biz_date}', INTERVAL 3 DAY) AND DATE_SUB('{biz_date}', INTERVAL 1 SECOND);
+        """
+        article_list = self.piaoquan_crawler_db_client.fetch(query=sql, cursor_type=DictCursor)
+        return article_list
+
+    def update_each_root_source_id(self, recall_dt: str, root_source_id: str) -> None:
+        """
+        :param recall_dt:
+        :param root_source_id:
+        :return:
+        """
+        mini_program_detail = self.get_root_source_id_result(root_source_id=root_source_id, dt=recall_dt)
+        if mini_program_detail:
+            # do update job
+            update_sql = f"""
+                UPDATE {DETAIL_TABLE}
+                SET first_level = %s, fission_0 = %s, fission_1 = %s, fission_2 = %s
+                WHERE root_source_id = %s and recall_dt = %s;
+            """
+            self.piaoquan_crawler_db_client.save(
+                query=update_sql,
+                params=(
+                    mini_program_detail['first_uv'],
+                    mini_program_detail['split0'],
+                    mini_program_detail['split1'],
+                    mini_program_detail['split2'],
+                    root_source_id,
+                    recall_dt
+                )
+            )
+        else:
+            return
+
+    def update_published_articles_job(self, biz_date=None):
+        """
+        将文章信息存入裂变表中
+        :param biz_date:
+        :return:
+        """
+        if not biz_date:
+            biz_date = datetime.today().strftime('%Y-%m-%d')
+
+        published_article_list = self.get_articles_published_yesterday(biz_date)
+        failed_article_list = []
+        for article_info in tqdm(published_article_list, desc="update_published_articles_job"):
+            failed_article = self.record_each_article(article_info)
+            if failed_article:
+                failed_article_list.append(failed_article)
+
+        # retry
+        second_try_fail_article_list = []
+        if failed_article_list:
+            for failed_article in failed_article_list:
+                second_fail = self.record_each_article(failed_article)
+                if second_fail:
+                    second_try_fail_article_list.append(second_fail)
+
+        bot(
+            title="更新文章任务完成",
+            detail={
+                "finish_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+            },
+            mention=False
+        )
+        if second_try_fail_article_list:
+            bot(
+                title="更新文章任务存在文章抓取失败",
+                detail=[
+                    {
+                        "account": line['accountName'],
+                        "title": line['title'],
+                        "url": line['ContentUrl']
+                    }
+                    for line in second_try_fail_article_list
+                ]
+            )
+
+    def update_mini_program_detail_job(self, biz_date=None):
+        """
+        update mini program detail info
+        :param biz_date:
+        :return:
+        """
+        if not biz_date:
+            biz_date = datetime.today().strftime('%Y-%m-%d')
+
+        # get root_source_id_list
+        root_source_id_obj_list = self.get_root_source_id_for_three_days(biz_date)
+        log(
+            task=TASK_NAME,
+            function="update_minigram_detail",
+            message="获取前三天的 rootSourceId, 一共有 {} 条记录".format(len(root_source_id_obj_list))
+        )
+        fail_count = 0
+        for item in tqdm(root_source_id_obj_list, desc="update_mini_program_detail_job"):
+            try:
+                self.update_each_root_source_id(
+                    root_source_id=item['root_source_id'],
+                    recall_dt=item['recall_dt']
+                )
+            except Exception as e:
+                log(
+                    task=TASK_NAME,
+                    function="update_minigram_detail",
+                    status="fail",
+                    message="更新单条数据失败, 报错信息是 {}".format(e),
+                    data={"error_msg": traceback.format_exc()}
+                )
+                fail_count += 1
+
+        if fail_count:
+            bot(
+                title="{} fail because of lam db error".format(TASK_NAME),
+                detail={
+                    "fail_count": fail_count
+                }
+            )
+
+
+
+

+ 33 - 0
update_mini_info_v2.py

@@ -0,0 +1,33 @@
+"""
+@author: luojunhui
+"""
+
+from argparse import ArgumentParser
+
+from tasks.update_published_articles_minigram_detail import UpdatePublishedArticlesMinigramDetail
+
+
+def main():
+    """
+    update mini program detail main
+    :return:
+    """
+    parser = ArgumentParser()
+    parser.add_argument("--run-date",
+                        help="Run only once for date in format of %Y-%m-%d. \
+                            If no specified, run as daily jobs.")
+    args = parser.parse_args()
+
+    update_minigram_detail_task = UpdatePublishedArticlesMinigramDetail()
+    update_minigram_detail_task.init_database()
+
+    if args.run_date:
+        update_minigram_detail_task.update_published_articles_job(biz_date=args.run_date)
+        update_minigram_detail_task.update_mini_program_detail_job(biz_date=args.run_date)
+    else:
+        update_minigram_detail_task.update_published_articles_job()
+        update_minigram_detail_task.update_mini_program_detail_job()
+
+
+if __name__ == '__main__':
+    main()