Sfoglia il codice sorgente

update mini info and crawler tasks

luojunhui 1 mese fa
parent
commit
0e329d3f42

+ 27 - 4
applications/utils/common.py

@@ -5,6 +5,7 @@
 import hashlib
 
 from requests import RequestException
+from urllib.parse import urlparse, parse_qs
 from tenacity import (
     stop_after_attempt,
     wait_exponential,
@@ -40,8 +41,10 @@ def proxy():
     username = "t14070979713487"
     password = "hqwanfvy"
     proxies = {
-        "http": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel},
-        "https": "http://%(user)s:%(pwd)s@%(proxy)s/" % {"user": username, "pwd": password, "proxy": tunnel}
+        "http": "http://%(user)s:%(pwd)s@%(proxy)s/"
+        % {"user": username, "pwd": password, "proxy": tunnel},
+        "https": "http://%(user)s:%(pwd)s@%(proxy)s/"
+        % {"user": username, "pwd": password, "proxy": tunnel},
     }
     return proxies
 
@@ -56,10 +59,11 @@ def request_retry(retry_times, min_retry_delay, max_retry_delay):
         stop=stop_after_attempt(retry_times),
         wait=wait_exponential(min=min_retry_delay, max=max_retry_delay),
         retry=retry_if_exception_type((RequestException, TimeoutError)),
-        reraise=True  # 重试耗尽后重新抛出异常
+        reraise=True,  # 重试耗尽后重新抛出异常
     )
     return common_retry
 
+
 def yield_batch(data, batch_size):
     """
     生成批次数据
@@ -68,4 +72,23 @@ def yield_batch(data, batch_size):
     :return:
     """
     for i in range(0, len(data), batch_size):
-        yield data[i:i + batch_size]
+        yield data[i : i + batch_size]
+
+
+def extract_root_source_id(path: str) -> dict:
+    """
+    提取path参数
+    :param path:
+    :return:
+    """
+    params = parse_qs(urlparse(path).query)
+    jump_page = params.get("jumpPage", [None])[0]
+    if jump_page:
+        params2 = parse_qs(jump_page)
+        res = {
+            "video_id": params2["pages/user-videos?id"][0],
+            "root_source_id": params2["rootSourceId"][0],
+        }
+        return res
+    else:
+        return {}

+ 127 - 103
tasks/update_published_articles_minigram_detail.py → tasks/data_tasks/update_published_articles_minigram_detail.py

@@ -1,6 +1,7 @@
 """
 @author: luojunhui
 """
+
 import json
 import traceback
 
@@ -8,21 +9,16 @@ from datetime import datetime, timedelta
 from typing import List, Dict
 
 from tqdm import tqdm
-from urllib.parse import urlparse, parse_qs
 from pymysql.cursors import DictCursor
 
-from applications import bot
-from applications import log
-from applications import Functions
-from applications import WeixinSpider
+from applications import bot, log
 from applications.db import DatabaseConnector
-from applications.const import UpdateMiniProgramDetailConst
 from applications.exception import SpiderError
+from applications.utils import extract_root_source_id
+
+from cold_start.crawler.wechat import get_article_detail
 from config import long_articles_config, piaoquan_crawler_config
 
-const = UpdateMiniProgramDetailConst()
-spider = WeixinSpider()
-functions = Functions()
 
 TASK_NAME = "updateMinigramInfoDaily"
 ARTICLE_TABLE = "official_articles_v2"
@@ -31,26 +27,21 @@ EMPTY_LIST = []
 EMPTY_DICT = {}
 
 
