Procházet zdrojové kódy

Merge branch '2025-04-07-luojunhui-video-best-frame' of luojunhui/LongArticlesJob into master

luojunhui před 6 měsíci
rodič
revize
bafc4a1299

+ 2 - 1
applications/api/__init__.py

@@ -5,4 +5,5 @@ from .aigc_system_api import AigcSystemApi
 from .deep_seek_api_by_volcanoengine import fetch_deepseek_response
 from .moon_shot_api import fetch_moon_shot_response
 from .nlp_api import similarity_between_title_list
-from .gewe_api import WechatChannelAPI
+from .gewe_api import WechatChannelAPI
+from .google_ai_api import GoogleAIAPI

+ 90 - 0
applications/api/google_ai_api.py

@@ -0,0 +1,90 @@
+"""
+@author: luojunhui
+"""
+
+from google import genai
+from tqdm import tqdm
+
+
+class GoogleAIAPI(object):
+    """
+    Google 视频内容理解API
+    """
+    def __init__(self):
+        self.api_key = 'AIzaSyBAFFL6yHa4kUK-YcuAw8tbiSHQB6oJG34'
+        self.client = genai.Client(api_key=self.api_key)
+
+    def upload_file(self, file_path: str):
+        """
+        file_path: 文件路径
+        """
+        tqdm.write("start uploading file: {}".format(file_path))
+        try:
+            video_file = self.client.files.upload(file=file_path)
+            file_name = video_file.name
+            file_state = video_file.state.name
+            expire_time = video_file.expiration_time
+            tqdm.write("success uploaded file: {}".format(file_path))
+            return file_name, file_state, expire_time
+        except Exception as e:
+            tqdm.write("fail to upload file: {} because {}".format(file_path, e))
+            return None
+
+    def get_file_status(self, file_name: str,):
+        """
+        获取文件的状态
+        file_name: 文件名称
+        """
+        try:
+            video_file = self.client.files.get(name=file_name)
+            state = video_file.state.name
+            return state
+        except Exception as e:
+            print(e)
+            return None
+
+    def get_google_file(self, file_name: str):
+        """
+        获取文件
+        file_name: 文件名称
+        """
+        try:
+            video_file = self.client.files.get(name=file_name)
+            return video_file
+        except Exception as e:
+            print(e)
+            return None
+
+    def fetch_info_from_google_ai(self, prompt, video_file):
+        """
+        获取视频文本
+        prompt: 提示词
+        video_file: <class 'google.genai.types.File'>
+        """
+        response = self.client.models.generate_content(
+            model='gemini-2.0-flash',
+            contents=[
+                video_file,
+                prompt
+            ]
+        )
+        return response.text
+
+    def delete_video(self, file_name: str):
+        """
+        删除视频
+        """
+        self.client.files.delete(name=file_name)
+
+    def get_file_list(self):
+        """
+        获取文件列表
+        """
+        file_list = self.client.files.list()
+        return file_list
+
+
+
+
+
+

+ 36 - 7
applications/const/__init__.py

@@ -8,6 +8,7 @@ class ColdStartTaskConst:
     """
     冷启动任务常量配置
     """
+
     PUBLISHED_STATUS = 2  # 文章已发布状态
     INIT_STATUS = 1  # 文章初始状态
     BAD_STATUS = 0  # 低质量文章状态
@@ -55,6 +56,7 @@ class updatePublishedMsgTaskConst:
     """
     更新已发布文章消息常量配置
     """
+
     # 爬虫详情接口返回code
     ARTICLE_ILLEGAL_CODE = 25012
     ARTICLE_DELETE_CODE = 25005
@@ -92,6 +94,7 @@ class UpdateAccountReadRateTaskConst:
     """
     更新账号阅读率常量配置
     """
+
     # 阅读率统计周期(秒)
     STATISTICS_PERIOD = 31 * 24 * 60 * 60
     # 一天的秒数
@@ -115,15 +118,16 @@ class UpdateAccountReadAvgTaskConst:
     """
     更新账号阅读均值常量配置
     """
+
     # 投流账号
     TOULIU_ACCOUNTS = {
-        'gh_93e00e187787',
-        'gh_ac43e43b253b',
-        'gh_68e7fdc09fe4',
-        'gh_77f36c109fb1',
-        'gh_b181786a6c8c',
-        'gh_1ee2e1b39ccf',
-        'gh_d3f039c9db2b'
+        "gh_93e00e187787",
+        "gh_ac43e43b253b",
+        "gh_68e7fdc09fe4",
+        "gh_77f36c109fb1",
+        "gh_b181786a6c8c",
+        "gh_1ee2e1b39ccf",
+        "gh_d3f039c9db2b",
     }
 
     # 发文模式
