""" @author luojunhui @description Update Minigram Info Daily """ import time import traceback from tqdm import tqdm from datetime import datetime, timedelta import schedule from argparse import ArgumentParser from applications import WeixinSpider, Functions, log, bot from applications.db import DatabaseConnector from config import long_articles_config, piaoquan_crawler_config TASK_NAME = "updateMinigramInfoDaily" SPIDER_SUCCESS_STATUS = 0 def get_yesterday(): """ get yesterday date :return: """ yesterday = datetime.today() - timedelta(1) return yesterday class DailyDataManager(object): """ daily 数据每日更新 """ def __init__(self): self.piaoquan_crawler_db_client = None self.long_articles_db_client = None self.spider = WeixinSpider() def init_database(self) -> None: """ init database connector :return: """ # 初始化数据库连接 try: self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config) self.piaoquan_crawler_db_client.connect() self.long_articles_db_client = DatabaseConnector(long_articles_config) self.long_articles_db_client.connect() except Exception as e: error_msg = traceback.format_exc() bot( title="更新小程序裂变信息任务连接数据库失败", detail={ "error": e, "msg": error_msg } ) return def get_published_articles(self, biz_date): """ 获取已经发布的文章的信息, updateTime 选择为前一天的 0 点并且转化为时间戳 :return: """ biz_date_midnight = datetime(year=biz_date.year, month=biz_date.month, day=biz_date.day) biz_date_ts = biz_date_midnight.timestamp() biz_date_end_ts = biz_date_ts + 24 * 60 * 60 - 1 sql2 = f""" select ContentUrl, wx_sn, publish_timestamp, accountName, title from official_articles_v2 where publish_timestamp between {biz_date_ts} and {biz_date_end_ts}; """ result_list = self.piaoquan_crawler_db_client.fetch(sql2) log( task=TASK_NAME, function="get_published_articles", message="一共获取 {} 篇文章数据".format(len(result_list)) ) return result_list def update_article_info(self, line): """ update info into mysql :return: """ url = line[0] update_time = line[2] wx_sn = line[1].decode() article_detail = self.get_root_source_ids(line) if article_detail: response_code = article_detail['code'] if response_code == SPIDER_SUCCESS_STATUS: mini_info = article_detail['data']['data']['mini_program'] if mini_info: log( task=TASK_NAME, function="get_root_source_ids", message="获取文章链接对应的 rootSourceId 成功", data={ "ContentUrl": url, "wxSn": wx_sn, "updateTime": update_time, "miniInfo": mini_info } ) try: dt_object = datetime.fromtimestamp(update_time) publish_dt = dt_object.strftime('%Y-%m-%d') one_day = timedelta(days=1) two_day = timedelta(days=2) next_day = dt_object + one_day next_next_day = dt_object + two_day recall_dt_list = [dt_object, next_day, next_next_day] recall_dt_str_list = [i.strftime('%Y-%m-%d') for i in recall_dt_list] for dt_str in recall_dt_str_list: for index, item in enumerate(mini_info, 1): image_url = item['image_url'] nick_name = item['nike_name'] root_source_id = item['path'].split("rootSourceId%3D")[-1] video_id = item['path'].split("videos%3Fid%3D")[1].split("%26su%3D")[0] kimi_title = item['title'] # print(image_url, nick_name, root_source_id, video_id, kimi_title) insert_sql = f""" INSERT INTO long_articles_detail_info (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt) values (%s, %s, %s, %s, %s, %s, %s, %s, %s); """ self.piaoquan_crawler_db_client.save( query=insert_sql, params=( wx_sn, kimi_title, nick_name, image_url, index, root_source_id, video_id, publish_dt, dt_str ) ) log( task=TASK_NAME, function="update_article_info", message="插入数据成功, video_id 是: {}".format(video_id) ) except Exception as e: error_msg = traceback.format_exc() log( task=TASK_NAME, function="update_article_info", status="fail", message="插入数据失败, 失败原因是{}--{}".format(e, error_msg) ) return None else: return line else: return line def get_root_source_ids(self, data_info): """ 通过抓取接口获取 data_info :return: """ url = data_info[0] try: article_detail = self.spider.get_article_text(url) return article_detail except Exception as e: log( task=TASK_NAME, function="get_root_source_ids", status="fail", message="获取文章链接对应的 rootSourceId失败, 报错信息是: {}".format(e), data={ "ContentUrl": url } ) return False def get_minigram_info(self, rootSourceId): """ :param rootSourceId: :return: """ sql = f""" select type, machinecode, create_time, first_level_dt from changwen_data_base_v2 where rootsourceid = '{rootSourceId}'; """ result_list = self.long_articles_db_client.fetch(sql) def summarize(values): """ :param values: :return: """ L = {} first_level = {} fission_level = {} for line in values: # 先统计首层 if line[0] == '首层': try: dt = str(line[-1]) key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d') if first_level.get(key_dt): first_level[key_dt].add(line[1]) else: first_level[key_dt] = {line[1]} except Exception as e: continue else: try: dt = str(line[-1]) first_level_dt = datetime.strptime(dt, '%Y%m%d') create_level_dt = line[-2] delta = create_level_dt - first_level_dt days = int(delta.days) key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d') if fission_level.get(key_dt): fission_level[key_dt].append((line[1], days)) else: fission_level[key_dt] = [(line[1], days)] except Exception as e: continue # print("first level dt is NULL") tt = {} for key in fission_level: detail_list = fission_level[key] temp = {} for item in detail_list: mid, days = item if temp.get(days): temp[days].add(mid) else: temp[days] = {mid} final = {} for sub_key in temp: length = len(temp[sub_key]) final[sub_key] = length tt[key] = final for key in first_level: temp = [len(first_level[key]), tt.get(key, {}).get(0, 0), tt.get(key, {}).get(1, 0), tt.get(key, {}).get(2, 0)] L[key] = temp return L try: response = summarize(result_list) log( task=TASK_NAME, function="get_minigram_info", message="计算source_id信息成功", data=response ) return response except Exception as e: log( task=TASK_NAME, function="get_minigram_info", message="获取 source_id信息失败, 报错信息是: {}".format(e), status="fail" ) return None def update_minigram_detail(self, biz_date): """ :return: """ # 获取三天前的日期 date_begin = biz_date - timedelta(days=3) datestr_begin = date_begin.strftime("%Y-%m-%d") datestr_end = biz_date.strftime("%Y-%m-%d") sql = f""" select distinct root_source_id from long_articles_detail_info where publish_dt between '{datestr_begin}' and '{datestr_end}'; """ source_id_list = self.piaoquan_crawler_db_client.fetch(sql) log( task=TASK_NAME, function="update_minigram_detail", message="获取前三天的 rootSourceId, 一共有 {} 条记录".format(len(source_id_list)) ) fail_count = 0 for item in tqdm(source_id_list): s_id = item[0] try: result = self.get_minigram_info(s_id) for key in result: recall_dt = key first_level = result[key][0] fission_0 = result[key][1] fission_1 = result[key][2] fission_2 = result[key][3] # print(s_id, recall_dt, first_level, fission_0, fission_1, fission_2) update_sql = f""" UPDATE long_articles_detail_info set first_level = %s, fission_0 = %s, fission_1 = %s, fission_2 = %s where root_source_id = %s and recall_dt = %s; """ try: self.piaoquan_crawler_db_client.save( query=update_sql, params=( first_level, fission_0, fission_1, fission_2, s_id, recall_dt ) ) except Exception as e: log( task=TASK_NAME, function="update_minigram_detail", status="fail", message="mysql 更新失败, 报错信息是 {}".format(e) ) except Exception as e: log( task=TASK_NAME, function="update_minigram_detail", status="fail", message="更新单条数据失败, 报错信息是 {}".format(e), data={"error_msg": traceback.format_exc()} ) fail_count += 1 if fail_count: bot( title="{} fail because of lam db error".format(TASK_NAME), detail={ "fail_count": fail_count } ) def updateArticlesJob(biz_date=None): """ 更新文章数据 :return: """ if not biz_date: biz_date = get_yesterday() data_manager = DailyDataManager() data_manager.init_database() article_list = data_manager.get_published_articles(biz_date) failed_article_list = [] for article in tqdm(article_list): failed_article = data_manager.update_article_info(article) if failed_article: failed_article_list.append(failed_article) # 重试 second_try_fail_article_list = [] if failed_article_list: for article in tqdm(failed_article_list): second_failed_article = data_manager.update_article_info(article) if second_failed_article: second_try_fail_article_list.append(second_failed_article) log( task=TASK_NAME, function="updateArticlesJob", message="文章更新完成---{}".format(biz_date.__str__()) ) bot( title="更新文章任务完成", detail={ "finish_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") }, mention=False ) if second_try_fail_article_list: bot( title="更新文章任务存在文章抓取失败", detail=[ { "account": line[3], "title": line[4], "url": line[0] } for line in second_try_fail_article_list ] ) def updateMinigramInfoJob(biz_date=None): """ 更新前三天小程序数据 :return: """ if not biz_date: biz_date = get_yesterday() data_manager = DailyDataManager() data_manager.init_database() try: data_manager.update_minigram_detail(biz_date) log( task=TASK_NAME, function="updateMinigramInfoJob", message="小程序更新完成---{}".format(biz_date.__str__()) ) except Exception as e: log( task=TASK_NAME, function="updateMinigramInfoJob", status="fail", message="小程序更新失败---{}, 报错信息是: {}".format(biz_date.__str__(), e) ) bot( title="更新小程序信息任务完成", detail={ "finish_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S") }, mention=False ) def main(): """ main function :return: """ parser = ArgumentParser() parser.add_argument("--run-date", help="Run only once for date in format of %Y%m%d. \ If no specified, run as daily jobs.") args = parser.parse_args() if args.run_date: biz_date = datetime.strptime(args.run_date, "%Y%m%d") print("Run in manual mode. Date: {}".format(args.run_date)) updateArticlesJob(biz_date) updateMinigramInfoJob(biz_date) return else: print("Run in daily mode.") schedule.every().day.at("01:30").do(Functions().job_with_thread, updateArticlesJob) schedule.every().day.at("03:30").do(Functions().job_with_thread, updateMinigramInfoJob) while True: schedule.run_pending() time.sleep(1) if __name__ == '__main__': main()