-def extract_path(path: str) -> Dict:
-    """
-    提取path参数
-    :param path:
-    :return:
-    """
-    params = parse_qs(urlparse(path).query)
-    jump_page = params.get('jumpPage', [None])[0]
-    if jump_page:
-        params2 = parse_qs(jump_page)
-        res = {
-            "video_id": params2['pages/user-videos?id'][0],
-            "root_source_id": params2['rootSourceId'][0],
-        }
-        return res
-    else:
-        return EMPTY_DICT
-
-
-class UpdatePublishedArticlesMinigramDetail(object):
+class Const:
+    ARTICLE_SUCCESS_CODE = 0
+    # 记录默认状态
+    DEFAULT_STATUS = 0
+    # 请求接口失败状态
+    REQUEST_FAIL_STATUS = -1
+    # 文章被删除状态
+    DELETE_STATUS = -2
+    # 未知原因无信息返回状态
+    UNKNOWN_STATUS = -3
+    # 文章违规状态
+    ILLEGAL_STATUS = -4
+
+
+class UpdatePublishedArticlesMinigramDetail(Const):
     """
     更新已发布文章数据
     """
@@ -74,10 +65,7 @@ class UpdatePublishedArticlesMinigramDetail(object):
             error_msg = traceback.format_exc()
             bot(
                 title="更新小程序裂变信息任务连接数据库失败",
-                detail={
-                    "error": e,
-                    "msg": error_msg
-                }
+                detail={"error": e, "msg": error_msg},
             )
             return
 
@@ -89,7 +77,7 @@ class UpdatePublishedArticlesMinigramDetail(object):
         sql = f"""
             SELECT ContentUrl, wx_sn 
             FROM {ARTICLE_TABLE}
-            WHERE publish_timestamp IN {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)};
+            WHERE publish_timestamp IN {(self.DEFAULT_STATUS, self.REQUEST_FAIL_STATUS)};
         """
 
         response = self.piaoquan_crawler_db_client.fetch(sql, cursor_type=DictCursor)
@@ -124,10 +112,23 @@ class UpdatePublishedArticlesMinigramDetail(object):
              WHERE FROM_UNIXTIME(publish_timestamp)
              BETWEEN DATE_SUB('{biz_date}', INTERVAL 1 DAY) AND DATE_SUB('{biz_date}', INTERVAL 1 SECOND);
          """
-        article_list = self.piaoquan_crawler_db_client.fetch(query=sql, cursor_type=DictCursor)
+        article_list = self.piaoquan_crawler_db_client.fetch(
+            query=sql, cursor_type=DictCursor
+        )
         return article_list
 
-    def insert_each_root_source_id(self, wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt) -> int:
+    def insert_each_root_source_id(
+        self,
+        wx_sn,
+        mini_title,
+        mini_name,
+        cover_url,
+        video_index,
+        root_source_id,
+        video_id,
+        publish_dt,
+        recall_dt,
+    ) -> int:
         """
         :param recall_dt: 召回日期
         :param publish_dt: 文章发布日期
@@ -149,8 +150,16 @@ class UpdatePublishedArticlesMinigramDetail(object):
         affected_rows = self.piaoquan_crawler_db_client.save(
             query=insert_sql,
             params=(
-                wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt
-            )
+                wx_sn,
+                mini_title,
+                mini_name,
+                cover_url,
+                video_index,
+                root_source_id,
+                video_id,
+                publish_dt,
+                recall_dt,
+            ),
         )
         return affected_rows
 
