video_insight_consumption_work.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. import asyncio
  2. import json
  3. import os
  4. import sys
  5. import orjson
  6. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  7. from apscheduler.triggers.interval import IntervalTrigger
  8. from loguru import logger
  9. logger.add("/app/logs/consumption.log", rotation="10 MB")
  10. sys.path.append('/app')
  11. from utils.aliyun_log import AliyunLogger
  12. from utils.google_ai_studio import GoogleAI
  13. from utils.piaoquan import PQ
  14. from utils.redis import RedisHelper, content_video_data
  15. from utils.mysql_db import MysqlHelper
  16. class ConsumptionRecommend(object):
  17. @classmethod
  18. async def run(cls):
  19. logger.info(f"[处理] 开始获取redis数据")
  20. while True:
  21. task = RedisHelper().get_client().rpop(name='task:video_insight')
  22. if not task:
  23. logger.info('[处理] 无待执行的扫描任务')
  24. return
  25. task = orjson.loads(task)
  26. logger.info(f"[处理] 获取redis数据{task}")
  27. video_id = task['video_id']
  28. count_sql = f"""select count(1) from video_demand_analysis where video_id = {video_id}"""
  29. count = MysqlHelper.get_values(count_sql)
  30. if not count or count[0][0] == 0:
  31. logger.info(f"[处理] 视频ID {video_id} 可用")
  32. # 这里可以继续处理 video_id
  33. break
  34. else:
  35. logger.info(f"[处理] 视频ID {video_id} 重复过滤,继续获取下一个任务")
  36. logger.info(f"[处理] 开始获取原视频OSS地址")
  37. video_title, video_path = PQ.get_pq_oss(video_id)
  38. if not video_path:
  39. return
  40. logger.info(f"[处理] 获取原视频OSS地址,视频链接:{video_path}")
  41. video_url = f"http://rescdn.yishihui.com/{video_path}"
  42. logger.info(f"[处理] 开始分析视频")
  43. api_key = os.getenv("VIDEO_INSIGHT_GEMINI_API_KEY")
  44. # api_key = 'AIzaSyBFLCKMLX-Pf1iXoC2e_rMDLbNhNG23vTk'
  45. logger.info(f"[处理] 使用的API_KEY:{api_key}")
  46. text,text1 = GoogleAI.run(api_key, video_url)
  47. if "[异常]" in text:
  48. content_video_data(json.dumps(task))
  49. # Parse JSON data
  50. data = json.loads(orjson.dumps(text).decode())
  51. # Generate SQL insert statement
  52. sql = """
  53. INSERT INTO video_demand_analysis (
  54. video_id, video_link, video_title, content_type,
  55. demand_order, demand_score, user_demand, demand_category,
  56. demand_reason, product_hook, hook_time, hook_desc,
  57. hook_type, landing_desc, landing_type, platform_case, reasoning_process
  58. ) VALUES
  59. """
  60. # Add values for each entry
  61. values = []
  62. link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
  63. for entry in data:
  64. value = f"""(
  65. {video_id}, '{link}', '{video_title}', NULL,
  66. {entry.get('需求排序序号', '')}, {entry.get('需求强烈程度分值', '')}, '{entry.get('用户具体的需求描述', '')}', '{entry.get('需求分类', '')}',
  67. '{entry.get('推测出该点需求的原因', '')}', '{entry.get('需求详细query', '')}', '', '{entry.get('需求钩子话术', '')}',
  68. '', '{entry.get('落地方案形态描述', '')}', '{entry.get('落地方案类型', '')}', '', ''
  69. )"""
  70. values.append(value)
  71. # Combine SQL statement and values
  72. sql += ",\n".join(values) + ";"
  73. # Print SQL statement
  74. logger.info(f"{sql}")
  75. MysqlHelper.update_values(sql)
  76. logger.info(f"[处理] text写入数据库成功")
  77. # Parse JSON data
  78. data = json.loads(orjson.dumps(text1).decode())
  79. # Generate SQL insert statement
  80. sql = """
  81. INSERT INTO video_demand_score (
  82. video_id, video_link, video_title, demand_score,reason
  83. ) VALUES
  84. """
  85. # Add values for each entry
  86. values = []
  87. link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
  88. for entry in data:
  89. value = f"""(
  90. {video_id}, '{link}', '{video_title}', {entry.get('需求强烈程度分值', '')}, '{entry.get('理由', '')}'
  91. )"""
  92. values.append(value)
  93. # Combine SQL statement and values
  94. sql += ",\n".join(values) + ";"
  95. # Print SQL statement
  96. logger.info(f"{sql}")
  97. MysqlHelper.update_values(sql)
  98. logger.info(f"[处理] text1写入数据库成功")
  99. async def run():
  100. scheduler = AsyncIOScheduler()
  101. try:
  102. logger.info(f"[处理] 开始启动")
  103. scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=2)) # 每2分钟启动一次
  104. scheduler.start()
  105. await asyncio.Event().wait()
  106. except KeyboardInterrupt:
  107. pass
  108. except Exception as e:
  109. logger.error(f"[处理] 启动异常,异常信息:{e}")
  110. pass
  111. finally:
  112. scheduler.shutdown()
  113. if __name__ == '__main__':
  114. # asyncio.run(ConsumptionRecommend.run())
  115. loop = asyncio.get_event_loop()
  116. loop.run_until_complete(run())