123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363 |
- """
- @author luojunhui
- @description Update Minigram Info Daily
- """
- import time
- import sys
- from tqdm import tqdm
- from datetime import datetime, timedelta
- import schedule
- from argparse import ArgumentParser
- from applications import longArticlesMySQL, PQMySQL, WeixinSpider, Functions, log, bot
- TASK_NAME = "updateMinigramInfoDaily"
- def get_yesterday():
- yesterday = datetime.today() - timedelta(1)
- return yesterday
- class DailyDataManager(object):
- """
- daily 数据每日更新
- """
- long_articles_db = longArticlesMySQL()
- pq_db = PQMySQL()
- wx_spider = WeixinSpider()
- functions = Functions()
- @classmethod
- def get_published_articles(cls, biz_date):
- """
- 获取已经发布的文章的信息, createTime 选择为前一天的 0 点并且转化为时间戳
- :return:
- """
- biz_date_midnight = datetime(year=biz_date.year, month=biz_date.month, day=biz_date.day)
- biz_date_ts = biz_date_midnight.timestamp()
- biz_date_end_ts = biz_date_ts + 24 * 60 * 60 - 1
- sql2 = f"""
- select ContentUrl, wx_sn, createTime
- from official_articles_v2
- where createTime between {biz_date_ts} and {biz_date_end_ts};
- -- and accountName in (
- -- select distinct account_name from account_avg_info_v2
- -- );
- """
- result_list = cls.pq_db.select(sql2)
- log(
- task=TASK_NAME,
- function="get_published_articles",
- message="一共获取 {} 篇文章数据".format(len(result_list))
- )
- return result_list
- @classmethod
- def update_article_info(cls, line):
- """
- update info into mysql
- :return:
- """
- try:
- wx_sn, mini_info, create_time = cls.get_root_source_ids(line)
- dt_object = datetime.fromtimestamp(create_time)
- publish_dt = dt_object.strftime('%Y-%m-%d')
- one_day = timedelta(days=1)
- two_day = timedelta(days=2)
- next_day = dt_object + one_day
- next_next_day = dt_object + two_day
- recall_dt_list = [dt_object, next_day, next_next_day]
- recall_dt_str_list = [i.strftime('%Y-%m-%d') for i in recall_dt_list]
- for dt_str in recall_dt_str_list:
- for index, item in enumerate(mini_info, 1):
- image_url = item['image_url']
- nick_name = item['nike_name']
- root_source_id = item['path'].split("rootSourceId%3D")[-1]
- video_id = item['path'].split("videos%3Fid%3D")[1].split("%26su%3D")[0]
- kimi_title = item['title']
- # print(image_url, nick_name, root_source_id, video_id, kimi_title)
- insert_sql = f"""
- INSERT INTO long_articles_detail_info
- (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
- values
- (%s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- cls.pq_db.update(
- sql=insert_sql,
- params=(
- wx_sn,
- kimi_title,
- nick_name,
- image_url,
- index,
- root_source_id,
- video_id,
- publish_dt,
- dt_str
- )
- )
- log(
- task=TASK_NAME,
- function="update_article_info",
- message="插入数据成功, video_id 是: {}".format(video_id)
- )
- except Exception as e:
- log(
- task=TASK_NAME,
- function="update_article_info",
- status="fail",
- message="插入数据失败, 失败原因是".format(e)
- )
- @classmethod
- def get_root_source_ids(cls, data_info):
- """
- 通过抓取接口获取 data_info
- :return:
- """
- url = data_info[0]
- try:
- article_detail = cls.wx_spider.get_article_text(url)
- mini_info = article_detail['data']['data']['mini_program']
- log(
- task=TASK_NAME,
- function="get_root_source_ids",
- message="获取文章链接对应的 rootSourceId 成功",
- data={
- "ContentUrl": url,
- "wxSn": data_info[1].decode(),
- "createTime": data_info[2],
- "miniInfo": mini_info
- }
- )
- return data_info[1].decode(), mini_info, data_info[2]
- except Exception as e:
- log(
- task=TASK_NAME,
- function="get_root_source_ids",
- status="fail",
- message="获取文章链接对应的 rootSourceId失败, 报错信息是: {}".format(e),
- data={
- "ContentUrl": url
- }
- )
- return
- @classmethod
- def get_minigram_info(cls, rootSourceId):
- """
- :param rootSourceId:
- :return:
- """
- sql = f"""
- select type, machinecode, create_time, first_level_dt
- from changwen_data_base_v2
- where rootsourceid = '{rootSourceId}';
- """
- result_list = cls.long_articles_db.select(sql)
- def summarize(values):
- """
- :param values:
- :return:
- """
- L = {}
- first_level = {}
- fission_level = {}
- for line in values:
- # 先统计首层
- if line[0] == '首层':
- try:
- dt = str(line[-1])
- key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
- if first_level.get(key_dt):
- first_level[key_dt].add(line[1])
- else:
- first_level[key_dt] = {line[1]}
- except Exception as e:
- continue
- else:
- try:
- dt = str(line[-1])
- first_level_dt = datetime.strptime(dt, '%Y%m%d')
- create_level_dt = line[-2]
- delta = create_level_dt - first_level_dt
- days = int(delta.days)
- key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
- if fission_level.get(key_dt):
- fission_level[key_dt].append((line[1], days))
- else:
- fission_level[key_dt] = [(line[1], days)]
- except Exception as e:
- continue
- # print("first level dt is NULL")
- tt = {}
- for key in fission_level:
- detail_list = fission_level[key]
- temp = {}
- for item in detail_list:
- mid, days = item
- if temp.get(days):
- temp[days].add(mid)
- else:
- temp[days] = {mid}
- final = {}
- for sub_key in temp:
- length = len(temp[sub_key])
- final[sub_key] = length
- tt[key] = final
- for key in first_level:
- temp = [len(first_level[key]), tt.get(key, {}).get(0, 0), tt.get(key, {}).get(1, 0), tt.get(key, {}).get(2, 0)]
- L[key] = temp
- return L
- try:
- response = summarize(result_list)
- log(
- task=TASK_NAME,
- function="get_minigram_info",
- message="计算source_id信息成功",
- data=response
- )
- return response
- except Exception as e:
- log(
- task=TASK_NAME,
- function="get_minigram_info",
- message="获取 source_id信息失败, 报错信息是: {}".format(e),
- status="fail"
- )
- return None
- @classmethod
- def update_minigram_detail(cls, biz_date):
- """
- :return:
- """
- # 获取三天前的日期
- date_begin = biz_date - timedelta(days=3)
- datestr_begin = date_begin.strftime("%Y-%m-%d")
- datestr_end = biz_date.strftime("%Y-%m-%d")
- sql = f"""
- select distinct root_source_id
- from long_articles_detail_info
- where publish_dt between '{datestr_begin}' and '{datestr_end}';
- """
- source_id_list = cls.pq_db.select(sql)
- log(
- task=TASK_NAME,
- function="update_minigram_detail",
- message="获取前三天的 rootSourceId, 一共有 {} 条记录".format(len(source_id_list))
- )
- for item in tqdm(source_id_list):
- s_id = item[0]
- try:
- result = cls.get_minigram_info(s_id)
- for key in result:
- recall_dt = key
- first_level = result[key][0]
- fission_0 = result[key][1]
- fission_1 = result[key][2]
- fission_2 = result[key][3]
- # print(s_id, recall_dt, first_level, fission_0, fission_1, fission_2)
- update_sql = f"""
- UPDATE long_articles_detail_info
- set first_level = %s, fission_0 = %s, fission_1 = %s, fission_2 = %s
- where root_source_id = %s and recall_dt = %s;
- """
- try:
- cls.pq_db.update(
- sql=update_sql,
- params=(
- first_level, fission_0, fission_1, fission_2, s_id, recall_dt
- )
- )
- except Exception as e:
- log(
- task=TASK_NAME,
- function="update_minigram_detail",
- status="fail",
- message="mysql 更新失败, 报错信息是 {}".format(e)
- )
- except Exception as e:
- log(
- task=TASK_NAME,
- function="update_minigram_detail",
- status="fail",
- message="更新单条数据失败, 报错信息是 {}".format(e)
- )
- def updateArticlesJob(biz_date=None):
- """
- 更新文章数据
- :return:
- """
- if not biz_date:
- biz_date = get_yesterday()
- data_manager = DailyDataManager()
- article_list = data_manager.get_published_articles(biz_date)
- for article in tqdm(article_list):
- data_manager.update_article_info(article)
- log(
- task=TASK_NAME,
- function="updateArticlesJob",
- message="文章更新完成---{}".format(biz_date.__str__())
- )
- def updateMinigramInfoJob(biz_date=None):
- """
- 更新前三天小程序数据
- :return:
- """
- if not biz_date:
- biz_date = get_yesterday()
- data_manager = DailyDataManager()
- try:
- data_manager.update_minigram_detail(biz_date)
- log(
- task=TASK_NAME,
- function="updateMinigramInfoJob",
- message="小程序更新完成---{}".format(biz_date.__str__())
- )
- except Exception as e:
- log(
- task=TASK_NAME,
- function="updateMinigramInfoJob",
- status="fail",
- message="小程序更新失败---{}, 报错信息是: {}".format(biz_date.__str__(), e)
- )
- def main():
- parser = ArgumentParser()
- parser.add_argument("--run-date",
- help="Run only once for date in format of %Y%m%d. \
- If no specified, run as daily jobs.")
- args = parser.parse_args()
- if args.run_date:
- biz_date = datetime.strptime(args.run_date, "%Y%m%d")
- print("Run in manual mode. Date: {}".format(args.run_date))
- updateArticlesJob(biz_date)
- updateMinigramInfoJob(biz_date)
- return
- else:
- print("Run in daily mode.")
- schedule.every().day.at("01:30").do(Functions().job_with_thread, updateArticlesJob)
- schedule.every().day.at("03:30").do(Functions().job_with_thread, updateMinigramInfoJob)
- while True:
- schedule.run_pending()
- time.sleep(1)
- # log(
- # task=TASK_NAME,
- # function="main",
- # message="更新文章小程序信息任务正常执行"
- # )
- if __name__ == '__main__':
- main()
|