123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- import json
- from concurrent.futures.thread import ThreadPoolExecutor
- from tqdm import tqdm
- from datetime import datetime, timedelta
- from applications import AdMySQL, PQMySQL, WeixinSpider
- class DailyDataManager(object):
- """
- daily 数据每日更新
- """
- ad_mysql = AdMySQL()
- pq_mysql = PQMySQL()
- wx_spider = WeixinSpider()
- @classmethod
- def getPublishedArticles(cls):
- """
- 获取已经发布的文章的信息
- :return:
- """
- sql2 = f"""
- select ContentUrl, wx_sn, createTime from official_articles_v2 where createTime > 1719763200 and accountName in (
- select distinct account_name from account_avg_info_v2);
- """
- result_list = cls.pq_mysql.select(sql2)
- return result_list
- @classmethod
- def getRootSourceIds(cls, data_info):
- """
- 通过抓取接口获取 data_info
- :return:
- """
- url = data_info[0]
- article_detail = cls.wx_spider.get_article_text(url)
- print(url)
- print(article_detail)
- mini_info = article_detail['data']['data']['mini_program']
- return data_info[1].decode(), mini_info, data_info[2]
- @classmethod
- def getMinigramInfo(cls, rootSourceId):
- """
- :param rootIdTuple:
- :return:
- """
- sql = f"""
- select type, machinecode, create_time, first_level_dt
- from changwen_data_base_v2
- where rootsourceid = '{rootSourceId}';
- """
- result_list = cls.ad_mysql.select(sql)
- def summarize(values):
- """
- :param values:
- :return:
- """
- L = {}
- first_level = {}
- fission_level = {}
- for line in values:
- # 先统计首层
- if line[0] == '首层':
- try:
- dt = str(line[-1])
- key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
- if first_level.get(key_dt):
- first_level[key_dt].add(line[1])
- else:
- first_level[key_dt] = {line[1]}
- except Exception as e:
- continue
- else:
- try:
- dt = str(line[-1])
- first_level_dt = datetime.strptime(dt, '%Y%m%d')
- create_level_dt = line[-2]
- delta = create_level_dt - first_level_dt
- days = int(delta.days)
- key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
- if fission_level.get(key_dt):
- fission_level[key_dt].append((line[1], days))
- else:
- fission_level[key_dt] = [(line[1], days)]
- except Exception as e:
- continue
- # print("first level dt is NULL")
- tt = {}
- for key in fission_level:
- detail_list = fission_level[key]
- temp = {}
- for item in detail_list:
- mid, days = item
- if temp.get(days):
- temp[days].add(mid)
- else:
- temp[days] = {mid}
- final = {}
- for sub_key in temp:
- length = len(temp[sub_key])
- final[sub_key] = length
- tt[key] = final
- for key in first_level:
- temp = [len(first_level[key]), tt.get(key, {}).get(0, 0), tt.get(key, {}).get(1, 0), tt.get(key, {}).get(2, 0)]
- L[key] = temp
- return L
- return summarize(result_list)
- @classmethod
- def getArticleInfo(cls, trace_id):
- """
- 通过 trace_id来获取文章信息
- :param trace_id:
- :return:
- """
- sql = f"""
- SELECT account_name, article_title
- FROM long_articles_video
- WHERE trace_id = '{trace_id}';
- """
- info = cls.pq_mysql.select(sql)
- return info[0]
- @classmethod
- def updateDetail(cls):
- """
- :return:
- """
- sql = f"""
- select distinct root_source_id
- from long_articles_detail_info;
- """
- source_id_list = cls.pq_mysql.select(sql)
- for item in tqdm(source_id_list):
- s_id = item[0]
- try:
- result = cls.getMinigramInfo(s_id)
- for key in result:
- recall_dt = key
- first_level = result[key][0]
- fission_0 = result[key][1]
- fission_1 = result[key][2]
- fission_2 = result[key][3]
- print(key, first_level, fission_0, fission_1, fission_2)
- update_sql = f"""
- UPDATE long_articles_detail_info
- set first_level = %s, fission_0 = %s, fission_1 = %s, fission_2 = %s
- where root_source_id = %s and recall_dt = %s;
- """
- try:
- cls.pq_mysql.update(
- sql=update_sql,
- params=(
- first_level, fission_0, fission_1, fission_2, s_id, recall_dt
- )
- )
- except Exception as e:
- print("insert error", e)
- except Exception as e:
- print(e)
- if __name__ == '__main__':
- DM = DailyDataManager()
- # result = DM.getMinigramInfo("longArticles_d409f27d9d64501d6811b47a3779d2d7")
- # print(result)
- # DM.updateDetail()
|