""" @author: luojunhui """ import json import traceback from datetime import datetime, timedelta from typing import List, Dict from tqdm import tqdm from urllib.parse import urlparse, parse_qs from pymysql.cursors import DictCursor from applications import bot from applications import log from applications import Functions from applications import WeixinSpider from applications.db import DatabaseConnector from applications.const import UpdateMiniProgramDetailConst from applications.exception import SpiderError from config import long_articles_config, piaoquan_crawler_config const = UpdateMiniProgramDetailConst() spider = WeixinSpider() functions = Functions() TASK_NAME = "updateMinigramInfoDaily" ARTICLE_TABLE = "official_articles_v2" DETAIL_TABLE = "long_articles_detail_info" EMPTY_LIST = [] EMPTY_DICT = {} def extract_path(path: str) -> Dict: """ 提取path参数 :param path: :return: """ params = parse_qs(urlparse(path).query) jump_page = params.get('jumpPage', [None])[0] if jump_page: params2 = parse_qs(jump_page) res = { "video_id": params2['pages/user-videos?id'][0], "root_source_id": params2['rootSourceId'][0], } return res else: return EMPTY_DICT class UpdatePublishedArticlesMinigramDetail(object): """ 更新已发布文章数据 """ def __init__(self): self.piaoquan_crawler_db_client = None self.long_articles_db_client = None def init_database(self) -> None: """ init database connector :return: """ # 初始化数据库连接 try: self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config) self.piaoquan_crawler_db_client.connect() self.long_articles_db_client = DatabaseConnector(long_articles_config) self.long_articles_db_client.connect() except Exception as e: error_msg = traceback.format_exc() bot( title="更新小程序裂变信息任务连接数据库失败", detail={ "error": e, "msg": error_msg } ) return def check_articles(self) -> List[Dict]: """ 校验是否存在文章未更新得到发布时间 :return: """ sql = f""" SELECT ContentUrl, wx_sn FROM {ARTICLE_TABLE} WHERE publish_timestamp IN {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)}; """ response = self.piaoquan_crawler_db_client.fetch(sql, cursor_type=DictCursor) return response def get_root_source_id_result(self, root_source_id: str, dt: str) -> Dict: """ 获取文章的root_source_id :param dt: :param root_source_id: :return: """ select_sql = f""" SELECT first_uv, split0, split0_head, split0_recommend, split1, split1_head, split1_recommend, split2, split2_head, split2_recommend FROM changwen_data_rootsourceid WHERE root_source_id = '{root_source_id}' AND dt = '{dt}'; """ result = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor) if result: return result[0] else: return EMPTY_DICT def get_articles_published_yesterday(self, biz_date: str) -> List[Dict]: """ 获取发布时间在biz_date前一天0点-23:59:59的文章 :return: """ sql = f""" SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list FROM official_articles_v2 WHERE FROM_UNIXTIME(publish_timestamp) BETWEEN DATE_SUB('{biz_date}', INTERVAL 1 DAY) AND DATE_SUB('{biz_date}', INTERVAL 1 SECOND); """ article_list = self.piaoquan_crawler_db_client.fetch(query=sql, cursor_type=DictCursor) return article_list def insert_each_root_source_id(self, wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt) -> int: """ :param recall_dt: 召回日期 :param publish_dt: 文章发布日期 :param video_id: 视频id :param video_index: 视频位置 :param cover_url: 视频封面 :param mini_name: 小程序名称 :param mini_title: 小程序标题 :param wx_sn: :param root_source_id: :return: """ insert_sql = f""" INSERT IGNORE INTO {DETAIL_TABLE} (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt) values (%s, %s, %s, %s, %s, %s, %s, %s, %s); """ affected_rows = self.piaoquan_crawler_db_client.save( query=insert_sql, params=( wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt ) ) return affected_rows def record_each_article(self, article_info: Dict) -> Dict: """ 记录每篇文章的root_source_id 数量集: article_count * mini_program_count * days_count :param article_info: :return: """ url = article_info['ContentUrl'] publish_timestamp = article_info['publish_timestamp'] wx_sn = article_info['wx_sn'].decode() root_source_id_list = json.loads(article_info['root_source_id_list'] if article_info['root_source_id_list'] else EMPTY_LIST) article_mini_program_detail = self.get_article_mini_program_info(url, root_source_id_list) if article_mini_program_detail: log( task=TASK_NAME, function="record_each_article", message="获取文章链接对应的 rootSourceId 成功", data={ "ContentUrl": url, "wxSn": wx_sn, "publish_timestamp": publish_timestamp, "miniInfo": article_mini_program_detail } ) try: publish_date = datetime.fromtimestamp(publish_timestamp) # generate T+0, T+1, T+2 date string recall_dt_str_list = [ (publish_date + timedelta(days=i)).strftime('%Y-%m-%d') for i in range(3) ] for date_str in recall_dt_str_list: for video_index, mini_item in enumerate(article_mini_program_detail, 1): image_url = mini_item['image_url'] nick_name = mini_item['nike_name'] # extract video id and root_source_id if mini_item.get("root_source_id") and mini_item.get("video_id"): root_source_id = mini_item['root_source_id'] video_id = mini_item['video_id'] else: id_info = extract_path(mini_item['path']) root_source_id = id_info['root_source_id'] video_id = id_info['video_id'] kimi_title = mini_item['title'] self.insert_each_root_source_id( wx_sn=wx_sn, mini_title=kimi_title, mini_name=nick_name, cover_url=image_url, video_index=video_index, root_source_id=root_source_id, video_id=video_id, publish_dt=publish_date.strftime('%Y-%m-%d'), recall_dt=date_str ) return EMPTY_DICT except Exception as e: error_msg = traceback.format_exc() log( task=TASK_NAME, function="record_each_article", status="fail", message="插入数据失败, 失败原因是{}--{}".format(e, error_msg) ) return article_info else: return EMPTY_DICT def get_article_mini_program_info(self, content_url: str, root_source_id_list: list) -> List[Dict]: """ 获取文章的小程序信息 :return: """ if root_source_id_list: # 说明已经获取到 root_source_id了 fetch_sql = f""" select video_id, root_source_id from long_articles_root_source_id where root_source_id in %s; """ fetch_response = self.long_articles_db_client.fetch( query=fetch_sql, params=(tuple(root_source_id_list),), cursor_type=DictCursor ) mini_info = [] if fetch_response: # 构造 mini_info 的格式 for item in fetch_response: mini_info.append( { "app_id": "wx89e7eb06478361d7", "avatar": "https://rescdn.yishihui.com/0temp/logo.png", "image_url": "", "nike_name": "票圈 l 3亿人喜欢的视频平台", "root_source_id": item['root_source_id'], "video_id": item['video_id'], "service_type": "0", "title": "", "type": "card" } ) return mini_info try: article_detail = spider.get_article_text(content_url) except Exception as e: raise SpiderError(error=e, spider="detail", url=content_url) response_code = article_detail['code'] if response_code == const.ARTICLE_SUCCESS_CODE: mini_info = article_detail['data']['data']['mini_program'] return mini_info else: return EMPTY_LIST def get_root_source_id_for_three_days(self, biz_date: str) -> List[Dict]: """ 获取publish_dt在 biz_date前三天的root_source_id :param biz_date: :return: """ sql = f""" SELECT recall_dt, root_source_id FROM {DETAIL_TABLE} WHERE publish_dt BETWEEN DATE_SUB('{biz_date}', INTERVAL 3 DAY) AND DATE_SUB('{biz_date}', INTERVAL 1 SECOND); """ article_list = self.piaoquan_crawler_db_client.fetch(query=sql, cursor_type=DictCursor) return article_list def update_each_root_source_id(self, recall_dt: str, root_source_id: str) -> None: """ :param recall_dt: :param root_source_id: :return: """ mini_program_detail = self.get_root_source_id_result(root_source_id=root_source_id, dt=recall_dt) if mini_program_detail: # do update job update_sql = f""" UPDATE {DETAIL_TABLE} SET first_level = %s, fission_0 = %s, fission_0_head = %s, fission_0_recommend = %s, fission_1 = %s, fission_1_head = %s, fission_1_recommend = %s, fission_2 = %s, fission_2_head = %s, fission_2_recommend = %s WHERE root_source_id = %s and recall_dt = %s; """ self.piaoquan_crawler_db_client.save( query=update_sql, params=( mini_program_detail['first_uv'], mini_program_detail['split0'], mini_program_detail['split0_head'], mini_program_detail['split0_recommend'], mini_program_detail['split1'], mini_program_detail['split1_head'], mini_program_detail['split1_recommend'], mini_program_detail['split2'], mini_program_detail['split2_head'], mini_program_detail['split2_recommend'], root_source_id, recall_dt ) ) else: return def update_published_articles_job(self, biz_date=None): """ 将文章信息存入裂变表中 :param biz_date: :return: """ if not biz_date: biz_date = datetime.today().strftime('%Y-%m-%d') published_article_list = self.get_articles_published_yesterday(biz_date) failed_article_list = [] for article_info in tqdm(published_article_list, desc="update_published_articles_job"): failed_article = self.record_each_article(article_info) if failed_article: failed_article_list.append(failed_article) # retry second_try_fail_article_list = [] if failed_article_list: for failed_article in failed_article_list: second_fail = self.record_each_article(failed_article) if second_fail: second_try_fail_article_list.append(second_fail) bot( title="更新文章任务完成", detail={ "finish_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") }, mention=False ) if second_try_fail_article_list: bot( title="更新文章任务存在文章抓取失败", detail=[ { "account": line['accountName'], "title": line['title'], "url": line['ContentUrl'] } for line in second_try_fail_article_list ] ) def update_mini_program_detail_job(self, biz_date=None): """ update mini program detail info :param biz_date: :return: """ if not biz_date: biz_date = datetime.today().strftime('%Y-%m-%d') # get root_source_id_list root_source_id_obj_list = self.get_root_source_id_for_three_days(biz_date) log( task=TASK_NAME, function="update_minigram_detail", message="获取前三天的 rootSourceId, 一共有 {} 条记录".format(len(root_source_id_obj_list)) ) fail_count = 0 for item in tqdm(root_source_id_obj_list, desc="update_mini_program_detail_job"): try: self.update_each_root_source_id( root_source_id=item['root_source_id'], recall_dt=item['recall_dt'] ) except Exception as e: log( task=TASK_NAME, function="update_minigram_detail", status="fail", message="更新单条数据失败, 报错信息是 {}".format(e), data={"error_msg": traceback.format_exc()} ) fail_count += 1 if fail_count: bot( title="{} fail because of lam db error".format(TASK_NAME), detail={ "fail_count": fail_count } )