Просмотр исходного кода

Merge branch '2024-10-30-luojunhui-add-bots' of luojunhui/LongArticlesJob into master

luojunhui 7 месяцев назад
Родитель
Сommit
b273ce1e4e
1 измененных файлов с 115 добавлено и 71 удалено
  1. 115 71
      updateMinigramInfoDaily.py

+ 115 - 71
updateMinigramInfoDaily.py

@@ -4,6 +4,7 @@
 """
 import time
 import sys
+import traceback
 
 from tqdm import tqdm
 from datetime import datetime, timedelta
@@ -13,9 +14,14 @@ from argparse import ArgumentParser
 from applications import longArticlesMySQL, PQMySQL, WeixinSpider, Functions, log, bot
 
 TASK_NAME = "updateMinigramInfoDaily"
+SPIDER_SUCCESS_STATUS = 0
 
 
 def get_yesterday():
+    """
+    get yesterday date
+    :return:
+    """
     yesterday = datetime.today() - timedelta(1)
     return yesterday
 
@@ -39,9 +45,9 @@ class DailyDataManager(object):
         biz_date_ts = biz_date_midnight.timestamp()
         biz_date_end_ts = biz_date_ts + 24 * 60 * 60 - 1
         sql2 = f"""
-        select ContentUrl, wx_sn, updateTime
+        select ContentUrl, wx_sn, publish_timestamp, accountName, title
         from official_articles_v2
-        where updateTime between {biz_date_ts} and {biz_date_end_ts};
+        where publish_timestamp between {biz_date_ts} and {biz_date_end_ts};
 --         and accountName in (
 --                         select distinct account_name from account_avg_info_v2
 --                         );
