Przeglądaj źródła

Merge branch 'feature/reload-by-day' of luojunhui/LongArticlesJob into master

luojunhui 8 miesięcy temu
rodzic
commit
9af9537b4f
1 zmienionych plików z 98 dodań i 74 usunięć
  1. 98 74
      updateMinigramInfoDaily.py

+ 98 - 74
updateMinigramInfoDaily.py

@@ -3,58 +3,64 @@
 @description Update Minigram Info Daily
 """
 import time
+import sys
 
 from tqdm import tqdm
 from datetime import datetime, timedelta
 import schedule
+from argparse import ArgumentParser
 
 from applications import longArticlesMySQL, PQMySQL, WeixinSpider, Functions, log, bot
 
+TASK_NAME = "updateMinigramInfoDaily"
+
+def get_yesterday():
+    yesterday = datetime.today() - timedelta(1)
+    return yesterday
+
 
 class DailyDataManager(object):
     """
     daily 数据每日更新
     """
-    laMysql = longArticlesMySQL()
-    pqMysql = PQMySQL()
-    wxSpider = WeixinSpider()
+    long_articles_db = longArticlesMySQL()
+    pq_db = PQMySQL()
+    wx_spider = WeixinSpider()
     functions = Functions()
 
     @classmethod
-    def getPublishedArticles(cls):
+    def get_published_articles(cls, biz_date):
         """
         获取已经发布的文章的信息, createTime 选择为前一天的 0 点并且转化为时间戳
         :return:
         """
-        today = datetime.today()
-        # 获取昨天的日期
-        yesterday = today - timedelta(days=1)
-        yesterday_midnight = datetime(year=yesterday.year, month=yesterday.month, day=yesterday.day)
-        yesterday_timestamp = yesterday_midnight.timestamp()
+        biz_date_midnight = datetime(year=biz_date.year, month=biz_date.month, day=biz_date.day)
+        biz_date_ts = biz_date_midnight.timestamp()
+        biz_date_end_ts = biz_date_ts + 24 * 60 * 60 - 1
         sql2 = f"""
         select ContentUrl, wx_sn, createTime
         from official_articles_v2
-        where createTime >= {yesterday_timestamp};
+        where createTime between {biz_date_ts} and {biz_date_end_ts};
 --         and accountName in (
 --                         select distinct account_name from account_avg_info_v2
 --                         );
         """
-        result_list = cls.pqMysql.select(sql2)
+        result_list = cls.pq_db.select(sql2)
         log(
-            task="updateMinigramInfoDaily",
-            function="getPublishedArticles",
+            task=TASK_NAME,
+            function="get_published_articles",
             message="一共获取 {} 篇文章数据".format(len(result_list))
         )
         return result_list
 
     @classmethod
-    def updateInfo(cls, line):
+    def update_article_info(cls, line):
         """
         update info into mysql
         :return:
         """
         try:
-            wx_sn, mini_info, create_time = cls.getRootSourceIds(line)
+            wx_sn, mini_info, create_time = cls.get_root_source_ids(line)
             dt_object = datetime.fromtimestamp(create_time)
             publish_dt = dt_object.strftime('%Y-%m-%d')
             one_day = timedelta(days=1)
@@ -77,7 +83,7 @@ class DailyDataManager(object):
                             values
                             (%s, %s, %s, %s, %s, %s, %s, %s, %s);
                         """
