Ver código fonte

优化小程序信息更新代码dev

luojunhui 3 meses atrás
pai
commit
d1964747e7
2 arquivos alterados com 288 adições e 30 exclusões
  1. 277 30
      tasks/update_published_articles_minigram_detail.py
  2. 11 0
      update_mini_info_v2.py

+ 277 - 30
tasks/update_published_articles_minigram_detail.py

@@ -1,14 +1,13 @@
 """
 @author: luojunhui
 """
-import json
-import time
 import traceback
 
-from argparse import ArgumentParser
+from datetime import datetime, timedelta
 from typing import List, Dict
 
 from tqdm import tqdm
+from urllib.parse import urlparse, parse_qs
 from pymysql.cursors import DictCursor
 
 from applications import bot
@@ -23,15 +22,39 @@ from config import long_articles_config, piaoquan_crawler_config
 const = UpdateMiniProgramDetailConst()
 spider = WeixinSpider()
 functions = Functions()
-# 数据库配置
+
+TASK_NAME = "updateMinigramInfoDaily"
 ARTICLE_TABLE = "official_articles_v2"
+DETAIL_TABLE = "long_articles_detail_info_dev"
+EMPTY_LIST = []
 
 
-def check_root_source_id_list(content_url: str):
+def get_root_source_id_list(mini_program: List[Dict]) -> List[str]:
     """
     校验是否存在文章是否存在root_source_id
     :return:
     """
+    root_source_id_list = []
+    for item in mini_program:
+        path = item['path']
+        # 解析主URL的查询参数
+        params = parse_qs(urlparse(path).query)
+        # 提取 'jumpPage' 参数的值并解析它的查询参数
+        jump_page = params.get('jumpPage', [None])[0]
+        if jump_page:
+            params2 = parse_qs(jump_page)
+            # 提取 'rootSourceId' 参数的值
+            root_source_id = params2.get('rootSourceId', [None])[0]
+            if root_source_id:
+                root_source_id_list.append(root_source_id)
+    return root_source_id_list
+
+
+def get_article_mini_program_info(content_url: str) -> List[Dict]:
+    """
+    获取文章的小程序信息
+    :return:
+    """
     try:
         article_detail = spider.get_article_text(content_url)
     except Exception as e:
@@ -42,7 +65,7 @@ def check_root_source_id_list(content_url: str):
         mini_info = article_detail['data']['data']['mini_program']
         return mini_info
     else:
-        return None
+        return EMPTY_LIST
 
 
 class UpdatePublishedArticlesMinigramDetail(object):
@@ -108,11 +131,11 @@ class UpdatePublishedArticlesMinigramDetail(object):
 
     def get_articles_published_yesterday(self, biz_date: str) -> List[Dict]:
         """
-        获取昨天发布的文章
+        获取发布时间在biz_date前一天0点-23:59:59的文章
         :return:
         """
         sql = f"""
-             SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list
+             SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title
              FROM official_articles_v2
              WHERE FROM_UNIXTIME(publish_timestamp)
              BETWEEN DATE_SUB('{biz_date}', INTERVAL 1 DAY) AND DATE_SUB('{biz_date}', INTERVAL 1 SECOND);
@@ -120,14 +143,21 @@ class UpdatePublishedArticlesMinigramDetail(object):
         article_list = self.piaoquan_crawler_db_client.fetch(query=sql, cursor_type=DictCursor)
         return article_list
 
-    def insert_each_root_source_id(self, root_source_id: str, article_info: Dict) -> int:
+    def insert_each_root_source_id(self, wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt) -> int:
         """
+        :param recall_dt: 召回日期
+        :param publish_dt: 文章发布日期
+        :param video_id: 视频id
+        :param video_index: 视频位置
+        :param cover_url: 视频封面
+        :param mini_name: 小程序名称
+        :param mini_title: 小程序标题
+        :param wx_sn:
         :param root_source_id:
-        :param article_info:
         :return:
         """
         insert_sql = f"""