@@ -161,13 +170,19 @@ class UpdatePublishedArticlesMinigramDetail(object):
         :param article_info:
         :return:
         """
-        url = article_info['ContentUrl']
-        publish_timestamp = article_info['publish_timestamp']
-        wx_sn = article_info['wx_sn'].decode()
-        root_source_id_list = json.loads(article_info['root_source_id_list'] if article_info['root_source_id_list'] else EMPTY_LIST)
+        url = article_info["ContentUrl"]
+        publish_timestamp = article_info["publish_timestamp"]
+        wx_sn = article_info["wx_sn"].decode()
+        root_source_id_list = json.loads(
+            article_info["root_source_id_list"]
+            if article_info["root_source_id_list"]
+            else EMPTY_LIST
+        )
 
         try:
-            article_mini_program_detail = self.get_article_mini_program_info(url, root_source_id_list)
+            article_mini_program_detail = self.get_article_mini_program_info(
+                url, root_source_id_list
+            )
         except Exception as e:
             return {}
 
@@ -180,30 +195,34 @@ class UpdatePublishedArticlesMinigramDetail(object):
                     "ContentUrl": url,
                     "wxSn": wx_sn,
                     "publish_timestamp": publish_timestamp,
-                    "miniInfo": article_mini_program_detail
-                }
+                    "miniInfo": article_mini_program_detail,
+                },
             )
             try:
                 publish_date = datetime.fromtimestamp(publish_timestamp)
                 # generate T+0, T+1, T+2 date string
                 recall_dt_str_list = [
-                    (publish_date + timedelta(days=i)).strftime('%Y-%m-%d')
+                    (publish_date + timedelta(days=i)).strftime("%Y-%m-%d")
                     for i in range(3)
                 ]
 
                 for date_str in recall_dt_str_list:
-                    for video_index, mini_item in enumerate(article_mini_program_detail, 1):
-                        image_url = mini_item['image_url']
-                        nick_name = mini_item['nike_name']
+                    for video_index, mini_item in enumerate(
+                        article_mini_program_detail, 1
+                    ):
+                        image_url = mini_item["image_url"]
+                        nick_name = mini_item["nike_name"]
                         # extract video id and root_source_id
-                        if mini_item.get("root_source_id") and mini_item.get("video_id"):
-                            root_source_id = mini_item['root_source_id']
-                            video_id = mini_item['video_id']
+                        if mini_item.get("root_source_id") and mini_item.get(
+                            "video_id"
+                        ):
+                            root_source_id = mini_item["root_source_id"]
+                            video_id = mini_item["video_id"]
                         else:
-                            id_info = extract_path(mini_item['path'])
-                            root_source_id = id_info['root_source_id']
-                            video_id = id_info['video_id']
-                        kimi_title = mini_item['title']
+                            id_info = extract_root_source_id(mini_item["path"])
+                            root_source_id = id_info["root_source_id"]
+                            video_id = id_info["video_id"]
+                        kimi_title = mini_item["title"]
                         self.insert_each_root_source_id(
                             wx_sn=wx_sn,
                             mini_title=kimi_title,
@@ -212,8 +231,8 @@ class UpdatePublishedArticlesMinigramDetail(object):
                             video_index=video_index,
                             root_source_id=root_source_id,
                             video_id=video_id,
-                            publish_dt=publish_date.strftime('%Y-%m-%d'),
-                            recall_dt=date_str
+                            publish_dt=publish_date.strftime("%Y-%m-%d"),
+                            recall_dt=date_str,
                         )
                 return EMPTY_DICT
             except Exception as e:
@@ -222,14 +241,16 @@ class UpdatePublishedArticlesMinigramDetail(object):
                     task=TASK_NAME,
                     function="record_each_article",
                     status="fail",
-                    message="插入数据失败, 失败原因是{}--{}".format(e, error_msg)
+                    message="插入数据失败, 失败原因是{}--{}".format(e, error_msg),
                 )
                 return article_info
 
         else:
             return EMPTY_DICT
 
-    def get_article_mini_program_info(self, content_url: str, root_source_id_list: list) -> List[Dict]:
+    def get_article_mini_program_info(
+        self, content_url: str, root_source_id_list: list
+    ) -> List[Dict]:
         """
         获取文章的小程序信息
         :return:
@@ -242,7 +263,7 @@ class UpdatePublishedArticlesMinigramDetail(object):
             fetch_response = self.long_articles_db_client.fetch(
                 query=fetch_sql,
                 params=(tuple(root_source_id_list),),
-                cursor_type=DictCursor
+                cursor_type=DictCursor,
             )
             mini_info = []
             if fetch_response:
@@ -254,23 +275,23 @@ class UpdatePublishedArticlesMinigramDetail(object):
                             "avatar": "https://rescdn.yishihui.com/0temp/logo.png",
                             "image_url": "",
                             "nike_name": "票圈 l 3亿人喜欢的视频平台",
-                            "root_source_id": item['root_source_id'],
-                            "video_id": item['video_id'],
+                            "root_source_id": item["root_source_id"],
+                            "video_id": item["video_id"],
                             "service_type": "0",
                             "title": "",
-                            "type": "card"
+                            "type": "card",
                         }
                     )
                 return mini_info
 
         try:
