select_work.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. import asyncio
  2. import datetime
  3. import time
  4. import orjson
  5. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  6. from apscheduler.triggers.cron import CronTrigger
  7. from loguru import logger
  8. from utils.feishu_utils import Feishu
  9. from utils.odps_data import OdpsDataCount
  10. from utils.redis import RedisHelper
  11. class StartGetRecommend(object):
  12. @classmethod
  13. async def run(cls):
  14. dt = (datetime.datetime.now() - datetime.timedelta(hours=2)).strftime('%Y%m%d%H') # 获取前一小时
  15. tasks = OdpsDataCount.get_data_count(dt)
  16. logger.info(f"[获取] {dt}时间,共获取到{len(tasks)} 条")
  17. if len(tasks) > 0:
  18. await RedisHelper().get_client().rpush('gong_ji_heng_ceng:scan_tasks', *tasks)
  19. logger.info(f"[获取] {dt}时间,共获取到{len(tasks)}条,写入redis成功")
  20. logger.info(f"[获取] 开始写入飞书表格")
  21. current_time = datetime.datetime.now()
  22. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  23. for task in tasks:
  24. time.sleep(0.5)
  25. task = orjson.loads(task)
  26. values = [[task['partition'], task['video_id'], task['channel'], task['time'], task["type"],formatted_time]]
  27. Feishu.insert_columns("JY4esfYvShLbTkthHEqcTw5qnsh", "f53916", "ROWS", 1, 2)
  28. time.sleep(0.5)
  29. Feishu.update_values("JY4esfYvShLbTkthHEqcTw5qnsh", "f53916", "A2:Z2", values)
  30. logger.info(f"[处理] 写入飞书表格一条成功")
  31. logger.info(f"[处理] 写入飞书表格全部成功")
  32. async def run():
  33. scheduler = AsyncIOScheduler()
  34. try:
  35. scheduler.add_job(StartGetRecommend.run, trigger=CronTrigger(hour=1, second=0)) # 每小时获取一次
  36. scheduler.start()
  37. await asyncio.Event().wait()
  38. except KeyboardInterrupt:
  39. pass
  40. except Exception as e:
  41. pass
  42. finally:
  43. scheduler.shutdown()
  44. if __name__ == '__main__':
  45. # asyncio.run(StartGetRecommend.run())
  46. loop = asyncio.get_event_loop()
  47. loop.run_until_complete(run())