luojunhui 2 weeks ago
parent
commit
e80918318c

+ 24 - 34
applications/const/__init__.py

@@ -8,6 +8,7 @@ class ColdStartTaskConst:
     """
     冷启动任务常量配置
     """
+
     PUBLISHED_STATUS = 2  # 文章已发布状态
     INIT_STATUS = 1  # 文章初始状态
     BAD_STATUS = 0  # 低质量文章状态
@@ -55,6 +56,7 @@ class updatePublishedMsgTaskConst:
     """
     更新已发布文章消息常量配置
     """
+
     # 爬虫详情接口返回code
     ARTICLE_ILLEGAL_CODE = 25012
     ARTICLE_DELETE_CODE = 25005
@@ -92,6 +94,7 @@ class UpdateAccountReadRateTaskConst:
     """
     更新账号阅读率常量配置
     """
+
     # 阅读率统计周期(秒)
     STATISTICS_PERIOD = 31 * 24 * 60 * 60
     # 一天的秒数
@@ -115,15 +118,16 @@ class UpdateAccountReadAvgTaskConst:
     """
     更新账号阅读均值常量配置
     """
+
     # 投流账号
     TOULIU_ACCOUNTS = {
-        'gh_93e00e187787',
-        'gh_ac43e43b253b',
-        'gh_68e7fdc09fe4',
-        'gh_77f36c109fb1',
-        'gh_b181786a6c8c',
-        'gh_1ee2e1b39ccf',
-        'gh_d3f039c9db2b'
+        "gh_93e00e187787",
+        "gh_ac43e43b253b",
+        "gh_68e7fdc09fe4",
+        "gh_77f36c109fb1",
+        "gh_b181786a6c8c",
+        "gh_1ee2e1b39ccf",
+        "gh_d3f039c9db2b",
     }
 
     # 发文模式
@@ -148,6 +152,7 @@ class WeixinVideoCrawlerConst:
     """
     微信视频抓取常量配置
     """
+
     # 账号抓取状态
     ACCOUNT_CRAWL_STATUS = 1
     ACCOUNT_DO_NOT_CRAWL_STATUS = 0
@@ -225,6 +230,7 @@ class AccountAssociationTaskConst:
     """
     账号联想任务常量配置
     """
+
     # 获取种子标题的统计周期
     STAT_PERIOD = 7 * 24 * 60 * 60
 
@@ -243,6 +249,7 @@ class ArticleCollectorConst:
     """
     文章采集任务常量配置
     """
+
     # 发送方式
 
     # 手动推送
@@ -273,6 +280,7 @@ class BaiduVideoCrawlerConst:
     """
     const for baidu video crawler
     """
+
     # account status
     BAIDU_ACCOUNT_GOOD_STATUS = 1
     BAIDU_ACCOUNT_BAD_STATUS = 0
@@ -294,6 +302,7 @@ class TitleRewriteTaskConst:
     """
     title rewrite task const
     """
+
     # title rewrite status
     TITLE_REWRITE_INIT_STATUS = 0
     TITLE_REWRITE_SUCCESS_STATUS = 1
@@ -318,6 +327,7 @@ class ChannelVideoCrawlerConst:
     """
     const for baidu video crawler
     """
+
     # account status
     CHANNEL_ACCOUNT_GOOD_STATUS = 1
     CHANNEL_ACCOUNT_BAD_STATUS = 0
@@ -345,6 +355,7 @@ class ToutiaoVideoCrawlerConst:
     """
     const for toutiao video crawler
     """
+
     # platform
     PLATFORM = "toutiao"
 
@@ -372,37 +383,17 @@ class SingleVideoPoolPublishTaskConst:
     """
     const for single video pool publish task
     """
+
     TRANSFORM_INIT_STATUS = 0
     TRANSFORM_SUCCESS_STATUS = 1
     TRANSFORM_FAIL_STATUS = 99
 
 
+class GoogleVideoUnderstandTaskConst:
+    # task batch size
+    BATCH_SIZE = 100
 
-
-
-
-
-# 视频转文本任务
-class VideoToTextConst:
-    """
-    视频转文本任务常量配置
-    """
-    # SUMMARY_STATUS
-    SUMMARY_INIT_STATUS = 0
-    SUMMARY_SUCCESS_STATUS = 1
-    SUMMARY_FAIL_STATUS = 99
-    SUMMARY_LOCK = 101
-
-    # SUMMARY_TASK_BATCH_SIZE
-    SUMMARY_BATCH_SIZE = 100
-
-    # bad_status 文章质量状态
-    ARTICLE_GOOD_STATUS = 0
-
-    # audit_status 文章审核状态
-    AUDIT_SUCCESS_STATUS = 1
-
-    # video understand status
+    # task status
     INIT_STATUS = 0
     PROCESSING_STATUS = 1
     SUCCESS_STATUS = 2
@@ -411,6 +402,5 @@ class VideoToTextConst:
     # sleep seconds
     SLEEP_SECONDS = 60
 
-    # time
+    # max processing time
     MAX_PROCESSING_TIME = 3600
-

+ 136 - 81
coldStartTasks/ai_pipeline/basic.py

@@ -1,9 +1,18 @@
 import os
+import re
+import json
 import time
 import datetime
 import requests
+from typing import Optional, Dict
+
+from requests.exceptions import RequestException
+from tenacity import retry
 
 from applications.db import DatabaseConnector
+from applications.utils import request_retry
+
+retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30)
 
 
 def get_status_field_by_task(task: str) -> tuple[str, str]:
@@ -27,14 +36,16 @@ def roll_back_lock_tasks(
     task: str,
     max_process_time: int,
     init_status: int,
-    processing_status: int
+    processing_status: int,
 ) -> int:
     """
     rollback tasks which have been locked for a long time
     """
     status, update_timestamp = get_status_field_by_task(task)
     now_timestamp = int(time.time())
