import requests import json class Ffmpeg: def get_oss_link(self, oss_key): url = "http://61.48.133.26:5555/api/v1/oss/get_object_link" payload = json.dumps({ "oss_object_key": oss_key }) headers = { 'Authorization': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiNGNhMTI4ZGYtYWMzMy00NWQ2LTg3MmEtMDAzOTk4MGVhM2ViIiwibmFtZSI6Inp5IiwiZXhwIjoyMDUwOTI3MjExfQ.k_rvuESjA62RgPDiLniVgJyLJn3Q8C1Y_AGq3CPRuKI', 'Content-Type': 'application/json' } response = requests.request("POST", url, headers=headers, data=payload) response = response.json() data = response['data'] return data def merge_m3u8(self,url_link): url = "http://101.37.24.17:5555/api/v1/ffmpeg/merge_m3u8" data = { "url": url_link, "referer": "" } headers = { 'Authorization': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiNGNhMTI4ZGYtYWMzMy00NWQ2LTg3MmEtMDAzOTk4MGVhM2ViIiwibmFtZSI6Inp5IiwiZXhwIjoyMDUwOTI3MjExfQ.k_rvuESjA62RgPDiLniVgJyLJn3Q8C1Y_AGq3CPRuKI', 'Content-Type': 'application/json' } response = requests.request("POST", url, headers=headers, json=data, stream=True) for item in response.content.split(b'\r\n\r\n'): try: item = json.loads(item[6:].decode()) if item['event'] == 'message': continue elif item['event'] == 'ffmpeg code': code = int(item['data']) if code != 0: # ffmpeg处理异常 return elif item['event'] == 'result': oss_object_key = item['data']['oss_object_key'] if oss_object_key: oss_url = self.get_oss_link(oss_object_key) return oss_url except json.decoder.JSONDecodeError: continue def webp2_jpg(self,webp2_url): url = "http://101.37.24.17:5555/api/v1/ffmpeg/webp2jpg" payload = json.dumps({ "url": webp2_url, "referer": "" }) headers = { 'Authorization': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VyX2lkIjoiNGNhMTI4ZGYtYWMzMy00NWQ2LTg3MmEtMDAzOTk4MGVhM2ViIiwibmFtZSI6Inp5IiwiZXhwIjoyMDUwOTI3MjExfQ.k_rvuESjA62RgPDiLniVgJyLJn3Q8C1Y_AGq3CPRuKI', 'Content-Type': 'application/json' } response = requests.request("POST", url, headers=headers, data=payload) response = response.json() oss_object_key = response['data']['oss_object_key'] if oss_object_key: oss_url = self.get_oss_link(oss_object_key) return oss_url else: return None if __name__ == '__main__': ffmpeg = Ffmpeg() print(ffmpeg.get_oss_link("jq_oss/video/20250103135417425230.mp4"))