video_insight_trigger_work.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. import asyncio
  2. import json
  3. import os
  4. import sys
  5. import orjson
  6. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  7. from apscheduler.triggers.interval import IntervalTrigger
  8. from loguru import logger
  9. logger.add("/app/logs/consumption.log", rotation="10 MB")
  10. sys.path.append('/app')
  11. from utils.aliyun_log import AliyunLogger
  12. from utils.google_ai_analyze import GoogleAI
  13. from utils.piaoquan import PQ
  14. from utils.redis import RedisHelper, content_video_trigger_data
  15. from utils.mysql_db import MysqlHelper
  16. from utils.aliyun_security import Security
  17. class ConsumptionRecommend(object):
  18. @classmethod
  19. async def run(cls):
  20. logger.info(f"[处理 - trigger] 开始获取redis数据")
  21. while True:
  22. task = RedisHelper().get_client().rpop(name='task:video_trigger_insight')
  23. if not task:
  24. logger.info('[处理] 无待执行的扫描任务')
  25. return
  26. task = orjson.loads(task)
  27. logger.info(f"[处理 - trigger] 获取redis数据{task}")
  28. video_id = task['video_id']
  29. count_sql = f"""select count(1) from video_triggers_analysis where video_id = {video_id}"""
  30. count = MysqlHelper.get_values(count_sql)
  31. if not count or count[0][0] == 0:
  32. logger.info(f"[处理 - trigger] 视频ID {video_id} 可用")
  33. # 这里可以继续处理 video_id
  34. break
  35. else:
  36. logger.info(f"[处理 - trigger] 视频ID {video_id} 重复过滤,继续获取下一个任务")
  37. logger.info(f"[处理 - trigger] 开始获取原视频OSS地址")
  38. video_title, video_path = PQ.get_pq_oss(video_id)
  39. if not video_path:
  40. return
  41. logger.info(f"[处理 - trigger] 获取原视频OSS地址,视频链接:{video_path}")
  42. video_url = f"http://rescdn.yishihui.com/{video_path}"
  43. logger.info(f"[处理 - trigger] 开始分析视频")
  44. # api_key = os.getenv("VIDEO_INSIGHT_GEMINI_API_KEY")
  45. # logger.info(f"[处理 - trigger] 使用的API_KEY:{api_key}")
  46. analysis_data, demand_list = GoogleAI._analyze_content_with_api(video_url)
  47. # 检查API分析是否成功
  48. if isinstance(analysis_data, str) and "[异常]" in analysis_data:
  49. logger.error(f"[处理 - trigger] API分析失败: {analysis_data}")
  50. content_video_trigger_data(json.dumps(task))
  51. return
  52. if not analysis_data or not demand_list:
  53. logger.error(f"[处理 - trigger] API分析结果为空")
  54. content_video_trigger_data(json.dumps(task))
  55. return
  56. # Parse JSON data for demand list
  57. try:
  58. data = json.loads(orjson.dumps(demand_list).decode())
  59. if not data:
  60. logger.error(f"[处理 - trigger] 需求列表为空")
  61. content_video_trigger_data(json.dumps(task))
  62. return
  63. # Generate SQL insert statement for triggers analysis
  64. sql = """
  65. INSERT INTO video_triggers_analysis (
  66. video_id, video_link, video_title,
  67. trigger_order, trigger_score, trigger_reason, trigger_text, trigger_question, trigger_category, trigger_time,
  68. audit_status,audit_desc
  69. ) VALUES
  70. """
  71. # Add values for each entry
  72. values = []
  73. link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
  74. for index, entry in enumerate(data, 1): # Start from 1 to match the original numbering
  75. try:
  76. hook_desc = entry.get('钩子文案', '')
  77. result, msg = Security.security(hook_desc)
  78. audit_status = 0
  79. if result:
  80. audit_status = 1
  81. else:
  82. audit_status = 2
  83. audit_desc = msg
  84. value = f"""(
  85. {video_id}, '{link}', '{video_title.replace("'", "''")}',
  86. '{index}', '{entry.get('评分', '').replace("'", "''")}', '{entry.get('推测出该点需求的原因', '').replace("'", "''")}',
  87. '{entry.get('钩子文案', '').replace("'", "''")}', '{entry.get('钩子到AI大模型的问题', '').replace("'", "''")}',
  88. '{entry.get('钩子类型', '').replace("'", "''")}', '{entry.get('钩子出现时间', '').replace("'", "''")}',
  89. '{audit_status}','{audit_desc.replace("'", "''")}'
  90. )"""
  91. values.append(value)
  92. except Exception as e:
  93. logger.error(f"[处理 - trigger] 处理需求条目时出错: {e}")
  94. continue
  95. if not values:
  96. logger.error(f"[处理 - trigger] 没有有效的需求条目")
  97. content_video_trigger_data(json.dumps(task))
  98. return
  99. # Combine SQL statement and values
  100. sql += ",\n".join(values) + ";"
  101. # Print SQL statement
  102. logger.info(f"{sql}")
  103. MysqlHelper.update_values(sql)
  104. logger.info(f"[处理 - trigger] 需求列表写入数据库成功")
  105. # Parse JSON data for analysis summary
  106. try:
  107. # Generate SQL insert statement for demand score
  108. sql = """
  109. INSERT INTO video_demand_score (
  110. video_id, video_link, video_title, analysis_summary, analysis_timeline
  111. ) VALUES
  112. """
  113. # Add values for each entry
  114. values = []
  115. link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
  116. # 处理视频选题与要点理解
  117. analysis_summary = json.dumps(analysis_data.get('视频选题与要点理解', {}), ensure_ascii=False)
  118. analysis_timeline = json.dumps(analysis_data.get('视频分段与时间点分析', {}), ensure_ascii=False)
  119. # 使用参数化查询来避免 SQL 注入和转义问题
  120. value = f"""(
  121. {video_id},
  122. '{link}',
  123. '{video_title.replace("'", "''")}',
  124. '{analysis_summary.replace("'", "''")}',
  125. '{analysis_timeline.replace("'", "''")}'
  126. )"""
  127. values.append(value)
  128. # Combine SQL statement and values
  129. sql += ",\n".join(values) + ";"
  130. # Print SQL statement
  131. logger.info(f"{sql}")
  132. MysqlHelper.update_values(sql)
  133. logger.info(f"[处理 - trigger] 视频分析结果写入数据库成功")
  134. except Exception as e:
  135. logger.error(f"[处理 - trigger] 写入视频分析结果时出错: {e}")
  136. content_video_trigger_data(json.dumps(task))
  137. return
  138. except Exception as e:
  139. logger.error(f"[处理 - trigger] 处理需求列表时出错: {e}")
  140. content_video_trigger_data(json.dumps(task))
  141. return
  142. async def run():
  143. scheduler = AsyncIOScheduler()
  144. try:
  145. logger.info(f"[处理 - trigger] 开始启动")
  146. scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=10)) # 每5分钟启动一次
  147. scheduler.start()
  148. await asyncio.Event().wait()
  149. except KeyboardInterrupt:
  150. pass
  151. except Exception as e:
  152. logger.error(f"[处理] 启动异常,异常信息:{e}")
  153. pass
  154. finally:
  155. scheduler.shutdown()
  156. if __name__ == '__main__':
  157. # asyncio.run(ConsumptionRecommend.run())
  158. loop = asyncio.get_event_loop()
  159. loop.run_until_complete(run())