Kaynağa Gözat

增加视频分析代码

zhangyong 6 ay önce
ebeveyn
işleme
77691ff5a1

+ 33 - 0
analyze_video.py

@@ -0,0 +1,33 @@
+from fastapi import FastAPI, HTTPException
+from pydantic import BaseModel
+import uvicorn
+from google_ai.generativeai_video import main
+
+app = FastAPI()
+
+class VideoRequest(BaseModel):
+    video_path: str
+
+@app.post("/process_video/")
+async def process_video(request: VideoRequest):
+    """处理视频请求"""
+    video_path = request.video_path
+
+    # 调用 main 函数进行视频处理
+    try:
+        print("来一个请求")
+        # 确保 main 函数是异步的
+        result = await main(video_path)
+        return {
+            "code": 0,
+            "message": "视频处理成功",
+            "result": result
+        }
+    except Exception as e:
+        raise HTTPException(status_code=500, detail=f"视频处理失败: {str(e)}")
+
+if __name__ == "__main__":
+    # proxy_url = 'http://127.0.0.1:1081'
+    # os.environ["http_proxy"] = proxy_url
+    # os.environ["https_proxy"] = proxy_url
+    uvicorn.run(app, host="0.0.0.0", port=8080)

+ 0 - 0
common/__init__.py


+ 68 - 0
common/aliyun_log.py

@@ -0,0 +1,68 @@
+# -*- coding: utf-8 -*-
+"""
+公共方法,包含:生成log / 删除log
+"""
+import json
+from datetime import date, timedelta
+from datetime import datetime
+from typing import Optional
+
+from aliyun.log import PutLogsRequest, LogClient, LogItem
+
+
+proxies = {"http": None, "https": None}
+
+
+class AliyunLogger:
+    # 统一获取当前时间 <class 'datetime.datetime'>  2022-04-14 20:13:51.244472
+    now = datetime.now()
+    # 昨天 <class 'str'>  2022-04-13
+    yesterday = (date.today() + timedelta(days=-1)).strftime("%Y-%m-%d")
+    # 今天 <class 'datetime.date'>  2022-04-14
+    today = date.today()
+    # 明天 <class 'str'>  2022-04-15
+    tomorrow = (date.today() + timedelta(days=1)).strftime("%Y-%m-%d")
+
+    # 写入阿里云日志
+    @staticmethod
+    def logging(video_id: str,
+            title: str,
+            video_url: str,
+            data: Optional[str] = None):
+        """
+        写入阿里云日志
+        测试库: https://sls.console.aliyun.com/lognext/project/crawler-log-dev/logsearch/crawler-log-dev
+        正式库: https://sls.console.aliyun.com/lognext/project/crawler-log-prod/logsearch/crawler-log-prod
+        """
+        accessKeyId = "LTAIWYUujJAm7CbH"
+        accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
+
+        project = "crawler-log-prod"
+        logstore = "video_tag_info"
+        endpoint = "cn-hangzhou.log.aliyuncs.com"
+        try:
+            contents = [
+                ("video_id", video_id),
+                ("video_title", title),
+                ("video_url", video_url),
+                ("data", json.dumps(data, ensure_ascii=False) if data else ""),
+            ]
+            # 创建 LogClient 实例
+            client = LogClient(endpoint, accessKeyId, accessKey)
+            log_group = []
+            log_item = LogItem()
+            log_item.set_contents(contents)
+            log_group.append(log_item)
+            # 写入日志
+            request = PutLogsRequest(
+                project=project,
+                logstore=logstore,
+                topic="",
+                source="",
+                logitems=log_group,
+                compress=False,
+            )
+
+            client.put_logs(request)
+        except Exception as e:
+            print("写入日志失败")

+ 44 - 0
common/odps_data.py

