# encoding: utf-8 """ @author: luojunhui """ import json import time import requests from uuid import uuid4 from spider.baidu_imgs import get_img_list from applications.config import db_config from applications.functions import whisper from applications.pipeline import question_fission, search_materials, summary_articles, generate_text class MatchArticlesTask(object): """ 视频匹配文章流程 流程 1. 拿视频id,标题等信息匹配账号 2. 账号匹配成功后,使用 AI Search 获取文章的生产资料 3. 通过GPT4, 腾讯元宝等AI 优化文章 4. 生成一篇文章,包含标题,文本,封面, 插图, 以及匹配到到小程序 """ def __init__(self, mysql_client): """ :param mysql_client mysql服务池 """ self.mysql_client = mysql_client async def whisper_task(self): """ 执行定时任务,把库里面的视频转文本 :return: """ select_sql = f"""SELECT video_id FROM {db_config} WHERE status_code = 0 ORDER BY id ASC limit 1;""" video_list = await self.mysql_client.select(select_sql) async def whisper_and_update(video_id, mysql_client): """ whisper处理视频并且把信息更新到mysql表中 :param video_id: :param mysql_client: :return: """ try: w_response = whisper(video_id) except: w_response = {"text": "whisper failed"} print(w_response) text = w_response['text'].replace("'", "") update_sql = f""" UPDATE {db_config} SET video_text = %s, status_code = %s WHERE video_id = %s; """ print(update_sql) await mysql_client.async_insert(sql=update_sql, params=(text, 1, video_id)) for vid in video_list: await whisper_and_update(video_id=vid[0], mysql_client=self.mysql_client) async def materials_task(self): """ 获取task的材料 :return: """ select_sql = f"""SELECT task_id, video_title, video_text FROM {db_config} WHERE status_code = 1 ORDER BY id ASC limit 1;""" task_list = await self.mysql_client.select(select_sql) async def find_material(task_tuple, mysql_client): task_id, title, text = task_tuple # 先用视频标题作为query, 后续可逐步优化 try: question_dict = question_fission(title) material_dict = {} for question_key in question_dict: question = question_dict[question_key] material = generate_text(question) cleand_question = question.replace("\n", "").replace("*", "").replace("#", "").replace(":", "").replace('"', "").replace("'", "") material_dict[cleand_question] = material material_result = json.dumps(material_dict, ensure_ascii=False) except: material_result=json.dumps({title: text}, ensure_ascii=False) update_sql = f""" UPDATE {db_config} SET materials = %s, status_code = %s WHERE task_id = %s; """ print(update_sql) await mysql_client.async_insert(sql=update_sql, params=(material_result, 2, task_id)) for task in task_list: await find_material(task, self.mysql_client) async def ai_task(self): """ 通过ai工具和材料来生成文章 :return: """ select_sql = f"""SELECT task_id, video_title, materials FROM {db_config} WHERE status_code = 2 ORDER BY id ASC limit 1;""" task_list = await self.mysql_client.select(sql=select_sql) async def ai_generate_text(task_tuple, mysql_client): task_id, video_title, materials = task_tuple try: ai_title, ai_text = summary_articles(materials) except: ai_title, ai_text = video_title, "文章生成失败" imgs = get_img_list(video_title) update_sql = f""" UPDATE {db_config} SET ai_text = %s, ai_title = %s, cover = %s, img_list = %s, status_code = %s WHERE task_id = %s; """ print(update_sql) await mysql_client.async_insert( sql=update_sql, params=( ai_text, ai_title, imgs[0], json.dumps(imgs, ensure_ascii=False), 3, task_id ) ) for task in task_list: await ai_generate_text(task, self.mysql_client) class MatchArticlesV1(object): """ 接受请求,并且把数据存储到MySQL服务器中 """ def __init__(self, params, mysql_client): self.title = None self.video_id = None self.params = params self.mysql_client = mysql_client def check_params(self): """ params check """ try: self.video_id = self.params['videoId'] self.title = self.params['title'] return None except AttributeError as e: response = { "code": 0, "error": "Params Error", "msg": "Params: {} is not correct".format(e) } return response async def record(self): """ 将数据存储到服务中 :return: """ request_id = "Article_{}_{}".format(uuid4(), int(time.time())) request_time = int(time.time()) insert_sql = f""" INSERT INTO {db_config} (video_id, task_id, video_title, request_time) VALUES (%s, %s, %s, %s) """ await self.mysql_client.async_insert( sql=insert_sql, params=( self.video_id, request_id, self.title, request_time ) ) return request_id async def deal(self): """ deal function :return: """ params_error = self.check_params() if params_error: return params_error else: task_id = await self.record() res = { "status": "success", "task_id": task_id } return res class MatchArticlesV2(object): """ 获取视频信息 """ def __init__(self, params, mysql_client): self.task_id = None self.params = params self.mysql_client = mysql_client def check_params(self): """ params check """ try: self.task_id = self.params['taskId'] return None except AttributeError as e: response = { "code": 0, "error": "Params Error", "msg": "Params: {} is not correct".format(e) } return response @classmethod def get_basic_video_info(cls, video_id): """ 获取视频信息 :return: """ url = "http://localhost:8888/singleVideo" body = { "videoId": video_id } headers = { "Content-Type": "application/json" } response = requests.post(url=url, json=body, headers=headers) return response.json() async def recall_articles(self): """ 从表中召回视频 :return: """ select_sql = f""" SELECT video_id, cover, img_list, ai_text, video_title, status_code FROM {db_config} WHERE task_id = '{self.task_id}'; """ result = await self.mysql_client.select(select_sql) video_id, cover, images, ai_text, ai_title, status_code = result[0] match status_code: case 0: return { "task_id": self.task_id, "code": 0, "msg": "未处理" } case 1: return { "task_id": self.task_id, "code": 1, "msg": "处理中, 已经用whisper生成视频文本" } case 2: return { "task_id": self.task_id, "code": 2, "msg": "处理中, 已经用AI搜索生成资料" } case 3: result = { "title": ai_title, "cover": cover, "content": ai_text, "images": json.loads(images), "videos": [ self.get_basic_video_info(video_id) ] } response = { "status": "success", "article": result } return response async def deal(self): params_error = self.check_params() if params_error: return params_error else: return await self.recall_articles()