video_insight_trigger_work.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. import asyncio
  2. import json
  3. import os
  4. import sys
  5. import orjson
  6. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  7. from apscheduler.triggers.interval import IntervalTrigger
  8. from loguru import logger
  9. logger.add("/app/logs/consumption.log", rotation="10 MB")
  10. sys.path.append('/app')
  11. from utils.aliyun_log import AliyunLogger
  12. from utils.google_ai_analyze import GoogleAI
  13. from utils.piaoquan import PQ
  14. from utils.redis import RedisHelper, content_video_trigger_data
  15. from utils.mysql_db import MysqlHelper
  16. from utils.aliyun_security import Security
  17. class ConsumptionRecommend(object):
  18. @classmethod
  19. async def run(cls):
  20. logger.info(f"[处理 - trigger] 开始获取redis数据")
  21. while True:
  22. task = RedisHelper().get_client().rpop(name='task:video_trigger_insight')
  23. if not task:
  24. logger.info('[处理] 无待执行的扫描任务')
  25. return
  26. task = orjson.loads(task)
  27. logger.info(f"[处理 - trigger] 获取redis数据{task}")
  28. video_id = task['video_id']
  29. count_sql = f"""select count(1) from video_triggers_analysis where video_id = {video_id}"""
  30. count = MysqlHelper.get_values(count_sql)
  31. if not count or count[0][0] == 0:
  32. logger.info(f"[处理 - trigger] 视频ID {video_id} 可用")
  33. # 这里可以继续处理 video_id
  34. video_title, video_path, transed_video_path = PQ.get_pq_oss(video_id)
  35. # 优先使用transed_video_path,没有则使用video_path
  36. if not transed_video_path:
  37. transed_video_path = video_path
  38. if not video_path:
  39. return
  40. # 如果transed_video_path以m3u8结尾,则取消这次任务,读取下一个
  41. if transed_video_path.endswith('.m3u8'):
  42. logger.info(f"[处理 - trigger] 视频ID {video_id} 为m3u8格式,跳过")
  43. continue
  44. break
  45. else:
  46. logger.info(f"[处理 - trigger] 视频ID {video_id} 重复过滤,继续获取下一个任务")
  47. logger.info(f"[处理 - trigger] 开始获取原视频OSS地址,视频链接:{transed_video_path}")
  48. video_url = f"http://rescdn.yishihui.com/{transed_video_path}"
  49. logger.info(f"[处理 - trigger] 开始分析视频")
  50. api_key = os.getenv("VIDEO_INSIGHT_GEMINI_API_KEY")
  51. logger.info(f"[处理 - trigger] 使用的API_KEY:{api_key}")
  52. analysis_data, demand_list = GoogleAI.run(api_key, video_url)
  53. # 检查API分析是否成功
  54. if isinstance(analysis_data, str) and "[异常]" in analysis_data:
  55. logger.error(f"[处理 - trigger] API分析失败: {analysis_data}")
  56. content_video_trigger_data(json.dumps(task))
  57. return
  58. if not analysis_data or not demand_list:
  59. logger.error(f"[处理 - trigger] API分析结果为空")
  60. content_video_trigger_data(json.dumps(task))
  61. return
  62. # Parse JSON data for demand list
  63. try:
  64. data = json.loads(orjson.dumps(demand_list).decode())
  65. if not data:
  66. logger.error(f"[处理 - trigger] 需求列表为空")
  67. content_video_trigger_data(json.dumps(task))
  68. return
  69. # Generate SQL insert statement for triggers analysis
  70. sql = """
  71. INSERT INTO video_triggers_analysis (
  72. video_id, video_link, video_title,
  73. trigger_order, trigger_score, trigger_reason, trigger_text, trigger_question, trigger_category, trigger_time,
  74. audit_status,audit_desc
  75. ) VALUES
  76. """
  77. # Add values for each entry
  78. values = []
  79. link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
  80. for index, entry in enumerate(data, 1): # Start from 1 to match the original numbering
  81. try:
  82. hook_desc = entry.get('钩子文案', '')
  83. result, msg = Security.security(hook_desc)
  84. audit_status = 0
  85. if result:
  86. audit_status = 1
  87. else:
  88. audit_status = 2
  89. audit_desc = msg
  90. value = f"""(
  91. {video_id}, '{link}', '{video_title.replace("'", "''")}',
  92. '{index}', '{entry.get('评分', '').replace("'", "''")}', '{entry.get('推测出该点需求的原因', '').replace("'", "''")}',
  93. '{entry.get('钩子文案', '').replace("'", "''")}', '{entry.get('钩子到AI大模型的问题', '').replace("'", "''")}',
  94. '{entry.get('钩子类型', '').replace("'", "''")}', '{entry.get('钩子出现时间', '').replace("'", "''")}',
  95. '{audit_status}','{audit_desc.replace("'", "''")}'
  96. )"""
  97. values.append(value)
  98. except Exception as e:
  99. logger.error(f"[处理 - trigger] 处理需求条目时出错: {e}")
  100. continue
  101. if not values:
  102. logger.error(f"[处理 - trigger] 没有有效的需求条目")
  103. content_video_trigger_data(json.dumps(task))
  104. return
  105. # Combine SQL statement and values
  106. sql += ",\n".join(values) + ";"
  107. # Print SQL statement
  108. logger.info(f"{sql}")
  109. MysqlHelper.update_values(sql)
  110. logger.info(f"[处理 - trigger] 需求列表写入数据库成功")
  111. # Parse JSON data for analysis summary
  112. try:
  113. # Generate SQL insert statement for demand score
  114. sql = """
  115. INSERT INTO video_demand_score (
  116. video_id, video_link, video_title, analysis_summary, analysis_timeline
  117. ) VALUES
  118. """
  119. # Add values for each entry
  120. values = []
  121. link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
  122. # 处理视频选题与要点理解
  123. analysis_summary = json.dumps(analysis_data.get('视频选题与要点理解', {}), ensure_ascii=False)
  124. analysis_timeline = json.dumps(analysis_data.get('视频分段与时间点分析', {}), ensure_ascii=False)
  125. # 使用参数化查询来避免 SQL 注入和转义问题
  126. value = f"""(
  127. {video_id},
  128. '{link}',
  129. '{video_title.replace("'", "''")}',
  130. '{analysis_summary.replace("'", "''")}',
  131. '{analysis_timeline.replace("'", "''")}'
  132. )"""
  133. values.append(value)
  134. # Combine SQL statement and values
  135. sql += ",\n".join(values) + """
  136. ON DUPLICATE KEY UPDATE
  137. video_link = VALUES(video_link),
  138. video_title = VALUES(video_title),
  139. analysis_summary = VALUES(analysis_summary),
  140. analysis_timeline = VALUES(analysis_timeline)
  141. ;"""
  142. # Print SQL statement
  143. # logger.info(f"{sql}")
  144. MysqlHelper.update_values(sql)
  145. # logger.info(f"[处理 - trigger] 视频分析结果写入数据库成功")
  146. except Exception as e:
  147. logger.error(f"[处理 - trigger] 写入视频分析结果时出错: {e}")
  148. content_video_trigger_data(json.dumps(task))
  149. return
  150. except Exception as e:
  151. logger.error(f"[处理 - trigger] 处理需求列表时出错: {e}")
  152. content_video_trigger_data(json.dumps(task))
  153. return
  154. async def run():
  155. scheduler = AsyncIOScheduler()
  156. try:
  157. logger.info(f"[处理 - trigger] 开始启动")
  158. scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=4)) # 每4分钟启动一次
  159. scheduler.start()
  160. await asyncio.Event().wait()
  161. except KeyboardInterrupt:
  162. pass
  163. except Exception as e:
  164. logger.error(f"[处理] 启动异常,异常信息:{e}")
  165. pass
  166. finally:
  167. scheduler.shutdown()
  168. if __name__ == '__main__':
  169. # asyncio.run(ConsumptionRecommend.run())
  170. loop = asyncio.get_event_loop()
  171. loop.run_until_complete(run())