@@ -0,0 +1,44 @@
+import requests
+import json
+import datetime
+from odps import ODPS
+
+# ODPS服务配置
+ODPS_CONFIG = {
+    'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
+    'ACCESSID': 'LTAIWYUujJAm7CbH',
+    'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
+}
+class OdpsDataCount:
+    @classmethod
+    def get_data_count(cls, project, table):
+        odps = ODPS(
+            access_id=ODPS_CONFIG['ACCESSID'],
+            secret_access_key=ODPS_CONFIG['ACCESSKEY'],
+            project=project,
+            endpoint=ODPS_CONFIG['ENDPOINT']
+        )
+        data_values = []
+        try:
+            dt = "2024092717"
+            sql = f'SELECT videoid,title,video_path FROM {project}.{table} WHERE dt = "{dt}" '
+            with odps.execute_sql(sql).open_reader() as reader:
+                for row in reader:
+                    data_values.append(json.dumps( {"video_id": row[0], "title": row[1], "video_path": row[2]}, ensure_ascii=False ))
+
+        except Exception as e:
+            print(f"An error occurred: {e}")
+            return data_values
+        return data_values
+
+    @classmethod
+    def main(cls):
+        dt = datetime.datetime.now().strftime('%Y%m%d%H')
+        project = 'loghubods'
+        table = 'video_need_google_ai_studio'
+        data_count = cls.get_data_count(project=project, table=table)
+        print(len(data_count))
+        return data_count
+
+if __name__ == '__main__':
+    OdpsDataCount.main()

+ 56 - 0
common/redis.py

@@ -0,0 +1,56 @@
+import json
+
+import redis
+from common.odps_data import OdpsDataCount
+
+
+class SyncRedisHelper:
+    _pool: redis.ConnectionPool = None
+    _instance = None
+
+    def __init__(self):
+        if not self._instance:
+            self._pool = self._get_pool()
+            self._instance = self
+
+    def _get_pool(self) -> redis.ConnectionPool:
+        if self._pool is None:
+            self._pool = redis.ConnectionPool(
+                # host="r-bp1mb0v08fqi4hjffupd.redis.rds.aliyuncs.com",  # 外网地址
+                host="r-bp1mb0v08fqi4hjffu.redis.rds.aliyuncs.com",  # 内网地址
+                port=6379,
+                db=0,
+                password="Wqsd@2019",
+                # password="Qingqu2019",
+
+            )
+        return self._pool
+
+    def get_client(self) -> redis.Redis:
+        pool = self._get_pool()
+        client = redis.Redis(connection_pool=pool)
+        return client
+
+    def close(self):
+        if self._pool:
+            self._pool.disconnect(inuse_connections=True)
+
+def install_video_data():
+    """写入redis需要打标签的视频"""
+    data = OdpsDataCount.main()
+
+    if not data:
+        return
+
+    task = f"task:video_ai"
+    helper = SyncRedisHelper()
+    client = helper.get_client()
+    client.rpush(task, *data)
+
+def get_video_data():
+    """获取一条需要打标签的视频"""
+    task = f"task:video_ai"
+    helper = SyncRedisHelper()
+    client = helper.get_client()
+    ret = client.lpop(task)
+    return ret

+ 0 - 0
google_ai/__init__.py


+ 326 - 0
google_ai/generativeai_video.py