-    timestamp_threshold = now_timestamp - max_process_time
+    timestamp_threshold = datetime.datetime.fromtimestamp(
+        now_timestamp - max_process_time
+    )
     update_query = f"""
         update long_articles_new_video_cover
         set {status} = %s
@@ -62,27 +73,13 @@ def download_file(task_id, oss_path):
     return file_name
 
 
-def generate_summary_prompt(text):
-    prompt = f"""
-        你是1个优秀的公众号文章写作大师,我对你有以下要求
-        视频总结:{text}
-
-        第一个要求:请仔细阅读以上视频总结,挑选其中最吸引人的情节或话题,总结为100字左右文章精彩总结(字数计算包括标点符号),这部分内容为段落1。
-        句子段落之间以悬念承接,可以吸引读者往下读第二句。
-
-        第二个要求:在这100字内容的结尾处,增加1-2句话的引导,引导大家去观看上面的视频了解详情,可以加一些emoji表情。注意是点击上面的视频,不是下面的视频。这部分内容为段落2。
-
-        你最终输出一段总结内容,将第一段和第二段之间空格一行。不用加标题或者主题,也不用写第几段、多少字这样的话。整体的语言风格要口语化、直接点,要让60岁以上的老年人能看懂、能共情。人的名字尽量用全名,不用简称。
-        """
-    return prompt
-
-
 def update_task_queue_status(
-        db_client: DatabaseConnector,
-        task_id: int,
-        task: str,
-        ori_status: int,
-        new_status: int) -> int:
+    db_client: DatabaseConnector,
+    task_id: int,
+    task: str,
+    ori_status: int,
+    new_status: int,
+) -> int:
     # update task queue status
     status, update_timestamp = get_status_field_by_task(task)
     update_query = f"""
@@ -101,62 +98,120 @@ def update_task_queue_status(
     )
     return update_rows
 