-            INSERT INTO long_articles_detail_info
+            INSERT INTO {DETAIL_TABLE}
             (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
             values
             (%s, %s, %s, %s, %s, %s, %s, %s, %s);
@@ -135,19 +165,12 @@ class UpdatePublishedArticlesMinigramDetail(object):
         affected_rows = self.piaoquan_crawler_db_client.save(
             query=insert_sql,
             params=(
-                article_info['wx_sn'],
-                article_info['title'],
-                article_info['mini_name'],
-                article_info['cover_url'],
-                article_info['video_index'],
-                root_source_id,
-                article_info['video_id'],
-                article_info['publish_dt']
+                wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt
             )
         )
         return affected_rows
 
-    def record_each_article(self, article_info: Dict) -> None:
+    def record_each_article(self, article_info: Dict) -> Dict:
         """
         记录每篇文章的root_source_id
         数量集: article_count * mini_program_count * days_count
@@ -155,14 +178,238 @@ class UpdatePublishedArticlesMinigramDetail(object):
         :return:
         """
         url = article_info['ContentUrl']
-        root_source_id_list = json.loads(article_info['root_source_id_list'])
-        if not root_source_id_list:
-            root_source_id_response = check_root_source_id_list(url)
-            if root_source_id_response:
-                root_source_id_list = []
-            else:
-                return
-
-        for root_source_id in root_source_id_list:
-            self.record_each_article(root_source_id, article_info)
+        publish_timestamp = article_info['publish_timestamp']
+        wx_sn = article_info['wx_sn'].decode()
+
+        article_mini_program_detail = get_article_mini_program_info(url)
+        if article_mini_program_detail:
+            log(
+                task=TASK_NAME,
+                function="record_each_article",
+                message="获取文章链接对应的 rootSourceId 成功",
+                data={
+                    "ContentUrl": url,
+                    "wxSn": wx_sn,
+                    "publish_timestamp": publish_timestamp,
+                    "miniInfo": article_mini_program_detail
+                }
+            )
+            try:
+                publish_date = datetime.fromtimestamp(publish_timestamp)
+                # generate T+0, T+1, T+2 date string
+                recall_dt_str_list = [
+                    (publish_date + timedelta(days=i)).strftime('%Y-%m-%d')
+                    for i in range(3)
+                ]
+
+                for date_str in recall_dt_str_list:
+                    for video_index, mini_item in enumerate(article_mini_program_detail, 1):
+                        image_url = mini_item['image_url']
+                        nick_name = mini_item['nike_name']
+                        root_source_id = mini_item['path'].split("rootSourceId%3D")[-1]
+                        video_id = mini_item['path'].split("videos%3Fid%3D")[1].split("%26su%3D")[0]
+                        kimi_title = mini_item['title']
+                        self.insert_each_root_source_id(
+                            wx_sn=wx_sn,
+                            mini_title=kimi_title,
+                            mini_name=nick_name,
+                            cover_url=image_url,
+                            video_index=video_index,
+                            root_source_id=root_source_id,
+                            video_id=video_id,
+                            publish_dt=publish_date.strftime('%Y-%m-%d'),
+                            recall_dt=date_str
+                        )
+                return {}
+            except Exception as e:
+                error_msg = traceback.format_exc()
+                log(
+                    task=TASK_NAME,
+                    function="record_each_article",
+                    status="fail",
+                    message="插入数据失败, 失败原因是{}--{}".format(e, error_msg)
+                )
+                return article_info
+
+        else:
+            return article_info
+
+        # # check whether root_source_id_list is null
+        # if root_source_id_list:
+        #     root_source_id_list = json.loads(article_info['root_source_id_list'])
+        #
+        #     # check whether root_source_id is empty, if not, try to get
+        #     if not root_source_id_list:
+        #         root_source_id_response = check_root_source_id_list(url)
+        #         if root_source_id_response:
+        #             root_source_id_list = get_root_source_id_list(root_source_id_response)
+        #         else:
+        #             return
+        #
+        #     for root_source_id in root_source_id_list:
+        #
+        #         self.insert_each_root_source_id(root_source_id, article_info)
+        #
+        # else:
+        #     print('todo: root_source_id_list is None')
+
+    def get_root_source_id_for_three_days(self, biz_date: str) -> List[Dict]:
+        """
+        获取publish_dt在 biz_date前三天的root_source_id
+        :param biz_date:
+        :return:
+        """
+        sql = f"""
+             SELECT recall_dt, root_source_id
+             FROM {DETAIL_TABLE}
+             WHERE publish_dt
+             BETWEEN DATE_SUB('{biz_date}', INTERVAL 3 DAY) AND DATE_SUB('{biz_date}', INTERVAL 1 SECOND);
+        """
+        article_list = self.piaoquan_crawler_db_client.fetch(query=sql, cursor_type=DictCursor)
+        return article_list
+
+    def update_each_root_source_id(self, recall_dt: str, root_source_id: str) -> None:
+        """
+        :param recall_dt:
+        :param root_source_id:
+        :return:
+        """
+        mini_program_detail = self.get_root_source_id_result(root_source_id=root_source_id, dt=recall_dt)
+        if mini_program_detail:
+            # do update job
+            update_sql = f"""
+                UPDATE {DETAIL_TABLE}
+                SET first_level = %s, fission_0 = %s, fission_1 = %s, fission_2 = %s
+                WHERE root_source_id = %s and recall_dt = %s;
+            """
+            self.long_articles_db_client.save(
+                query=update_sql,
+                params=(
+                    mini_program_detail['first_uv'],
+                    mini_program_detail['split0'],
+                    mini_program_detail['split1'],
+                    mini_program_detail['split2'],
+                    root_source_id,
+                    recall_dt
+                )
+            )
+        else:
+            return
+
+    def update_published_articles_job(self, biz_date=None):
+        """
+        将文章信息存入裂变表中
+        :param biz_date:
+        :return:
+        """
+        if not biz_date:
+            biz_date = datetime.today().strftime('%Y-%m-%d')
+
+        published_article_list = self.get_articles_published_yesterday(biz_date)
+        failed_article_list = []
+        for article_info in tqdm(published_article_list, desc="update_published_articles_job"):
+            failed_article = self.record_each_article(article_info)
+            if failed_article:
+                failed_article_list.append(failed_article)
+
+        # retry
+        second_try_fail_article_list = []
+        if failed_article_list:
+            for failed_article in failed_article_list:
+                second_fail = self.record_each_article(failed_article)
+                if second_fail:
+                    second_try_fail_article_list.append(second_fail)
+
+        bot(
+            title="更新文章任务完成",
+            detail={
+                "finish_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+            },
+            mention=False
+        )
+        if second_try_fail_article_list:
+            bot(
+                title="更新文章任务存在文章抓取失败",
+                detail=[
+                    {
+                        "account": line['accountName'],
+                        "title": line['title'],
+                        "url": line['ContentUrl']
+                    }
+                    for line in second_try_fail_article_list
+                ]
+            )
+
+    def update_mini_program_detail_job(self, biz_date=None):
+        """
+        更新裂变信息
+        :param biz_date:
+        :return:
+        """
+        if not biz_date:
+            biz_date = datetime.today().strftime('%Y-%m-%d')
+
+        # get root_source_id_list
+        root_source_id_obj_list = self.get_root_source_id_for_three_days(biz_date)
+        log(
+            task=TASK_NAME,
+            function="update_minigram_detail",
+            message="获取前三天的 rootSourceId, 一共有 {} 条记录".format(len(root_source_id_obj_list))
+        )
+        fail_count = 0
+        for item in tqdm(root_source_id_obj_list, desc="update_mini_program_detail_job"):
+            try:
+                self.update_each_root_source_id(
+                    root_source_id=item['root_source_id'],
+                    recall_dt=item['recall_dt']
+                )
+            except Exception as e:
+                log(
+                    task=TASK_NAME,
+                    function="update_minigram_detail",
+                    status="fail",
+                    message="更新单条数据失败, 报错信息是 {}".format(e),
+                    data={"error_msg": traceback.format_exc()}
+                )
+                fail_count += 1
+        log(
+            task=TASK_NAME,
+            function="update_minigram_detail",
+            message="获取前三天的 rootSourceId, 一共有 {} 条记录".format(len(root_source_id_obj_list))
+        )
+        fail_count = 0
+        for item in tqdm(root_source_id_obj_list, desc="update_mini_program_detail_job"):
+            try:
+                self.update_each_root_source_id(
+                    root_source_id=item['root_source_id'],
+                    recall_dt=item['recall_dt']
+                )
+            except Exception as e:
+                log(
+                    task=TASK_NAME,
+                    function="update_minigram_detail",
+                    status="fail",
+                    message="更新单条数据失败, 报错信息是 {}".format(e),
+                    data={"error_msg": traceback.format_exc()}
+                )
+                fail_count += 1
+
+        if fail_count:
+            bot(
+                title="{} fail because of lam db error".format(TASK_NAME),
+                detail={
+                    "fail_count": fail_count
+                }
+            )
+
+        if fail_count:
+            bot(
+                title="{} fail because of lam db error".format(TASK_NAME),
+                detail={
+                    "fail_count": fail_count
+                }
+            )
+
+
+
 

+ 11 - 0
update_mini_info_v2.py

@@ -0,0 +1,11 @@
+"""
+@author: luojunhui
+"""
+from tasks.update_published_articles_minigram_detail import UpdatePublishedArticlesMinigramDetail
+
+
+U = UpdatePublishedArticlesMinigramDetail()
+U.init_database()
+
+U.update_published_articles_job()
+# U.update_mini_program_detail_job()