""" @author: luojunhui """ import json import time import traceback from argparse import ArgumentParser from typing import List, Dict from tqdm import tqdm from pymysql.cursors import DictCursor from applications import bot from applications import log from applications import Functions from applications import WeixinSpider from applications.db import DatabaseConnector from applications.const import UpdateMiniProgramDetailConst from applications.exception import SpiderError from config import long_articles_config, piaoquan_crawler_config const = UpdateMiniProgramDetailConst() spider = WeixinSpider() functions = Functions() # 数据库配置 ARTICLE_TABLE = "official_articles_v2" def check_root_source_id_list(content_url: str): """ 校验是否存在文章是否存在root_source_id :return: """ try: article_detail = spider.get_article_text(content_url) except Exception as e: raise SpiderError(error=e, spider="detail", url=content_url) response_code = article_detail['code'] if response_code == const.ARTICLE_SUCCESS_CODE: mini_info = article_detail['data']['data']['mini_program'] return mini_info else: return None class UpdatePublishedArticlesMinigramDetail(object): """ 更新已发布文章数据 """ def __init__(self): self.piaoquan_crawler_db_client = None self.long_articles_db_client = None def init_database(self) -> None: """ init database connector :return: """ # 初始化数据库连接 try: self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config) self.piaoquan_crawler_db_client.connect() self.long_articles_db_client = DatabaseConnector(long_articles_config) self.long_articles_db_client.connect() except Exception as e: error_msg = traceback.format_exc() bot( title="更新小程序裂变信息任务连接数据库失败", detail={ "error": e, "msg": error_msg } ) return def check_articles(self) -> List[Dict]: """ 校验是否存在文章未更新得到发布时间 :return: """ sql = f""" SELECT ContentUrl, wx_sn FROM {ARTICLE_TABLE} WHERE publish_timestamp IN {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)};""" response = self.piaoquan_crawler_db_client.fetch(sql, cursor_type=DictCursor) return response def get_root_source_id_result(self, root_source_id: str, dt: str) -> Dict: """ 获取文章的root_source_id :param dt: :param root_source_id: :return: """ select_sql = f""" SELECT first_uv, split0, split1, split2 FROM changwen_data_rootsourceid WHERE root_source_id = '{root_source_id}' AND dt = '{dt}'; """ result = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor) if result: return result[0] else: return {} def get_articles_published_yesterday(self, biz_date: str) -> List[Dict]: """ 获取昨天发布的文章 :return: """ sql = f""" SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list FROM official_articles_v2 WHERE FROM_UNIXTIME(publish_timestamp) BETWEEN DATE_SUB('{biz_date}', INTERVAL 1 DAY) AND DATE_SUB('{biz_date}', INTERVAL 1 SECOND); """ article_list = self.piaoquan_crawler_db_client.fetch(query=sql, cursor_type=DictCursor) return article_list def insert_each_root_source_id(self, root_source_id: str, article_info: Dict) -> int: """ :param root_source_id: :param article_info: :return: """ insert_sql = f""" INSERT INTO long_articles_detail_info (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt) values (%s, %s, %s, %s, %s, %s, %s, %s, %s); """ affected_rows = self.piaoquan_crawler_db_client.save( query=insert_sql, params=( article_info['wx_sn'], article_info['title'], article_info['mini_name'], article_info['cover_url'], article_info['video_index'], root_source_id, article_info['video_id'], article_info['publish_dt'] ) ) return affected_rows def record_each_article(self, article_info: Dict) -> None: """ 记录每篇文章的root_source_id 数量集: article_count * mini_program_count * days_count :param article_info: :return: """ url = article_info['ContentUrl'] root_source_id_list = json.loads(article_info['root_source_id_list']) if not root_source_id_list: root_source_id_response = check_root_source_id_list(url) if root_source_id_response: root_source_id_list = [] else: return for root_source_id in root_source_id_list: self.record_each_article(root_source_id, article_info)