""" @author: luojunhui Read data from ODPS daily and save file to static folder """ import json import os import time import schedule from datetime import datetime, timedelta from concurrent.futures.thread import ThreadPoolExecutor from applications.functions.odps import PyODPS from applications.functions.ask_kimi import ask_kimi def read_data_from_odps_daily(dt): """ Read data from ODPS daily and save file to static folder :return: """ sql = f"""select * from loghubods.lastday_return where dt = '{dt}' limit 2000;""" data_list = PyODPS().select(sql) obj_list = [ { "video_id": obj['videoid'], "title": obj['title'], "uid": obj['uid'], "dt": dt } for obj in data_list ] return obj_list def save_file_to_local(obj): """ use kimi to mine data info and save to local file :param obj: :return: """ video_id = obj['video_id'] title = obj['title'] uid = obj['uid'] dt = obj["dt"] save_path = os.path.join(os.getcwd(), 'applications', 'static', dt, "{}_{}.json".format(uid, video_id)) print(save_path) if os.path.exists(save_path): return else: os.makedirs(os.path.dirname(save_path), exist_ok=True) if not title: result = {} else: result = ask_kimi(title) print(result) with open(save_path, "w", encoding="utf-8") as f: f.write(json.dumps(result, ensure_ascii=False)) def run(): """ Read data from ODPS daily and save file to static folder with thread pool :return: """ today = datetime.today() yesterday = today - timedelta(days=1) yesterday_str = yesterday.strftime("%Y%m%d") data_list = read_data_from_odps_daily(yesterday_str) # print(data_list) # for obj in data_list: # save_file_to_local(obj) with ThreadPoolExecutor(max_workers=10) as Pool: Pool.map(save_file_to_local, data_list) if __name__ == '__main__': run() # 设置任务每天的 9:00 执行 # schedule.every().day.at("09:00").do(run) # while True: # schedule.run_pending() # time.sleep(1)