-            article_detail = spider.get_article_text(content_url)
+            article_detail = get_article_detail(content_url)
         except Exception as e:
             raise SpiderError(error=e, spider="detail", url=content_url)
 
-        response_code = article_detail['code']
-        if response_code == const.ARTICLE_SUCCESS_CODE:
-            mini_info = article_detail['data']['data']['mini_program']
+        response_code = article_detail["code"]
+        if response_code == self.ARTICLE_SUCCESS_CODE:
+            mini_info = article_detail["data"]["data"]["mini_program"]
             return mini_info
         else:
             return EMPTY_LIST
@@ -287,7 +308,9 @@ class UpdatePublishedArticlesMinigramDetail(object):
              WHERE publish_dt
              BETWEEN DATE_SUB('{biz_date}', INTERVAL 3 DAY) AND DATE_SUB('{biz_date}', INTERVAL 1 SECOND);
         """
-        article_list = self.piaoquan_crawler_db_client.fetch(query=sql, cursor_type=DictCursor)
+        article_list = self.piaoquan_crawler_db_client.fetch(
+            query=sql, cursor_type=DictCursor
+        )
         return article_list
 
     def update_each_root_source_id(self, recall_dt: str, root_source_id: str) -> None:
@@ -296,7 +319,9 @@ class UpdatePublishedArticlesMinigramDetail(object):
         :param root_source_id:
         :return:
         """
-        mini_program_detail = self.get_root_source_id_result(root_source_id=root_source_id, dt=recall_dt)
+        mini_program_detail = self.get_root_source_id_result(
+            root_source_id=root_source_id, dt=recall_dt
+        )
         if mini_program_detail:
             # do update job
             update_sql = f"""
@@ -310,19 +335,19 @@ class UpdatePublishedArticlesMinigramDetail(object):
             self.piaoquan_crawler_db_client.save(
                 query=update_sql,
                 params=(
-                    mini_program_detail['first_uv'],
-                    mini_program_detail['split0'],
-                    mini_program_detail['split0_head'],
-                    mini_program_detail['split0_recommend'],
-                    mini_program_detail['split1'],
-                    mini_program_detail['split1_head'],
-                    mini_program_detail['split1_recommend'],
-                    mini_program_detail['split2'],
-                    mini_program_detail['split2_head'],
-                    mini_program_detail['split2_recommend'],
+                    mini_program_detail["first_uv"],
+                    mini_program_detail["split0"],
+                    mini_program_detail["split0_head"],
+                    mini_program_detail["split0_recommend"],
+                    mini_program_detail["split1"],
+                    mini_program_detail["split1_head"],
+                    mini_program_detail["split1_recommend"],
+                    mini_program_detail["split2"],
+                    mini_program_detail["split2_head"],
+                    mini_program_detail["split2_recommend"],
                     root_source_id,
-                    recall_dt
-                )
+                    recall_dt,
+                ),
             )
         else:
             return
@@ -334,11 +359,13 @@ class UpdatePublishedArticlesMinigramDetail(object):
         :return:
         """
         if not biz_date:
-            biz_date = datetime.today().strftime('%Y-%m-%d')
+            biz_date = datetime.today().strftime("%Y-%m-%d")
 
         published_article_list = self.get_articles_published_yesterday(biz_date)
         failed_article_list = []
-        for article_info in tqdm(published_article_list, desc="update_published_articles_job"):
+        for article_info in tqdm(
+            published_article_list, desc="update_published_articles_job"
+        ):
             failed_article = self.record_each_article(article_info)
             if failed_article:
                 failed_article_list.append(failed_article)
@@ -353,22 +380,20 @@ class UpdatePublishedArticlesMinigramDetail(object):
 
         bot(
             title="更新文章任务完成",
-            detail={
-                "finish_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
-            },
-            mention=False
+            detail={"finish_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")},
+            mention=False,
         )
         if second_try_fail_article_list:
             bot(
                 title="更新文章任务存在文章抓取失败",
                 detail=[
                     {
-                        "account": line['accountName'],
-                        "title": line['title'],
-                        "url": line['ContentUrl']
+                        "account": line["accountName"],
+                        "title": line["title"],
+                        "url": line["ContentUrl"],
                     }
                     for line in second_try_fail_article_list
-                ]
+                ],
             )
 
     def update_mini_program_detail_job(self, biz_date=None):