@@ -148,6 +152,7 @@ class WeixinVideoCrawlerConst:
     """
     微信视频抓取常量配置
     """
+
     # 账号抓取状态
     ACCOUNT_CRAWL_STATUS = 1
     ACCOUNT_DO_NOT_CRAWL_STATUS = 0
@@ -225,6 +230,7 @@ class AccountAssociationTaskConst:
     """
     账号联想任务常量配置
     """
+
     # 获取种子标题的统计周期
     STAT_PERIOD = 7 * 24 * 60 * 60
 
@@ -243,6 +249,7 @@ class ArticleCollectorConst:
     """
     文章采集任务常量配置
     """
+
     # 发送方式
 
     # 手动推送
@@ -273,6 +280,7 @@ class BaiduVideoCrawlerConst:
     """
     const for baidu video crawler
     """
+
     # account status
     BAIDU_ACCOUNT_GOOD_STATUS = 1
     BAIDU_ACCOUNT_BAD_STATUS = 0
@@ -294,6 +302,7 @@ class TitleRewriteTaskConst:
     """
     title rewrite task const
     """
+
     # title rewrite status
     TITLE_REWRITE_INIT_STATUS = 0
     TITLE_REWRITE_SUCCESS_STATUS = 1
@@ -318,6 +327,7 @@ class ChannelVideoCrawlerConst:
     """
     const for baidu video crawler
     """
+
     # account status
     CHANNEL_ACCOUNT_GOOD_STATUS = 1
     CHANNEL_ACCOUNT_BAD_STATUS = 0
@@ -345,6 +355,7 @@ class ToutiaoVideoCrawlerConst:
     """
     const for toutiao video crawler
     """
+
     # platform
     PLATFORM = "toutiao"
 
@@ -372,12 +383,30 @@ class SingleVideoPoolPublishTaskConst:
     """
     const for single video pool publish task
     """
+
     TRANSFORM_INIT_STATUS = 0
     TRANSFORM_SUCCESS_STATUS = 1
     TRANSFORM_FAIL_STATUS = 99
 
 
+class GoogleVideoUnderstandTaskConst:
+    # task batch size
+    BATCH_SIZE = 100
 
+    # task status
+    INIT_STATUS = 0
+    PROCESSING_STATUS = 1
+    SUCCESS_STATUS = 2
+    FAIL_STATUS = 99
 
+    # sleep seconds
+    SLEEP_SECONDS = 60
 
+    # max processing time
+    MAX_PROCESSING_TIME = 3600
 
+    # task info
+    TABLE_NAME = "long_articles_new_video_cover"
+    TASK_NAME = "extract_video_best_frame_as_cover"
+    DIR_NAME = "static"
+    POOL_SIZE = 15

+ 5 - 0
coldStartTasks/ai_pipeline/__init__.py

@@ -0,0 +1,5 @@
+"""
+@author: luojunhui
+"""
+
+from .extract_video_best_frame import ExtractVideoBestFrame

+ 217 - 0
coldStartTasks/ai_pipeline/basic.py

