""" @author luojunhui @description Update Minigram Info Daily """ import time from tqdm import tqdm from datetime import datetime, timedelta import schedule from applications import longArticlesMySQL, PQMySQL, WeixinSpider, Functions, log, bot class DailyDataManager(object): """ daily 数据每日更新 """ laMysql = longArticlesMySQL() pqMysql = PQMySQL() wxSpider = WeixinSpider() functions = Functions() @classmethod def getPublishedArticles(cls): """ 获取已经发布的文章的信息, createTime 选择为前一天的 0 点并且转化为时间戳 :return: """ today = datetime.today() # 获取昨天的日期 yesterday = today - timedelta(days=1) yesterday_midnight = datetime(year=yesterday.year, month=yesterday.month, day=yesterday.day) yesterday_timestamp = yesterday_midnight.timestamp() sql2 = f""" select ContentUrl, wx_sn, createTime from official_articles_v2 where createTime >= {yesterday_timestamp}; -- and accountName in ( -- select distinct account_name from account_avg_info_v2 -- ); """ result_list = cls.pqMysql.select(sql2) log( task="updateMinigramInfoDaily", function="getPublishedArticles", message="一共获取 {} 篇文章数据".format(len(result_list)) ) return result_list @classmethod def updateInfo(cls, line): """ update info into mysql :return: """ try: wx_sn, mini_info, create_time = cls.getRootSourceIds(line) dt_object = datetime.fromtimestamp(create_time) publish_dt = dt_object.strftime('%Y-%m-%d') one_day = timedelta(days=1) two_day = timedelta(days=2) next_day = dt_object + one_day next_next_day = dt_object + two_day recall_dt_list = [dt_object, next_day, next_next_day] recall_dt_str_list = [i.strftime('%Y-%m-%d') for i in recall_dt_list] for dt_str in recall_dt_str_list: for index, item in enumerate(mini_info, 1): image_url = item['image_url'] nick_name = item['nike_name'] root_source_id = item['path'].split("rootSourceId%3D")[-1] video_id = item['path'].split("videos%3Fid%3D")[1].split("%26su%3D")[0] kimi_title = item['title'] # print(image_url, nick_name, root_source_id, video_id, kimi_title) insert_sql = f""" INSERT INTO long_articles_detail_info (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt) values (%s, %s, %s, %s, %s, %s, %s, %s, %s); """ cls.pqMysql.update( sql=insert_sql, params=( wx_sn, kimi_title, nick_name, image_url, index, root_source_id, video_id, publish_dt, dt_str ) ) log( task="updateMinigramInfoDaily", function="updateInfo", message="插入数据成功, video_id 是: {}".format(video_id) ) except Exception as e: log( task="updateMinigramInfoDaily", function="updateInfo", status="fail", message="插入数据失败, 失败原因是".format(e) ) @classmethod def getRootSourceIds(cls, data_info): """ 通过抓取接口获取 data_info :return: """ url = data_info[0] try: article_detail = cls.wxSpider.get_article_text(url) mini_info = article_detail['data']['data']['mini_program'] log( task="updateMinigramInfoDaily", function="getRootSourceIds", message="获取文章链接对应的 rootSourceId 成功", data={ "ContentUrl": url, "wxSn": data_info[1].decode(), "createTime": data_info[2], "miniInfo": mini_info } ) return data_info[1].decode(), mini_info, data_info[2] except Exception as e: log( task="updateMinigramInfoDaily", function="getRootSourceIds", status="fail", message="获取文章链接对应的 rootSourceId失败, 报错信息是: {}".format(e), data={ "ContentUrl": url } ) return @classmethod def getMinigramInfo(cls, rootSourceId): """ :param rootSourceId: :return: """ sql = f""" select type, machinecode, create_time, first_level_dt from changwen_data_base_v2 where rootsourceid = '{rootSourceId}'; """ result_list = cls.laMysql.select(sql) def summarize(values): """ :param values: :return: """ L = {} first_level = {} fission_level = {} for line in values: # 先统计首层 if line[0] == '首层': try: dt = str(line[-1]) key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d') if first_level.get(key_dt): first_level[key_dt].add(line[1]) else: first_level[key_dt] = {line[1]} except Exception as e: continue else: try: dt = str(line[-1]) first_level_dt = datetime.strptime(dt, '%Y%m%d') create_level_dt = line[-2] delta = create_level_dt - first_level_dt days = int(delta.days) key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d') if fission_level.get(key_dt): fission_level[key_dt].append((line[1], days)) else: fission_level[key_dt] = [(line[1], days)] except Exception as e: continue # print("first level dt is NULL") tt = {} for key in fission_level: detail_list = fission_level[key] temp = {} for item in detail_list: mid, days = item if temp.get(days): temp[days].add(mid) else: temp[days] = {mid} final = {} for sub_key in temp: length = len(temp[sub_key]) final[sub_key] = length tt[key] = final for key in first_level: temp = [len(first_level[key]), tt.get(key, {}).get(0, 0), tt.get(key, {}).get(1, 0), tt.get(key, {}).get(2, 0)] L[key] = temp return L try: response = summarize(result_list) log( task="updateMinigramInfoDaily", function="getMinigramInfo", message="计算source_id信息成功", data=response ) return response except Exception as e: log( task="updateMinigramInfoDaily", function="getMinigramInfo", message="获取 source_id信息失败, 报错信息是: {}".format(e), status="fail" ) return None @classmethod def updateDetail(cls): """ :return: """ today = datetime.today() # 获取三天前的日期 yesterday = today - timedelta(days=3) yesterday_str = yesterday.__str__().split(" ")[0] sql = f""" select distinct root_source_id from long_articles_detail_info where publish_dt >= '{yesterday_str}'; """ source_id_list = cls.pqMysql.select(sql) log( task="updateMinigramInfoDaily", function="updateDetail", message="获取前三天的 rootSourceId, 一共有 {} 条记录".format(len(source_id_list)) ) for item in tqdm(source_id_list): s_id = item[0] try: result = cls.getMinigramInfo(s_id) for key in result: recall_dt = key first_level = result[key][0] fission_0 = result[key][1] fission_1 = result[key][2] fission_2 = result[key][3] # print(s_id, recall_dt, first_level, fission_0, fission_1, fission_2) update_sql = f""" UPDATE long_articles_detail_info set first_level = %s, fission_0 = %s, fission_1 = %s, fission_2 = %s where root_source_id = %s and recall_dt = %s; """ try: cls.pqMysql.update( sql=update_sql, params=( first_level, fission_0, fission_1, fission_2, s_id, recall_dt ) ) except Exception as e: log( task="updateMinigramInfoDaily", function="updateDetail", status="fail", message="mysql 更新失败, 报错信息是 {}".format(e) ) except Exception as e: log( task="updateMinigramInfoDaily", function="updateDetail", status="fail", message="更新单条数据失败, 报错信息是 {}".format(e) ) def updateArticlesJob(): """ 更新文章数据 :return: """ DDM = DailyDataManager() article_list = DDM.getPublishedArticles() for article in tqdm(article_list): DDM.updateInfo(article) log( task="updateMinigramInfoDaily", function="updateArticlesJob", message="文章更新完成---{}".format(datetime.today().__str__()) ) def updateMinigramInfoJob(): """ 更新前三天小程序数据 :return: """ DDM = DailyDataManager() try: DDM.updateDetail() log( task="updateMinigramInfoDaily", function="updateArticlesJob", message="小程序更新完成---{}".format(datetime.today().__str__()) ) except Exception as e: log( task="updateMinigramInfoDaily", function="updateArticlesJob", status="fail", message="小程序更新失败---{}, 报错信息是: {}".format(datetime.today().__str__(), e) ) if __name__ == '__main__': # updateMinigramInfoJob() schedule.every().day.at("01:30").do(Functions().job_with_thread, updateArticlesJob) schedule.every().day.at("03:30").do(Functions().job_with_thread, updateMinigramInfoJob) while True: schedule.run_pending() time.sleep(1) # log( # task="updateMinigramInfoDaily", # function="main", # message="更新文章小程序信息任务正常执行" # )