video_insight_consumption_work.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. import asyncio
  2. import json
  3. import os
  4. import sys
  5. import orjson
  6. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  7. from apscheduler.triggers.interval import IntervalTrigger
  8. from loguru import logger
  9. logger.add("/app/logs/consumption.log", rotation="10 MB")
  10. sys.path.append('/app')
  11. from utils.aliyun_log import AliyunLogger
  12. from utils.google_ai_studio import GoogleAI
  13. from utils.piaoquan import PQ
  14. from utils.redis import RedisHelper, content_video_data
  15. from utils.mysql_db import MysqlHelper
  16. from utils.aliyun_security import Security
  17. class ConsumptionRecommend(object):
  18. @classmethod
  19. async def run(cls):
  20. logger.info(f"[处理] 开始获取redis数据")
  21. while True:
  22. task = RedisHelper().get_client().rpop(name='task:video_insight')
  23. if not task:
  24. logger.info('[处理] 无待执行的扫描任务')
  25. return
  26. task = orjson.loads(task)
  27. logger.info(f"[处理] 获取redis数据{task}")
  28. video_id = task['video_id']
  29. count_sql = f"""select count(1) from video_demand_analysis where video_id = {video_id}"""
  30. count = MysqlHelper.get_values(count_sql)
  31. if not count or count[0][0] == 0:
  32. logger.info(f"[处理] 视频ID {video_id} 可用")
  33. # 这里可以继续处理 video_id
  34. break
  35. else:
  36. logger.info(f"[处理] 视频ID {video_id} 重复过滤,继续获取下一个任务")
  37. logger.info(f"[处理] 开始获取原视频OSS地址")
  38. video_title, video_path = PQ.get_pq_oss(video_id)
  39. if not video_path:
  40. return
  41. logger.info(f"[处理] 获取原视频OSS地址,视频链接:{video_path}")
  42. video_url = f"http://rescdn.yishihui.com/{video_path}"
  43. logger.info(f"[处理] 开始分析视频")
  44. api_key = os.getenv("VIDEO_INSIGHT_GEMINI_API_KEY")
  45. # api_key = 'AIzaSyBFLCKMLX-Pf1iXoC2e_rMDLbNhNG23vTk'
  46. logger.info(f"[处理] 使用的API_KEY:{api_key}")
  47. text,text1 = GoogleAI.run(api_key, video_url)
  48. if "[异常]" in text:
  49. content_video_data(json.dumps(task))
  50. # Parse JSON data
  51. data = json.loads(orjson.dumps(text).decode())
  52. # Generate SQL insert statement
  53. sql = """
  54. INSERT INTO video_demand_analysis (
  55. video_id, video_link, video_title, content_type,
  56. demand_order, demand_score, user_demand, demand_category,
  57. demand_reason, product_hook, hook_time, hook_desc,
  58. hook_type, landing_desc, landing_type, platform_case, reasoning_process,audit_status,audit_desc
  59. ) VALUES
  60. """
  61. # Add values for each entry
  62. values = []
  63. link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
  64. for entry in data:
  65. hook_desc = entry.get('需求钩子话术', '')
  66. result,msg = Security.security(hook_desc)
  67. audit_status = 0
  68. if result :
  69. audit_status = 1
  70. else :
  71. audit_status = 2
  72. audit_desc = msg
  73. value = f"""(
  74. {video_id}, '{link}', '{video_title}', NULL,
  75. '{entry.get('需求排序序号', '')}', '{entry.get('需求强烈程度分值', '')}', '{entry.get('用户具体的需求描述', '')}', '{entry.get('需求分类', '')}',
  76. '{entry.get('推测出该点需求的原因', '')}', '{entry.get('需求详细query', '')}', '', '{entry.get('需求钩子话术', '')}',
  77. '', '{entry.get('落地方案形态描述', '')}', '{entry.get('落地方案类型', '')}', '', '','{audit_status}','{audit_desc}'
  78. )"""
  79. values.append(value)
  80. # Combine SQL statement and values
  81. sql += ",\n".join(values) + ";"
  82. # Print SQL statement
  83. logger.info(f"{sql}")
  84. MysqlHelper.update_values(sql)
  85. logger.info(f"[处理] text写入数据库成功")
  86. # Parse JSON data
  87. data = json.loads(orjson.dumps(text1).decode())
  88. # Generate SQL insert statement
  89. sql = """
  90. INSERT INTO video_demand_score (
  91. video_id, video_link, video_title, demand_score,reason
  92. ) VALUES
  93. """
  94. # Add values for each entry
  95. values = []
  96. link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
  97. entry = data
  98. value = f"""(
  99. {video_id}, '{link}', '{video_title}', '{entry.get('需求强烈程度', )}', '{entry.get('理由', '')}'
  100. )"""
  101. values.append(value)
  102. # Combine SQL statement and values
  103. sql += ",\n".join(values) + ";"
  104. # Print SQL statement
  105. logger.info(f"{sql}")
  106. MysqlHelper.update_values(sql)
  107. logger.info(f"[处理] text1写入数据库成功")
  108. async def run():
  109. scheduler = AsyncIOScheduler()
  110. try:
  111. logger.info(f"[处理] 开始启动")
  112. scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=2)) # 每2分钟启动一次
  113. scheduler.start()
  114. await asyncio.Event().wait()
  115. except KeyboardInterrupt:
  116. pass
  117. except Exception as e:
  118. logger.error(f"[处理] 启动异常,异常信息:{e}")
  119. pass
  120. finally:
  121. scheduler.shutdown()
  122. if __name__ == '__main__':
  123. # asyncio.run(ConsumptionRecommend.run())
  124. loop = asyncio.get_event_loop()
  125. loop.run_until_complete(run())