-                    cls.pqMysql.update(
+                    cls.pq_db.update(
                         sql=insert_sql,
                         params=(
                             wx_sn,
@@ -92,31 +98,31 @@ class DailyDataManager(object):
                         )
                     )
                     log(
-                        task="updateMinigramInfoDaily",
-                        function="updateInfo",
+                        task=TASK_NAME,
+                        function="update_article_info",
                         message="插入数据成功, video_id 是: {}".format(video_id)
                     )
         except Exception as e:
             log(
-                task="updateMinigramInfoDaily",
-                function="updateInfo",
+                task=TASK_NAME,
+                function="update_article_info",
                 status="fail",
                 message="插入数据失败, 失败原因是".format(e)
             )
 
     @classmethod
-    def getRootSourceIds(cls, data_info):
+    def get_root_source_ids(cls, data_info):
         """
         通过抓取接口获取 data_info
         :return:
         """
         url = data_info[0]
         try:
-            article_detail = cls.wxSpider.get_article_text(url)
+            article_detail = cls.wx_spider.get_article_text(url)
             mini_info = article_detail['data']['data']['mini_program']
             log(
-                task="updateMinigramInfoDaily",
-                function="getRootSourceIds",
+                task=TASK_NAME,
+                function="get_root_source_ids",
                 message="获取文章链接对应的 rootSourceId 成功",
                 data={
                     "ContentUrl": url,
@@ -128,8 +134,8 @@ class DailyDataManager(object):
             return data_info[1].decode(), mini_info, data_info[2]
         except Exception as e:
             log(
-                task="updateMinigramInfoDaily",
-                function="getRootSourceIds",
+                task=TASK_NAME,
+                function="get_root_source_ids",
                 status="fail",
                 message="获取文章链接对应的 rootSourceId失败, 报错信息是: {}".format(e),
                 data={
@@ -139,18 +145,18 @@ class DailyDataManager(object):
             return
 
     @classmethod
-    def getMinigramInfo(cls, rootSourceId):
+    def get_minigram_info(cls, rootSourceId):
         """
 
         :param rootSourceId:
         :return:
         """
         sql = f"""
-        select type, machinecode, create_time, first_level_dt 
+        select type, machinecode, create_time, first_level_dt
         from changwen_data_base_v2
         where rootsourceid = '{rootSourceId}';
         """
-        result_list = cls.laMysql.select(sql)
+        result_list = cls.long_articles_db.select(sql)
 
         def summarize(values):
             """
@@ -211,45 +217,45 @@ class DailyDataManager(object):
         try:
             response = summarize(result_list)
             log(
-                task="updateMinigramInfoDaily",
-                function="getMinigramInfo",
+                task=TASK_NAME,
+                function="get_minigram_info",
                 message="计算source_id信息成功",
                 data=response
             )
             return response
         except Exception as e:
             log(
-                task="updateMinigramInfoDaily",
-                function="getMinigramInfo",
+                task=TASK_NAME,
+                function="get_minigram_info",
                 message="获取 source_id信息失败, 报错信息是: {}".format(e),
                 status="fail"
             )
             return None
 
     @classmethod
-    def updateDetail(cls):
+    def update_minigram_detail(cls, biz_date):
         """
         :return:
         """
-        today = datetime.today()
         # 获取三天前的日期
-        yesterday = today - timedelta(days=3)
-        yesterday_str = yesterday.__str__().split(" ")[0]
+        date_begin = biz_date - timedelta(days=3)
+        datestr_begin = date_begin.strftime("%Y-%m-%d")
+        datestr_end = biz_date.strftime("%Y-%m-%d")
         sql = f"""
             select distinct root_source_id
             from long_articles_detail_info
-            where publish_dt >= '{yesterday_str}';
+            where publish_dt between '{datestr_begin}' and '{datestr_end}';
         """
-        source_id_list = cls.pqMysql.select(sql)
+        source_id_list = cls.pq_db.select(sql)
         log(
-            task="updateMinigramInfoDaily",
-            function="updateDetail",
+            task=TASK_NAME,
+            function="update_minigram_detail",
             message="获取前三天的 rootSourceId, 一共有 {} 条记录".format(len(source_id_list))
         )
         for item in tqdm(source_id_list):
             s_id = item[0]
             try:
-                result = cls.getMinigramInfo(s_id)
+                result = cls.get_minigram_info(s_id)
                 for key in result:
                     recall_dt = key
                     first_level = result[key][0]
@@ -263,7 +269,7 @@ class DailyDataManager(object):
                         where root_source_id = %s and recall_dt = %s;
                     """
                     try:
-                        cls.pqMysql.update(
+                        cls.pq_db.update(
                             sql=update_sql,
                             params=(
                                 first_level, fission_0, fission_1, fission_2, s_id, recall_dt
@@ -271,69 +277,87 @@ class DailyDataManager(object):
                         )
                     except Exception as e:
                         log(
-                            task="updateMinigramInfoDaily",
-                            function="updateDetail",
+                            task=TASK_NAME,
+                            function="update_minigram_detail",
                             status="fail",
                             message="mysql 更新失败, 报错信息是 {}".format(e)
                         )
             except Exception as e:
                 log(
-                    task="updateMinigramInfoDaily",
-                    function="updateDetail",
+                    task=TASK_NAME,
+                    function="update_minigram_detail",
                     status="fail",
                     message="更新单条数据失败, 报错信息是 {}".format(e)
                 )
 
 
-def updateArticlesJob():
+def updateArticlesJob(biz_date=None):
     """
     更新文章数据
     :return:
     """
-    DDM = DailyDataManager()
-    article_list = DDM.getPublishedArticles()
+    if not biz_date:
+        biz_date = get_yesterday()
+    data_manager = DailyDataManager()
+    article_list = data_manager.get_published_articles(biz_date)
     for article in tqdm(article_list):
-        DDM.updateInfo(article)
+        data_manager.update_article_info(article)
     log(
-        task="updateMinigramInfoDaily",
+        task=TASK_NAME,
         function="updateArticlesJob",
-        message="文章更新完成---{}".format(datetime.today().__str__())
+        message="文章更新完成---{}".format(biz_date.__str__())
     )
 
 
-def updateMinigramInfoJob():
+def updateMinigramInfoJob(biz_date=None):
     """
     更新前三天小程序数据
     :return:
     """
-    DDM = DailyDataManager()
+    if not biz_date:
+        biz_date = get_yesterday()
+    data_manager = DailyDataManager()
     try:
-        DDM.updateDetail()
+        data_manager.update_minigram_detail(biz_date)
         log(
-            task="updateMinigramInfoDaily",
-            function="updateArticlesJob",
-            message="小程序更新完成---{}".format(datetime.today().__str__())
+            task=TASK_NAME,
+            function="updateMinigramInfoJob",
+            message="小程序更新完成---{}".format(biz_date.__str__())
         )
     except Exception as e:
         log(
-            task="updateMinigramInfoDaily",
-            function="updateArticlesJob",
+            task=TASK_NAME,
+            function="updateMinigramInfoJob",
             status="fail",
-            message="小程序更新失败---{}, 报错信息是: {}".format(datetime.today().__str__(), e)
+            message="小程序更新失败---{}, 报错信息是: {}".format(biz_date.__str__(), e)
         )
 
+def main():
+    parser = ArgumentParser()
+    parser.add_argument("--run-date",
+                        help="Run only once for date in format of %Y%m%d. \
+                        If no specified, run as daily jobs.")
+    args = parser.parse_args()
 
-if __name__ == '__main__':
-    # updateMinigramInfoJob()
-    schedule.every().day.at("01:30").do(Functions().job_with_thread, updateArticlesJob)
+    if args.run_date:
+        biz_date = datetime.strptime(args.run_date, "%Y%m%d")
+        print("Run in manual mode. Date: {}".format(args.run_date))
+        updateArticlesJob(biz_date)
+        updateMinigramInfoJob(biz_date)
+        return
+    else:
+        print("Run in daily mode.")
+        schedule.every().day.at("01:30").do(Functions().job_with_thread, updateArticlesJob)
+        schedule.every().day.at("03:30").do(Functions().job_with_thread, updateMinigramInfoJob)
 
-    schedule.every().day.at("03:30").do(Functions().job_with_thread, updateMinigramInfoJob)
+        while True:
+            schedule.run_pending()
+            time.sleep(1)
+            # log(
+            #     task=TASK_NAME,
+            #     function="main",
+            #     message="更新文章小程序信息任务正常执行"
+            # )
 
-    while True:
-        schedule.run_pending()
-        time.sleep(1)
-        # log(
-        #     task="updateMinigramInfoDaily",
-        #     function="main",
-        #     message="更新文章小程序信息任务正常执行"
-        # )
+if __name__ == '__main__':
+    main()