read_data_from_odps_daily.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. """
  2. @author: luojunhui
  3. Read data from ODPS daily and save file to static folder
  4. """
  5. import json
  6. import os
  7. import time
  8. import schedule
  9. from datetime import datetime, timedelta
  10. from concurrent.futures.thread import ThreadPoolExecutor
  11. from applications.functions.odps import PyODPS
  12. from applications.functions.ask_kimi import ask_kimi
  13. def read_data_from_odps_daily(dt):
  14. """
  15. Read data from ODPS daily and save file to static folder
  16. :return:
  17. """
  18. sql = f"""select * from loghubods.lastday_return where dt = '{dt}' limit 20;"""
  19. data_list = PyODPS().select(sql)
  20. obj_list = [
  21. {
  22. "video_id": obj['videoid'],
  23. "title": obj['title'],
  24. "uid": obj['uid'],
  25. "dt": dt
  26. } for obj in data_list
  27. ]
  28. return obj_list
  29. def save_file_to_local(obj):
  30. """
  31. use kimi to mine data info and save to local file
  32. :param obj:
  33. :return:
  34. """
  35. video_id = obj['video_id']
  36. title = obj['title']
  37. uid = obj['uid']
  38. dt = obj["dt"]
  39. save_path = os.path.join(os.getcwd(), 'applications', 'static', dt, "{}_{}.json".format(uid, video_id))
  40. print(save_path)
  41. if os.path.exists(save_path):
  42. return
  43. else:
  44. os.makedirs(os.path.dirname(save_path), exist_ok=True)
  45. if not title:
  46. result = {}
  47. else:
  48. result = ask_kimi(title)
  49. print(result)
  50. with open(save_path, "w", encoding="utf-8") as f:
  51. f.write(json.dumps(result, ensure_ascii=False))
  52. def run():
  53. """
  54. Read data from ODPS daily and save file to static folder with thread pool
  55. :return:
  56. """
  57. today = datetime.today()
  58. yesterday = today - timedelta(days=1)
  59. yesterday_str = yesterday.strftime("%Y%m%d")
  60. data_list = read_data_from_odps_daily(yesterday_str)
  61. # print(data_list)
  62. for obj in data_list:
  63. save_file_to_local(obj)
  64. # with ThreadPoolExecutor(max_workers=10) as Pool:
  65. # Pool.map(save_file_to_local, data_list)
  66. if __name__ == '__main__':
  67. run()
  68. # # 设置任务每天的 9:00 执行
  69. # schedule.every().day.at("09:00").do(run)
  70. # while True:
  71. # schedule.run_pending()
  72. # time.sleep(1)