@@ -0,0 +1,326 @@
+import asyncio
+import os
+
+import aiohttp
+import requests
+import google.generativeai as genai
+from google.generativeai import caching
+import datetime
+import time
+import uuid
+
+
+
+class VideoAnalyzer:
+    def __init__(self, api_key):
+        """初始化类,配置 API 密钥和视频路径"""
+        genai.configure(api_key=api_key)
+        self.video_file = None
+
+    async def process_and_delete_file(self, file_path):
+        """删除文件"""
+        try:
+            print(f"正在处理文件: {file_path}" )
+            os.remove( file_path )
+            print( f"文件已删除: {file_path}" )
+        except Exception as e:
+            print( f"处理或删除文件时发生错误: {str( e )}" )
+
+    async def download_video(self, video_url,  save_directory='/root/google_ai_studio/path'):
+    # async def download_video(self, video_url, save_directory='/Users/tzld/Desktop/google_ai_studio/path'):
+
+        """从给定的视频链接下载视频并保存到指定路径"""
+        try:
+            # 发送 GET 请求获取视频内容
+            random_filename = f"{uuid.uuid4()}.mp4"
+            save_path = os.path.join(save_directory, random_filename)
+            async with aiohttp.ClientSession() as session:
+                async with session.get( video_url ) as response:
+                    response.raise_for_status()  # 检查请求是否成功
+                    with open( save_path, 'wb' ) as video_file:
+                        while True:
+                            chunk = await response.content.read( 1024 * 1024 )  # 每次读取 1MB
+                            if not chunk:
+                                break
+                            video_file.write( chunk )
+
+                print( f"视频已成功下载并保存到: {save_path}" )
+                return save_path
+        except requests.exceptions.RequestException as e:
+            print( f"下载视频时出现错误: {e}" )
+            return None
+
+    async def upload_video(self, save_path):
+        """上传视频文件并获取视频文件对象"""
+        self.video_file = genai.upload_file(path=save_path)
+        await self._wait_for_processing()
+
+    async def _wait_for_processing(self):
+        """等待视频文件处理完成"""
+        while self.video_file.state.name == 'PROCESSING':
+            print( '等待视频处理完成...' )
+            time.sleep(2)  # 每两秒检查一次状态
+            self.video_file = genai.get_file( self.video_file.name )
+        print( f'视频处理完成: {self.video_file.uri}' )
+
+    async def create_cache(self):
+        """创建缓存内容,并返回生成模型"""
+        cache = caching.CachedContent.create(
+            model='models/gemini-1.5-flash-001',
+            display_name='AIzaSyDs4FWRuwrEnQzu1M_Skio6NII6Mp4whAw',
+            system_instruction=(
+                '你是一个专业的视频分析师,负责根据访问的视频文件回答用户的所有问题。'
+            ),
+            contents=[self.video_file],
+            ttl=datetime.timedelta( minutes=5 ),
+        )
+        return genai.GenerativeModel.from_cached_content( cached_content=cache )
+
+    async def analyze_video(self, model, questions):
+        """根据缓存模型生成视频分析结果"""
+        response = model.generate_content([(questions)])
+        return response
+
+async def main(video_path):
+    """主函数,执行视频上传、缓存创建、问题生成"""
+
+    # video_path = '/root/3333.mp4'
+    api_key = 'AIzaSyDs4FWRuwrEnQzu1M_Skio6NII6Mp4whAw'
+
+    # 初始化视频分析类
+    analyzer = VideoAnalyzer(api_key )
+    save_path = await analyzer.download_video(video_path)
+    if not save_path:
+        if os.path.exists( save_path ):
+            os.remove( save_path )
+            print( f"文件已删除: {save_path}" )
+        print("视频下载失败")
+    # 上传并处理视频
+    await analyzer.upload_video(save_path)
+
+    # 创建缓存模型
+    model =await analyzer.create_cache()
+
+    # 视频分析问题
+    video_analysis_questions = """
+                    一、基础信息:
+                    1.视觉/音乐/文字: 请从视频中的视觉、音乐、文字这三个维度信息做分析比较,哪个维度的信息是该视频中最重要的,可能成为该视频的要点驱动力?你只要回答 视觉/音乐/文字 三者其一即可。
+                    2.内容选题: 如果需要从视频中提取一个内容选题,你觉得应该是什么?请注意:选题应该体现视频的关键点,亮点,爆点,选题不能超过8个字。
+                    3.视频主题:描述视频的整体主题。
+                    二、主体和场景:
+                    1.视频主体:视频中的核心人物或物体,有几个?分别是什么?
+                    2.视频场景:视频属于什么场景?场景可以有多个,每个不超过6个字
+                    三、情感与风格:
+                    1.情感倾向:视频传递的情感是积极、消极还是中立或其他?
+                    2.视频风格:判断视频的风格类型(如严肃、轻松、幽默等)。
+                    四、视频传播性与观众:
+                    1.片尾引导
+                    视频片尾是否有引导观众分享?
+                    引导时长?
+                    引导强度如何?
+                    2.传播性判断:基于中国中老年微信用户,判断该视频的传播性并说明依据。
+                    3.观众画像:
+                    推测观众的年龄
+                    推测观众的性别
+                    推测观众的地域
+                    五、音画细节:
+                    1.音频细节
+                    视频中的音频信息,是否有歌曲?
+                    视频中的音频信息,歌曲名是什么?
+                    视频中的音色音色是怎样的?
+                    2.视频水印
+                    是否有产品名相关的水印?
+                    水印是否涉及产品名称什么?
+                    3.视频字幕
+                    是否有字幕?
+                    字幕的颜色?
+                    字幕的字号?
+                    字幕的位置如何?
+                    4. 视频口播:提供视频中出现的准确且完整的口播内容。
+                    六、人物与场景:
+                    1.知名人物
+                    视频或音频中是否出现知名人物?
+                    视频或音频中是否出现知名人物是谁?
+                    2.人物年龄段:视频中人物的年龄段(如中青年男、中青年女等)。
+                    3.场景描述:视频和声音中涉及的场景描述。
+                    七、时效性与分类:
+                    1.时效性:
+                    适用时效日
+                    适用时效早中晚
+                    2.视频一级分类:判断视频分别属于下面的哪种一级分类,并输出准确完整的一级品类_分类名称:
+                    一级分类范围为:
+                      一级品类_音乐
+                      一级品类_剧情 / 剧情演绎
+                      一级品类_二次元
+                      一级品类_游戏
+                      一级品类_公益
+                      一级品类_随拍 / 颜值
+                      一级品类_舞蹈
+                      一级品类_动物 / 萌宠
+                      一级品类_三农
+                      一级品类_科技 / 科技数码
+                      一级品类_财经
+                      一级品类_母婴 / 母婴亲子
+                      一级品类_法律 / 人文社科
+                      一级品类_科普 / 人文社科
+                      一级品类_情感 / 情感心理
+                      一级品类_职场 / 人文社科
+                      一级品类_教育 / 教育培训
+                      一级品类_摄影摄像
+                      一级品类_艺术 / 才艺技能
+                      一级品类_美食
+                      一级品类_旅行 / 旅游
+                      一级品类_地域本地
+                      一级品类_时尚 / 时尚 / 美妆
+                      一级品类_文化 / 人文社科
+                      一级品类_搞笑 / 休闲娱乐
+                      一级品类_明星 / 名人
+                      一级品类_综艺
+                      一级品类_影视综艺
+                      一级品类_电影
+                      一级品类_影视综艺
+                      一级品类_电视剧
+                      一级品类_影视综艺
+                      一级品类_汽车
+                      一级品类_体育 / 运动
+                      一级品类_医疗健康 / 长寿 / 健身
+                      一级品类_生活记录 / 生活
+                      一级品类_生活家居 / 家居家装
+                      一级品类_时政社会
+                      一级品类_奇人异象
+                      一级品类_历史
+                      一级品类_军事
+                      一级品类_宗教
+                      一级品类_短剧
+                      一级品类_收藏品
+                      3.二级分类: 判断视频分别属于下面的哪种二级分类,并输出准确完整的品类-分类名称
+                      二级分类范围为:
+                      品类-祝福音乐
+                      品类-人生感悟音乐
+                      品类-民族异域音乐
+                      品类-亲情音乐
+                      品类-红歌老歌
+                      品类-音乐知识
+                      品类-正能量剧情
+                      品类-对口型表演
+                      品类-快闪
+                      品类-拟真游戏
+                      品类-麻将
+                      品类-棋牌
+                      品类-老年审美美女
+                      品类-老年审美帅哥
+                      品类-红歌老歌舞蹈
+                      品类-广场舞
+                      品类-舞蹈教程
+                      品类-宠物日常
+                      品类-动物表演
+                      品类-生动物
+                      品类-农村生活
+                      品类-农业技术
+                      品类-老年相关科技
+                      品类-未来科幻
+                      品类-国家科技力量
+                      品类-保险
+                      品类-理财
+                      品类-亲子日常
+                      品类-K12教育
+                      品类-老年相关法律科普
+                      品类-知识科普
+                      品类-生活技巧科普
+                      品类-怀念时光
+                      品类-人生忠告
+                      品类-迷信祝福
+                      品类-节日祝福
+                      品类-早中晚好
+                      品类-退休前
+                      品类-退休后
+                      品类-益智解密
+                      品类-老年教育
+                      品类-风景实拍
+                      品类-动植物实拍
+                      品类-人像模特实拍
+                      品类-摄影教学
+                      品类-名画赏析
+                      品类-杂技柔术
+                      品类-魔术
+                      品类-魔术特效
+                      品类-书法
+                      品类-绘画
+                      品类-木工
+                      品类-口技
+                      品类-大型集体艺术
+                      品类-戏曲戏剧
+                      品类-二人转
+                      品类-其他才艺
+                      品类-美食测评
+                      品类-美食教程
+                      品类-吃播探店
+                      品类-旅行记录
+                      品类-旅行攻略
+                      品类-省份城市亮点
+                      品类-本地新闻
+                      品类-本地生活
+                      品类-老年时尚
+                      品类-美妆护肤穿搭
+                      品类-传统文化
+                      品类-国际文化
+                      品类-搞笑瞬间合集
+                      品类-搞笑段子
+                      品类-历史名人
+                      品类-当代正能量人物
+                      品类-老明星
+                      品类-老年人上综艺
+                      品类-老年关心纪录片
+                      品类-老综艺影像
+                      品类-电影切片
+                      品类-电影解说
+                      品类-电视剧切片
+                      品类-电视剧解说
+                      品类-中国队比赛
+                      品类-老年运动
+                      品类-健康知识
+                      品类-长寿知识
+                      品类-饮食健康
+                      品类-健身操
+                      品类-老年生活
+                      品类-生活小妙招
+                      品类-园艺花艺
+                      品类-民生政策
+                      品类-流行病疫情
+                      品类-社会风气
+                      品类-食品安全
+                      品类-贪污腐败
+                      品类-人财诈骗
+                      品类-核污染
+                      品类-惠民新闻
+                      品类-天气变化
+                      品类-国家力量
+                      品类-国际时政
+                      品类-他国政策
+                      品类-惊奇事件
+                      品类-罕见画面
+                      品类-中国战争史
+                      品类-中国党史
+                      品类-中国历史影像
+                      品类-国际军事
+                      品类-国内军事
+                      品类-国家统一
+                    输出格式:要求输出格式为符合RFC8259标准的JSON格式的字符串
+                """
+
+    # 分析视频并打印结果
+    response =await analyzer.analyze_video( model, video_analysis_questions )
+    print( response.usage_metadata )
+    print(response.text)
+    if os.path.exists( save_path ):
+        os.remove( save_path )
+        print( f"文件已删除: {save_path}" )
+    return response.text
+
+
+if __name__ == "__main__":
+    proxy_url = 'http://127.0.0.1:1081'
+    os.environ["http_proxy"] = proxy_url
+    os.environ["https_proxy"] = proxy_url
+    video_path = 'http://rescdn.yishihui.com/jq_oss/video/2024092315510095449.mp4'
+    asyncio.run(main(video_path))