-extract_prompt = """
-以下为视频爆款封面的评分体系,请根据视频内容,逐帧分析视频画面,根据以下标准进行打分,最终先把分数最高的画面出现的时间输出给我,并精确到几时几分几秒几毫秒,格式:hh:mm:ss.xxx,要求是确切时间而不是时间段。
-评分维度:
-一、现实关切度 (满分15分):
-高度相关(13-15分): 封面主题直接涉及老年人的生活、健康、财产等切身利益,例如养老、退休、健康、食品安全、子女教育、财产安全等。
-中度相关(8-12分): 封面主题与老年人的生活有一定的关联,例如家长里短、邻里关系、社会热点、生活窍门等。
-低度相关(4-7分): 封面主题与老年人的生活关系较弱,例如娱乐八卦、时尚潮流等。
-无关(0-3分): 封面主题与老年人的生活基本无关。
-二、社会认同感与亲切感 (满分15分):
-高度认同(13-15分): 封面人物形象亲切、接地气,场景贴近老年人的生活,有强烈的代入感和归属感。
-中度认同(8-12分): 封面人物形象尚可接受,场景有一定的熟悉感。
-低度认同(4-7分): 封面人物形象陌生,场景较为遥远。
-无认同感(0-3分): 封面人物形象令人反感,场景与老年人生活无关。
-三、信息传达效率 (满分15分):
-高效传达(13-15分): 封面文字简洁直白、字体较大、重点突出,视觉元素聚焦,能让老年人快速理解视频内容,色彩明快,对比强烈,视觉冲击力强。
-中效传达(8-12分): 封面文字尚可,视觉元素有一定吸引力,但略显复杂。
-低效传达(4-7分): 封面文字晦涩难懂,视觉元素杂乱无章。色彩暗淡,对比度弱。
-无效传达(0-3分): 封面信息表达不清,无法理解视频内容。
-四、情感共鸣与回忆杀 (满分20分):
-高度共鸣(17-20分): 封面主题能够引发老年人对过去岁月的回忆,勾起他们对老朋友、老时光的思念,产生强烈的情感共鸣,引发对晚年生活的思考。例如同学情、怀旧主题等。
-中度共鸣(11-16分): 封面主题有一定怀旧元素,能引发部分老年人的回忆,并进行一定程度的思考。
-低度共鸣(6-10分): 封面主题怀旧元素较少,共鸣感不强。
-无共鸣(0-5分): 封面主题与回忆无关。
-五、正能量与精神寄托 (满分15分):
-高度正能量(13-15分): 封面内容积极向上,能够给予老年人希望和力量,或者包含宗教、祈福等元素,满足他们的精神寄托。
-中度正能量(8-12分): 封面内容有一定的积极意义,但不够突出。
-低度正能量(4-7分): 封面内容较为平淡,缺乏精神寄托。
-负能量 (0-3分): 封面内容消极悲观,或者与老年人的信仰不符。
-六、节日/时事关联性 (满分10分):
-高度关联(8-10分): 封面与节日、时事热点紧密相关,能激发老年人的分享欲和参与感。
-中度关联(5-7分): 封面与节日或时事有一定关联,但并非核心。
-低度关联(2-4分): 封面与节日或时事关联较弱。
-无关联(0-1分): 封面与节日、时事无关。
-七、传播动机 (满分10分):
-强传播动机(8-10分): 封面内容能激发老年人强烈的情感,例如激动、感动、惊讶等,让他们想要分享给家人朋友,或者认为视频内容对他人有帮助,有分享的价值。
-中等传播动机(5-7分): 封面内容有一定分享价值,但分享意愿不强烈。
-低传播动机(2-4分): 封面内容平淡,缺乏分享动力。
-无传播动机(0-1分): 封面内容无分享价值,或者会引起不适,降低传播意愿。
-八、附加分(满分60分)
-1.包含老年人为画面主体(0-5分)
-2.有超过3人为画面主体(0-5分)
-3.充斥画面的密集人群为画面主体(0-5分)
-4.存在知名历史、近代人物(0-5分)
-5.存在人物脸部、头部未完整出现在画面的情况(0-5分)
-6.是不以人为主体的鲜花、美景、知名建筑或风景(0-5分)
-7.是老照片、怀旧风格(0-5分)
-8.是农村、军事、综艺演出、历史画面(0-5分)
-9.有趣味、惊奇的形象或画面为主体(0-5分)
-10.以大号文字或密集文字为主体并且不包含人物(0-5分)
-11.是不包含人物的纯色画面(0-5分)
-12.是模糊的或清晰度、像素较低的(0-5分)
-
-总分评估:110-160分: 高度吸引老年人群体,有成为爆款的潜力。
-80-109分: 具有一定吸引力,值得尝试。
-65-79分: 吸引力一般,需要优化。
-65分以下: 吸引力不足,不建议使用。
-
-注意:输出只需要返回 'hh:mm:ss.xxx' 格式的时间, 无需返回任何其他东西
-"""
+
+def extract_best_frame_prompt():
+    extract_prompt = """
+    以下为视频爆款封面的评分体系,请根据视频内容,逐帧分析视频画面,根据以下标准进行打分,最终先把分数最高的画面出现的时间输出给我,并精确到几时几分几秒几毫秒,格式:hh:mm:ss.xxx,要求是确切时间而不是时间段。
+    评分维度:
+    一、现实关切度 (满分15分):
+    高度相关(13-15分): 封面主题直接涉及老年人的生活、健康、财产等切身利益,例如养老、退休、健康、食品安全、子女教育、财产安全等。
+    中度相关(8-12分): 封面主题与老年人的生活有一定的关联,例如家长里短、邻里关系、社会热点、生活窍门等。
+    低度相关(4-7分): 封面主题与老年人的生活关系较弱,例如娱乐八卦、时尚潮流等。
+    无关(0-3分): 封面主题与老年人的生活基本无关。
+    二、社会认同感与亲切感 (满分15分):
+    高度认同(13-15分): 封面人物形象亲切、接地气,场景贴近老年人的生活,有强烈的代入感和归属感。
+    中度认同(8-12分): 封面人物形象尚可接受,场景有一定的熟悉感。
+    低度认同(4-7分): 封面人物形象陌生,场景较为遥远。
+    无认同感(0-3分): 封面人物形象令人反感,场景与老年人生活无关。
+    三、信息传达效率 (满分15分):
+    高效传达(13-15分): 封面文字简洁直白、字体较大、重点突出,视觉元素聚焦,能让老年人快速理解视频内容,色彩明快,对比强烈,视觉冲击力强。
+    中效传达(8-12分): 封面文字尚可,视觉元素有一定吸引力,但略显复杂。
+    低效传达(4-7分): 封面文字晦涩难懂,视觉元素杂乱无章。色彩暗淡,对比度弱。
+    无效传达(0-3分): 封面信息表达不清,无法理解视频内容。
+    四、情感共鸣与回忆杀 (满分20分):
+    高度共鸣(17-20分): 封面主题能够引发老年人对过去岁月的回忆,勾起他们对老朋友、老时光的思念,产生强烈的情感共鸣,引发对晚年生活的思考。例如同学情、怀旧主题等。
+    中度共鸣(11-16分): 封面主题有一定怀旧元素,能引发部分老年人的回忆,并进行一定程度的思考。
+    低度共鸣(6-10分): 封面主题怀旧元素较少,共鸣感不强。
+    无共鸣(0-5分): 封面主题与回忆无关。
+    五、正能量与精神寄托 (满分15分):
+    高度正能量(13-15分): 封面内容积极向上,能够给予老年人希望和力量,或者包含宗教、祈福等元素,满足他们的精神寄托。
+    中度正能量(8-12分): 封面内容有一定的积极意义,但不够突出。
+    低度正能量(4-7分): 封面内容较为平淡,缺乏精神寄托。
+    负能量 (0-3分): 封面内容消极悲观,或者与老年人的信仰不符。
+    六、节日/时事关联性 (满分10分):
+    高度关联(8-10分): 封面与节日、时事热点紧密相关,能激发老年人的分享欲和参与感。
+    中度关联(5-7分): 封面与节日或时事有一定关联,但并非核心。
+    低度关联(2-4分): 封面与节日或时事关联较弱。
+    无关联(0-1分): 封面与节日、时事无关。
+    七、传播动机 (满分10分):
+    强传播动机(8-10分): 封面内容能激发老年人强烈的情感,例如激动、感动、惊讶等,让他们想要分享给家人朋友,或者认为视频内容对他人有帮助,有分享的价值。
+    中等传播动机(5-7分): 封面内容有一定分享价值,但分享意愿不强烈。
+    低传播动机(2-4分): 封面内容平淡,缺乏分享动力。
+    无传播动机(0-1分): 封面内容无分享价值,或者会引起不适,降低传播意愿。
+    八、附加分(满分60分)
+    1.包含老年人为画面主体(0-5分)
+    2.有超过3人为画面主体(0-5分)
+    3.充斥画面的密集人群为画面主体(0-5分)
+    4.存在知名历史、近代人物(0-5分)
+    5.存在人物脸部、头部未完整出现在画面的情况(0-5分)
+    6.是不以人为主体的鲜花、美景、知名建筑或风景(0-5分)
+    7.是老照片、怀旧风格(0-5分)
+    8.是农村、军事、综艺演出、历史画面(0-5分)
+    9.有趣味、惊奇的形象或画面为主体(0-5分)
+    10.以大号文字或密集文字为主体并且不包含人物(0-5分)
+    11.是不包含人物的纯色画面(0-5分)
+    12.是模糊的或清晰度、像素较低的(0-5分)
+    
+    总分评估:110-160分: 高度吸引老年人群体,有成为爆款的潜力。
+    80-109分: 具有一定吸引力,值得尝试。
+    65-79分: 吸引力一般,需要优化。
+    65分以下: 吸引力不足,不建议使用。
+    
+    注意:输出只需要返回 'hh:mm:ss.xxx' 格式的时间, 无需返回任何其他东西
+    """
+    return extract_prompt
+
+
+@retry(**retry_desc)
+def get_video_cover(video_oss_path: str, time_millisecond_str: str) -> Optional[Dict]:
+    """
+    input video oss path and time millisecond
+    output video cover image oss path
+    """
+    video_url = "https://rescdn.yishihui.com/" + video_oss_path
+    url = "http://192.168.205.80:8080/ffmpeg/fetchKeyFrames"
+    data = {"url": video_url, "timestamp": time_millisecond_str}
+    headers = {
+        "content-type": "application/json",
+    }
+    try:
+        response = requests.post(url, headers=headers, json=data, timeout=60)
+        response.raise_for_status()
+        return response.json()
+    except RequestException as e:
+        print(f"API请求失败: {e}")
+    except json.JSONDecodeError as e:
+        print(f"响应解析失败: {e}")
+    return None
+
+
+def normalize_time_str(time_string: str) -> str | None:
+    # 预处理:替换中文冒号、去除特殊标记、清理空格
+    time_str = re.sub(r":", ":", time_string)  # 中文冒号转英文
+    time_str = re.sub(r"\s*(\d+\.\s*)?\*+\s*", "", time_str)  # 去除数字编号和**标记
+    time_str = time_str.strip()
+
+    # 组合式正则匹配(按优先级排序)
+    patterns = [
+        # hh:mm:ss.xxx
+        (r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3})$", None),
+        # h:mm:ss.xxx
+        (
+            r"^(\d{1}):(\d{2}):(\d{2})\.(\d{3})$",
+            lambda m: f"0{m[1]}:{m[2]}:{m[3]}.{m[4]}",
+        ),
+        # mm:ss.xxx
+        (r"^(\d{2}):(\d{2})\.(\d{3})$", lambda m: f"00:{m[1]}:{m[2]}.{m[3]}"),
+        # m:ss.xxx
+        (r"^(\d{1}):(\d{2})\.(\d{3})$", lambda m: f"00:0{m[1]}:{m[2]}.{m[3]}"),
+    ]
+
+    for pattern, processor in patterns:
+        if match := re.fullmatch(pattern, time_str):
+            return processor(match) if processor else time_str
+
+    # 特殊处理 dd:dd:ddd 格式(假设最后3位为毫秒)
+    if m := re.fullmatch(r"(\d{2}:\d{2}):(\d{3})", time_str):
+        return f"00:{m[1]}.{m[2]}"
+
+    return None

