|
@@ -1,81 +0,0 @@
|
|
|
-"""
|
|
|
-@author: luojunhui
|
|
|
-Read data from ODPS daily and save file to static folder
|
|
|
-"""
|
|
|
-import json
|
|
|
-import os
|
|
|
-import time
|
|
|
-import schedule
|
|
|
-from datetime import datetime, timedelta
|
|
|
-from concurrent.futures.thread import ThreadPoolExecutor
|
|
|
-
|
|
|
-from applications.functions.odps import PyODPS
|
|
|
-from applications.functions.ask_kimi import ask_kimi
|
|
|
-
|
|
|
-
|
|
|
-def read_data_from_odps_daily(dt):
|
|
|
- """
|
|
|
- Read data from ODPS daily and save file to static folder
|
|
|
- :return:
|
|
|
- """
|
|
|
- sql = f"""select * from loghubods.lastday_return where dt = '{dt}' limit 2000;"""
|
|
|
- data_list = PyODPS().select(sql)
|
|
|
- obj_list = [
|
|
|
- {
|
|
|
- "video_id": obj['videoid'],
|
|
|
- "title": obj['title'],
|
|
|
- "uid": obj['uid'],
|
|
|
- "dt": dt
|
|
|
- } for obj in data_list
|
|
|
- ]
|
|
|
- return obj_list
|
|
|
-
|
|
|
-
|
|
|
-def save_file_to_local(obj):
|
|
|
- """
|
|
|
- use kimi to mine data info and save to local file
|
|
|
- :param obj:
|
|
|
- :return:
|
|
|
- """
|
|
|
- video_id = obj['video_id']
|
|
|
- title = obj['title']
|
|
|
- uid = obj['uid']
|
|
|
- dt = obj["dt"]
|
|
|
- save_path = os.path.join(os.getcwd(), 'applications', 'static', dt, "{}_{}.json".format(uid, video_id))
|
|
|
- print(save_path)
|
|
|
- if os.path.exists(save_path):
|
|
|
- return
|
|
|
- else:
|
|
|
- os.makedirs(os.path.dirname(save_path), exist_ok=True)
|
|
|
- if not title:
|
|
|
- result = {}
|
|
|
- else:
|
|
|
- result = ask_kimi(title)
|
|
|
- print(result)
|
|
|
- with open(save_path, "w", encoding="utf-8") as f:
|
|
|
- f.write(json.dumps(result, ensure_ascii=False))
|
|
|
-
|
|
|
-
|
|
|
-def run():
|
|
|
- """
|
|
|
- Read data from ODPS daily and save file to static folder with thread pool
|
|
|
- :return:
|
|
|
- """
|
|
|
- today = datetime.today()
|
|
|
- yesterday = today - timedelta(days=1)
|
|
|
- yesterday_str = yesterday.strftime("%Y%m%d")
|
|
|
- data_list = read_data_from_odps_daily(yesterday_str)
|
|
|
- # print(data_list)
|
|
|
- # for obj in data_list:
|
|
|
- # save_file_to_local(obj)
|
|
|
- with ThreadPoolExecutor(max_workers=10) as Pool:
|
|
|
- Pool.map(save_file_to_local, data_list)
|
|
|
-
|
|
|
-
|
|
|
-if __name__ == '__main__':
|
|
|
- # run()
|
|
|
- # 设置任务每天的 9:00 执行
|
|
|
- schedule.every().day.at("09:00").do(run)
|
|
|
- while True:
|
|
|
- schedule.run_pending()
|
|
|
- time.sleep(1)
|