Przeglądaj źródła

优化小程序信息更新代码dev

luojunhui 4 miesięcy temu
rodzic
commit
2e6dd8eec6

+ 1 - 0
applications/exception/__init__.py

@@ -2,4 +2,5 @@
 @author: luojunhui
 """
 from .query_error import QueryError
+from .spider_error import SpiderError
 from .transaction_error import TransactionError

+ 28 - 0
applications/exception/spider_error.py

@@ -0,0 +1,28 @@
+"""
+@author: luojunhui
+"""
+import json
+
+from applications import log
+
+
+class SpiderError(Exception):
+    """数据库查询异常"""
+
+    def __init__(self, error=None, spider=None, url=None):
+        """
+        :param error: 异常对象,可选,用于提供更详细的错误信息。
+        :param spider: 爬虫任务
+        """
+        error_obj = {
+            "error": str(error),
+            "spider": spider,
+            "message": "爬虫接口请求失败",
+            "url": url
+        }
+        log(
+            task="spider_task",
+            function="log_spider_error",
+            data=error_obj
+        )
+        super().__init__(json.dumps(error_obj, ensure_ascii=False, indent=4))

+ 243 - 0
daily_tasks/update_mini_program_info_task.py

@@ -0,0 +1,243 @@
+"""
+@author: luojunhui
+@desc: 更新每日发布文章的小程序裂变信息
+"""
+import json
+import time
+import traceback
+from argparse import ArgumentParser
+from datetime import datetime, timedelta
+from typing import List, Dict
+
+from tqdm import tqdm
+from pymysql.cursors import DictCursor
+
+from applications import bot
+from applications import Functions
+from applications import log
+from applications import WeixinSpider
+from applications.db import DatabaseConnector
+from applications.exception import SpiderError
+from config import long_articles_config, piaoquan_crawler_config
+
+basic_function = Functions()
+spider = WeixinSpider()
+
+TASK_NAME = "updateMinigramInfoDaily"
+SPIDER_SUCCESS_STATUS = 0
+
+
+class UpdateMiniProgramInfoTask(object):
+    """
+    更新小程序裂变信息
+    """
+
+    def __init__(self):
+        self.piaoquan_crawler_db_client = None
+        self.long_articles_db_client = None
+
+    def init_database(self) -> None:
+        """
+        init database connector
+        :return:
+        """
+        # 初始化数据库连接
+        try:
+            self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
+            self.piaoquan_crawler_db_client.connect()
+            self.long_articles_db_client = DatabaseConnector(long_articles_config)
+            self.long_articles_db_client.connect()
+        except Exception as e:
+            error_msg = traceback.format_exc()
+            bot(
+                title="更新小程序裂变信息任务连接数据库失败",
+                detail={
+                    "error": e,
+                    "msg": error_msg
+                }
+            )
+            return
+
+    def get_published_articles_yesterday(self, run_date: str) -> List[Dict]:
+        """
+        get_published_articles_yesterday
+        :param run_date:
+        :return:
+        """
+        sql = f"""
+            SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title
+            FROM official_articles_v2
+            WHERE FROM_UNIXTIME(publish_timestamp)
+            BETWEEN DATE_SUB('{run_date}', INTERVAL 1 DAY) AND DATE_SUB('{run_date}', INTERVAL 1 SECOND);
+        """
+        article_list = self.piaoquan_crawler_db_client.fetch(query=sql, cursor_type=DictCursor)
+        return article_list
+
+    def update_each_article(self, article_info: Dict) -> None:
+        """
+        update_each_article
+        :param article_info:
+        :return:
+        """
+        url = article_info['ContentUrl']
+        wx_sn = article_info['wx_sn'].decode()
+        publish_timestamp = article_info['publish_timestamp']
+
+        try:
+            article_detail = spider.get_article_text(url)
+        except Exception as e:
+            raise SpiderError(error=e, spider="detail", url=url)
+
+        response_code = article_detail['code']
+        if response_code == SPIDER_SUCCESS_STATUS:
+            mini_info = article_detail['data']['data']['mini_program']
+            if mini_info:
+                log(
+                    task=TASK_NAME,
+                    function="get_root_source_ids",
+                    message="获取文章链接对应的 rootSourceId 成功",
+                    data={
+                        "ContentUrl": url,
+                        "wxSn": wx_sn,
+                        "publish_time_stamp": publish_timestamp,
+                        "miniInfo": mini_info
+                    }
+                )
+                try:
+                    dt_object = datetime.fromtimestamp(publish_timestamp)
+                    publish_dt = dt_object.strftime('%Y-%m-%d')
+                    one_day = timedelta(days=1)
+                    two_day = timedelta(days=2)
+                    next_day = dt_object + one_day
+                    next_next_day = dt_object + two_day
+                    recall_dt_list = [dt_object, next_day, next_next_day]
+                    recall_dt_str_list = [i.strftime('%Y-%m-%d') for i in recall_dt_list]
+                    for dt_str in recall_dt_str_list:
+                        for index, item in enumerate(mini_info, 1):
+                            image_url = item['image_url']
+                            nick_name = item['nike_name']
+                            root_source_id = item['path'].split("rootSourceId%3D")[-1]
+                            video_id = item['path'].split("videos%3Fid%3D")[1].split("%26su%3D")[0]
+                            kimi_title = item['title']
+                            # print(image_url, nick_name, root_source_id, video_id, kimi_title)
+                            insert_sql = f"""
+                                                INSERT INTO long_articles_detail_info
+                                                (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
+                                                values
+                                                (%s, %s, %s, %s, %s, %s, %s, %s, %s);
+                                            """
+                            self.piaoquan_crawler_db_client.save(
+                                query=insert_sql,
+                                params=(
+                                    wx_sn,
+                                    kimi_title,
+                                    nick_name,
+                                    image_url,
+                                    index,
+                                    root_source_id,
+                                    video_id,
+                                    publish_dt,
+                                    dt_str
+                                )
+                            )
+                            log(
+                                task=TASK_NAME,
+                                function="update_article_info",
+                                message="插入数据成功, video_id 是: {}".format(video_id)
+                            )
+                except Exception as e:
+                    error_msg = traceback.format_exc()
+                    log(
+                        task=TASK_NAME,
+                        function="update_article_info",
+                        status="fail",
+                        message="插入数据失败, 失败原因是{}--{}".format(e, error_msg)
+                    )
+
+    def get_source_id_info(self, root_source_id: str) -> Dict:
+        """
+        计算root_source_id
+        :param root_source_id:
+        :return:
+        """
+        sql = f"""
+        select type, machinecode, create_time, first_level_dt
+        from changwen_data_base_v2
+        where rootsourceid = '{root_source_id}';
+        """
+        result_list = self.long_articles_db_client.fetch(sql)
+
+        def summarize(values):
+            """
+            :param values:
+            :return:
+            """
+            L = {}
+            first_level = {}
+            fission_level = {}
+            for line in values:
+                # 先统计首层
+                if line[0] == '首层':
+                    try:
+                        dt = str(line[-1])
+                        key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
+                        if first_level.get(key_dt):
+                            first_level[key_dt].add(line[1])
+                        else:
+                            first_level[key_dt] = {line[1]}
+                    except Exception as e:
+                        continue
+                else:
+                    try:
+                        dt = str(line[-1])
+                        first_level_dt = datetime.strptime(dt, '%Y%m%d')
+                        create_level_dt = line[-2]
+                        delta = create_level_dt - first_level_dt
+                        days = int(delta.days)
+                        key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
+                        if fission_level.get(key_dt):
+                            fission_level[key_dt].append((line[1], days))
+                        else:
+                            fission_level[key_dt] = [(line[1], days)]
+                    except Exception as e:
+                        continue
+                        # print("first level dt is NULL")
+
+            tt = {}
+            for key in fission_level:
+                detail_list = fission_level[key]
+                temp = {}
+                for item in detail_list:
+                    mid, days = item
+                    if temp.get(days):
+                        temp[days].add(mid)
+                    else:
+                        temp[days] = {mid}
+                final = {}
+                for sub_key in temp:
+                    length = len(temp[sub_key])
+                    final[sub_key] = length
+                tt[key] = final
+            for key in first_level:
+                temp = [len(first_level[key]), tt.get(key, {}).get(0, 0), tt.get(key, {}).get(1, 0),
+                        tt.get(key, {}).get(2, 0)]
+                L[key] = temp
+            return L
+
+        try:
+            response = summarize(result_list)
+            log(
+                task=TASK_NAME,
+                function="get_minigram_info",
+                message="计算source_id信息成功",
+                data=response
+            )
+            return response
+        except Exception as e:
+            log(
+                task=TASK_NAME,
+                function="get_minigram_info",
+                message="获取 source_id信息失败, 报错信息是: {}".format(e),
+                status="fail"
+            )
+            return {}