@@ -0,0 +1,217 @@
+import os
+import re
+import json
+import time
+import datetime
+import requests
+from typing import Optional, Dict
+
+from requests.exceptions import RequestException
+from tenacity import retry
+
+from applications.db import DatabaseConnector
+from applications.utils import request_retry
+
+retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30)
+
+
+def get_status_field_by_task(task: str) -> tuple[str, str]:
+    match task:
+        case "upload":
+            status = "upload_status"
+            update_timestamp = "upload_status_ts"
+        case "extract":
+            status = "extract_status"
+            update_timestamp = "extract_status_ts"
+        case "get_cover":
+            status = "get_cover_status"
+            update_timestamp = "get_cover_status_ts"
+        case _:
+            raise ValueError(f"Unexpected task: {task}")
+    return status, update_timestamp
+
+
+def roll_back_lock_tasks(
+    db_client: DatabaseConnector,
+    task: str,
+    max_process_time: int,
+    init_status: int,
+    processing_status: int,
+) -> int:
+    """
+    rollback tasks which have been locked for a long time
+    """
+    status, update_timestamp = get_status_field_by_task(task)
+    now_timestamp = int(time.time())
+    timestamp_threshold = datetime.datetime.fromtimestamp(
+        now_timestamp - max_process_time
+    )
+    update_query = f"""
+        update long_articles_new_video_cover
+        set {status} = %s
+        where {status} = %s and {update_timestamp} < %s;
+    """
+    rollback_rows = db_client.save(
+        query=update_query, params=(init_status, processing_status, timestamp_threshold)
+    )
+    return rollback_rows
+
+
+def download_file(task_id, oss_path):
+    """
+    下载视频文件
+    """
+    video_url = "https://rescdn.yishihui.com/" + oss_path
+    file_name = "static/{}.mp4".format(task_id)
+    if os.path.exists(file_name):
+        return file_name
+
+    proxies = {"http": None, "https": None}
+    with open(file_name, "wb") as f:
+        response = requests.get(video_url, proxies=proxies)
+        f.write(response.content)
+    return file_name
+
+
+def update_task_queue_status(
+    db_client: DatabaseConnector,
+    task_id: int,
+    task: str,
+    ori_status: int,
+    new_status: int,
+) -> int:
+    # update task queue status
+    status, update_timestamp = get_status_field_by_task(task)
+    update_query = f"""
+        update long_articles_new_video_cover
+        set {status} = %s, {update_timestamp} = %s
+        where {status} = %s and id = %s;
+    """
+    update_rows = db_client.save(
+        query=update_query,
+        params=(
+            new_status,
+            datetime.datetime.now(),
+            ori_status,
+            task_id,
+        ),
+    )
+    return update_rows
+
+
+def extract_best_frame_prompt():
+    extract_prompt = """
+    以下为视频爆款封面的评分体系,请根据视频内容,逐帧分析视频画面,根据以下标准进行打分,最终先把分数最高的画面出现的时间输出给我,并精确到几时几分几秒几毫秒,格式:hh:mm:ss.xxx,要求是确切时间而不是时间段。
+    评分维度:
+    一、现实关切度 (满分15分):
+    高度相关(13-15分): 封面主题直接涉及老年人的生活、健康、财产等切身利益,例如养老、退休、健康、食品安全、子女教育、财产安全等。
+    中度相关(8-12分): 封面主题与老年人的生活有一定的关联,例如家长里短、邻里关系、社会热点、生活窍门等。
+    低度相关(4-7分): 封面主题与老年人的生活关系较弱,例如娱乐八卦、时尚潮流等。
+    无关(0-3分): 封面主题与老年人的生活基本无关。
+    二、社会认同感与亲切感 (满分15分):
+    高度认同(13-15分): 封面人物形象亲切、接地气,场景贴近老年人的生活,有强烈的代入感和归属感。
+    中度认同(8-12分): 封面人物形象尚可接受,场景有一定的熟悉感。
+    低度认同(4-7分): 封面人物形象陌生,场景较为遥远。
+    无认同感(0-3分): 封面人物形象令人反感,场景与老年人生活无关。
+    三、信息传达效率 (满分15分):
+    高效传达(13-15分): 封面文字简洁直白、字体较大、重点突出,视觉元素聚焦,能让老年人快速理解视频内容,色彩明快,对比强烈,视觉冲击力强。
+    中效传达(8-12分): 封面文字尚可,视觉元素有一定吸引力,但略显复杂。
+    低效传达(4-7分): 封面文字晦涩难懂,视觉元素杂乱无章。色彩暗淡,对比度弱。
+    无效传达(0-3分): 封面信息表达不清,无法理解视频内容。
+    四、情感共鸣与回忆杀 (满分20分):
+    高度共鸣(17-20分): 封面主题能够引发老年人对过去岁月的回忆,勾起他们对老朋友、老时光的思念,产生强烈的情感共鸣,引发对晚年生活的思考。例如同学情、怀旧主题等。
+    中度共鸣(11-16分): 封面主题有一定怀旧元素,能引发部分老年人的回忆,并进行一定程度的思考。
+    低度共鸣(6-10分): 封面主题怀旧元素较少,共鸣感不强。
+    无共鸣(0-5分): 封面主题与回忆无关。
+    五、正能量与精神寄托 (满分15分):
+    高度正能量(13-15分): 封面内容积极向上,能够给予老年人希望和力量,或者包含宗教、祈福等元素,满足他们的精神寄托。
+    中度正能量(8-12分): 封面内容有一定的积极意义,但不够突出。
+    低度正能量(4-7分): 封面内容较为平淡,缺乏精神寄托。
+    负能量 (0-3分): 封面内容消极悲观,或者与老年人的信仰不符。
+    六、节日/时事关联性 (满分10分):
+    高度关联(8-10分): 封面与节日、时事热点紧密相关,能激发老年人的分享欲和参与感。
+    中度关联(5-7分): 封面与节日或时事有一定关联,但并非核心。
+    低度关联(2-4分): 封面与节日或时事关联较弱。
+    无关联(0-1分): 封面与节日、时事无关。
+    七、传播动机 (满分10分):
+    强传播动机(8-10分): 封面内容能激发老年人强烈的情感,例如激动、感动、惊讶等,让他们想要分享给家人朋友,或者认为视频内容对他人有帮助,有分享的价值。
+    中等传播动机(5-7分): 封面内容有一定分享价值,但分享意愿不强烈。
+    低传播动机(2-4分): 封面内容平淡,缺乏分享动力。
+    无传播动机(0-1分): 封面内容无分享价值,或者会引起不适,降低传播意愿。
+    八、附加分(满分60分)
+    1.包含老年人为画面主体(0-5分)
+    2.有超过3人为画面主体(0-5分)
+    3.充斥画面的密集人群为画面主体(0-5分)
+    4.存在知名历史、近代人物(0-5分)
+    5.存在人物脸部、头部未完整出现在画面的情况(0-5分)
+    6.是不以人为主体的鲜花、美景、知名建筑或风景(0-5分)
+    7.是老照片、怀旧风格(0-5分)
+    8.是农村、军事、综艺演出、历史画面(0-5分)
+    9.有趣味、惊奇的形象或画面为主体(0-5分)
+    10.以大号文字或密集文字为主体并且不包含人物(0-5分)
+    11.是不包含人物的纯色画面(0-5分)
+    12.是模糊的或清晰度、像素较低的(0-5分)
+    
+    总分评估:110-160分: 高度吸引老年人群体,有成为爆款的潜力。
+    80-109分: 具有一定吸引力,值得尝试。
+    65-79分: 吸引力一般,需要优化。
+    65分以下: 吸引力不足,不建议使用。
+    
+    注意:输出只需要返回 'hh:mm:ss.xxx' 格式的时间, 无需返回任何其他东西
+    """
+    return extract_prompt
+
+
+@retry(**retry_desc)
+def get_video_cover(video_oss_path: str, time_millisecond_str: str) -> Optional[Dict]:
+    """
+    input video oss path and time millisecond
+    output video cover image oss path
+    """
+    video_url = "https://rescdn.yishihui.com/" + video_oss_path
+    url = "http://192.168.205.80:8080/ffmpeg/fetchKeyFrames"
+    data = {"url": video_url, "timestamp": time_millisecond_str}
+    headers = {
+        "content-type": "application/json",
+    }
+    try:
+        response = requests.post(url, headers=headers, json=data, timeout=60)
+        response.raise_for_status()
+        return response.json()
+    except RequestException as e:
+        print(f"API请求失败: {e}")
+    except json.JSONDecodeError as e:
+        print(f"响应解析失败: {e}")
+    return None
+
+
+def normalize_time_str(time_string: str) -> str | None:
+    # 预处理:替换中文冒号、去除特殊标记、清理空格
+    time_str = re.sub(r":", ":", time_string)  # 中文冒号转英文
+    time_str = re.sub(r"\s*(\d+\.\s*)?\*+\s*", "", time_str)  # 去除数字编号和**标记
+    time_str = time_str.strip()
+
+    # 组合式正则匹配(按优先级排序)
+    patterns = [
+        # hh:mm:ss.xxx
+        (r"^(\d{2}):(\d{2}):(\d{2})\.(\d{3})$", None),
+        # h:mm:ss.xxx
+        (
+            r"^(\d{1}):(\d{2}):(\d{2})\.(\d{3})$",
+            lambda m: f"0{m[1]}:{m[2]}:{m[3]}.{m[4]}",
+        ),
+        # mm:ss.xxx
+        (r"^(\d{2}):(\d{2})\.(\d{3})$", lambda m: f"00:{m[1]}:{m[2]}.{m[3]}"),
+        # m:ss.xxx
+        (r"^(\d{1}):(\d{2})\.(\d{3})$", lambda m: f"00:0{m[1]}:{m[2]}.{m[3]}"),
+    ]
+
+    for pattern, processor in patterns:
+        if match := re.fullmatch(pattern, time_str):
+            return processor(match) if processor else time_str
+
+    # 特殊处理 dd:dd:ddd 格式(假设最后3位为毫秒)
+    if m := re.fullmatch(r"(\d{2}:\d{2}):(\d{3})", time_str):
+        return f"00:{m[1]}.{m[2]}"
+
+    return None

