Jelajahi Sumber

feat: 新增trigger脚本

jihuaqiang 1 bulan lalu
induk
melakukan
f503e8f37f

+ 6 - 1
Dockerfile

@@ -9,6 +9,11 @@ ENV TZ=Asia/Shanghai
 RUN apt update && apt --no-install-recommends install -y curl jq \
     && apt-get clean && rm -rf /var/lib/apt/lists/* \
     && pip install -r requirements.txt --no-cache-dir \
-    && mkdir -p /app/cache
+    && mkdir -p /app/cache \
+    && apt-get update && apt-get install -y supervisor && \
+    mkdir -p /var/log/supervisor
+
+# 复制 supervisor 配置文件
+COPY supervisor.conf /app/supervisor.conf
 
 #ENTRYPOINT ["python", "/app/job.py"]

+ 7 - 2
start.sh

@@ -1,8 +1,13 @@
-#!/bin/sh
+#!/bin/bash
 
+# 设置环境变量
 export CONTAINER_INFO="$(curl -s --unix-socket /var/run/docker.sock http://docker/containers/$HOSTNAME/json)"
 export CONTAINER_INDEX="$(echo "$CONTAINER_INFO" | jq '.Name' | sed 's/^"\(.*\)"$/\1/' | awk -F'-' '{print $NF}')"
 echo "export VIDEO_INSIGHT_GEMINI_API_KEY=$(eval echo \$"VIDEO_INSIGHT_GEMINI_API_KEY_${CONTAINER_INDEX}")" >> /root/.bashrc
 . /root/.bashrc
 
-python /app/workers/video_insight_consumption_work.py
+# 确保日志目录存在
+mkdir -p /app/logs
+
+# 启动 supervisor
+/usr/bin/supervisord -c /app/supervisor.conf

+ 39 - 0
supervisor.conf

@@ -0,0 +1,39 @@
+[supervisord]
+nodaemon=true
+logfile=/app/logs/supervisord.log
+logfile_maxbytes=50MB
+logfile_backups=10
+loglevel=info
+pidfile=/var/run/supervisord.pid
+
+[program:video_insight_consumption]
+command=python /app/workers/video_insight_consumption_work.py
+directory=/app
+user=root
+autostart=true
+autorestart=true
+startsecs=5
+startretries=3
+stderr_logfile=/app/logs/consumption.err.log
+stdout_logfile=/app/logs/consumption.out.log
+stdout_logfile_maxbytes=50MB
+stdout_logfile_backups=10
+stderr_logfile_maxbytes=50MB
+stderr_logfile_backups=10
+environment=PYTHONUNBUFFERED=1
+
+[program:video_insight_trigger]
+command=python /app/workers/video_insight_trigger_work.py
+directory=/app
+user=root
+autostart=true
+autorestart=true
+startsecs=5
+startretries=3
+stderr_logfile=/app/logs/trigger.err.log
+stdout_logfile=/app/logs/trigger.out.log
+stdout_logfile_maxbytes=50MB
+stdout_logfile_backups=10
+stderr_logfile_maxbytes=50MB
+stderr_logfile_backups=10
+environment=PYTHONUNBUFFERED=1 

+ 2 - 1
utils/google_ai_analyze.py

@@ -90,7 +90,8 @@ class GoogleAI(object):
             logger.error(f'[内容分析] 创建缓存目录失败')
             return None
       
-    def _analyze_content(self, video, prompt):
+    @classmethod
+    def _analyze_content(cls, video, prompt):
         """增强版内容分析"""
         model = genai.GenerativeModel(
             model_name='gemini-2.0-flash',

+ 7 - 0
utils/redis.py

@@ -34,4 +34,11 @@ def content_video_data(ret):
     task = f"task:video_insight"
     helper = RedisHelper()
     client = helper.get_client()
+    client.rpush(task, ret)
+
+def content_video_trigger_data(ret):
+    """分析失败视频重新写入redis"""
+    task = f"task:video_trigger_insight"
+    helper = RedisHelper()
+    client = helper.get_client()
     client.rpush(task, ret)

+ 12 - 12
workers/video_insight_consumption_work.py

@@ -11,7 +11,7 @@ logger.add("/app/logs/consumption.log", rotation="10 MB")
 sys.path.append('/app')
 
 from utils.aliyun_log import AliyunLogger
-from utils.google_ai_analyze import GoogleAI
+from utils.google_ai_studio import GoogleAI
 from utils.piaoquan import PQ
 from utils.redis import RedisHelper, content_video_data
 from utils.mysql_db import MysqlHelper
@@ -56,16 +56,16 @@ class ConsumptionRecommend(object):
         api_key = os.getenv("VIDEO_INSIGHT_GEMINI_API_KEY")
         # api_key = 'AIzaSyBFLCKMLX-Pf1iXoC2e_rMDLbNhNG23vTk'
         logger.info(f"[处理] 使用的API_KEY:{api_key}")
-        analysis_data, demand_list = GoogleAI.run(api_key, video_url)
-        if "[异常]" in analysis_data:
+        text,text1 = GoogleAI.run(api_key, video_url)
+        if "[异常]" in text:
             content_video_data(json.dumps(task))
         # Parse JSON data
-        data = json.loads(orjson.dumps(demand_list).decode())
+        data = json.loads(orjson.dumps(text).decode())
         # Generate SQL insert statement
         sql = """
         INSERT INTO video_demand_analysis (
             video_id, video_link, video_title, content_type,
-            demand_order, demand_score, user_demand, demand_category, demand_show_time,
+            demand_order, demand_score, user_demand, demand_category,
             demand_reason, product_hook, hook_time, hook_desc,
             hook_type, landing_desc, landing_type, platform_case, reasoning_process,audit_status,audit_desc
         ) VALUES
@@ -73,7 +73,7 @@ class ConsumptionRecommend(object):
         # Add values for each entry
         values = []
         link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
-        for index, entry in enumerate(data, 1):  # Start from 1 to match the original numbering
+        for entry in data:
 
             hook_desc = entry.get('需求钩子话术', '')
             result,msg = Security.security(hook_desc)
@@ -86,7 +86,7 @@ class ConsumptionRecommend(object):
 
             value = f"""(
                 {video_id}, '{link}', '{video_title}', NULL,
-                '{index}', '{entry.get('需求强烈程度分值', '')}', '{entry.get('用户具体的需求描述', '')}', '{entry.get('需求分类', '')}', '{entry.get('需求钩子出现时间', '')}',
+                '{entry.get('需求排序序号', '')}', '{entry.get('需求强烈程度分值', '')}', '{entry.get('用户具体的需求描述', '')}', '{entry.get('需求分类', '')}',
                 '{entry.get('推测出该点需求的原因', '')}', '{entry.get('需求详细query', '')}', '', '{entry.get('需求钩子话术', '')}',
                 '', '{entry.get('落地方案形态描述', '')}', '{entry.get('落地方案类型', '')}', '', '','{audit_status}','{audit_desc}'
             )"""
@@ -97,15 +97,15 @@ class ConsumptionRecommend(object):
         logger.info(f"{sql}")
         MysqlHelper.update_values(sql)
 
-        logger.info(f"[处理] 需求列表写入数据库成功")
+        logger.info(f"[处理] text写入数据库成功")
 
 
         # Parse JSON data
-        data = json.loads(orjson.dumps(analysis_data).decode())
+        data = json.loads(orjson.dumps(text1).decode())
         # Generate SQL insert statement
         sql = """
         INSERT INTO video_demand_score (
-            video_id, video_link, video_title, analysis_summary, analysis_timeline
+            video_id, video_link, video_title, demand_score,reason
         ) VALUES
         """
         # Add values for each entry
@@ -113,7 +113,7 @@ class ConsumptionRecommend(object):
         link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
         entry = data
         value = f"""(
-            {video_id}, '{link}', '{video_title}', '{entry.get('视频选题与要点理解', )}', '{entry.get('视频分段与时间点分析', '')}'
+            {video_id}, '{link}', '{video_title}', '{entry.get('需求强烈程度', )}', '{entry.get('理由', '')}'
         )"""
         values.append(value)
         # Combine SQL statement and values
@@ -122,7 +122,7 @@ class ConsumptionRecommend(object):
         logger.info(f"{sql}")
         MysqlHelper.update_values(sql)
 
-        logger.info(f"[处理] 视频分析结果写入数据库成功")
+        logger.info(f"[处理] text1写入数据库成功")
 
 
 async def run():

+ 2 - 0
workers/video_insight_select_work.py

@@ -19,11 +19,13 @@ def requirement_insight():
         dt = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y%m%d')
         logger.info(f"视频需求点洞察")
         redis_task = "task:video_insight"
+        redis_trigger_task = "task:video_trigger_insight"
         sql =f'select clickobjectid as video_id from loghubods.user_share_log where dt = {dt} and topic = "click" group by clickobjectid order by count(distinct machinecode) desc limit 1000'
         data = OdpsDataCount.main(sql)
         if not data:
             return
         RedisHelper().get_client().rpush(redis_task, *data)
+        RedisHelper().get_client().rpush(redis_trigger_task, *data)
         logger.info(f"[R] 写入Redis 成功 共写入 {len(data)} 条")
     except Exception as e:
         logger.error(f"[R] 写入Redis写入失败,失败信息{e}")

+ 147 - 0
workers/video_insight_trigger_work.py

@@ -0,0 +1,147 @@
+import asyncio
+import json
+import os
+import sys
+import orjson
+from apscheduler.schedulers.asyncio import AsyncIOScheduler
+from apscheduler.triggers.interval import IntervalTrigger
+from loguru import logger
+logger.add("/app/logs/consumption.log", rotation="10 MB")
+
+sys.path.append('/app')
+
+from utils.aliyun_log import AliyunLogger
+from utils.google_ai_analyze import GoogleAI
+from utils.piaoquan import PQ
+from utils.redis import RedisHelper, content_video_trigger_data
+from utils.mysql_db import MysqlHelper
+from utils.aliyun_security import Security
+
+
+
+
+class ConsumptionRecommend(object):
+    @classmethod
+    async def run(cls):
+        logger.info(f"[处理] 开始获取redis数据")
+
+        while True:
+            task = RedisHelper().get_client().rpop(name='task:video_trigger_insight')
+
+            if not task:
+                logger.info('[处理] 无待执行的扫描任务')
+                return
+
+            task = orjson.loads(task)
+            logger.info(f"[处理] 获取redis数据{task}")
+            video_id = task['video_id']
+
+            count_sql = f"""select count(1) from video_triggers_analysis where video_id = {video_id}"""
+            count = MysqlHelper.get_values(count_sql)
+
+            if not count or count[0][0] == 0:
+                logger.info(f"[处理] 视频ID {video_id} 可用")
+                # 这里可以继续处理 video_id
+                break
+            else:
+                logger.info(f"[处理] 视频ID {video_id} 重复过滤,继续获取下一个任务")
+
+        logger.info(f"[处理] 开始获取原视频OSS地址")
+        video_title, video_path = PQ.get_pq_oss(video_id)
+        if not video_path:
+            return
+        logger.info(f"[处理] 获取原视频OSS地址,视频链接:{video_path}")
+        video_url = f"http://rescdn.yishihui.com/{video_path}"
+        logger.info(f"[处理] 开始分析视频")
+        api_key = os.getenv("VIDEO_INSIGHT_GEMINI_API_KEY")
+        # api_key = 'AIzaSyBFLCKMLX-Pf1iXoC2e_rMDLbNhNG23vTk'
+        logger.info(f"[处理] 使用的API_KEY:{api_key}")
+        analysis_data, demand_list = GoogleAI.run(api_key, video_url)
+        if "[异常]" in analysis_data:
+            content_video_trigger_data(json.dumps(task))
+        # Parse JSON data
+        data = json.loads(orjson.dumps(demand_list).decode())
+        # Generate SQL insert statement
+        sql = """
+        INSERT INTO video_triggers_analysis (
+            video_id, video_link, video_title, content_type,
+            demand_order, demand_score, user_demand, demand_category, demand_type, demand_show_time,
+            demand_reason, product_hook, hook_time, hook_desc,
+            hook_type, landing_desc, landing_type, platform_case, reasoning_process,audit_status,audit_desc
+        ) VALUES
+        """
+        # Add values for each entry
+        values = []
+        link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
+        for index, entry in enumerate(data, 1):  # Start from 1 to match the original numbering
+
+            hook_desc = entry.get('需求钩子话术', '')
+            result,msg = Security.security(hook_desc)
+            audit_status = 0
+            if result :
+                audit_status = 1
+            else :
+                audit_status = 2
+            audit_desc = msg
+
+            value = f"""(
+                {video_id}, '{link}', '{video_title}', NULL,
+                '{index}', '{entry.get('需求强烈程度分值', '')}', '{entry.get('用户具体的需求描述', '')}', '{entry.get('需求分类', '')}', '{entry.get('需求类型', '')}', '{entry.get('需求钩子出现时间', '')}',
+                '{entry.get('推测出该点需求的原因', '')}', '{entry.get('需求详细query', '')}', '', '{entry.get('需求钩子话术', '')}',
+                '', '{entry.get('落地方案形态描述', '')}', '{entry.get('落地方案类型', '')}', '', '','{audit_status}','{audit_desc}'
+            )"""
+            values.append(value)
+        # Combine SQL statement and values
+        sql += ",\n".join(values) + ";"
+        # Print SQL statement
+        logger.info(f"{sql}")
+        MysqlHelper.update_values(sql)
+
+        logger.info(f"[处理] 需求列表写入数据库成功")
+
+
+        # Parse JSON data
+        data = json.loads(orjson.dumps(analysis_data).decode())
+        # Generate SQL insert statement
+        sql = """
+        INSERT INTO video_demand_score (
+            video_id, video_link, video_title, analysis_summary, analysis_timeline
+        ) VALUES
+        """
+        # Add values for each entry
+        values = []
+        link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
+        entry = data
+        value = f"""(
+            {video_id}, '{link}', '{video_title}', {MysqlHelper.escape_string(entry.get('视频选题与要点理解', ''))}, '{entry.get('视频分段与时间点分析', '')}'
+        )"""
+        values.append(value)
+        # Combine SQL statement and values
+        sql += ",\n".join(values) + ";"
+        # Print SQL statement
+        logger.info(f"{sql}")
+        MysqlHelper.update_values(sql)
+
+        logger.info(f"[处理] 视频分析结果写入数据库成功")
+
+
+async def run():
+    scheduler = AsyncIOScheduler()
+    try:
+        logger.info(f"[处理] 开始启动")
+        scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=2))  # 每2分钟启动一次
+        scheduler.start()
+        await asyncio.Event().wait()
+    except KeyboardInterrupt:
+        pass
+    except Exception as e:
+        logger.error(f"[处理] 启动异常,异常信息:{e}")
+        pass
+    finally:
+        scheduler.shutdown()
+
+
+if __name__ == '__main__':
+    # asyncio.run(ConsumptionRecommend.run())
+    loop = asyncio.get_event_loop()
+    loop.run_until_complete(run())