123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411 |
- """
- @author: luojunhui
- """
- import json
- import traceback
- from datetime import datetime, timedelta
- from typing import List, Dict
- from tqdm import tqdm
- from urllib.parse import urlparse, parse_qs
- from pymysql.cursors import DictCursor
- from applications import bot
- from applications import log
- from applications import Functions
- from applications import WeixinSpider
- from applications.db import DatabaseConnector
- from applications.const import UpdateMiniProgramDetailConst
- from applications.exception import SpiderError
- from config import long_articles_config, piaoquan_crawler_config
- const = UpdateMiniProgramDetailConst()
- spider = WeixinSpider()
- functions = Functions()
- TASK_NAME = "updateMinigramInfoDaily"
- ARTICLE_TABLE = "official_articles_v2"
- DETAIL_TABLE = "long_articles_detail_info"
- EMPTY_LIST = []
- EMPTY_DICT = {}
- def extract_path(path: str) -> Dict:
- """
- 提取path参数
- :param path:
- :return:
- """
- params = parse_qs(urlparse(path).query)
- jump_page = params.get('jumpPage', [None])[0]
- if jump_page:
- params2 = parse_qs(jump_page)
- res = {
- "video_id": params2['pages/user-videos?id'][0],
- "root_source_id": params2['rootSourceId'][0],
- }
- return res
- else:
- return EMPTY_DICT
- class UpdatePublishedArticlesMinigramDetail(object):
- """
- 更新已发布文章数据
- """
- def __init__(self):
- self.piaoquan_crawler_db_client = None
- self.long_articles_db_client = None
- def init_database(self) -> None:
- """
- init database connector
- :return:
- """
- # 初始化数据库连接
- try:
- self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
- self.piaoquan_crawler_db_client.connect()
- self.long_articles_db_client = DatabaseConnector(long_articles_config)
- self.long_articles_db_client.connect()
- except Exception as e:
- error_msg = traceback.format_exc()
- bot(
- title="更新小程序裂变信息任务连接数据库失败",
- detail={
- "error": e,
- "msg": error_msg
- }
- )
- return
- def check_articles(self) -> List[Dict]:
- """
- 校验是否存在文章未更新得到发布时间
- :return:
- """
- sql = f"""
- SELECT ContentUrl, wx_sn
- FROM {ARTICLE_TABLE}
- WHERE publish_timestamp IN {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)};
- """
- response = self.piaoquan_crawler_db_client.fetch(sql, cursor_type=DictCursor)
- return response
- def get_root_source_id_result(self, root_source_id: str, dt: str) -> Dict:
- """
- 获取文章的root_source_id
- :param dt:
- :param root_source_id:
- :return:
- """
- select_sql = f"""
- SELECT first_uv, split0, split0_head, split0_recommend, split1, split1_head, split1_recommend, split2, split2_head, split2_recommend
- FROM changwen_data_rootsourceid
- WHERE root_source_id = '{root_source_id}' AND dt = '{dt}';
- """
- result = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
- if result:
- return result[0]
- else:
- return EMPTY_DICT
- def get_articles_published_yesterday(self, biz_date: str) -> List[Dict]:
- """
- 获取发布时间在biz_date前一天0点-23:59:59的文章
- :return:
- """
- sql = f"""
- SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list
- FROM official_articles_v2
- WHERE FROM_UNIXTIME(publish_timestamp)
- BETWEEN DATE_SUB('{biz_date}', INTERVAL 1 DAY) AND DATE_SUB('{biz_date}', INTERVAL 1 SECOND);
- """
- article_list = self.piaoquan_crawler_db_client.fetch(query=sql, cursor_type=DictCursor)
- return article_list
- def insert_each_root_source_id(self, wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt) -> int:
- """
- :param recall_dt: 召回日期
- :param publish_dt: 文章发布日期
- :param video_id: 视频id
- :param video_index: 视频位置
- :param cover_url: 视频封面
- :param mini_name: 小程序名称
- :param mini_title: 小程序标题
- :param wx_sn:
- :param root_source_id:
- :return:
- """
- insert_sql = f"""
- INSERT IGNORE INTO {DETAIL_TABLE}
- (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
- values
- (%s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- affected_rows = self.piaoquan_crawler_db_client.save(
- query=insert_sql,
- params=(
- wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt
- )
- )
- return affected_rows
- def record_each_article(self, article_info: Dict) -> Dict:
- """
- 记录每篇文章的root_source_id
- 数量集: article_count * mini_program_count * days_count
- :param article_info:
- :return:
- """
- url = article_info['ContentUrl']
- publish_timestamp = article_info['publish_timestamp']
- wx_sn = article_info['wx_sn'].decode()
- root_source_id_list = json.loads(article_info['root_source_id_list'] if article_info['root_source_id_list'] else EMPTY_LIST)
- article_mini_program_detail = self.get_article_mini_program_info(url, root_source_id_list)
- if article_mini_program_detail:
- log(
- task=TASK_NAME,
- function="record_each_article",
- message="获取文章链接对应的 rootSourceId 成功",
- data={
- "ContentUrl": url,
- "wxSn": wx_sn,
- "publish_timestamp": publish_timestamp,
- "miniInfo": article_mini_program_detail
- }
- )
- try:
- publish_date = datetime.fromtimestamp(publish_timestamp)
- # generate T+0, T+1, T+2 date string
- recall_dt_str_list = [
- (publish_date + timedelta(days=i)).strftime('%Y-%m-%d')
- for i in range(3)
- ]
- for date_str in recall_dt_str_list:
- for video_index, mini_item in enumerate(article_mini_program_detail, 1):
- image_url = mini_item['image_url']
- nick_name = mini_item['nike_name']
- # extract video id and root_source_id
- if mini_item.get("root_source_id") and mini_item.get("video_id"):
- root_source_id = mini_item['root_source_id']
- video_id = mini_item['video_id']
- else:
- id_info = extract_path(mini_item['path'])
- root_source_id = id_info['root_source_id']
- video_id = id_info['video_id']
- kimi_title = mini_item['title']
- self.insert_each_root_source_id(
- wx_sn=wx_sn,
- mini_title=kimi_title,
- mini_name=nick_name,
- cover_url=image_url,
- video_index=video_index,
- root_source_id=root_source_id,
- video_id=video_id,
- publish_dt=publish_date.strftime('%Y-%m-%d'),
- recall_dt=date_str
- )
- return EMPTY_DICT
- except Exception as e:
- error_msg = traceback.format_exc()
- log(
- task=TASK_NAME,
- function="record_each_article",
- status="fail",
- message="插入数据失败, 失败原因是{}--{}".format(e, error_msg)
- )
- return article_info
- else:
- return EMPTY_DICT
- def get_article_mini_program_info(self, content_url: str, root_source_id_list: list) -> List[Dict]:
- """
- 获取文章的小程序信息
- :return:
- """
- if root_source_id_list:
- # 说明已经获取到 root_source_id了
- fetch_sql = f"""
- select video_id, root_source_id from long_articles_root_source_id where root_source_id in %s;
- """
- fetch_response = self.long_articles_db_client.fetch(
- query=fetch_sql,
- params=(tuple(root_source_id_list),),
- cursor_type=DictCursor
- )
- mini_info = []
- if fetch_response:
- # 构造 mini_info 的格式
- for item in fetch_response:
- mini_info.append(
- {
- "app_id": "wx89e7eb06478361d7",
- "avatar": "https://rescdn.yishihui.com/0temp/logo.png",
- "image_url": "",
- "nike_name": "票圈 l 3亿人喜欢的视频平台",
- "root_source_id": item['root_source_id'],
- "video_id": item['video_id'],
- "service_type": "0",
- "title": "",
- "type": "card"
- }
- )
- return mini_info
- try:
- article_detail = spider.get_article_text(content_url)
- except Exception as e:
- raise SpiderError(error=e, spider="detail", url=content_url)
- response_code = article_detail['code']
- if response_code == const.ARTICLE_SUCCESS_CODE:
- mini_info = article_detail['data']['data']['mini_program']
- return mini_info
- else:
- return EMPTY_LIST
- def get_root_source_id_for_three_days(self, biz_date: str) -> List[Dict]:
- """
- 获取publish_dt在 biz_date前三天的root_source_id
- :param biz_date:
- :return:
- """
- sql = f"""
- SELECT recall_dt, root_source_id
- FROM {DETAIL_TABLE}
- WHERE publish_dt
- BETWEEN DATE_SUB('{biz_date}', INTERVAL 3 DAY) AND DATE_SUB('{biz_date}', INTERVAL 1 SECOND);
- """
- article_list = self.piaoquan_crawler_db_client.fetch(query=sql, cursor_type=DictCursor)
- return article_list
- def update_each_root_source_id(self, recall_dt: str, root_source_id: str) -> None:
- """
- :param recall_dt:
- :param root_source_id:
- :return:
- """
- mini_program_detail = self.get_root_source_id_result(root_source_id=root_source_id, dt=recall_dt)
- if mini_program_detail:
- # do update job
- update_sql = f"""
- UPDATE {DETAIL_TABLE}
- SET first_level = %s,
- fission_0 = %s, fission_0_head = %s, fission_0_recommend = %s,
- fission_1 = %s, fission_1_head = %s, fission_1_recommend = %s,
- fission_2 = %s, fission_2_head = %s, fission_2_recommend = %s
- WHERE root_source_id = %s and recall_dt = %s;
- """
- self.piaoquan_crawler_db_client.save(
- query=update_sql,
- params=(
- mini_program_detail['first_uv'],
- mini_program_detail['split0'],
- mini_program_detail['split0_head'],
- mini_program_detail['split0_recommend'],
- mini_program_detail['split1'],
- mini_program_detail['split1_head'],
- mini_program_detail['split1_recommend'],
- mini_program_detail['split2'],
- mini_program_detail['split2_head'],
- mini_program_detail['split2_recommend'],
- root_source_id,
- recall_dt
- )
- )
- else:
- return
- def update_published_articles_job(self, biz_date=None):
- """
- 将文章信息存入裂变表中
- :param biz_date:
- :return:
- """
- if not biz_date:
- biz_date = datetime.today().strftime('%Y-%m-%d')
- published_article_list = self.get_articles_published_yesterday(biz_date)
- failed_article_list = []
- for article_info in tqdm(published_article_list, desc="update_published_articles_job"):
- failed_article = self.record_each_article(article_info)
- if failed_article:
- failed_article_list.append(failed_article)
- # retry
- second_try_fail_article_list = []
- if failed_article_list:
- for failed_article in failed_article_list:
- second_fail = self.record_each_article(failed_article)
- if second_fail:
- second_try_fail_article_list.append(second_fail)
- bot(
- title="更新文章任务完成",
- detail={
- "finish_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- },
- mention=False
- )
- if second_try_fail_article_list:
- bot(
- title="更新文章任务存在文章抓取失败",
- detail=[
- {
- "account": line['accountName'],
- "title": line['title'],
- "url": line['ContentUrl']
- }
- for line in second_try_fail_article_list
- ]
- )
- def update_mini_program_detail_job(self, biz_date=None):
- """
- update mini program detail info
- :param biz_date:
- :return:
- """
- if not biz_date:
- biz_date = datetime.today().strftime('%Y-%m-%d')
- # get root_source_id_list
- root_source_id_obj_list = self.get_root_source_id_for_three_days(biz_date)
- log(
- task=TASK_NAME,
- function="update_minigram_detail",
- message="获取前三天的 rootSourceId, 一共有 {} 条记录".format(len(root_source_id_obj_list))
- )
- fail_count = 0
- for item in tqdm(root_source_id_obj_list, desc="update_mini_program_detail_job"):
- try:
- self.update_each_root_source_id(
- root_source_id=item['root_source_id'],
- recall_dt=item['recall_dt']
- )
- except Exception as e:
- log(
- task=TASK_NAME,
- function="update_minigram_detail",
- status="fail",
- message="更新单条数据失败, 报错信息是 {}".format(e),
- data={"error_msg": traceback.format_exc()}
- )
- fail_count += 1
- if fail_count:
- bot(
- title="{} fail because of lam db error".format(TASK_NAME),
- detail={
- "fail_count": fail_count
- }
- )
|