+ 113 - 37
coldStartTasks/ai_pipeline/extract_video_best_frame.py

@@ -9,14 +9,19 @@ from tqdm import tqdm
 from pymysql.cursors import DictCursor
 
 from applications.api import GoogleAIAPI
+from applications.const import GoogleVideoUnderstandTaskConst
 from applications.db import DatabaseConnector
 from config import long_articles_config
 from coldStartTasks.ai_pipeline.basic import download_file
 from coldStartTasks.ai_pipeline.basic import update_task_queue_status
 from coldStartTasks.ai_pipeline.basic import roll_back_lock_tasks
-from coldStartTasks.ai_pipeline.basic import extract_prompt
+from coldStartTasks.ai_pipeline.basic import extract_best_frame_prompt
+from coldStartTasks.ai_pipeline.basic import get_video_cover
+from coldStartTasks.ai_pipeline.basic import normalize_time_str
 
+const = GoogleVideoUnderstandTaskConst()
 table_name = "long_articles_new_video_cover"
+dir_name = "static"
 POOL_SIZE = 10
 google_ai = GoogleAIAPI()
 
@@ -30,13 +35,13 @@ class ExtractVideoBestFrame:
         self.db_client = DatabaseConnector(db_config=long_articles_config)
         self.db_client.connect()
 
-    def get_upload_task_list(self, task_num: int = 10) -> list[dict]:
+    def get_upload_task_list(self, task_num: int = POOL_SIZE) -> list[dict]:
         """
         get upload task list
         """
         fetch_query = f"""
             select id, video_oss_path from {table_name} 
-            where upload_status = 0 and priority = 1
+            where upload_status = {const.INIT_STATUS} and priority = 1
             limit {task_num};
         """
         upload_task_list = self.db_client.fetch(
@@ -44,13 +49,13 @@ class ExtractVideoBestFrame:
         )
         return upload_task_list
 
-    def get_extract_task_list(self, task_num: int = 10) -> list[dict]:
+    def get_extract_task_list(self, task_num: int = POOL_SIZE) -> list[dict]:
         """
         get extract task list
         """
         fetch_query = f"""
             select id, file_name from {table_name}
-            where upload_status = 2 and extract_status = 0
+            where upload_status = {const.SUCCESS_STATUS} and extract_status = {const.INIT_STATUS}
             order by file_expire_time
             limit {task_num};
         """
@@ -59,13 +64,26 @@ class ExtractVideoBestFrame:
         )
         return extract_task_list
 
+    def get_cover_task_list(self) -> list[dict]:
+        """
+        get cover task list
+        """
+        fetch_query = f"""
+            select id, video_oss_path, best_frame_time_ms from {table_name}
+            where extract_status = {const.SUCCESS_STATUS} and get_cover_status = {const.INIT_STATUS};
+            """
+        extract_task_list = self.db_client.fetch(
+            query=fetch_query, cursor_type=DictCursor
+        )
+        return extract_task_list
+
     def get_processing_task_pool_size(self) -> int:
         """
         get processing task pool size
         """
         fetch_query = f"""
             select count(1) as pool_size from {table_name}
-            where upload_status = 2 and file_state = 'PROCESSING' and extract_status = 0;
+            where upload_status = {const.SUCCESS_STATUS} and file_state = 'PROCESSING' and extract_status = {const.INIT_STATUS};
         """
         fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
         processing_task_pool_size = (
@@ -85,13 +103,13 @@ class ExtractVideoBestFrame:
         update_rows = self.db_client.save(
             query=update_query,
             params=(
-                2,
+                const.SUCCESS_STATUS,
                 datetime.datetime.now(),
                 file_name,
                 file_state,
                 file_expire_time,
                 task_id,
-                1,
+                const.PROCESSING_STATUS,
             ),
         )
         return update_rows
@@ -108,12 +126,12 @@ class ExtractVideoBestFrame:
         update_rows = self.db_client.save(
             query=update_query,
             params=(
-                2,
+                const.SUCCESS_STATUS,
                 datetime.datetime.now(),
                 file_state,
                 best_frame_tims_ms,
                 task_id,
-                1,
+                const.PROCESSING_STATUS,
             ),
         )
         return update_rows
@@ -125,9 +143,9 @@ class ExtractVideoBestFrame:
         roll_back_lock_tasks_count = roll_back_lock_tasks(
             db_client=self.db_client,
             task="upload",
-            init_status=0,
-            processing_status=1,
-            max_process_time=3600,
+            init_status=const.INIT_STATUS,
+            processing_status=const.PROCESSING_STATUS,
+            max_process_time=const.MAX_PROCESSING_TIME,
         )
         print("roll_back_lock_tasks_count", roll_back_lock_tasks_count)
 
@@ -140,8 +158,8 @@ class ExtractVideoBestFrame:
                     db_client=self.db_client,
                     task_id=task["id"],
                     task="upload",
-                    ori_status=0,
-                    new_status=1,
+                    ori_status=const.INIT_STATUS,
+                    new_status=const.PROCESSING_STATUS,
                 )
                 if not lock_status:
                     continue
