""" @author: luojunhui @desc: 更新每日发布文章的小程序裂变信息 """ import json import time import traceback from argparse import ArgumentParser from datetime import datetime, timedelta from typing import List, Dict from tqdm import tqdm from pymysql.cursors import DictCursor from applications import bot from applications import Functions from applications import log from applications import WeixinSpider from applications.db import DatabaseConnector from applications.exception import SpiderError from config import long_articles_config, piaoquan_crawler_config basic_function = Functions() spider = WeixinSpider() TASK_NAME = "updateMinigramInfoDaily" SPIDER_SUCCESS_STATUS = 0 class UpdateMiniProgramInfoTask(object): """ 更新小程序裂变信息 """ def __init__(self): self.piaoquan_crawler_db_client = None self.long_articles_db_client = None def init_database(self) -> None: """ init database connector :return: """ # 初始化数据库连接 try: self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config) self.piaoquan_crawler_db_client.connect() self.long_articles_db_client = DatabaseConnector(long_articles_config) self.long_articles_db_client.connect() except Exception as e: error_msg = traceback.format_exc() bot( title="更新小程序裂变信息任务连接数据库失败", detail={ "error": e, "msg": error_msg } ) return def get_published_articles_yesterday(self, run_date: str) -> List[Dict]: """ get_published_articles_yesterday :param run_date: :return: """ sql = f""" SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title FROM official_articles_v2 WHERE FROM_UNIXTIME(publish_timestamp) BETWEEN DATE_SUB('{run_date}', INTERVAL 1 DAY) AND DATE_SUB('{run_date}', INTERVAL 1 SECOND); """ article_list = self.piaoquan_crawler_db_client.fetch(query=sql, cursor_type=DictCursor) return article_list def update_each_article(self, article_info: Dict) -> None: """ update_each_article :param article_info: :return: """ url = article_info['ContentUrl'] wx_sn = article_info['wx_sn'].decode() publish_timestamp = article_info['publish_timestamp'] try: article_detail = spider.get_article_text(url) except Exception as e: raise SpiderError(error=e, spider="detail", url=url) response_code = article_detail['code'] if response_code == SPIDER_SUCCESS_STATUS: mini_info = article_detail['data']['data']['mini_program'] if mini_info: log( task=TASK_NAME, function="get_root_source_ids", message="获取文章链接对应的 rootSourceId 成功", data={ "ContentUrl": url, "wxSn": wx_sn, "publish_time_stamp": publish_timestamp, "miniInfo": mini_info } ) try: dt_object = datetime.fromtimestamp(publish_timestamp) publish_dt = dt_object.strftime('%Y-%m-%d') one_day = timedelta(days=1) two_day = timedelta(days=2) next_day = dt_object + one_day next_next_day = dt_object + two_day recall_dt_list = [dt_object, next_day, next_next_day] recall_dt_str_list = [i.strftime('%Y-%m-%d') for i in recall_dt_list] for dt_str in recall_dt_str_list: for index, item in enumerate(mini_info, 1): image_url = item['image_url'] nick_name = item['nike_name'] root_source_id = item['path'].split("rootSourceId%3D")[-1] video_id = item['path'].split("videos%3Fid%3D")[1].split("%26su%3D")[0] kimi_title = item['title'] # print(image_url, nick_name, root_source_id, video_id, kimi_title) insert_sql = f""" INSERT INTO long_articles_detail_info (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt) values (%s, %s, %s, %s, %s, %s, %s, %s, %s); """ self.piaoquan_crawler_db_client.save( query=insert_sql, params=( wx_sn, kimi_title, nick_name, image_url, index, root_source_id, video_id, publish_dt, dt_str ) ) log( task=TASK_NAME, function="update_article_info", message="插入数据成功, video_id 是: {}".format(video_id) ) except Exception as e: error_msg = traceback.format_exc() log( task=TASK_NAME, function="update_article_info", status="fail", message="插入数据失败, 失败原因是{}--{}".format(e, error_msg) ) def get_source_id_info(self, root_source_id: str) -> Dict: """ 计算root_source_id :param root_source_id: :return: """ sql = f""" select type, machinecode, create_time, first_level_dt from changwen_data_base_v2 where rootsourceid = '{root_source_id}'; """ result_list = self.long_articles_db_client.fetch(sql) def summarize(values): """ :param values: :return: """ L = {} first_level = {} fission_level = {} for line in values: # 先统计首层 if line[0] == '首层': try: dt = str(line[-1]) key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d') if first_level.get(key_dt): first_level[key_dt].add(line[1]) else: first_level[key_dt] = {line[1]} except Exception as e: continue else: try: dt = str(line[-1]) first_level_dt = datetime.strptime(dt, '%Y%m%d') create_level_dt = line[-2] delta = create_level_dt - first_level_dt days = int(delta.days) key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d') if fission_level.get(key_dt): fission_level[key_dt].append((line[1], days)) else: fission_level[key_dt] = [(line[1], days)] except Exception as e: continue # print("first level dt is NULL") tt = {} for key in fission_level: detail_list = fission_level[key] temp = {} for item in detail_list: mid, days = item if temp.get(days): temp[days].add(mid) else: temp[days] = {mid} final = {} for sub_key in temp: length = len(temp[sub_key]) final[sub_key] = length tt[key] = final for key in first_level: temp = [len(first_level[key]), tt.get(key, {}).get(0, 0), tt.get(key, {}).get(1, 0), tt.get(key, {}).get(2, 0)] L[key] = temp return L try: response = summarize(result_list) log( task=TASK_NAME, function="get_minigram_info", message="计算source_id信息成功", data=response ) return response except Exception as e: log( task=TASK_NAME, function="get_minigram_info", message="获取 source_id信息失败, 报错信息是: {}".format(e), status="fail" ) return {}