+ 459 - 0
coldStartTasks/ai_pipeline/extract_video_best_frame.py

@@ -0,0 +1,459 @@
+"""
+@author luojunhui
+@desc find best frame from each video
+"""
+
+import os
+import datetime
+import traceback
+from tqdm import tqdm
+from pymysql.cursors import DictCursor
+
+from applications import log
+from applications.api import GoogleAIAPI
+from applications.const import GoogleVideoUnderstandTaskConst
+from applications.db import DatabaseConnector
+from config import long_articles_config
+from coldStartTasks.ai_pipeline.basic import download_file
+from coldStartTasks.ai_pipeline.basic import update_task_queue_status
+from coldStartTasks.ai_pipeline.basic import roll_back_lock_tasks
+from coldStartTasks.ai_pipeline.basic import extract_best_frame_prompt
+from coldStartTasks.ai_pipeline.basic import get_video_cover
+from coldStartTasks.ai_pipeline.basic import normalize_time_str
+
+const = GoogleVideoUnderstandTaskConst()
+google_ai = GoogleAIAPI()
+
+
+class ExtractVideoBestFrame:
+    """
+    extract video best frame from each video by GeminiAI
+    """
+
+    def __init__(self):
+        self.db_client = DatabaseConnector(db_config=long_articles_config)
+        self.db_client.connect()
+
+    def _roll_back_lock_tasks(self, task: str) -> int:
+        return roll_back_lock_tasks(
+            db_client=self.db_client,
+            task=task,
+            init_status=const.INIT_STATUS,
+            processing_status=const.PROCESSING_STATUS,
+            max_process_time=const.MAX_PROCESSING_TIME,
+        )
+
+    def _lock_task(self, task_id: int, task_name) -> int:
+        return update_task_queue_status(
+            db_client=self.db_client,
+            task_id=task_id,
+            task=task_name,
+            ori_status=const.INIT_STATUS,
+            new_status=const.PROCESSING_STATUS,
+        )
+
+    def get_upload_task_list(self, task_num: int = const.POOL_SIZE) -> list[dict]:
+        """
+        get upload task list
+        """
+        fetch_query = f"""
+            select id, video_oss_path from {const.TABLE_NAME} 
+            where upload_status = {const.INIT_STATUS}
+            order by priority desc
+            limit {task_num};
+        """
+        upload_task_list = self.db_client.fetch(
+            query=fetch_query, cursor_type=DictCursor
+        )
+        return upload_task_list
+
+    def get_extract_task_list(self, task_num: int = const.POOL_SIZE) -> list[dict]:
+        """
+        get extract task list
+        """
+        fetch_query = f"""
+            select id, file_name from {const.TABLE_NAME} 
+            where upload_status = {const.SUCCESS_STATUS} and extract_status = {const.INIT_STATUS}
+            order by file_expire_time
+            limit {task_num};
+        """
+        extract_task_list = self.db_client.fetch(
+            query=fetch_query, cursor_type=DictCursor
+        )
+        return extract_task_list
+
+    def get_cover_task_list(self) -> list[dict]:
+        """
+        get cover task list
+        """
+        fetch_query = f"""
+            select id, video_oss_path, best_frame_time_ms from {const.TABLE_NAME}
+            where extract_status = {const.SUCCESS_STATUS} and get_cover_status = {const.INIT_STATUS};
+            """
+        extract_task_list = self.db_client.fetch(
+            query=fetch_query, cursor_type=DictCursor
+        )
+        return extract_task_list
+
+    def get_processing_task_pool_size(self) -> int:
+        """
+        get processing task pool size
+        """
+        fetch_query = f"""
+            select count(1) as pool_size from {const.TABLE_NAME}
+            where upload_status = {const.SUCCESS_STATUS} and file_state = 'PROCESSING' and extract_status = {const.INIT_STATUS};
+        """
+        fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
+        processing_task_pool_size = (
+            fetch_response[0]["pool_size"] if fetch_response else 0
+        )
+        return processing_task_pool_size
+
+    def set_upload_result(
+        self, task_id: int, file_name: str, file_state: str, file_expire_time: str
+    ) -> int:
+        update_query = f"""
+            update {const.TABLE_NAME} 
+            set upload_status = %s, upload_status_ts = %s,
+                file_name = %s, file_state = %s, file_expire_time = %s
+            where id = %s and upload_status = %s;
+        """
+        update_rows = self.db_client.save(
+            query=update_query,
+            params=(
+                const.SUCCESS_STATUS,
+                datetime.datetime.now(),
+                file_name,
+                file_state,
+                file_expire_time,
+                task_id,
+                const.PROCESSING_STATUS,
+            ),
+        )
+        return update_rows
+
+    def set_extract_result(
+        self, task_id: int, file_state: str, best_frame_time_ms: str
+    ) -> int:
+        update_query = f"""
+            update {const.TABLE_NAME} 
+            set extract_status = %s, extract_status_ts = %s,
+                file_state = %s, best_frame_time_ms = %s
+            where id = %s and extract_status = %s;
+        """
+        update_rows = self.db_client.save(
+            query=update_query,
+            params=(
+                const.SUCCESS_STATUS,
+                datetime.datetime.now(),
+                file_state,
+                best_frame_time_ms,
+                task_id,
+                const.PROCESSING_STATUS,
+            ),
+        )
+        return update_rows
+
+    def set_cover_result(self, task_id: int, cover_oss_path: str) -> int:
+        update_query = f"""
+            update {const.TABLE_NAME}
+            set cover_oss_path = %s, get_cover_status = %s, get_cover_status_ts = %s
+            where id = %s and get_cover_status = %s;
+        """
+        update_rows = self.db_client.save(
+            query=update_query,
+            params=(
+                cover_oss_path,
+                const.SUCCESS_STATUS,
+                datetime.datetime.now(),
+                task_id,
+                const.PROCESSING_STATUS,
+            ),
+        )
+        return update_rows
+
+    def upload_each_video(self, task: dict) -> None:
+        lock_status = self._lock_task(task_id=task["id"], task_name="upload")
+        if not lock_status:
+            return None
+
+        try:
+            file_path = download_file(task["id"], task["video_oss_path"])
+            upload_response = google_ai.upload_file(file_path)
+            if upload_response:
+                file_name, file_state, expire_time = upload_response
+                self.set_upload_result(
+                    task_id=task["id"],
+                    file_name=file_name,
+                    file_state=file_state,
+                    file_expire_time=expire_time,
+                )
+                return None
+            else:
+                # set status as fail
+                update_task_queue_status(
+                    db_client=self.db_client,
+                    task_id=task["id"],
+                    task="upload",
+                    ori_status=const.PROCESSING_STATUS,
+                    new_status=const.FAIL_STATUS,
+                )
+                return None
+        except Exception as e:
+            log(
+                task=const.TASK_NAME,
+                function="upload_video_to_gemini_ai",
+                message="task_failed",
+                data={
+                    "task_id": task["id"],
+                    "track_back": traceback.format_exc(),
+                    "error": str(e),
+                },
+            )
+            update_task_queue_status(
+                db_client=self.db_client,
+                task_id=task["id"],
+                task="upload",
+                ori_status=const.PROCESSING_STATUS,
+                new_status=const.FAIL_STATUS,
+            )
+            return None
+
+    def upload_video_to_gemini_ai(
+        self, max_processing_pool_size: int = const.POOL_SIZE
+    ) -> None:
+        # upload video to gemini ai
+        roll_back_lock_tasks_count = self._roll_back_lock_tasks(task="upload")
+        log(
+            task=const.TASK_NAME,
+            function="upload_video_to_gemini_ai",
+            message=f"roll_back_lock_tasks_count: {roll_back_lock_tasks_count}",
+        )
+        processing_task_num = self.get_processing_task_pool_size()
+        res_task_num = max_processing_pool_size - processing_task_num
+        if res_task_num:
+            upload_task_list = self.get_upload_task_list(task_num=res_task_num)
+            for task in tqdm(upload_task_list, desc="upload_video_to_gemini_ai"):
+                self.upload_each_video(task=task)
+
+        else:
+            log(
+                task=const.TASK_NAME,
+                function="upload_video_to_gemini_ai",
+                message="reach pool size, no more space for task to upload",
+            )
+
+    def extract_each_video(self, task: dict) -> None:
+        # lock task
+        lock_status = self._lock_task(task_id=task["id"], task_name="extract")
+        if not lock_status:
+            return None
+
+        file_name = task["file_name"]
+        video_local_path = os.path.join(const.DIR_NAME, "{}.mp4".format(task["id"]))
+        try:
+            google_file = google_ai.get_google_file(file_name)
+            state = google_file.state.name
+            match state:
+                case "PROCESSING":
+                    # google is still processing this video
+                    update_task_queue_status(
+                        db_client=self.db_client,
+                        task_id=task["id"],
+                        task="extract",
+                        ori_status=const.PROCESSING_STATUS,
+                        new_status=const.INIT_STATUS,
+                    )
+                    log(
+                        task=const.TASK_NAME,
+                        function="extract_best_frame_with_gemini_ai",
+                        message="google is still processing this video",
+                        data={
+                            "task_id": task["id"],
+                            "file_name": file_name,
+                            "state": state,
+                        },
+                    )
+
+                case "FAILED":
+                    # google process this video failed
+                    update_query = f"""
+                        update {const.TABLE_NAME}
+                        set file_state = %s, extract_status = %s, extract_status_ts = %s
+                        where id = %s and extract_status = %s;
+                    """
+                    self.db_client.save(
+                        query=update_query,
+                        params=(
+                            "FAILED",
+                            const.FAIL_STATUS,
+                            datetime.datetime.now(),
+                            task["id"],
+                            const.PROCESSING_STATUS,
+                        ),
+                    )
+                    log(
+                        task=const.TASK_NAME,
+                        function="extract_best_frame_with_gemini_ai",
+                        message="google process this video failed",
+                        data={
+                            "task_id": task["id"],
+                            "file_name": file_name,
+                            "state": state,
+                        },
+                    )
+
+                case "ACTIVE":
+                    # video process successfully
+                    try:
+                        best_frame_time_ms = google_ai.fetch_info_from_google_ai(
+                            prompt=extract_best_frame_prompt(),
+                            video_file=google_file,
+                        )
+                        if best_frame_time_ms:
+                            self.set_extract_result(
+                                task_id=task["id"],
+                                file_state="ACTIVE",
+                                best_frame_time_ms=best_frame_time_ms.strip(),
+                            )
+                        else:
+                            update_task_queue_status(
+                                db_client=self.db_client,
+                                task_id=task["id"],
+                                task="extract",
+                                ori_status=const.PROCESSING_STATUS,
+                                new_status=const.FAIL_STATUS,
+                            )
+                        # delete local file and google file
+                        if os.path.exists(video_local_path):
+                            os.remove(video_local_path)
+
+                        google_ai.delete_video(file_name)
+                        log(
+                            task=const.TASK_NAME,
+                            function="extract_best_frame_with_gemini_ai",
+                            message="video process successfully",
+                            data={
+                                "task_id": task["id"],
+                                "file_name": file_name,
+                                "state": state,
+                                "best_frame_time_ms": best_frame_time_ms,
+                            },
+                        )
+                    except Exception as e:
+                        log(
+                            task=const.TASK_NAME,
+                            function="extract_best_frame_with_gemini_ai",
+                            message="task_failed_inside_cycle",
+                            data={
+                                "task_id": task["id"],
+                                "track_back": traceback.format_exc(),
+                                "error": str(e),
+                            },
+                        )
+                        update_task_queue_status(
+                            db_client=self.db_client,
+                            task_id=task["id"],
+                            task="extract",
+                            ori_status=const.PROCESSING_STATUS,
+                            new_status=const.FAIL_STATUS,
+                        )
+
+        except Exception as e:
+            log(
+                task=const.TASK_NAME,
+                function="extract_best_frame_with_gemini_ai",
+                message="task_failed_outside_cycle",
+                data={
+                    "task_id": task["id"],
+                    "track_back": traceback.format_exc(),
+                    "error": str(e),
+                },
+            )
+            update_task_queue_status(
+                db_client=self.db_client,
+                task_id=task["id"],
+                task="extract",
+                ori_status=const.PROCESSING_STATUS,
+                new_status=const.FAIL_STATUS,
+            )
+
+    def extract_best_frame_with_gemini_ai(self):
+        # roll back lock tasks
+        roll_back_lock_tasks_count = self._roll_back_lock_tasks(task="extract")
+        log(
+            task=const.TASK_NAME,
+            function="extract_best_frame_with_gemini_ai",
+            message=f"roll_back_lock_tasks_count: {roll_back_lock_tasks_count}",
+        )
+        # do extract frame task
+        task_list = self.get_extract_task_list()
+        for task in tqdm(task_list, desc="extract_best_frame_with_gemini_ai"):
+            self.extract_each_video(task=task)
+
+    def get_each_cover(self, task: dict) -> None:
+        lock_status = self._lock_task(task_id=task["id"], task_name="get_cover")
+        if not lock_status:
+            return None
+
+        time_str = normalize_time_str(task["best_frame_time_ms"])
+        if time_str:
+            response = get_video_cover(
+                video_oss_path=task["video_oss_path"], time_millisecond_str=time_str
+            )
+            log(
+                task=const.TASK_NAME,
+                function="extract_cover_with_ffmpeg",
+                message="get_video_cover_with_ffmpeg",
+                data={
+                    "task_id": task["id"],
+                    "video_oss_path": task["video_oss_path"],
+                    "time_millisecond_str": time_str,
+                    "response": response,
+                },
+            )
+            if response["success"] and response["data"]:
+                cover_oss_path = response["data"]
+                self.set_cover_result(task_id=task["id"], cover_oss_path=cover_oss_path)
+            else:
+                update_task_queue_status(
+                    db_client=self.db_client,
+                    task_id=task["id"],
+                    task="get_cover",
+                    ori_status=const.PROCESSING_STATUS,
+                    new_status=const.FAIL_STATUS,
+                )
+        else:
+            log(
+                task=const.TASK_NAME,
+                function="extract_cover_with_ffmpeg",
+                message="time_str format is not correct",
+                data={
+                    "task_id": task["id"],
+                    "video_oss_path": task["video_oss_path"],
+                    "time_millisecond_str": time_str,
+                },
+            )
+            update_task_queue_status(
+                db_client=self.db_client,
+                task_id=task["id"],
+                task="get_cover",
+                ori_status=const.PROCESSING_STATUS,
+                new_status=const.FAIL_STATUS,
+            )
+
+    def get_cover_with_best_frame(self):
+        """
+        get cover with best frame
+        """
+        # roll back lock tasks
+        roll_back_lock_tasks_count = self._roll_back_lock_tasks(task="get_cover")
+        log(
+            task=const.TASK_NAME,
+            function="extract_cover_with_ffmpeg",
+            message=f"roll_back_lock_tasks_count: {roll_back_lock_tasks_count}",
+        )
+        # get task list
+        task_list = self.get_cover_task_list()
+        for task in tqdm(task_list, desc="extract_cover_with_ffmpeg"):
+            self.get_each_cover(task=task)