@@ -163,8 +181,8 @@ class ExtractVideoBestFrame:
                             db_client=self.db_client,
                             task_id=task["id"],
                             task="upload",
-                            ori_status=1,
-                            new_status=99,
+                            ori_status=const.PROCESSING_STATUS,
+                            new_status=const.FAIL_STATUS,
                         )
                 except Exception as e:
                     print(f"download_file error: {e}")
@@ -172,8 +190,8 @@ class ExtractVideoBestFrame:
                         db_client=self.db_client,
                         task_id=task["id"],
                         task="upload",
-                        ori_status=1,
-                        new_status=99,
+                        ori_status=const.PROCESSING_STATUS,
+                        new_status=const.FAIL_STATUS,
                     )
                     continue
 
@@ -185,9 +203,9 @@ class ExtractVideoBestFrame:
         roll_back_lock_tasks_count = roll_back_lock_tasks(
             db_client=self.db_client,
             task="extract",
-            init_status=0,
-            processing_status=1,
-            max_process_time=3600,
+            init_status=const.INIT_STATUS,
+            processing_status=const.PROCESSING_STATUS,
+            max_process_time=const.MAX_PROCESSING_TIME,
         )
         print("roll_back_lock_tasks_count", roll_back_lock_tasks_count)
 
@@ -199,18 +217,17 @@ class ExtractVideoBestFrame:
                 db_client=self.db_client,
                 task_id=task["id"],
                 task="extract",
-                ori_status=0,
-                new_status=1,
+                ori_status=const.INIT_STATUS,
+                new_status=const.PROCESSING_STATUS,
             )
             if not lock_status:
                 continue
 
             file_name = task["file_name"]
-            video_local_path = "static/{}.mp4".format(task["id"])
+            video_local_path = os.path.join(dir_name, "{}.mp4".format(task["id"]))
             try:
                 google_file = google_ai.get_google_file(file_name)
                 state = google_file.state.name
-
                 match state:
                     case "PROCESSING":
                         # google is still processing this video
@@ -218,8 +235,8 @@ class ExtractVideoBestFrame:
                             db_client=self.db_client,
                             task_id=task["id"],
                             task="extract",
-                            ori_status=1,
-                            new_status=0,
+                            ori_status=const.PROCESSING_STATUS,
+                            new_status=const.INIT_STATUS,
                         )
                         print("this video is still processing")
 
@@ -234,10 +251,10 @@ class ExtractVideoBestFrame:
                             query=update_query,
                             params=(
                                 "FAILED",
-                                99,
+                                const.FAIL_STATUS,
                                 datetime.datetime.now(),
                                 task["id"],
-                                1,
+                                const.PROCESSING_STATUS,
                             ),
                         )
 
@@ -245,7 +262,8 @@ class ExtractVideoBestFrame:
                         # video process successfully
                         try:
                             best_frame_tims_ms = google_ai.fetch_info_from_google_ai(
-                                prompt=extract_prompt, video_file=google_file
+                                prompt=extract_best_frame_prompt(),
+                                video_file=google_file,
                             )
                             if best_frame_tims_ms:
                                 self.set_extract_result(
@@ -258,8 +276,8 @@ class ExtractVideoBestFrame:
                                     db_client=self.db_client,
                                     task_id=task["id"],
                                     task="extract",
-                                    ori_status=1,
-                                    new_status=99,
+                                    ori_status=const.PROCESSING_STATUS,
+                                    new_status=const.FAIL_STATUS,
                                 )
                             # delete local file and google file
                             if os.path.exists(video_local_path):
@@ -272,8 +290,8 @@ class ExtractVideoBestFrame:
                                 db_client=self.db_client,
                                 task_id=task["id"],
                                 task="extract",
-                                ori_status=1,
-                                new_status=99,
+                                ori_status=const.PROCESSING_STATUS,
+                                new_status=const.FAIL_STATUS,
                             )
 
             except Exception as e:
@@ -282,6 +300,64 @@ class ExtractVideoBestFrame:
                     db_client=self.db_client,
                     task_id=task["id"],
                     task="extract",
-                    ori_status=1,
-                    new_status=99,
+                    ori_status=const.PROCESSING_STATUS,
+                    new_status=const.FAIL_STATUS,
+                )
+
+    def get_cover_with_best_frame(self):
+        """
+        get cover with best frame
+        """
+        # get task list
+        task_list = self.get_cover_task_list()
+        for task in tqdm(task_list, desc="extract_cover_with_ffmpeg"):
+            # lock task
+            lock_status = update_task_queue_status(
+                db_client=self.db_client,
+                task_id=task["id"],
+                task="get_cover",
+                ori_status=const.INIT_STATUS,
+                new_status=const.PROCESSING_STATUS,
+            )
+            if not lock_status:
+                continue
+
+            time_str = normalize_time_str(task["best_frame_time_ms"])
+            if time_str:
+                response = get_video_cover(
+                    video_oss_path=task["video_oss_path"], time_millisecond_str=time_str
+                )
+                print(response)
+                if response["success"] and response["data"]:
+                    cover_oss_path = response["data"]
+                    update_query = f"""
+                        update {table_name}
+                        set cover_oss_path = %s, get_cover_status = %s, get_cover_status_ts = %s
+                        where id = %s and get_cover_status = %s;
+                    """
+                    update_rows = self.db_client.save(
+                        query=update_query,
+                        params=(
+                            cover_oss_path,
+                            const.SUCCESS_STATUS,
+                            datetime.datetime.now(),
+                            task["id"],
+                            const.PROCESSING_STATUS,
+                        ),
+                    )
+                else:
+                    update_task_queue_status(
+                        db_client=self.db_client,
+                        task_id=task["id"],
+                        task="get_cover",
+                        ori_status=const.PROCESSING_STATUS,
+                        new_status=const.FAIL_STATUS,
+                    )
+            else:
+                update_task_queue_status(
+                    db_client=self.db_client,
+                    task_id=task["id"],
+                    task="get_cover",
+                    ori_status=const.PROCESSING_STATUS,
+                    new_status=const.FAIL_STATUS,
                 )

