123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- """
- @author: luojunhui
- """
- import json
- import time
- import traceback
- from argparse import ArgumentParser
- from typing import List, Dict
- from tqdm import tqdm
- from pymysql.cursors import DictCursor
- from applications import bot
- from applications import log
- from applications import Functions
- from applications import WeixinSpider
- from applications.db import DatabaseConnector
- from applications.const import UpdateMiniProgramDetailConst
- from applications.exception import SpiderError
- from config import long_articles_config, piaoquan_crawler_config
- const = UpdateMiniProgramDetailConst()
- spider = WeixinSpider()
- functions = Functions()
- # 数据库配置
- ARTICLE_TABLE = "official_articles_v2"
- def check_root_source_id_list(content_url: str):
- """
- 校验是否存在文章是否存在root_source_id
- :return:
- """
- try:
- article_detail = spider.get_article_text(content_url)
- except Exception as e:
- raise SpiderError(error=e, spider="detail", url=content_url)
- response_code = article_detail['code']
- if response_code == const.ARTICLE_SUCCESS_CODE:
- mini_info = article_detail['data']['data']['mini_program']
- return mini_info
- else:
- return None
- class UpdatePublishedArticlesMinigramDetail(object):
- """
- 更新已发布文章数据
- """
- def __init__(self):
- self.piaoquan_crawler_db_client = None
- self.long_articles_db_client = None
- def init_database(self) -> None:
- """
- init database connector
- :return:
- """
- # 初始化数据库连接
- try:
- self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
- self.piaoquan_crawler_db_client.connect()
- self.long_articles_db_client = DatabaseConnector(long_articles_config)
- self.long_articles_db_client.connect()
- except Exception as e:
- error_msg = traceback.format_exc()
- bot(
- title="更新小程序裂变信息任务连接数据库失败",
- detail={
- "error": e,
- "msg": error_msg
- }
- )
- return
- def check_articles(self) -> List[Dict]:
- """
- 校验是否存在文章未更新得到发布时间
- :return:
- """
- sql = f"""
- SELECT ContentUrl, wx_sn
- FROM {ARTICLE_TABLE}
- WHERE publish_timestamp IN {(const.DEFAULT_STATUS, const.REQUEST_FAIL_STATUS)};"""
- response = self.piaoquan_crawler_db_client.fetch(sql, cursor_type=DictCursor)
- return response
- def get_root_source_id_result(self, root_source_id: str, dt: str) -> Dict:
- """
- 获取文章的root_source_id
- :param dt:
- :param root_source_id:
- :return:
- """
- select_sql = f"""
- SELECT first_uv, split0, split1, split2
- FROM changwen_data_rootsourceid
- WHERE root_source_id = '{root_source_id}' AND dt = '{dt}';
- """
- result = self.long_articles_db_client.fetch(select_sql, cursor_type=DictCursor)
- if result:
- return result[0]
- else:
- return {}
- def get_articles_published_yesterday(self, biz_date: str) -> List[Dict]:
- """
- 获取昨天发布的文章
- :return:
- """
- sql = f"""
- SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list
- FROM official_articles_v2
- WHERE FROM_UNIXTIME(publish_timestamp)
- BETWEEN DATE_SUB('{biz_date}', INTERVAL 1 DAY) AND DATE_SUB('{biz_date}', INTERVAL 1 SECOND);
- """
- article_list = self.piaoquan_crawler_db_client.fetch(query=sql, cursor_type=DictCursor)
- return article_list
- def insert_each_root_source_id(self, root_source_id: str, article_info: Dict) -> int:
- """
- :param root_source_id:
- :param article_info:
- :return:
- """
- insert_sql = f"""
- INSERT INTO long_articles_detail_info
- (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
- values
- (%s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- affected_rows = self.piaoquan_crawler_db_client.save(
- query=insert_sql,
- params=(
- article_info['wx_sn'],
- article_info['title'],
- article_info['mini_name'],
- article_info['cover_url'],
- article_info['video_index'],
- root_source_id,
- article_info['video_id'],
- article_info['publish_dt']
- )
- )
- return affected_rows
- def record_each_article(self, article_info: Dict) -> None:
- """
- 记录每篇文章的root_source_id
- 数量集: article_count * mini_program_count * days_count
- :param article_info:
- :return:
- """
- url = article_info['ContentUrl']
- root_source_id_list = json.loads(article_info['root_source_id_list'])
- if not root_source_id_list:
- root_source_id_response = check_root_source_id_list(url)
- if root_source_id_response:
- root_source_id_list = []
- else:
- return
- for root_source_id in root_source_id_list:
- self.record_each_article(root_source_id, article_info)
|