video_insight_consumption_work.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. import asyncio
  2. import json
  3. import os
  4. import sys
  5. import orjson
  6. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  7. from apscheduler.triggers.interval import IntervalTrigger
  8. from loguru import logger
  9. sys.path.append('/app')
  10. from utils.aliyun_log import AliyunLogger
  11. from utils.google_ai_studio import GoogleAI
  12. from utils.piaoquan import PQ
  13. from utils.redis import RedisHelper, content_video_data
  14. from utils.mysql_db import MysqlHelper
  15. class ConsumptionRecommend(object):
  16. @classmethod
  17. async def run(cls):
  18. logger.info(f"[处理] 开始获取redis数据")
  19. task = RedisHelper().get_client().rpop(name = 'task:video_insight')
  20. if not task:
  21. logger.info('[处理] 无待执行的扫描任务')
  22. return
  23. task = orjson.loads(task)
  24. logger.info(f"[处理] 获取redis数据{task}")
  25. video_id = task['video_id']
  26. count_sql = f"""select count(1) from video_demand_analysis where video_id = {video_id}"""
  27. count = MysqlHelper.get_values(count_sql)
  28. if count and count[0][0] == 0:
  29. logger.info(f"[处理] 视频重复过滤")
  30. return
  31. logger.info(f"[处理] 开始获取原视频OSS地址")
  32. video_title, video_path = PQ.get_pq_oss(video_id)
  33. if not video_path:
  34. return
  35. logger.info(f"[处理] 获取原视频OSS地址,视频链接:{video_path}")
  36. video_url = f"http://rescdn.yishihui.com/{video_path}"
  37. logger.info(f"[处理] 开始分析视频")
  38. api_key = os.getenv("VIDEO_INSIGHT_GEMINI_API_KEY")
  39. # api_key = 'AIzaSyBFLCKMLX-Pf1iXoC2e_rMDLbNhNG23vTk'
  40. logger.info(f"[处理] 使用的API_KEY:{api_key}")
  41. text = GoogleAI.run(api_key, video_url)
  42. if "[异常]" in text:
  43. content_video_data(json.dumps(task))
  44. # Parse JSON data
  45. data = json.loads(orjson.dumps(text).decode())
  46. # Generate SQL insert statement
  47. sql = """
  48. INSERT INTO video_demand_analysis (
  49. video_id, video_link, video_title, content_type,
  50. demand_order, demand_score, user_demand, demand_category,
  51. demand_reason, product_hook, hook_time, hook_desc,
  52. hook_type, landing_desc, landing_type, platform_case
  53. ) VALUES
  54. """
  55. # Add values for each entry
  56. values = []
  57. link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
  58. for entry in data:
  59. value = f"""(
  60. {video_id}, {link}, {video_title}, NULL,
  61. {entry.get('需求排序序号', '')}, {entry.get('需求强烈程度分值', '')}, '{entry.get('用户具体的需求描述', '')}', '{entry.get('需求分类', '')}',
  62. '{entry.get('推测出该点需求的原因', '')}', '{entry.get('描述出与需求对应的产品钩子', '')}', '{entry.get('产品形式出现到消失的时间点', '')}', '{entry.get('钩子形式描述', '')}',
  63. '{entry.get('钩子形式类型', '')}', '{entry.get('点击钩子后的产品落地形态描述', '')}', '{entry.get('产品落地形态分类', '')}', '{entry.get('其他平台案例', '')}'
  64. )"""
  65. values.append(value)
  66. # Combine SQL statement and values
  67. sql += ",\n".join(values) + ";"
  68. # Print SQL statement
  69. print(sql)
  70. MysqlHelper.update_values(sql)
  71. # AliyunLogger.logging(str(video_id), orjson.dumps(text).decode())
  72. logger.info(f"[处理] 写入日志成功")
  73. async def run():
  74. scheduler = AsyncIOScheduler()
  75. try:
  76. logger.info(f"[处理] 开始启动")
  77. scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=2)) # 每2分钟启动一次
  78. scheduler.start()
  79. await asyncio.Event().wait()
  80. except KeyboardInterrupt:
  81. pass
  82. except Exception as e:
  83. logger.error(f"[处理] 启动异常,异常信息:{e}")
  84. pass
  85. finally:
  86. scheduler.shutdown()
  87. if __name__ == '__main__':
  88. # asyncio.run(ConsumptionRecommend.run())
  89. loop = asyncio.get_event_loop()
  90. loop.run_until_complete(run())