123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- """
- @author: luojunhui
- @desc: 更新每日发布文章的小程序裂变信息
- """
- import json
- import time
- import traceback
- from argparse import ArgumentParser
- from collections import defaultdict
- from datetime import datetime, timedelta
- from typing import List, Dict
- from tqdm import tqdm
- from pymysql.cursors import DictCursor
- from applications import bot
- from applications import Functions
- from applications import log
- from applications import WeixinSpider
- from applications.db import DatabaseConnector
- from applications.exception import SpiderError
- from config import long_articles_config, piaoquan_crawler_config
- basic_function = Functions()
- spider = WeixinSpider()
- TASK_NAME = "updateMinigramInfoDaily"
- SPIDER_SUCCESS_STATUS = 0
- class UpdateMiniProgramInfoTask(object):
- """
- 更新小程序裂变信息
- """
- def __init__(self):
- self.piaoquan_crawler_db_client = None
- self.long_articles_db_client = None
- def init_database(self) -> None:
- """
- init database connector
- :return:
- """
- # 初始化数据库连接
- try:
- self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
- self.piaoquan_crawler_db_client.connect()
- self.long_articles_db_client = DatabaseConnector(long_articles_config)
- self.long_articles_db_client.connect()
- except Exception as e:
- error_msg = traceback.format_exc()
- bot(
- title="更新小程序裂变信息任务连接数据库失败",
- detail={
- "error": e,
- "msg": error_msg
- }
- )
- return
- def get_published_articles_yesterday(self, run_date: str) -> List[Dict]:
- """
- get_published_articles_yesterday
- :param run_date:
- :return:
- """
- sql = f"""
- SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title
- FROM official_articles_v2
- WHERE FROM_UNIXTIME(publish_timestamp)
- BETWEEN DATE_SUB('{run_date}', INTERVAL 1 DAY) AND DATE_SUB('{run_date}', INTERVAL 1 SECOND);
- """
- article_list = self.piaoquan_crawler_db_client.fetch(query=sql, cursor_type=DictCursor)
- return article_list
- def update_each_article(self, article_info: Dict) -> None:
- """
- update_each_article
- :param article_info:
- :return:
- """
- url = article_info['ContentUrl']
- wx_sn = article_info['wx_sn'].decode()
- publish_timestamp = article_info['publish_timestamp']
- try:
- article_detail = spider.get_article_text(url)
- except Exception as e:
- raise SpiderError(error=e, spider="detail", url=url)
- response_code = article_detail['code']
- if response_code == SPIDER_SUCCESS_STATUS:
- mini_info = article_detail['data']['data']['mini_program']
- if mini_info:
- log(
- task=TASK_NAME,
- function="get_root_source_ids",
- message="获取文章链接对应的 rootSourceId 成功",
- data={
- "ContentUrl": url,
- "wxSn": wx_sn,
- "publish_time_stamp": publish_timestamp,
- "miniInfo": mini_info
- }
- )
- try:
- dt_object = datetime.fromtimestamp(publish_timestamp)
- publish_dt = dt_object.strftime('%Y-%m-%d')
- one_day = timedelta(days=1)
- two_day = timedelta(days=2)
- next_day = dt_object + one_day
- next_next_day = dt_object + two_day
- recall_dt_list = [dt_object, next_day, next_next_day]
- recall_dt_str_list = [i.strftime('%Y-%m-%d') for i in recall_dt_list]
- for dt_str in recall_dt_str_list:
- for index, item in enumerate(mini_info, 1):
- image_url = item['image_url']
- nick_name = item['nike_name']
- root_source_id = item['path'].split("rootSourceId%3D")[-1]
- video_id = item['path'].split("videos%3Fid%3D")[1].split("%26su%3D")[0]
- kimi_title = item['title']
- # print(image_url, nick_name, root_source_id, video_id, kimi_title)
- insert_sql = f"""
- INSERT INTO long_articles_detail_info
- (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
- values
- (%s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- self.piaoquan_crawler_db_client.save(
- query=insert_sql,
- params=(
- wx_sn,
- kimi_title,
- nick_name,
- image_url,
- index,
- root_source_id,
- video_id,
- publish_dt,
- dt_str
- )
- )
- log(
- task=TASK_NAME,
- function="update_article_info",
- message="插入数据成功, video_id 是: {}".format(video_id)
- )
- except Exception as e:
- error_msg = traceback.format_exc()
- log(
- task=TASK_NAME,
- function="update_article_info",
- status="fail",
- message="插入数据失败, 失败原因是{}--{}".format(e, error_msg)
- )
- def get_source_id_info(self, root_source_id: str) -> Dict:
- """
- 计算root_source_id
- :param root_source_id:
- :return:
- """
- sql = f"""
- select type, machinecode, create_time, first_level_dt
- from changwen_data_base_v2
- where rootsourceid = '{root_source_id}';
- """
- result_list = self.long_articles_db_client.fetch(sql)
- def summarize(values):
- """
- :param values:
- :return:
- """
- L = {}
- first_level = {}
- fission_level = {}
- for line in values:
- # 先统计首层
- if line[0] == '首层':
- try:
- dt = str(line[-1])
- key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
- if first_level.get(key_dt):
- first_level[key_dt].add(line[1])
- else:
- first_level[key_dt] = {line[1]}
- except Exception as e:
- continue
- else:
- try:
- dt = str(line[-1])
- first_level_dt = datetime.strptime(dt, '%Y%m%d')
- create_level_dt = line[-2]
- delta = create_level_dt - first_level_dt
- days = int(delta.days)
- key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
- if fission_level.get(key_dt):
- fission_level[key_dt].append((line[1], days))
- else:
- fission_level[key_dt] = [(line[1], days)]
- except Exception as e:
- continue
- # print("first level dt is NULL")
- tt = {}
- for key in fission_level:
- detail_list = fission_level[key]
- temp = {}
- for item in detail_list:
- mid, days = item
- if temp.get(days):
- temp[days].add(mid)
- else:
- temp[days] = {mid}
- final = {}
- for sub_key in temp:
- length = len(temp[sub_key])
- final[sub_key] = length
- tt[key] = final
- for key in first_level:
- temp = [len(first_level[key]), tt.get(key, {}).get(0, 0), tt.get(key, {}).get(1, 0),
- tt.get(key, {}).get(2, 0)]
- L[key] = temp
- return L
- def summarize_v2(value):
- """
- :param value:
- :return:
- """
- first_level = defaultdict(set)
- fission_level = defaultdict(list)
- for record in value:
- type_, machinecode, create_time, first_level_dt = record
- try:
- response = summarize(result_list)
- log(
- task=TASK_NAME,
- function="get_minigram_info",
- message="计算source_id信息成功",
- data=response
- )
- return response
- except Exception as e:
- log(
- task=TASK_NAME,
- function="get_minigram_info",
- message="获取 source_id信息失败, 报错信息是: {}".format(e),
- status="fail"
- )
- return {}
|