video_insight_trigger_work.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. import asyncio
  2. import json
  3. import os
  4. import sys
  5. import orjson
  6. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  7. from apscheduler.triggers.interval import IntervalTrigger
  8. from loguru import logger
  9. logger.add("/app/logs/consumption.log", rotation="10 MB")
  10. sys.path.append('/app')
  11. from utils.aliyun_log import AliyunLogger
  12. from utils.google_ai_analyze import GoogleAI
  13. from utils.piaoquan import PQ
  14. from utils.redis import RedisHelper, content_video_trigger_data
  15. from utils.mysql_db import MysqlHelper
  16. from utils.aliyun_security import Security
  17. class ConsumptionRecommend(object):
  18. @classmethod
  19. async def run(cls):
  20. logger.info(f"[处理 - trigger] 开始获取redis数据")
  21. while True:
  22. task = RedisHelper().get_client().rpop(name='task:video_trigger_insight')
  23. if not task:
  24. logger.info('[处理] 无待执行的扫描任务')
  25. return
  26. task = orjson.loads(task)
  27. logger.info(f"[处理 - trigger] 获取redis数据{task}")
  28. video_id = task['video_id']
  29. count_sql = f"""select count(1) from video_triggers_analysis where video_id = {video_id}"""
  30. count = MysqlHelper.get_values(count_sql)
  31. if not count or count[0][0] == 0:
  32. logger.info(f"[处理 - trigger] 视频ID {video_id} 可用")
  33. # 这里可以继续处理 video_id
  34. break
  35. else:
  36. logger.info(f"[处理 - trigger] 视频ID {video_id} 重复过滤,继续获取下一个任务")
  37. logger.info(f"[处理 - trigger] 开始获取原视频OSS地址")
  38. video_title, video_path, transed_video_path = PQ.get_pq_oss(video_id)
  39. # 优先使用transed_video_path,没有则使用video_path
  40. if not transed_video_path:
  41. transed_video_path = video_path
  42. if not video_path:
  43. return
  44. logger.info(f"[处理 - trigger] 获取原视频OSS地址,视频链接:{transed_video_path}")
  45. video_url = f"http://rescdn.yishihui.com/{transed_video_path}"
  46. logger.info(f"[处理 - trigger] 开始分析视频")
  47. api_key = os.getenv("VIDEO_INSIGHT_GEMINI_API_KEY")
  48. logger.info(f"[处理 - trigger] 使用的API_KEY:{api_key}")
  49. analysis_data, demand_list = GoogleAI.run(api_key, video_url)
  50. # 检查API分析是否成功
  51. if isinstance(analysis_data, str) and "[异常]" in analysis_data:
  52. logger.error(f"[处理 - trigger] API分析失败: {analysis_data}")
  53. content_video_trigger_data(json.dumps(task))
  54. return
  55. if not analysis_data or not demand_list:
  56. logger.error(f"[处理 - trigger] API分析结果为空")
  57. content_video_trigger_data(json.dumps(task))
  58. return
  59. # Parse JSON data for demand list
  60. try:
  61. data = json.loads(orjson.dumps(demand_list).decode())
  62. if not data:
  63. logger.error(f"[处理 - trigger] 需求列表为空")
  64. content_video_trigger_data(json.dumps(task))
  65. return
  66. # Generate SQL insert statement for triggers analysis
  67. sql = """
  68. INSERT INTO video_triggers_analysis (
  69. video_id, video_link, video_title,
  70. trigger_order, trigger_score, trigger_reason, trigger_text, trigger_question, trigger_category, trigger_time,
  71. audit_status,audit_desc
  72. ) VALUES
  73. """
  74. # Add values for each entry
  75. values = []
  76. link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
  77. for index, entry in enumerate(data, 1): # Start from 1 to match the original numbering
  78. try:
  79. hook_desc = entry.get('钩子文案', '')
  80. result, msg = Security.security(hook_desc)
  81. audit_status = 0
  82. if result:
  83. audit_status = 1
  84. else:
  85. audit_status = 2
  86. audit_desc = msg
  87. value = f"""(
  88. {video_id}, '{link}', '{video_title.replace("'", "''")}',
  89. '{index}', '{entry.get('评分', '').replace("'", "''")}', '{entry.get('推测出该点需求的原因', '').replace("'", "''")}',
  90. '{entry.get('钩子文案', '').replace("'", "''")}', '{entry.get('钩子到AI大模型的问题', '').replace("'", "''")}',
  91. '{entry.get('钩子类型', '').replace("'", "''")}', '{entry.get('钩子出现时间', '').replace("'", "''")}',
  92. '{audit_status}','{audit_desc.replace("'", "''")}'
  93. )"""
  94. values.append(value)
  95. except Exception as e:
  96. logger.error(f"[处理 - trigger] 处理需求条目时出错: {e}")
  97. continue
  98. if not values:
  99. logger.error(f"[处理 - trigger] 没有有效的需求条目")
  100. content_video_trigger_data(json.dumps(task))
  101. return
  102. # Combine SQL statement and values
  103. sql += ",\n".join(values) + ";"
  104. # Print SQL statement
  105. logger.info(f"{sql}")
  106. MysqlHelper.update_values(sql)
  107. logger.info(f"[处理 - trigger] 需求列表写入数据库成功")
  108. # Parse JSON data for analysis summary
  109. try:
  110. # Generate SQL insert statement for demand score
  111. sql = """
  112. INSERT INTO video_demand_score (
  113. video_id, video_link, video_title, analysis_summary, analysis_timeline
  114. ) VALUES
  115. """
  116. # Add values for each entry
  117. values = []
  118. link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
  119. # 处理视频选题与要点理解
  120. analysis_summary = json.dumps(analysis_data.get('视频选题与要点理解', {}), ensure_ascii=False)
  121. analysis_timeline = json.dumps(analysis_data.get('视频分段与时间点分析', {}), ensure_ascii=False)
  122. # 使用参数化查询来避免 SQL 注入和转义问题
  123. value = f"""(
  124. {video_id},
  125. '{link}',
  126. '{video_title.replace("'", "''")}',
  127. '{analysis_summary.replace("'", "''")}',
  128. '{analysis_timeline.replace("'", "''")}'
  129. )"""
  130. values.append(value)
  131. # Combine SQL statement and values
  132. sql += ",\n".join(values) + """
  133. ON DUPLICATE KEY UPDATE
  134. video_link = VALUES(video_link),
  135. video_title = VALUES(video_title),
  136. analysis_summary = VALUES(analysis_summary),
  137. analysis_timeline = VALUES(analysis_timeline)
  138. ;"""
  139. # Print SQL statement
  140. # logger.info(f"{sql}")
  141. MysqlHelper.update_values(sql)
  142. # logger.info(f"[处理 - trigger] 视频分析结果写入数据库成功")
  143. except Exception as e:
  144. logger.error(f"[处理 - trigger] 写入视频分析结果时出错: {e}")
  145. content_video_trigger_data(json.dumps(task))
  146. return
  147. except Exception as e:
  148. logger.error(f"[处理 - trigger] 处理需求列表时出错: {e}")
  149. content_video_trigger_data(json.dumps(task))
  150. return
  151. async def run():
  152. scheduler = AsyncIOScheduler()
  153. try:
  154. logger.info(f"[处理 - trigger] 开始启动")
  155. scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=5)) # 每5分钟启动一次
  156. scheduler.start()
  157. await asyncio.Event().wait()
  158. except KeyboardInterrupt:
  159. pass
  160. except Exception as e:
  161. logger.error(f"[处理] 启动异常,异常信息:{e}")
  162. pass
  163. finally:
  164. scheduler.shutdown()
  165. if __name__ == '__main__':
  166. # asyncio.run(ConsumptionRecommend.run())
  167. loop = asyncio.get_event_loop()
  168. loop.run_until_complete(run())