123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081 |
- """
- @author: luojunhui
- Read data from ODPS daily and save file to static folder
- """
- import json
- import os
- import time
- import schedule
- from datetime import datetime, timedelta
- from concurrent.futures.thread import ThreadPoolExecutor
- from applications.functions.odps import PyODPS
- from applications.functions.ask_kimi import ask_kimi
- def read_data_from_odps_daily(dt):
- """
- Read data from ODPS daily and save file to static folder
- :return:
- """
- sql = f"""select * from loghubods.lastday_return where dt = '{dt}' limit 20;"""
- data_list = PyODPS().select(sql)
- obj_list = [
- {
- "video_id": obj['videoid'],
- "title": obj['title'],
- "uid": obj['uid'],
- "dt": dt
- } for obj in data_list
- ]
- return obj_list
- def save_file_to_local(obj):
- """
- use kimi to mine data info and save to local file
- :param obj:
- :return:
- """
- video_id = obj['video_id']
- title = obj['title']
- uid = obj['uid']
- dt = obj["dt"]
- save_path = os.path.join(os.getcwd(), 'applications', 'static', dt, "{}_{}.json".format(uid, video_id))
- print(save_path)
- if os.path.exists(save_path):
- return
- else:
- os.makedirs(os.path.dirname(save_path), exist_ok=True)
- if not title:
- result = {}
- else:
- result = ask_kimi(title)
- print(result)
- with open(save_path, "w", encoding="utf-8") as f:
- f.write(json.dumps(result, ensure_ascii=False))
- def run():
- """
- Read data from ODPS daily and save file to static folder with thread pool
- :return:
- """
- today = datetime.today()
- yesterday = today - timedelta(days=1)
- yesterday_str = yesterday.strftime("%Y%m%d")
- data_list = read_data_from_odps_daily(yesterday_str)
- # print(data_list)
- for obj in data_list:
- save_file_to_local(obj)
- # with ThreadPoolExecutor(max_workers=10) as Pool:
- # Pool.map(save_file_to_local, data_list)
- if __name__ == '__main__':
- run()
- # # 设置任务每天的 9:00 执行
- # schedule.every().day.at("09:00").do(run)
- # while True:
- # schedule.run_pending()
- # time.sleep(1)
|