@@ -378,21 +403,24 @@ class UpdatePublishedArticlesMinigramDetail(object):
         :return:
         """
         if not biz_date:
-            biz_date = datetime.today().strftime('%Y-%m-%d')
+            biz_date = datetime.today().strftime("%Y-%m-%d")
 
         # get root_source_id_list
         root_source_id_obj_list = self.get_root_source_id_for_three_days(biz_date)
         log(
             task=TASK_NAME,
             function="update_minigram_detail",
-            message="获取前三天的 rootSourceId, 一共有 {} 条记录".format(len(root_source_id_obj_list))
+            message="获取前三天的 rootSourceId, 一共有 {} 条记录".format(
+                len(root_source_id_obj_list)
+            ),
         )
         fail_count = 0
-        for item in tqdm(root_source_id_obj_list, desc="update_mini_program_detail_job"):
+        for item in tqdm(
+            root_source_id_obj_list, desc="update_mini_program_detail_job"
+        ):
             try:
                 self.update_each_root_source_id(
-                    root_source_id=item['root_source_id'],
-                    recall_dt=item['recall_dt']
+                    root_source_id=item["root_source_id"], recall_dt=item["recall_dt"]
                 )
             except Exception as e:
                 log(
@@ -400,16 +428,12 @@ class UpdatePublishedArticlesMinigramDetail(object):
                     function="update_minigram_detail",
                     status="fail",
                     message="更新单条数据失败, 报错信息是 {}".format(e),
-                    data={"error_msg": traceback.format_exc()}
+                    data={"error_msg": traceback.format_exc()},
                 )
                 fail_count += 1
 
         if fail_count:
             bot(
                 title="{} fail because of lam db error".format(TASK_NAME),
-                detail={
-                    "fail_count": fail_count
-                }
+                detail={"fail_count": fail_count},
             )
-
-

+ 0 - 452
tasks/not_used_tasks/updateMinigramInfoDaily.py

@@ -1,452 +0,0 @@
-"""
-@author luojunhui
-@description Update Minigram Info Daily
-"""
-import time
-import traceback
-
-from tqdm import tqdm
-from datetime import datetime, timedelta
-import schedule
-from argparse import ArgumentParser
-
-from applications import WeixinSpider, Functions, log, bot
-from applications.db import DatabaseConnector
-from config import long_articles_config, piaoquan_crawler_config
-
-TASK_NAME = "updateMinigramInfoDaily"
-SPIDER_SUCCESS_STATUS = 0
-
-
-def get_yesterday():
-    """
-    get yesterday date
-    :return:
-    """
-    yesterday = datetime.today() - timedelta(1)
-    return yesterday
-
-
-class DailyDataManager(object):
-    """
-    daily 数据每日更新
-    """
-
-    def __init__(self):
-        self.piaoquan_crawler_db_client = None
-        self.long_articles_db_client = None
-        self.spider = WeixinSpider()
-
-    def init_database(self) -> None:
-        """
-        init database connector
-        :return:
-        """
-        # 初始化数据库连接
-        try:
-            self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
-            self.piaoquan_crawler_db_client.connect()
-            self.long_articles_db_client = DatabaseConnector(long_articles_config)
-            self.long_articles_db_client.connect()
-        except Exception as e:
-            error_msg = traceback.format_exc()
-            bot(
-                title="更新小程序裂变信息任务连接数据库失败",
-                detail={
-                    "error": e,
-                    "msg": error_msg
-                }
-            )
-            return
-
-    def get_published_articles(self, biz_date):
-        """
-        获取已经发布的文章的信息, updateTime 选择为前一天的 0 点并且转化为时间戳
-        :return:
-        """
-        biz_date_midnight = datetime(year=biz_date.year, month=biz_date.month, day=biz_date.day)
-        biz_date_ts = biz_date_midnight.timestamp()
-        biz_date_end_ts = biz_date_ts + 24 * 60 * 60 - 1
-        sql2 = f"""
-        select ContentUrl, wx_sn, publish_timestamp, accountName, title
-        from official_articles_v2
-        where publish_timestamp between {biz_date_ts} and {biz_date_end_ts};
-        """
-        result_list = self.piaoquan_crawler_db_client.fetch(sql2)
-        log(
-            task=TASK_NAME,
-            function="get_published_articles",
-            message="一共获取 {} 篇文章数据".format(len(result_list))
-        )
-        return result_list
-
-    def update_article_info(self, line):
-        """
-        update info into mysql
-        :return:
-        """
-        url = line[0]
-        update_time = line[2]
-        wx_sn = line[1].decode()
-        article_detail = self.get_root_source_ids(line)
-        if article_detail:
-            response_code = article_detail['code']
-            if response_code == SPIDER_SUCCESS_STATUS:
-                mini_info = article_detail['data']['data']['mini_program']
-                if mini_info:
-                    log(
-                        task=TASK_NAME,
-                        function="get_root_source_ids",
-                        message="获取文章链接对应的 rootSourceId 成功",
-                        data={
-                            "ContentUrl": url,
-                            "wxSn": wx_sn,
-                            "updateTime": update_time,
-                            "miniInfo": mini_info
-                        }
-                    )
-                    try:
-                        dt_object = datetime.fromtimestamp(update_time)
-                        publish_dt = dt_object.strftime('%Y-%m-%d')
-                        one_day = timedelta(days=1)
-                        two_day = timedelta(days=2)
-                        next_day = dt_object + one_day
-                        next_next_day = dt_object + two_day
-                        recall_dt_list = [dt_object, next_day, next_next_day]
-                        recall_dt_str_list = [i.strftime('%Y-%m-%d') for i in recall_dt_list]
-                        for dt_str in recall_dt_str_list:
-                            for index, item in enumerate(mini_info, 1):
-                                image_url = item['image_url']
-                                nick_name = item['nike_name']
-                                root_source_id = item['path'].split("rootSourceId%3D")[-1]
-                                video_id = item['path'].split("videos%3Fid%3D")[1].split("%26su%3D")[0]
-                                kimi_title = item['title']
-                                # print(image_url, nick_name, root_source_id, video_id, kimi_title)
-                                insert_sql = f"""
-                                        INSERT INTO long_articles_detail_info
-                                        (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
-                                        values
-                                        (%s, %s, %s, %s, %s, %s, %s, %s, %s);
-                                    """
-                                self.piaoquan_crawler_db_client.save(
-                                    query=insert_sql,
-                                    params=(
-                                        wx_sn,
-                                        kimi_title,
-                                        nick_name,
-                                        image_url,
-                                        index,
-                                        root_source_id,
-                                        video_id,
-                                        publish_dt,
-                                        dt_str
-                                    )
-                                )
-                                log(
-                                    task=TASK_NAME,
-                                    function="update_article_info",
-                                    message="插入数据成功, video_id 是: {}".format(video_id)
-                                )
-                    except Exception as e:
-                        error_msg = traceback.format_exc()
-                        log(
-                            task=TASK_NAME,
-                            function="update_article_info",
-                            status="fail",
-                            message="插入数据失败, 失败原因是{}--{}".format(e, error_msg)
-                        )
-
-                return None
-            else:
-                return line
-
-        else:
-            return line
-
-    def get_root_source_ids(self, data_info):
-        """
-        通过抓取接口获取 data_info
-        :return:
-        """
-        url = data_info[0]
-        try:
-            article_detail = self.spider.get_article_text(url)
-            return article_detail
-        except Exception as e:
-            log(
-                task=TASK_NAME,
-                function="get_root_source_ids",
-                status="fail",
-                message="获取文章链接对应的 rootSourceId失败, 报错信息是: {}".format(e),
-                data={
-                    "ContentUrl": url
-                }
-            )
-            return False
-
-    def get_minigram_info(self, rootSourceId):
-        """
-
-        :param rootSourceId:
-        :return:
-        """
-        sql = f"""
-        select type, machinecode, create_time, first_level_dt
-        from changwen_data_base_v2
-        where rootsourceid = '{rootSourceId}';
-        """
-        result_list = self.long_articles_db_client.fetch(sql)
-
-        def summarize(values):
-            """
-            :param values:
-            :return:
-            """
-            L = {}
-            first_level = {}
-            fission_level = {}
-            for line in values:
-                # 先统计首层
-                if line[0] == '首层':
-                    try:
-                        dt = str(line[-1])
-                        key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
-                        if first_level.get(key_dt):
-                            first_level[key_dt].add(line[1])
-                        else:
-                            first_level[key_dt] = {line[1]}
-                    except Exception as e:
-                        continue
-                else:
-                    try:
-                        dt = str(line[-1])
-                        first_level_dt = datetime.strptime(dt, '%Y%m%d')
-                        create_level_dt = line[-2]
-                        delta = create_level_dt - first_level_dt
-                        days = int(delta.days)
-                        key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
-                        if fission_level.get(key_dt):
-                            fission_level[key_dt].append((line[1], days))
-                        else:
-                            fission_level[key_dt] = [(line[1], days)]
-                    except Exception as e:
-                        continue
-                        # print("first level dt is NULL")
-
-            tt = {}
-            for key in fission_level:
-                detail_list = fission_level[key]
-                temp = {}
-                for item in detail_list:
-                    mid, days = item
-                    if temp.get(days):
-                        temp[days].add(mid)
-                    else:
-                        temp[days] = {mid}
-                final = {}
-                for sub_key in temp:
-                    length = len(temp[sub_key])
-                    final[sub_key] = length
-                tt[key] = final
-            for key in first_level:
-                temp = [len(first_level[key]), tt.get(key, {}).get(0, 0), tt.get(key, {}).get(1, 0),
-                        tt.get(key, {}).get(2, 0)]
-                L[key] = temp
-            return L
-
-        try:
-            response = summarize(result_list)
-            log(
-                task=TASK_NAME,
-                function="get_minigram_info",
-                message="计算source_id信息成功",
-                data=response
-            )
-            return response
-        except Exception as e:
-            log(
-                task=TASK_NAME,
-                function="get_minigram_info",
-                message="获取 source_id信息失败, 报错信息是: {}".format(e),
-                status="fail"
-            )
-            return None
-
-    def update_minigram_detail(self, biz_date):
-        """
-        :return:
-        """
-        # 获取三天前的日期
-        date_begin = biz_date - timedelta(days=3)
-        datestr_begin = date_begin.strftime("%Y-%m-%d")
-        datestr_end = biz_date.strftime("%Y-%m-%d")
-        sql = f"""
-            select distinct root_source_id
-            from long_articles_detail_info
-            where publish_dt between '{datestr_begin}' and '{datestr_end}';
-        """
-        source_id_list = self.piaoquan_crawler_db_client.fetch(sql)
-        log(
-            task=TASK_NAME,
-            function="update_minigram_detail",
-            message="获取前三天的 rootSourceId, 一共有 {} 条记录".format(len(source_id_list))
-        )
-        fail_count = 0
-        for item in tqdm(source_id_list):
-            s_id = item[0]
-            try:
-                result = self.get_minigram_info(s_id)
-                for key in result:
-                    recall_dt = key
-                    first_level = result[key][0]
-                    fission_0 = result[key][1]
-                    fission_1 = result[key][2]
-                    fission_2 = result[key][3]
-                    # print(s_id, recall_dt, first_level, fission_0, fission_1, fission_2)
-                    update_sql = f"""
-                        UPDATE long_articles_detail_info
-                        set first_level = %s, fission_0 = %s, fission_1 = %s, fission_2 = %s
-                        where root_source_id = %s and recall_dt = %s;
-                    """
-                    try:
-                        self.piaoquan_crawler_db_client.save(
-                            query=update_sql,
-                            params=(
-                                first_level, fission_0, fission_1, fission_2, s_id, recall_dt
-                            )
-                        )
-                    except Exception as e:
-                        log(
-                            task=TASK_NAME,
-                            function="update_minigram_detail",
-                            status="fail",
-                            message="mysql 更新失败, 报错信息是 {}".format(e)
-                        )
-            except Exception as e:
-                log(
-                    task=TASK_NAME,
-                    function="update_minigram_detail",
-                    status="fail",
-                    message="更新单条数据失败, 报错信息是 {}".format(e),
-                    data={"error_msg": traceback.format_exc()}
-                )
-                fail_count += 1
-        if fail_count:
-            bot(
-                title="{} fail because of lam db error".format(TASK_NAME),
-                detail={
-                    "fail_count": fail_count
-                }
-            )
-
-
-def updateArticlesJob(biz_date=None):
-    """
-    更新文章数据
-    :return:
-    """
-    if not biz_date:
-        biz_date = get_yesterday()
-    data_manager = DailyDataManager()
-    data_manager.init_database()
-    article_list = data_manager.get_published_articles(biz_date)
-    failed_article_list = []
-    for article in tqdm(article_list):
-        failed_article = data_manager.update_article_info(article)
-        if failed_article:
-            failed_article_list.append(failed_article)
-
-    # 重试
-    second_try_fail_article_list = []
-    if failed_article_list:
-        for article in tqdm(failed_article_list):
-            second_failed_article = data_manager.update_article_info(article)
-            if second_failed_article:
-                second_try_fail_article_list.append(second_failed_article)
-
-    log(
-        task=TASK_NAME,
-        function="updateArticlesJob",
-        message="文章更新完成---{}".format(biz_date.__str__())
-    )
-    bot(
-        title="更新文章任务完成",
-        detail={
-            "finish_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
-        },
-        mention=False
-    )
-    if second_try_fail_article_list:
-        bot(
-            title="更新文章任务存在文章抓取失败",
-            detail=[
-                {
-                    "account": line[3],
-                    "title": line[4],
-                    "url": line[0]
-                }
-                for line in second_try_fail_article_list
-            ]
-        )
-
-
-def updateMinigramInfoJob(biz_date=None):
-    """
-    更新前三天小程序数据
-    :return:
-    """
-    if not biz_date:
-        biz_date = get_yesterday()
-    data_manager = DailyDataManager()
-    data_manager.init_database()
-    try:
-        data_manager.update_minigram_detail(biz_date)
-        log(
-            task=TASK_NAME,
-            function="updateMinigramInfoJob",
-            message="小程序更新完成---{}".format(biz_date.__str__())
-        )
-    except Exception as e:
-        log(
-            task=TASK_NAME,
-            function="updateMinigramInfoJob",
-            status="fail",
-            message="小程序更新失败---{}, 报错信息是: {}".format(biz_date.__str__(), e)
-        )
-    bot(
-        title="更新小程序信息任务完成",
-        detail={
-            "finish_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
-        },
-        mention=False
-    )
-
-
-def main():
-    """
-    main function
-    :return:
-    """
-    parser = ArgumentParser()
-    parser.add_argument("--run-date",
-                        help="Run only once for date in format of %Y%m%d. \
-                        If no specified, run as daily jobs.")
-    args = parser.parse_args()
-
-    if args.run_date:
-        biz_date = datetime.strptime(args.run_date, "%Y%m%d")
-        print("Run in manual mode. Date: {}".format(args.run_date))
-        updateArticlesJob(biz_date)
-        updateMinigramInfoJob(biz_date)
-        return
-    else:
-        print("Run in daily mode.")
-        schedule.every().day.at("01:30").do(Functions().job_with_thread, updateArticlesJob)
-        schedule.every().day.at("03:30").do(Functions().job_with_thread, updateMinigramInfoJob)
-        while True:
-            schedule.run_pending()
-            time.sleep(1)
-
-
-if __name__ == '__main__':
-    main()

+ 1 - 1
update_mini_info_v2.py

@@ -4,7 +4,7 @@
 
 from argparse import ArgumentParser
 
-from tasks.update_published_articles_minigram_detail import UpdatePublishedArticlesMinigramDetail
+from tasks.data_tasks.update_published_articles_minigram_detail import UpdatePublishedArticlesMinigramDetail
 
 
 def main():