+ 0 - 165
coldStartTasks/ai_pipeline/summary_text.py

@@ -1,165 +0,0 @@
-"""
-@author: luojunhui
-"""
-
-import time
-import datetime
-import traceback
-
-from pymysql.cursors import DictCursor
-from tqdm import tqdm
-
-from applications import log
-from applications.api import fetch_deepseek_response
-from applications.const import VideoToTextConst
-from applications.db import DatabaseConnector
-from config import long_articles_config
-from coldStartTasks.ai_pipeline.basic import generate_summary_prompt
-from coldStartTasks.ai_pipeline.basic import update_video_pool_status
-from coldStartTasks.ai_pipeline.basic import update_task_queue_status
-
-const = VideoToTextConst()
-
-
-class ArticleSummaryTask(object):
-    """
-    文章总结任务
-    """
-
-    def __init__(self):
-        self.db_client = DatabaseConnector(db_config=long_articles_config)
-        self.db_client.connect()
-
-    def get_summary_task_list(self) -> list[dict]:
-        """
-        获取任务列表
-        """
-        fetch_query = f"""
-            select id, content_trace_id, video_text
-            from video_content_understanding
-            where summary_status = {const.INIT_STATUS} and understanding_status = {const.SUCCESS_STATUS}
-            limit {const.SUMMARY_BATCH_SIZE};
-        """
-        task_list = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
-        return task_list
-
-    def rollback_lock_tasks(self) -> int:
-        """
-        rollback tasks which have been locked for a long time
-        """
-        now_timestamp = int(time.time())
-        timestamp_threshold = now_timestamp - const.MAX_PROCESSING_TIME
-        update_sql = f"""
-            update video_content_understanding
-            set summary_status = %s
-            where summary_status = %s and summary_status_ts < %s;
-        """
-        rollback_rows = self.db_client.save(
-            query=update_sql,
-            params=(const.INIT_STATUS, const.PROCESSING_STATUS, timestamp_threshold),
-        )
-
-        return rollback_rows
-
-    def handle_task_execution(self, task):
-        """
-        :param task: keys: [id, video_text]
-        """
-        task_id = task["id"]
-        content_trace_id = task["content_trace_id"]
-        video_text = task["video_text"]
-
-        # Lock Task
-        affected_rows = update_task_queue_status(
-            db_client=self.db_client,
-            task_id=task_id,
-            process="summary",
-            ori_status=const.INIT_STATUS,
-            new_status=const.PROCESSING_STATUS,
-        )
-        if not affected_rows:
-            return
-
-        # fetch summary text from AI
-        try:
-            # generate prompt
-            prompt = generate_summary_prompt(video_text)
-            # get result from deep seek AI
-            result = fetch_deepseek_response(model="DeepSeek-R1", prompt=prompt)
-            if result:
-                # set as success and update summary text
-                self.set_summary_text_for_task(task_id, result.strip())
-                task_status = const.SUCCESS_STATUS
-            else:
-                # set as fail
-                update_task_queue_status(
-                    db_client=self.db_client,
-                    task_id=task_id,
-                    process="summary",
-                    ori_status=const.PROCESSING_STATUS,
-                    new_status=const.FAIL_STATUS,
-                )
-                task_status = const.FAIL_STATUS
-        except Exception as e:
-            # set as fail
-            update_task_queue_status(
-                db_client=self.db_client,
-                task_id=task_id,
-                process="summary",
-                ori_status=const.PROCESSING_STATUS,
-                new_status=const.FAIL_STATUS,
-            )
-            task_status = const.FAIL_STATUS
-            log(
-                task="article_summary_task",
-                function="fetch_deepseek_response",
-                message="fetch_deepseek_response failed",
-                data={"error": str(e), "trace_back": traceback.format_exc()},
-            )
-
-        # update video pool status
-        update_video_pool_status(
-            self.db_client, content_trace_id, const.PROCESSING_STATUS, task_status
-        )
-
-    def set_summary_text_for_task(self, task_id, text):
-        """
-        successfully get summary text and update summary text to database
-        """
-        update_sql = f"""
-            update video_content_understanding
-            set summary_status = %s, summary_text = %s, understanding_status_ts = %s
-            where id = %s and summary_status = %s;
-        """
-        affected_rows = self.db_client.save(
-            query=update_sql,
-            params=(
-                const.SUCCESS_STATUS,
-                text,
-                datetime.datetime.now(),
-                task_id,
-                const.PROCESSING_STATUS,
-            ),
-        )
-        return affected_rows
-
-    def deal(self):
-        """
-        entrance function for this class
-        """
-        # first of all rollback tasks which have been locked for a long time
-        rollback_rows = self.rollback_lock_tasks()
-        tqdm.write("rollback_lock_tasks: {}".format(rollback_rows))
-
-        # get task list
-        task_list = self.get_summary_task_list()
-        for task in tqdm(task_list, desc="handle each task"):
-            try:
-                self.handle_task_execution(task=task)
-            except Exception as e:
-                log(
-                    task="article_summary_task",
-                    function="deal",
-                    message="fetch_deepseek_response",
-                    data={"error": str(e), "trace_back": traceback.format_exc()},
-                )

+ 0 - 354
coldStartTasks/ai_pipeline/video_to_text.py

