import json import pandas as pd from tqdm import tqdm from datetime import datetime, timedelta from applications import AdMySQL, PQMySQL, WeixinSpider class DailyDataManager(object): """ daily 数据每日更新 """ ad_mysql = AdMySQL() pq_mysql = PQMySQL() wx_spider = WeixinSpider() @classmethod def getPublishedArticles(cls): """ 获取已经发布的文章的信息 :return: """ # sql = f""" # SELECT ContentUrl, wx_sn, createTime # FROM official_articles_v2 # WHERE createTime > 1719763200; # """ sql2 = f""" select ContentUrl, wx_sn, createTime from official_articles_v2 where createTime > 1719763200 and accountName in ( select distinct account_name from account_avg_info_v2 ); """ result_list = cls.pq_mysql.select(sql2) return result_list @classmethod def getRootSourceIds(cls, data_info): """ 通过抓取接口获取 data_info :return: """ url = data_info[0] article_detail = cls.wx_spider.get_article_text(url) print(url) print(article_detail) # print(json.dumps(article_detail, ensure_ascii=False, indent=4)) mini_info = article_detail['data']['data']['mini_program'] return data_info[1].decode(), mini_info, data_info[2] @classmethod def getMinigramInfo(cls, rootSourceId): """ :param rootIdTuple: :return: """ sql = f""" select type, machinecode, create_time, first_level_dt from changwen_data_base where rootsourceid = '{rootSourceId}'; """ result_list = cls.ad_mysql.select(sql) def summarize(values): """ :param values: :return: """ L = {} first_level = {} fission_level = {} for line in values: # 先统计首层 if line[0] == '首层': c_time = line[-2].__str__().split(" ")[0] if first_level.get(c_time): first_level[c_time].add(line[1]) else: first_level[c_time] = {line[1]} else: dt = str(line[-1]) first_level_dt = datetime.strptime(dt, '%Y%m%d') create_level_dt = line[-2] delta = create_level_dt - first_level_dt days = int(delta.days) key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d') if fission_level.get(key_dt): fission_level[key_dt].append((line[1], days)) else: fission_level[key_dt] = [(line[1], days)] tt = {} for key in fission_level: detail_list = fission_level[key] temp = {} for item in detail_list: mid, days = item if temp.get(days): temp[days].add(mid) else: temp[days] = {mid} final = {} for sub_key in temp: length = len(temp[sub_key]) final[sub_key] = length tt[key] = final for key in first_level: temp = [len(first_level[key]), tt.get(key, {}).get(0, 0), tt.get(key, {}).get(1, 0), tt.get(key, {}).get(2, 0)] L[key] = temp return L return summarize(result_list) @classmethod def getArticleInfo(cls, trace_id): """ 通过 trace_id来获取文章信息 :param trace_id: :return: """ sql = f""" SELECT account_name, article_title FROM long_articles_video WHERE trace_id = '{trace_id}'; """ info = cls.pq_mysql.select(sql) return info[0] @classmethod def updateDetail(cls): """ :return: """ sql = f""" select distinct root_source_id from long_articles_detail_info """ source_id_list = cls.pq_mysql.select(sql) for item in tqdm(source_id_list): s_id = item[0] try: result = cls.getMinigramInfo(s_id) for key in result: recall_dt = key first_level = result[key][0] fission_0 = result[key][1] fission_1 = result[key][2] fission_2 = result[key][3] print(key, first_level, fission_0, fission_1, fission_2) update_sql = f""" UPDATE long_articles_detail_info set first_level = %s, fission_0 = %s, fission_1 = %s, fission_2 = %s where root_source_id = %s and recall_dt = %s; """ try: cls.pq_mysql.update( sql=update_sql, params=( first_level, fission_0, fission_1, fission_2, s_id, recall_dt ) ) except Exception as e: print("insert error", e) except Exception as e: print(e) if __name__ == '__main__': DM = DailyDataManager() # DM.updateDetail() publishArticles = DM.getPublishedArticles() print(len(publishArticles)) for line in tqdm(publishArticles): try: wx_sn, mini_info, create_time = DM.getRootSourceIds(line) dt_object = datetime.fromtimestamp(create_time) publish_dt = dt_object.strftime('%Y-%m-%d') one_day = timedelta(days=1) two_day = timedelta(days=2) next_day = dt_object + one_day next_next_day = dt_object + two_day recall_dt_list = [dt_object, next_day, next_next_day] recall_dt_str_list = [i.strftime('%Y-%m-%d') for i in recall_dt_list] for dt_str in recall_dt_str_list: for index, item in enumerate(mini_info, 1): image_url = item['image_url'] nick_name = item['nike_name'] root_source_id = item['path'].split("rootSourceId%3D")[-1] video_id = item['path'].split("videos%3Fid%3D")[1].split("%26su%3D")[0] kimi_title = item['title'] insert_sql = f""" INSERT INTO long_articles_detail_info (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt) values (%s, %s, %s, %s, %s, %s, %s, %s, %s); """ DM.pq_mysql.update( sql=insert_sql, params=( wx_sn, kimi_title, nick_name, image_url, index, root_source_id, video_id, publish_dt, dt_str ) ) except Exception as e: print(e) pass # for line in DailyIds: # try: # source_id_tuple = DM.getRootSourceIds(trace_id=line) # result = DM.getMinigramInfo(source_id_tuple) # print(line) # print(result) # print("\n") # except Exception as e: # print(e) # print(line) # L = {} # trace_id = "search-a9bb246a-57fa-49f4-88d7-eec575813130-1723608633" # source_id_tuple = DM.getRootSourceIds(trace_id=trace_id) # result = DM.getMinigramInfo(source_id_tuple) # print(result) # for t_id in tqdm(DailyIds): # s_tuple = DM.getRootSourceIds(trace_id=t_id) # first_, fission_ = DM.getMinigramInfo(s_tuple) # obj = { # "first_": first_, # "fission_": fission_, # "rate": fission_ / first_ if first_ > 0 else 0 # } # L[t_id] = obj # Df = [] # with open("t.json", encoding="utf-8") as f: # L = json.loads(f.read()) # for key in L: # print(key) # value = L[key] # result = DM.getArticleInfo(trace_id=key) # account_name, article_title = result # temp = [ # account_name, # article_title, # value['first_'], # value['fission_'], # value['rate'] # ] # Df.append(temp) # df = pd.DataFrame(Df, columns=['account_name', 'article_title', 'first_', 'fission_', 'rate0']) # df.to_excel("0825.xlsx", index=False)