import re import time import requests import json from common.aliyun_log import AliyunLogger from common.redis import get_video_data from common.feishu_data import Material class VideoProcessing: def get_ai_data(self, video_path): mark, prompt = Material.feishu_list() sample_data = { "一、基础信息": { "视觉/音乐/文字": "", "内容选题": "", "视频主题": "" }, "二、主体和场景": { "视频主体": "", "视频场景": [] }, "三、情感与风格": {}, "四、视频传播性与观众": { "片尾引导": {}, "传播性判断": "", "观众画像": {} }, "五、音画细节": { "音频细节": {}, "视频水印": {}, "视频字幕": {}, "视频口播": "" }, "六、人物与场景": { "知名人物": {}, "人物年龄段": "", "场景描述": [] }, "七、时效性与分类": { "时效性": {}, "视频一级分类": "", "二级分类": ["品类- 、分数-", "品类- 、分数-", "品类- 、分数-"] } } url = "http://8.219.186.16:8080/process_video/" payload = json.dumps( { "video_path": video_path, "prompt": prompt, "mark": str(mark), "sample_data": str(sample_data) } ) headers = { 'Content-Type': 'application/json' } try: response = requests.request( "POST", url, headers=headers, data=payload ) response = response.json() result = response['result'] mark = response['mark'] cleaned_string = result.replace( "```json", '' ).replace( "```", '' ).strip() return cleaned_string, str(mark) except Exception as e: print(f"视频请求异常:{e}") return None def get_video(self, redis_task): video_data = get_video_data(redis_task) if not video_data: print("没有获取到视频内容") time.sleep(5) return # 解码为字符串 data_str = video_data.decode( 'utf-8' ) # 解析为 JSON 对象 data_json = json.loads( data_str ) video_id = data_json['video_id'] title = data_json['title'] video_path = data_json['video_path'] type = data_json['type'] partition = data_json['partition'] print(video_path) data, mark = self.get_ai_data(video_path) AliyunLogger.logging(str(video_id), title, video_path, mark, type, partition, data) print("写入日志成功") if __name__ == '__main__': video_processor = VideoProcessing() video_processor.get_video()