@@ -1,354 +0,0 @@
-"""
-@author: luojunhui
-"""
-
-import os
-import time
-import datetime
-import traceback
-
-from pymysql.cursors import DictCursor
-from tqdm import tqdm
-
-from applications import log
-from applications.api import GoogleAIAPI
-from applications.const import VideoToTextConst
-from applications.db import DatabaseConnector
-from config import long_articles_config
-from config import apolloConfig
-from coldStartTasks.ai_pipeline.basic import download_file
-from coldStartTasks.ai_pipeline.basic import update_task_queue_status
-from coldStartTasks.ai_pipeline.basic import roll_back_lock_tasks
-
-# 办公室网络调试需要打开代理
-# os.environ["HTTP_PROXY"] = "http://192.168.100.20:1087"
-# os.environ["HTTPS_PROXY"] = "http://192.168.100.20:1087"
-
-const = VideoToTextConst()
-config = apolloConfig(env="prod")
-
-# pool_size
-POOL_SIZE = int(config.getConfigValue("video_extract_pool_size"))
-# batch_size
-BATCH_SIZE = int(config.getConfigValue("video_extract_batch_size"))
-
-
-class GenerateTextFromVideo(object):
-    """
-    从视频中生成文本
-    """
-
-    def __init__(self):
-        self.google_ai_api = GoogleAIAPI()
-        self.db = DatabaseConnector(db_config=long_articles_config)
-        self.db.connect()
-
-    def get_upload_task_list(self, task_length: int) -> list[dict]:
-        """
-        获取上传视频任务,优先处理高流量池视频内容
-        """
-        fetch_query = f"""
-            select t1.id, t1.video_oss_path
-            from video_content_understanding t1
-            join publish_single_video_source t2 on t1.content_trace_id = t2.content_trace_id
-            where t1.upload_status = {const.INIT_STATUS} 
-                and t2.video_pool_audit_status = {const.AUDIT_SUCCESS_STATUS} 
-                and t2.bad_status = {const.ARTICLE_GOOD_STATUS}
-            order by t2.flow_pool_level
-            limit {task_length};
-            """
-        task_list = self.db.fetch(query=fetch_query, cursor_type=DictCursor)
-        return task_list
-
-    def get_extract_task_list(self) -> list[dict]:
-        """
-        获取处理视频转文本任务
-        """
-        fetch_query = f"""
-            select id, file_name, video_ori_title
-            from video_content_understanding 
-            where upload_status = {const.SUCCESS_STATUS} and understanding_status = {const.INIT_STATUS} 
-            order by file_expire_time
-            limit {BATCH_SIZE};
-        """
-        task_list = self.db.fetch(query=fetch_query, cursor_type=DictCursor)
-        return task_list
-
-    def get_processing_task_num(self) -> int:
-        """
-        get the number of processing task
-        """
-        select_query = f"""
-            select count(1) as processing_count 
-            from video_content_understanding 
-            where file_state = 'PROCESSING' and upload_status = {const.SUCCESS_STATUS};
-        """
-        fetch_response = self.db.fetch(query=select_query, cursor_type=DictCursor)
-        processing_task_num = (
-            fetch_response[0]["processing_count"] if fetch_response else 0
-        )
-        return processing_task_num
-
-    def set_upload_result_for_task(
-        self, task_id: str, file_name: str, file_state: str, expire_time: str
-    ) -> int:
-        """
-        set upload result for task
-        """
-        update_query = f"""
-            update video_content_understanding
-            set upload_status = %s, upload_status_ts = %s, 
-                file_name = %s, file_state = %s, file_expire_time = %s
-            where id = %s and upload_status = %s;
-        """
-        affected_rows = self.db.save(
-            query=update_query,
-            params=(
-                const.SUCCESS_STATUS,
-                datetime.datetime.now(),
-                file_name,
-                file_state,
-                expire_time,
-                task_id,
-                const.PROCESSING_STATUS,
-            ),
-        )
-        return affected_rows
-
-    def set_understanding_result_for_task(
-        self, task_id: str, state: str, text: str
-    ) -> int:
-        update_query = f"""
-            update video_content_understanding
-            set understanding_status = %s, video_text = %s, file_state = %s
-            where id = %s and understanding_status = %s;
-        """
-        affected_rows = self.db.save(
-            query=update_query,
-            params=(
-                const.SUCCESS_STATUS,
-                text,
-                state,
-                task_id,
-                const.PROCESSING_STATUS,
-            ),
-        )
-        return affected_rows
-
-    def upload_video_to_google_ai_task(
-        self, max_processing_video_count: int = POOL_SIZE
-    ):
-        """
-        upload video to google AI and wait for processing
-        """
-        # rollback lock tasks
-        rollback_rows = roll_back_lock_tasks(
-            db_client=self.db,
-            process="upload",
-            init_status=const.INIT_STATUS,
-            processing_status=const.PROCESSING_STATUS,
-            max_process_time=const.MAX_PROCESSING_TIME,
-        )
-        tqdm.write("upload rollback_lock_tasks: {}".format(rollback_rows))
-
-        processing_task_num = self.get_processing_task_num()
-        rest_video_count = max_processing_video_count - processing_task_num
-        if rest_video_count:
-            task_list = self.get_upload_task_list(rest_video_count)
-            for task in tqdm(task_list, desc="upload_video_task"):
-                lock_rows = update_task_queue_status(
-                    db_client=self.db,
-                    task_id=task["id"],
-                    process="upload",
-                    ori_status=const.INIT_STATUS,
-                    new_status=const.PROCESSING_STATUS,
-                )
-                if not lock_rows:
-                    continue
-                try:
-                    file_path = download_file(task["id"], task["video_oss_path"])
-                    google_upload_result = self.google_ai_api.upload_file(file_path)
-                    if google_upload_result:
-                        file_name, file_state, expire_time = google_upload_result
-                        self.set_upload_result_for_task(
-                            task_id=task["id"],
-                            file_name=file_name,
-                            file_state=file_state,
-                            expire_time=expire_time,
-                        )
-                    else:
-                        # roll back status
-                        update_task_queue_status(
-                            db_client=self.db,
-                            task_id=task["id"],
-                            process="upload",
-                            ori_status=const.PROCESSING_STATUS,
-                            new_status=const.FAIL_STATUS,
-                        )
-                        log(
-                            task="video_to_text",
-                            function="upload_video_to_google_ai_task",
-                            message="upload_video_to_google_ai_task failed",
-                            data={
-                                "task_id": task["id"],
-                            },
-                        )
-                except Exception as e:
-                    log(
-                        task="video_to_text",
-                        function="upload_video_to_google_ai_task",
-                        message="upload_video_to_google_ai_task failed",
-                        data={
-                            "error": str(e),
-                            "traceback": traceback.format_exc(),
-                            "task_id": task["id"],
-                        },
-                    )
-                    # roll back status
-                    update_task_queue_status(
-                        db_client=self.db,
-                        task_id=task["id"],
-                        process="upload",
-                        ori_status=const.PROCESSING_STATUS,
-                        new_status=const.FAIL_STATUS,
-                    )
-        else:
-            log(
-                task="video_to_text",
-                function="upload_video_to_google_ai_task",
-                message="task pool is full",
-            )
-
-    def convert_video_to_text_with_google_ai_task(self):
-        """
-        处理视频转文本任务
-        """
-        rollback_rows = roll_back_lock_tasks(
-            db_client=self.db,
-            process="understanding",
-            init_status=const.INIT_STATUS,
-            processing_status=const.PROCESSING_STATUS,
-            max_process_time=const.MAX_PROCESSING_TIME,
-        )
-        tqdm.write("extract rollback_lock_tasks: {}".format(rollback_rows))
-
-        task_list = self.get_extract_task_list()
-        for task in tqdm(task_list, desc="convert video to text"):
-            # LOCK TASK
-            lock_row = update_task_queue_status(
-                db_client=self.db,
-                task_id=task["id"],
-                process="understanding",
-                ori_status=const.INIT_STATUS,
-                new_status=const.PROCESSING_STATUS,
-            )
-            if not lock_row:
-                print("Task has benn locked by other process")
-                continue
-            file_name = task["file_name"]
-            video_local_path = "static/{}.mp4".format(task["id"])
-            try:
-                google_file = self.google_ai_api.get_google_file(file_name)
-                state = google_file.state.name
-                match state:
-                    case "ACTIVE":
-                        try:
-                            video_text = self.google_ai_api.get_video_text(
-                                prompt="分析我上传的视频的画面和音频,用叙述故事的风格将视频所描述的事件进行总结,需要保证视频内容的完整性,并且用中文进行输出,直接返回生成的文本",
-                                video_file=google_file,
-                            )
-                            if video_text:
-                                self.set_understanding_result_for_task(
-                                    task_id=task["id"], state=state, text=video_text
-                                )
-
-                                # delete local file and google file
-                                if os.path.exists(video_local_path):
-                                    os.remove(video_local_path)
-
-                                tqdm.write(
-                                    "video transform to text success, delete local file"
-                                )
-                                task_list.remove(task)
-
-                                self.google_ai_api.delete_video(file_name)
-                                tqdm.write(
-                                    "delete video from google success: {}".format(
-                                        file_name
-                                    )
-                                )
-                            else:
-                                # roll back status and wait for next process
-                                update_task_queue_status(
-                                    db_client=self.db,
-                                    task_id=task["id"],
-                                    process="understanding",
-                                    ori_status=const.PROCESSING_STATUS,
-                                    new_status=const.INIT_STATUS,
-                                )
-
-                        except Exception as e:
-                            # roll back status
-                            update_task_queue_status(
-                                db_client=self.db,
-                                task_id=task["id"],
-                                process="understanding",
-                                ori_status=const.PROCESSING_STATUS,
-                                new_status=const.FAIL_STATUS,
-                            )
-                            tqdm.write(str(e))
-                            continue
-
-                    case "PROCESSING":
-                        update_task_queue_status(
-                            db_client=self.db,
-                            task_id=task["id"],
-                            process="understanding",
-                            ori_status=const.PROCESSING_STATUS,
-                            new_status=const.INIT_STATUS,
-                        )
-                        tqdm.write("video is still processing")
-
-                    case "FAILED":
-                        update_sql = f"""
-                            update video_content_understanding
-                            set file_state = %s, understanding_status = %s, understanding_status_ts = %s
-                            where id = %s and understanding_status = %s;
-                        """
-                        self.db.save(
-                            query=update_sql,
-                            params=(
-                                state,
-                                const.FAIL_STATUS,
-                                datetime.datetime.now(),
-                                task["id"],
-                                const.PROCESSING_STATUS,
-                            ),
-                        )
-                        # delete local file and google file
-                        if os.path.exists(video_local_path):
-                            os.remove(video_local_path)
-
-                        self.google_ai_api.delete_video(file_name)
-                        task_list.remove(task)
-                        tqdm.write("video process failed, delete local file")
-
-                time.sleep(const.SLEEP_SECONDS)
-            except Exception as e:
-                log(
-                    task="video_to_text",
-                    function="extract_video_to_text_task",
-                    message="extract video to text task failed",
-                    data={
-                        "error": str(e),
-                        "traceback": traceback.format_exc(),
-                        "task_id": task["id"],
-                    },
-                )
-                update_task_queue_status(
-                    db_client=self.db,
-                    task_id=task["id"],
-                    process="understanding",
-                    ori_status=const.PROCESSING_STATUS,
-                    new_status=const.FAIL_STATUS,
-                )

