select_work.py 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. import asyncio
  2. import datetime
  3. import sys
  4. import time
  5. import orjson
  6. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  7. from apscheduler.triggers.cron import CronTrigger
  8. from loguru import logger
  9. sys.path.append('/app')
  10. from utils.feishu_utils import Feishu
  11. from utils.odps_data import OdpsDataCount
  12. from utils.redis import RedisHelper
  13. class StartGetRecommend(object):
  14. @classmethod
  15. async def run(cls):
  16. dt = (datetime.datetime.now() - datetime.timedelta(hours=1)).strftime('%Y%m%d%H') # 获取前一小时
  17. tasks = OdpsDataCount.get_data_count(dt)
  18. logger.info(f"[获取] {dt}时间,共获取到{len(tasks)} 条")
  19. if len(tasks) > 0:
  20. await RedisHelper().get_client().rpush('gong_ji_heng_ceng:scan_tasks', *tasks)
  21. logger.info(f"[获取] {dt}时间,共获取到{len(tasks)}条,写入redis成功")
  22. logger.info(f"[获取] 开始写入飞书表格")
  23. current_time = datetime.datetime.now()
  24. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  25. for task in tasks:
  26. time.sleep(0.5)
  27. task = orjson.loads(task)
  28. values = [[task['partition'], task['video_id'], task['channel'], task['time'], task["type"],formatted_time]]
  29. Feishu.insert_columns("JY4esfYvShLbTkthHEqcTw5qnsh", "f53916", "ROWS", 1, 2)
  30. time.sleep(0.5)
  31. Feishu.update_values("JY4esfYvShLbTkthHEqcTw5qnsh", "f53916", "A2:Z2", values)
  32. logger.info(f"[处理] 写入飞书表格一条成功")
  33. logger.info(f"[处理] 写入飞书表格全部成功")
  34. async def run():
  35. scheduler = AsyncIOScheduler()
  36. try:
  37. scheduler.add_job(StartGetRecommend.run, trigger=CronTrigger(minute=55, second=0)) # 每小时获取一次
  38. scheduler.start()
  39. await asyncio.Event().wait()
  40. except KeyboardInterrupt:
  41. pass
  42. except Exception as e:
  43. pass
  44. finally:
  45. scheduler.shutdown()
  46. if __name__ == '__main__':
  47. # asyncio.run(StartGetRecommend.run())
  48. loop = asyncio.get_event_loop()
  49. loop.run_until_complete(run())