@@ -60,56 +66,83 @@ class DailyDataManager(object):
         update info into mysql
         :return:
         """
-        try:
-            wx_sn, mini_info, update_time = cls.get_root_source_ids(line)
-            dt_object = datetime.fromtimestamp(update_time)
-            publish_dt = dt_object.strftime('%Y-%m-%d')
-            one_day = timedelta(days=1)
-            two_day = timedelta(days=2)
-            next_day = dt_object + one_day
-            next_next_day = dt_object + two_day
-            recall_dt_list = [dt_object, next_day, next_next_day]
-            recall_dt_str_list = [i.strftime('%Y-%m-%d') for i in recall_dt_list]
-            for dt_str in recall_dt_str_list:
-                for index, item in enumerate(mini_info, 1):
-                    image_url = item['image_url']
-                    nick_name = item['nike_name']
-                    root_source_id = item['path'].split("rootSourceId%3D")[-1]
-                    video_id = item['path'].split("videos%3Fid%3D")[1].split("%26su%3D")[0]
-                    kimi_title = item['title']
-                    # print(image_url, nick_name, root_source_id, video_id, kimi_title)
-                    insert_sql = f"""
-                            INSERT INTO long_articles_detail_info
-                            (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
-                            values
-                            (%s, %s, %s, %s, %s, %s, %s, %s, %s);
-                        """
-                    cls.pq_db.update(
-                        sql=insert_sql,
-                        params=(
-                            wx_sn,
-                            kimi_title,
-                            nick_name,
-                            image_url,
-                            index,
-                            root_source_id,
-                            video_id,
-                            publish_dt,
-                            dt_str
-                        )
-                    )
+        url = line[0]
+        update_time = line[2]
+        wx_sn = line[1].decode()
+        article_detail = cls.get_root_source_ids(line)
+        if article_detail:
+            response_code = article_detail['code']
+            if response_code == SPIDER_SUCCESS_STATUS:
+                mini_info = article_detail['data']['data']['mini_program']
+                if mini_info:
                     log(
                         task=TASK_NAME,
-                        function="update_article_info",
-                        message="插入数据成功, video_id 是: {}".format(video_id)
+                        function="get_root_source_ids",
+                        message="获取文章链接对应的 rootSourceId 成功",
+                        data={
+                            "ContentUrl": url,
+                            "wxSn": wx_sn,
+                            "updateTime": update_time,
+                            "miniInfo": mini_info
+                        }
                     )
-        except Exception as e:
-            log(
-                task=TASK_NAME,
-                function="update_article_info",
-                status="fail",
-                message="插入数据失败, 失败原因是".format(e)
-            )
+                    try:
+                        dt_object = datetime.fromtimestamp(update_time)
+                        publish_dt = dt_object.strftime('%Y-%m-%d')
+                        one_day = timedelta(days=1)
+                        two_day = timedelta(days=2)
+                        next_day = dt_object + one_day
+                        next_next_day = dt_object + two_day
+                        recall_dt_list = [dt_object, next_day, next_next_day]
+                        recall_dt_str_list = [i.strftime('%Y-%m-%d') for i in recall_dt_list]
+                        for dt_str in recall_dt_str_list:
+                            for index, item in enumerate(mini_info, 1):
+                                image_url = item['image_url']
+                                nick_name = item['nike_name']
+                                root_source_id = item['path'].split("rootSourceId%3D")[-1]
+                                video_id = item['path'].split("videos%3Fid%3D")[1].split("%26su%3D")[0]
+                                kimi_title = item['title']
+                                # print(image_url, nick_name, root_source_id, video_id, kimi_title)
+                                insert_sql = f"""
+                                        INSERT INTO long_articles_detail_info
+                                        (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
+                                        values
+                                        (%s, %s, %s, %s, %s, %s, %s, %s, %s);
+                                    """
+                                cls.pq_db.update(
+                                    sql=insert_sql,
+                                    params=(
+                                        wx_sn,
+                                        kimi_title,
+                                        nick_name,
+                                        image_url,
+                                        index,
+                                        root_source_id,
+                                        video_id,
+                                        publish_dt,
+                                        dt_str
+                                    )
+                                )
+                                log(
+                                    task=TASK_NAME,
+                                    function="update_article_info",
+                                    message="插入数据成功, video_id 是: {}".format(video_id)
+                                )
+                    except Exception as e:
+                        error_msg = traceback.format_exc()
+                        log(
+                            task=TASK_NAME,
+                            function="update_article_info",
+                            status="fail",
+                            message="插入数据失败, 失败原因是{}--{}".format(e, error_msg)
+                        )
+
+                return None
+            else:
+                return line
+
+        else:
+            return line
 
     @classmethod
     def get_root_source_ids(cls, data_info):
@@ -120,19 +153,7 @@ class DailyDataManager(object):
         url = data_info[0]
         try:
             article_detail = cls.wx_spider.get_article_text(url)
-            mini_info = article_detail['data']['data']['mini_program']
-            log(
-                task=TASK_NAME,
-                function="get_root_source_ids",
-                message="获取文章链接对应的 rootSourceId 成功",
-                data={
-                    "ContentUrl": url,
-                    "wxSn": data_info[1].decode(),
-                    "createTime": data_info[2],
-                    "miniInfo": mini_info
-                }
-            )
-            return data_info[1].decode(), mini_info, data_info[2]
+            return article_detail
         except Exception as e:
             log(
                 task=TASK_NAME,
@@ -143,7 +164,7 @@ class DailyDataManager(object):
                     "ContentUrl": url
                 }
             )
-            return
+            return False
 
     @classmethod
     def get_minigram_info(cls, rootSourceId):
@@ -302,8 +323,20 @@ def updateArticlesJob(biz_date=None):
         biz_date = get_yesterday()
     data_manager = DailyDataManager()
     article_list = data_manager.get_published_articles(biz_date)
+    failed_article_list = []
     for article in tqdm(article_list):
-        data_manager.update_article_info(article)
+        failed_article = data_manager.update_article_info(article)
+        if failed_article:
+            failed_article_list.append(failed_article)
+
+    # 重试
+    second_try_fail_article_list = []
+    if failed_article_list:
+        for article in tqdm(failed_article_list):
+            second_failed_article = data_manager.update_article_info(article)
+            if second_failed_article:
+                second_try_fail_article_list.append(second_failed_article)
+
     log(
         task=TASK_NAME,
         function="updateArticlesJob",
@@ -312,10 +345,22 @@ def updateArticlesJob(biz_date=None):
     bot(
         title="更新文章任务完成",
         detail={
-            "finish_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+            "finish_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
         },
         mention=False
     )
+    if second_try_fail_article_list:
+        bot(
+            title="更新文章任务存在文章抓取失败",
+            detail=[
+                {
+                    "account": line[3],
+                    "title": line[4],
+                    "url": line[0]
+                }
+                for line in second_try_fail_article_list
+            ]
+        )
 
 
 def updateMinigramInfoJob(biz_date=None):
@@ -343,13 +388,17 @@ def updateMinigramInfoJob(biz_date=None):
     bot(
         title="更新小程序信息任务完成",
         detail={
-            "finish_time": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+            "finish_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
         },
         mention=False
     )
 
 
 def main():
+    """
+    main function
+    :return:
+    """
     parser = ArgumentParser()
     parser.add_argument("--run-date",
                         help="Run only once for date in format of %Y%m%d. \
@@ -369,11 +418,6 @@ def main():
         while True:
             schedule.run_pending()
             time.sleep(1)
-            # log(
-            #     task=TASK_NAME,
-            #     function="main",
-            #     message="更新文章小程序信息任务正常执行"
-            # )
 
 
 if __name__ == '__main__':