+ 0 - 9
run_article_summary.py

@@ -1,9 +0,0 @@
-"""
-@author: luojunhui
-"""
-
-from coldStartTasks.ai_pipeline.summary_text import ArticleSummaryTask
-
-if __name__ == "__main__":
-    task = ArticleSummaryTask()
-    task.deal()

+ 3 - 0
run_video_understanding_with_google.py

@@ -30,6 +30,9 @@ def main():
         )
         task.extract_best_frame_with_gemini_ai()
 
+    # 调用接口,使用 ffmpeg 获取视频的最佳帧作为封面
+    task.get_cover_with_best_frame()
+
 
 if __name__ == "__main__":
     main()

+ 0 - 26
sh/run_article_summary.sh

@@ -1,26 +0,0 @@
-#!/bin/bash
-
-# 获取当前日期,格式为 YYYY-MM-DD
-CURRENT_DATE=$(date +%F)
-
-# 日志文件路径,包含日期
-LOG_FILE="/root/luojunhui/logs/article_summary_log_$CURRENT_DATE.txt"
-
-# 重定向整个脚本的输出到带日期的日志文件
-exec >> "$LOG_FILE" 2>&1
-if pgrep -f "python3 run_article_summary.py" > /dev/null
-then
-    echo "$(date '+%Y-%m-%d %H:%M:%S') - run_article_summary.py is running"
-else
-    echo "$(date '+%Y-%m-%d %H:%M:%S') - trying to restart run_article_summary.py"
-    # 切换到指定目录
-    cd /root/luojunhui/LongArticlesJob
-
-    # 激活 Conda 环境
-    source /root/miniconda3/etc/profile.d/conda.sh
-    conda activate tasks
-
-    # 在后台运行 Python 脚本并重定向日志输出
-    nohup python3 run_article_summary.py >> "${LOG_FILE}" 2>&1 &
-    echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restarted run_article_summary.py"
-fi