+ 19 - 0
job_video_processing.py

@@ -0,0 +1,19 @@
+import time
+
+from video_processing.video_processing import VideoProcessing
+
+
+def video_task_start():
+    while True:
+        try:
+            print("开始执行任务")
+            video_processor = VideoProcessing()
+            video_processor.get_video()
+            print("执行完成")
+            time.sleep(5)
+        except Exception as e:
+            print("处理任务时出现异常:", e)
+            time.sleep(5)
+            continue
+if __name__ == '__main__':
+    video_task_start()

+ 0 - 0
path/__init__.py


+ 0 - 0
video_processing/__init__.py


+ 51 - 0
video_processing/video_processing.py

@@ -0,0 +1,51 @@
+import requests
+import json
+
+from common.aliyun_log import AliyunLogger
+from common.redis import get_video_data, install_video_data
+
+
+class VideoProcessing:
+    def get_ai_data(self, video_path):
+        url = "http://8.219.186.16:8080/process_video/"
+
+        payload = json.dumps( {
+            "video_path": video_path
+        } )
+        headers = {
+            'Content-Type': 'application/json'
+        }
+
+        response = requests.request( "POST", url, headers=headers, data=payload )
+        try:
+            response = response.json()
+            result = response['result']
+            cleaned_string = result.replace( "```json", "" ).replace( "```", "" ).strip()
+            json_data = json.loads( cleaned_string )
+            print( json_data )
+            return json_data
+        except Exception as e:
+            print(f"视频请求异常:{e}")
+            return None
+
+
+    def get_video(self):
+        video_data = get_video_data()
+        # 解码为字符串
+        data_str = video_data.decode( 'utf-8' )
+
+        # 解析为 JSON 对象
+        data_json = json.loads( data_str )
+        video_id = data_json['video_id']
+        title = data_json['title']
+        video_path = data_json['video_path']
+        data = self.get_ai_data(video_path)
+        AliyunLogger.logging(str(video_id), title, video_path, data)
+        print("写入日志成功")
+
+
+
+
+if __name__ == '__main__':
+    video_processor = VideoProcessing()
+    video_processor.get_video()