import re import time import requests import json from common.aliyun_log import AliyunLogger from common.redis import get_video_data, install_video_data, in_video_data class VideoProcessing: def get_ai_data(self, video_path): url = "http://8.219.186.16:8080/process_video/" payload = json.dumps( { "video_path": video_path } ) headers = { 'Content-Type': 'application/json' } try: response = requests.request( "POST", url, headers=headers, data=payload ) response = response.json() result = response['result'] mark = response['mark'] cleaned_string = result.replace( "```json", '' ).replace( "```", '' ).strip() return cleaned_string, str(mark) except Exception as e: print(f"视频请求异常:{e}") return None def get_video(self, redis_task): video_data = get_video_data(redis_task) if not video_data: print("没有获取到视频内容") time.sleep(120) return # 解码为字符串 data_str = video_data.decode( 'utf-8' ) # 解析为 JSON 对象 data_json = json.loads( data_str ) video_id = data_json['video_id'] title = data_json['title'] video_path = data_json['video_path'] type = data_json['type'] print(video_path) data, mark = self.get_ai_data(video_path) AliyunLogger.logging(str(video_id), title, video_path, mark, type, data) print("写入日志成功") if __name__ == '__main__': video_processor = VideoProcessing() video_processor.get_video()