Browse Source

优化小程序信息更新代码dev

luojunhui 6 months ago
parent
commit
6a9ed6d28f

+ 6 - 0
applications/const.py

@@ -151,6 +151,12 @@ class WeixinVideoCrawlerConst:
     TITLE_MIN_LENGTH = 15
 
 
+class UpdateMiniProgramDetailConst(updatePublishedMsgTaskConst):
+    """
+    更新小程序详情常量配置
+    """
+
+
 
 
 

+ 1 - 0
applications/exception/__init__.py

@@ -2,4 +2,5 @@
 @author: luojunhui
 """
 from .query_error import QueryError
+from .spider_error import SpiderError
 from .transaction_error import TransactionError

+ 28 - 0
applications/exception/spider_error.py

@@ -0,0 +1,28 @@
+"""
+@author: luojunhui
+"""
+import json
+
+from applications import log
+
+
+class SpiderError(Exception):
+    """数据库查询异常"""
+
+    def __init__(self, error=None, spider=None, url=None):
+        """
+        :param error: 异常对象,可选,用于提供更详细的错误信息。
+        :param spider: 爬虫任务
+        """
+        error_obj = {
+            "error": str(error),
+            "spider": spider,
+            "message": "爬虫接口请求失败",
+            "url": url
+        }
+        log(
+            task="spider_task",
+            function="log_spider_error",
+            data=error_obj
+        )
+        super().__init__(json.dumps(error_obj, ensure_ascii=False, indent=4))

+ 168 - 0
tasks/update_published_articles_minigram_detail.py

@@ -0,0 +1,168 @@
+"""
+@author: luojunhui
+"""
+import json
+import time
+import traceback
+
+from argparse import ArgumentParser
+from typing import List, Dict
+
+from tqdm import tqdm
+from pymysql.cursors import DictCursor
+
+from applications import bot
+from applications import log
+from applications import Functions
+from applications import WeixinSpider
+from applications.db import DatabaseConnector
+from applications.const import UpdateMiniProgramDetailConst
+from applications.exception import SpiderError
+from config import long_articles_config, piaoquan_crawler_config
+
+const = UpdateMiniProgramDetailConst()
+spider = WeixinSpider()
+functions = Functions()
+# 数据库配置
+ARTICLE_TABLE = "official_articles_v2"
+
+
+def check_root_source_id_list(content_url: str):
+    """
+    校验是否存在文章是否存在root_source_id
+    :return:
+    """
+    try:
+        article_detail = spider.get_article_text(content_url)
+    except Exception as e:
+        raise SpiderError(error=e, spider="detail", url=content_url)
+
+    response_code = article_detail['code']
+    if response_code == const.ARTICLE_SUCCESS_CODE:
+        mini_info = article_detail['data']['data']['mini_program']
+        return mini_info
+    else:
+        return None
+
+
+class UpdatePublishedArticlesMinigramDetail(object):
+    """
+    更新已发布文章数据
+    """
+
+    def __init__(self):
+        self.piaoquan_crawler_db_client = None
+        self.long_articles_db_client = None
+
+    def init_database(self) -> None:
+        """
+        init database connector
+        :return:
+        """
+        # 初始化数据库连接
+        try:
+            self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
+            self.piaoquan_crawler_db_client.connect()
+            self.long_articles_db_client = DatabaseConnector(long_articles_config)
+            self.long_articles_db_client.connect()
+        except Exception as e:
+            error_msg = traceback.format_exc()
+            bot(
+                title="更新小程序裂变信息任务连接数据库失败",
+                detail={
+                    "error": e,
+                    "msg": error_msg
+                }
+            )
+            return
+
+    def check_articles(self) -> List[Dict]:
+        """
+        校验是否存在文章未更新得到发布时间
+        :return:
+        """
+        sql = f"""
+        SELECT ContentUrl, wx_sn 
+        FROM {ARTICLE_TABLE}
+        WHERE publish_timestamp IN {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)};"""
+        response = self.piaoquan_crawler_db_client.fetch(sql, cursor_type=DictCursor)
+        return response
+
+    def get_root_source_id_result(self, root_source_id: str, dt: str) -> Dict:
+        """
+        获取文章的root_source_id
+        :param dt:
+        :param root_source_id:
+        :return:
+        """
+        select_sql = f"""
+            SELECT first_uv, split0, split1, split2
+            FROM changwen_data_rootsourceid
+            WHERE root_source_id = '{root_source_id}' AND dt = '{dt}';
+        """
+        result = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
+        if result:
+            return result[0]
+        else:
+            return {}
+
+    def get_articles_published_yesterday(self, biz_date: str) -> List[Dict]:
+        """
+        获取昨天发布的文章
+        :return:
+        """
+        sql = f"""
+             SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list
+             FROM official_articles_v2
+             WHERE FROM_UNIXTIME(publish_timestamp)
+             BETWEEN DATE_SUB('{biz_date}', INTERVAL 1 DAY) AND DATE_SUB('{biz_date}', INTERVAL 1 SECOND);
+         """
+        article_list = self.piaoquan_crawler_db_client.fetch(query=sql, cursor_type=DictCursor)
+        return article_list
+
+    def insert_each_root_source_id(self, root_source_id: str, article_info: Dict) -> int:
+        """
+        :param root_source_id:
+        :param article_info:
+        :return:
+        """
+        insert_sql = f"""
+            INSERT INTO long_articles_detail_info
+            (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
+            values
+            (%s, %s, %s, %s, %s, %s, %s, %s, %s);
+        """
+        affected_rows = self.piaoquan_crawler_db_client.save(
+            query=insert_sql,
+            params=(
+                article_info['wx_sn'],
+                article_info['title'],
+                article_info['mini_name'],
+                article_info['cover_url'],
+                article_info['video_index'],
+                root_source_id,
+                article_info['video_id'],
+                article_info['publish_dt']
+            )
+        )
+        return affected_rows
+
+    def record_each_article(self, article_info: Dict) -> None:
+        """
+        记录每篇文章的root_source_id
+        数量集: article_count * mini_program_count * days_count
+        :param article_info:
+        :return:
+        """
+        url = article_info['ContentUrl']
+        root_source_id_list = json.loads(article_info['root_source_id_list'])
+        if not root_source_id_list:
+            root_source_id_response = check_root_source_id_list(url)
+            if root_source_id_response:
+                root_source_id_list = []
+            else:
+                return
+
+        for root_source_id in root_source_id_list:
+            self.record_each_article(root_source_id, article_info)
+