Parcourir la source

Merge branch 'feature_Trigger2' of Server/video-insight into master

jihuaqiang il y a 1 mois
Parent
commit
60e430eb74

+ 6 - 1
Dockerfile

@@ -9,6 +9,11 @@ ENV TZ=Asia/Shanghai
 RUN apt update && apt --no-install-recommends install -y curl jq \
     && apt-get clean && rm -rf /var/lib/apt/lists/* \
     && pip install -r requirements.txt --no-cache-dir \
-    && mkdir -p /app/cache
+    && mkdir -p /app/cache \
+    && apt-get update && apt-get install -y supervisor && \
+    mkdir -p /var/log/supervisor
+
+# 复制 supervisor 配置文件
+COPY supervisor.conf /app/supervisor.conf
 
 #ENTRYPOINT ["python", "/app/job.py"]

+ 7 - 2
start.sh

@@ -1,8 +1,13 @@
-#!/bin/sh
+#!/bin/bash
 
+# 设置环境变量
 export CONTAINER_INFO="$(curl -s --unix-socket /var/run/docker.sock http://docker/containers/$HOSTNAME/json)"
 export CONTAINER_INDEX="$(echo "$CONTAINER_INFO" | jq '.Name' | sed 's/^"\(.*\)"$/\1/' | awk -F'-' '{print $NF}')"
 echo "export VIDEO_INSIGHT_GEMINI_API_KEY=$(eval echo \$"VIDEO_INSIGHT_GEMINI_API_KEY_${CONTAINER_INDEX}")" >> /root/.bashrc
 . /root/.bashrc
 
-python /app/workers/video_insight_consumption_work.py
+# 确保日志目录存在
+mkdir -p /app/logs
+
+# 启动 supervisor
+/usr/bin/supervisord -c /app/supervisor.conf

+ 39 - 0
supervisor.conf

@@ -0,0 +1,39 @@
+[supervisord]
+nodaemon=true
+logfile=/app/logs/supervisord.log
+logfile_maxbytes=50MB
+logfile_backups=10
+loglevel=info
+pidfile=/var/run/supervisord.pid
+
+[program:video_insight_consumption]
+command=python /app/workers/video_insight_consumption_work.py
+directory=/app
+user=root
+autostart=true
+autorestart=true
+startsecs=5
+startretries=3
+stderr_logfile=/app/logs/consumption.err.log
+stdout_logfile=/app/logs/consumption.out.log
+stdout_logfile_maxbytes=50MB
+stdout_logfile_backups=10
+stderr_logfile_maxbytes=50MB
+stderr_logfile_backups=10
+environment=PYTHONUNBUFFERED=1
+
+[program:video_insight_trigger]
+command=python /app/workers/video_insight_trigger_work.py
+directory=/app
+user=root
+autostart=true
+autorestart=true
+startsecs=5
+startretries=3
+stderr_logfile=/app/logs/trigger.err.log
+stdout_logfile=/app/logs/trigger.out.log
+stdout_logfile_maxbytes=50MB
+stdout_logfile_backups=10
+stderr_logfile_maxbytes=50MB
+stderr_logfile_backups=10
+environment=PYTHONUNBUFFERED=1 

+ 133 - 0
utils/coze_hook.py

@@ -0,0 +1,133 @@
+import requests
+import json
+
+class CozeHook(object):
+    def __init__(self):
+        self.url = "https://api.coze.cn/v1/workflow/run"
+        self.headers = {
+            "Content-Type": "application/json",
+            "Authorization": "Bearer pat_ddPm5K5tCKXU2zH1NChGHoOKGOSECyzxmXPEcrtJg52UcCIndRAfiWRRxCH95pdB"
+        }
+        self.hook_id = "7507245138873450535"
+    def call_coze_api(self, summary, timeline):
+        url = self.url
+        headers = self.headers
+        payload = {
+            "workflow_id": self.hook_id,
+            "parameters": {
+                "summary": summary,
+                "timeline": timeline
+            }
+        }
+        response = requests.post(url, json=payload, headers=headers, timeout=600)
+        response.raise_for_status()
+        return response.json()
+
+    def extract_fields_from_response(self, resp):
+        import re
+        
+        # Define patterns at the function level
+        JSON_PATTERNS = [
+            r"```json\\n(.*?)```",  # 转义的换行
+            r"```json\n(.*?)```",   # 普通换行
+            r"```(.*?)```",         # 无语言标记
+            r"\{.*\}"               # 直接JSON对象
+        ]
+        
+        def try_unescape_json_string(s):
+            # 递归反序列化所有层级的转义JSON字符串
+            for _ in range(3):  # 最多尝试3层
+                if isinstance(s, str):
+                    try:
+                        s2 = json.loads(s)
+                        # 如果反序列化后类型有变化,继续递归
+                        if type(s2) != str:
+                            s = s2
+                        else:
+                            break
+                    except Exception as e:
+                        print(f"JSON反序列化失败: {str(e)}")
+                        break
+                else:
+                    break
+            return s
+
+        def extract_json_from_string(s):
+            """Helper function to extract and parse JSON from a string"""
+            if not isinstance(s, str):
+                return s
+                
+            # First try direct JSON parsing
+            try:
+                return json.loads(s)
+            except json.JSONDecodeError:
+                pass
+                
+            # Then try each pattern
+            for pattern in JSON_PATTERNS:
+                json_str = re.search(pattern, s, re.DOTALL)
+                if json_str:
+                    try:
+                        content = json_str.group(1)
+                        return json.loads(content)
+                    except Exception as e:
+                        print(f"使用模式 {pattern} 解析失败: {str(e)}")
+                        continue
+            return s
+
+        try:
+            data = resp.get("data")
+            if not data:
+                print("响应中没有data字段")
+                return ("", "", "")
+                
+            
+            # First parse the outer JSON structure
+            try:
+                data = json.loads(data)
+            except json.JSONDecodeError as e:
+                print(f"解析外层data失败: {str(e)}")
+                return ("", "", "")
+                
+            # Then handle the output field
+            output = data.get("output")
+            if not output:
+                print("data中没有output字段")
+                return ("", "", "")
+                
+            print(f"\n原始output字段: {output}")
+            output = extract_json_from_string(output)
+            
+            if isinstance(output, str):
+                print("output解析后仍为字符串")
+                return ("", "", "")
+                
+
+            if isinstance(output, dict):
+                # 按优先级检查不同的字段名
+                if "需求列表" in output:
+                    demand_list = output["需求列表"]
+                elif "questions" in output:
+                    demand_list = output["questions"]
+                elif "interactive_questions" in output:
+                    demand_list = output["interactive_questions"]
+                else:
+                    print("output中没有找到需求列表、questions或interactive_questions字段")
+                    return []
+            else:
+                demand_list = output
+
+            if not demand_list or not isinstance(demand_list, list):
+                print(f"需求列表无效: {demand_list}")
+
+            return demand_list
+
+        except Exception as e:
+            print(f"解析返回数据出错: {str(e)}")
+            print(f"原始响应: {json.dumps(resp, ensure_ascii=False, indent=2)}")
+        return []
+    
+    def run(self, summary, timeline):
+        resp = self.call_coze_api(summary, timeline)
+        list = self.extract_fields_from_response(resp)
+        return list

+ 182 - 0
utils/google_ai_analyze.py

@@ -0,0 +1,182 @@
+import os
+import time
+import uuid
+from typing import  Optional
+
+import google.generativeai as genai
+import orjson
+import requests
+from google.generativeai.types import (HarmBlockThreshold, HarmCategory)
+from loguru import logger
+from utils.coze_hook import CozeHook
+
+from utils.google_ai_prompt import VIDEO_TOPIC_ANALYSIS_PROMPT, VIDEO_SEGMENT_ANALYSIS_PROMPT
+# from utils.feishu_data import Material
+
+CACHE_DIR = os.path.join(os.getcwd(), 'video_cache')
+
+# CACHE_DIR = '/Users/z/Downloads/'
+# PROXY_ADDR = 'http://localhost:1081'
+# os.environ['http_proxy'] = PROXY_ADDR
+# os.environ['https_proxy'] = PROXY_ADDR
+def load_prompts():
+    """从prompt.py加载Prompt"""
+    try:
+        print("\n[初始化] 从prompt.py加载Prompt")
+        
+        prompts = [
+            {
+                "name": "视频选题与要点理解",
+                "content": VIDEO_TOPIC_ANALYSIS_PROMPT
+            },
+            {
+                "name": "视频分段与时间点分析",
+                "content": VIDEO_SEGMENT_ANALYSIS_PROMPT
+            }
+        ]
+            
+        print(f"[成功] 加载 {len(prompts)} 个Prompt")
+        return prompts
+        
+    except Exception as e:
+        raise Exception(f"加载Prompt失败: {str(e)}")
+    
+class GoogleAI(object):
+
+    @classmethod
+    def download_video(cls, video_link: str) -> Optional[str]:
+        file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp4')
+        try:
+            # 确保缓存目录存在
+            try:
+                os.makedirs(CACHE_DIR, exist_ok=True)
+            except Exception as e:
+                error_info = {
+                    "error_type": type(e).__name__,
+                    "error_message": str(e),
+                    "cache_dir": CACHE_DIR,
+                    "current_dir": os.getcwd(),
+                    "dir_exists": os.path.exists(CACHE_DIR),
+                    "dir_permissions": oct(os.stat(os.path.dirname(CACHE_DIR)).st_mode)[-3:] if os.path.exists(os.path.dirname(CACHE_DIR)) else "N/A"
+                }
+                error_json = orjson.dumps(error_info, option=orjson.OPT_INDENT_2).decode('utf-8')
+                logger.error(f'[内容分析] 创建缓存目录失败: {error_json}')
+                return None
+            
+            for _ in range(3):
+                try:
+                    response = requests.get(url=video_link, timeout=60)
+                    print(f"response content: {file_path}")
+                    if response.status_code == 200:
+                        try:
+                            with open(file_path, 'wb') as f:
+                                f.write(response.content)
+                            logger.info(f'[内容分析] 视频链接: {video_link}, 存储地址: {file_path}')
+                        except Exception as e:
+                            error_info = {
+                                "error_type": type(e).__name__,
+                                "error_message": str(e),
+                                "file_path": file_path,
+                                "content_length": len(response.content) if response.content else 0
+                            }
+                            error_json = orjson.dumps(error_info, option=orjson.OPT_INDENT_2).decode('utf-8')
+                            logger.error(f'[内容分析] 视频保存失败: {error_json}')
+                            return None
+                        return file_path
+                except Exception:
+                    time.sleep(1)
+                    continue
+        except Exception:
+            logger.error(f'[内容分析] 创建缓存目录失败')
+            return None
+      
+    @classmethod
+    def _analyze_content(cls, video, prompt):
+        """增强版内容分析"""
+        model = genai.GenerativeModel(
+            model_name='gemini-2.0-flash',
+            generation_config=genai.GenerationConfig(
+                response_mime_type='application/json',
+                temperature=0.3,
+                max_output_tokens=20480
+            ),
+            safety_settings={
+                HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
+                HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
+            }
+        )
+        
+        try:
+            response = model.generate_content(
+                contents=[video, prompt],
+                request_options={'timeout': 300}
+            )
+            
+            if hasattr(response, '_error') and response._error:
+                raise Exception(f"生成错误: {response._error}")
+                
+            result = orjson.loads(response.text.strip())
+            print(f"[视频分析] 响应: {result}")
+            if not isinstance(result, dict):
+                raise ValueError("响应格式错误:非字典结构")
+                
+            return result
+        except orjson.JSONDecodeError:
+            raise Exception("响应解析失败,非JSON格式")
+        except Exception as e:
+            raise Exception(f"分析失败: {str(e)}")
+
+    @classmethod
+    def run(cls, api_key, video_url):
+        print(f"api_key:{api_key},video_url:{video_url}")
+        video_path = None
+        try:
+            genai.configure(api_key=api_key)
+            video_path = cls.download_video(video_link=video_url)
+            if not video_path:
+                logger.error(f'[内容分析] 视频下载失败, 跳过任务')
+                os.remove(video_path)
+                logger.info(f"[内容分析] 文件已删除: {video_path}")
+                return "[异常] 视频下载失败",""
+
+            video = genai.upload_file(path=video_path, mime_type='video/mp4')
+            while video.state.name == 'PROCESSING':
+                time.sleep(1)
+                video = genai.get_file(name=video.name)
+            if video.state.name != 'ACTIVE':
+                genai.delete_file(name=video.name)
+                os.remove(video_path)
+                return "[异常] 上传视频失败", ""
+            
+            prompts = load_prompts()
+            analysis_data = {}
+            for prompt in prompts[:3]:
+                print(f"[分析] 正在执行: {prompt['name']}")
+                try:
+                    result = cls._analyze_content(video, prompt['content'])
+                    analysis_data[prompt['name']] = result
+                except Exception as e:
+                    analysis_data[prompt['name']] = {
+                        "error": str(e),
+                        "error_type": type(e).__name__
+                    }
+            print(f"[分析] 所有分析完成, 结果: {analysis_data}")
+
+            coze_hook = CozeHook()
+            demand_list = coze_hook.run(analysis_data["视频选题与要点理解"], analysis_data["视频分段与时间点分析"])
+            print(f"[分析] 所有分析完成, 结果: {demand_list}")
+
+            genai.delete_file(name=video.name)
+            os.remove(video_path)
+            return analysis_data, demand_list
+        except Exception as e:
+            logger.error(f"[内容分析] 处理异常,异常信息{e}")
+            os.remove(video_path)
+            return f"[异常] {e}",""
+
+
+if __name__ == '__main__':
+    ai = GoogleAI()
+    ai.run("AIzaSyBFLCKMLX-Pf1iXoC2e_rMDLbNhNG23vTk",
+                 "http://rescdn.yishihui.com/jq_oss/video/2025012215472528213")
+

+ 73 - 0
utils/google_ai_prompt.py

@@ -0,0 +1,73 @@
+# 视频选题与要点理解
+
+VIDEO_TOPIC_ANALYSIS_PROMPT = '''# 任务说明:
+你是一位短视频结构化分析专家。请严格按以下要求处理输入视频:
+
+# 分析规范:
+1. 使用标准JSON格式输出,遵循以下规则:
+   - 所有键名使用英文双引号包裹
+   - 字符串值使用中文双引号""
+   - 禁止换行符、Markdown符号
+   - 数值类目用字符串表示
+
+2. 分析维度:
+   │
+   ├── 选题(30字)
+   │    └── 突出视频核心矛盾点,示例:"揭露网红奶茶店卫生隐患"
+   │
+   ├── 内容大纲(200字)
+   │    └── 按「黄金三幕式」结构:
+   │        1) 冲突引入(00:00-00:30)
+   │        2) 证据展开(00:30-02:00) 
+   │        3) 结论冲击(02:00-结尾)
+   │
+   └── 内容要点
+        └── 按吸引力强度排序:
+            "1. 现场实拍过期原料特写镜头(00:01:45)
+             2. 员工偷拍工作流程(00:03:20)
+             3. 专家访谈数据对比(00:04:10)"
+
+# 输出示例:
+{
+  "选题": "揭秘网红零食代工黑幕",
+  "内容大纲": "视频首先展示代工厂合规车间环境(00:00:15-00:01:30),随后突袭检查发现原料过期问题(00:01:45-00:03:20),最后通过员工采访揭露生产日期篡改流程(00:04:10-00:05:50)",
+  "内容要点": [
+    "1. 车间环境与原料仓库的视觉反差",
+    "2. 特写镜头展示虫蛀原料袋(00:02:15)",
+    "3. 隐蔽摄像头拍摄的灌装过程(00:04:30)"
+  ]
+}
+
+请现在开始分析:'''
+
+# timeline
+VIDEO_SEGMENT_ANALYSIS_PROMPT = '''# 任务说明:你是一位短视频分析专家。请根据输入视频内容,从整体结构出发,分析视频的分段逻辑与关键时间点。
+
+# 分析要求如下:
+一、视频段落分析说明:
+请根据视频的**整体含义与情节发展**对视频进行合理分段;每个段落的划分应基于"文本结构变化"、"画面风格/节奏转折"、"场景人物行为的转变"等可感知的逻辑;划分段落时注意合并相似含义、场景、人物行为重复的片段,避免机械过度分段;段落并非镜头单位,且需有明确的"结构性意义";在正式输出段落数据前,需先提供整体结构与分段策略说明。
+
+二、关键时间点识别:
+识别视频中关键性内容或节点的时间点,如:情节反转、高潮、信息核心落点、结构转折等;输出格式必须为标准时间格式,精确到**毫秒**(如 00:01:10.234)。
+
+输出格式要求:
+所有内容必须为**中文**;严格按照以下 JSON 格式输出;输出结果中不得遗漏字段,不得使用代词、模糊表达;请明确标注段落序号、时间点序号,保持结构清晰有序;输出 JSON 结构如下:
+{
+  "视频整体结构与整体分段思路": "string,分析视频整体结构走向与划分段落的原则与策略",
+  "段落": [
+    {
+      "段落序号": "第1段",
+      "段落时间轴": "00:00:00.000 - 00:00:30.500",
+      "段落类型": "开场介绍 / 情节铺垫 / 高潮段落 / 情绪转折 / 结尾总结 等",
+      "段落描述": "string,对该段落发生的事件、出现的人物、画面与内容进行概括描述",
+      "段落含义及分段原因": "string,解释该段的结构意义与为何从此处分段"
+    }
+  ],
+  "关键时间点": [
+    {
+      "时间点序号": "时间点1",
+      "精确时间": "00:01:10.234",
+      "时间点描述": "string,描述该时间点对应的事件或结构意义"
+    }
+  ]
+}'''

+ 7 - 0
utils/redis.py

@@ -34,4 +34,11 @@ def content_video_data(ret):
     task = f"task:video_insight"
     helper = RedisHelper()
     client = helper.get_client()
+    client.rpush(task, ret)
+
+def content_video_trigger_data(ret):
+    """分析失败视频重新写入redis"""
+    task = f"task:video_trigger_insight"
+    helper = RedisHelper()
+    client = helper.get_client()
     client.rpush(task, ret)

+ 2 - 0
workers/video_insight_select_work.py

@@ -19,11 +19,13 @@ def requirement_insight():
         dt = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y%m%d')
         logger.info(f"视频需求点洞察")
         redis_task = "task:video_insight"
+        redis_trigger_task = "task:video_trigger_insight"
         sql =f'select clickobjectid as video_id from loghubods.user_share_log where dt = {dt} and topic = "click" group by clickobjectid order by count(distinct machinecode) desc limit 1000'
         data = OdpsDataCount.main(sql)
         if not data:
             return
         RedisHelper().get_client().rpush(redis_task, *data)
+        RedisHelper().get_client().rpush(redis_trigger_task, *data)
         logger.info(f"[R] 写入Redis 成功 共写入 {len(data)} 条")
     except Exception as e:
         logger.error(f"[R] 写入Redis写入失败,失败信息{e}")

+ 147 - 0
workers/video_insight_trigger_work.py

@@ -0,0 +1,147 @@
+import asyncio
+import json
+import os
+import sys
+import orjson
+from apscheduler.schedulers.asyncio import AsyncIOScheduler
+from apscheduler.triggers.interval import IntervalTrigger
+from loguru import logger
+logger.add("/app/logs/consumption.log", rotation="10 MB")
+
+sys.path.append('/app')
+
+from utils.aliyun_log import AliyunLogger
+from utils.google_ai_analyze import GoogleAI
+from utils.piaoquan import PQ
+from utils.redis import RedisHelper, content_video_trigger_data
+from utils.mysql_db import MysqlHelper
+from utils.aliyun_security import Security
+
+
+
+
+class ConsumptionRecommend(object):
+    @classmethod
+    async def run(cls):
+        logger.info(f"[处理] 开始获取redis数据")
+
+        while True:
+            task = RedisHelper().get_client().rpop(name='task:video_trigger_insight')
+
+            if not task:
+                logger.info('[处理] 无待执行的扫描任务')
+                return
+
+            task = orjson.loads(task)
+            logger.info(f"[处理] 获取redis数据{task}")
+            video_id = task['video_id']
+
+            count_sql = f"""select count(1) from video_triggers_analysis where video_id = {video_id}"""
+            count = MysqlHelper.get_values(count_sql)
+
+            if not count or count[0][0] == 0:
+                logger.info(f"[处理] 视频ID {video_id} 可用")
+                # 这里可以继续处理 video_id
+                break
+            else:
+                logger.info(f"[处理] 视频ID {video_id} 重复过滤,继续获取下一个任务")
+
+        logger.info(f"[处理] 开始获取原视频OSS地址")
+        video_title, video_path = PQ.get_pq_oss(video_id)
+        if not video_path:
+            return
+        logger.info(f"[处理] 获取原视频OSS地址,视频链接:{video_path}")
+        video_url = f"http://rescdn.yishihui.com/{video_path}"
+        logger.info(f"[处理] 开始分析视频")
+        api_key = os.getenv("VIDEO_INSIGHT_GEMINI_API_KEY")
+        # api_key = 'AIzaSyBFLCKMLX-Pf1iXoC2e_rMDLbNhNG23vTk'
+        logger.info(f"[处理] 使用的API_KEY:{api_key}")
+        analysis_data, demand_list = GoogleAI.run(api_key, video_url)
+        if "[异常]" in analysis_data:
+            content_video_trigger_data(json.dumps(task))
+        # Parse JSON data
+        data = json.loads(orjson.dumps(demand_list).decode())
+        # Generate SQL insert statement
+        sql = """
+        INSERT INTO video_triggers_analysis (
+            video_id, video_link, video_title, content_type,
+            demand_order, demand_score, user_demand, demand_category, demand_type, demand_show_time,
+            demand_reason, product_hook, hook_time, hook_desc,
+            hook_type, landing_desc, landing_type, platform_case, reasoning_process,audit_status,audit_desc
+        ) VALUES
+        """
+        # Add values for each entry
+        values = []
+        link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
+        for index, entry in enumerate(data, 1):  # Start from 1 to match the original numbering
+
+            hook_desc = entry.get('需求钩子话术', '')
+            result,msg = Security.security(hook_desc)
+            audit_status = 0
+            if result :
+                audit_status = 1
+            else :
+                audit_status = 2
+            audit_desc = msg
+
+            value = f"""(
+                {video_id}, '{link}', '{video_title}', NULL,
+                '{index}', '{entry.get('需求强烈程度分值', '')}', '{entry.get('用户具体的需求描述', '')}', '{entry.get('需求分类', '')}', '{entry.get('需求类型', '')}', '{entry.get('需求钩子出现时间', '')}',
+                '{entry.get('推测出该点需求的原因', '')}', '{entry.get('需求详细query', '')}', '', '{entry.get('需求钩子话术', '')}',
+                '', '{entry.get('落地方案形态描述', '')}', '{entry.get('落地方案类型', '')}', '', '','{audit_status}','{audit_desc}'
+            )"""
+            values.append(value)
+        # Combine SQL statement and values
+        sql += ",\n".join(values) + ";"
+        # Print SQL statement
+        logger.info(f"{sql}")
+        MysqlHelper.update_values(sql)
+
+        logger.info(f"[处理] 需求列表写入数据库成功")
+
+
+        # Parse JSON data
+        data = json.loads(orjson.dumps(analysis_data).decode())
+        # Generate SQL insert statement
+        sql = """
+        INSERT INTO video_demand_score (
+            video_id, video_link, video_title, analysis_summary, analysis_timeline
+        ) VALUES
+        """
+        # Add values for each entry
+        values = []
+        link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
+        entry = data
+        value = f"""(
+            {video_id}, '{link}', '{video_title}', {MysqlHelper.escape_string(entry.get('视频选题与要点理解', ''))}, '{entry.get('视频分段与时间点分析', '')}'
+        )"""
+        values.append(value)
+        # Combine SQL statement and values
+        sql += ",\n".join(values) + ";"
+        # Print SQL statement
+        logger.info(f"{sql}")
+        MysqlHelper.update_values(sql)
+
+        logger.info(f"[处理] 视频分析结果写入数据库成功")
+
+
+async def run():
+    scheduler = AsyncIOScheduler()
+    try:
+        logger.info(f"[处理] 开始启动")
+        scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=3))  # 每2分钟启动一次
+        scheduler.start()
+        await asyncio.Event().wait()
+    except KeyboardInterrupt:
+        pass
+    except Exception as e:
+        logger.error(f"[处理] 启动异常,异常信息:{e}")
+        pass
+    finally:
+        scheduler.shutdown()
+
+
+if __name__ == '__main__':
+    # asyncio.run(ConsumptionRecommend.run())
+    loop = asyncio.get_event_loop()
+    loop.run_until_complete(run())