+ 57 - 0
run_video_understanding_with_google.py

@@ -0,0 +1,57 @@
+"""
+@author: luojunhui
+"""
+
+import datetime
+import multiprocessing
+
+from applications import log
+from coldStartTasks.ai_pipeline import ExtractVideoBestFrame
+
+PROCESS_EXIT_TIMEOUT = 10 * 60
+
+
+def start_task():
+    task = ExtractVideoBestFrame()
+
+    # 查询有多少任务正在处理中
+    processing_tasks = task.get_processing_task_pool_size()
+
+    if processing_tasks:
+        print(
+            f"{datetime.datetime.now()} 当前有 {processing_tasks} 个任务正在等待 google 处理..."
+        )
+        task.extract_best_frame_with_gemini_ai()
+    else:
+        print(f"{datetime.datetime.now()} 没有任务正在处理中...")
+        # upload video to google ai
+        task.upload_video_to_gemini_ai()
+        log(
+            task="video_understanding_with_google",
+            function="main",
+            message="upload_video_to_google_ai_task",
+        )
+        task.extract_best_frame_with_gemini_ai()
+
+    # 调用接口,使用 ffmpeg 获取视频的最佳帧作为封面
+    task.get_cover_with_best_frame()
+
+
+def main():
+    # create sub process
+    process = multiprocessing.Process(target=start_task)
+    process.start()
+
+    # wait for sub process to finish
+    process.join(PROCESS_EXIT_TIMEOUT)
+
+    if process.is_alive():
+        print(
+            f"Process {process.pid} did not finish within {PROCESS_EXIT_TIMEOUT} seconds. Terminating..."
+        )
+        process.terminate()
+        process.join()
+
+
+if __name__ == "__main__":
+    main()

+ 27 - 0
sh/run_upload_video_to_google.sh

@@ -0,0 +1,27 @@
+#!/bin/bash
+
+# 获取当前日期,格式为 YYYY-MM-DD
+CURRENT_DATE=$(date +%F)
+
+# 日志文件路径,包含日期
+LOG_FILE="/root/luojunhui/logs/video_upload_to_google_log_$CURRENT_DATE.txt"
+
+# 重定向整个脚本的输出到带日期的日志文件
+exec >> "$LOG_FILE" 2>&1
+
+if pgrep -f "python3 run_video_understanding_with_google.py" > /dev/null
+then
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - run_video_understanding_with_google.py is running"
+else
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - trying to restart run_video_understanding_with_google.py"
+    # 切换到指定目录
+    cd /root/luojunhui/LongArticlesJob
+
+    # 激活 Conda 环境
+    source /root/miniconda3/etc/profile.d/conda.sh
+    conda activate tasks
+
+    # 在后台运行 Python 脚本并重定向日志输出
+    nohup python3 run_video_understanding_with_google.py >> "${LOG_FILE}" 2>&1 &
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restarted run_video_understanding_with_google.py"
+fi