123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299 |
- # encoding: utf-8
- """
- @author: luojunhui
- """
- import json
- import time
- import requests
- from uuid import uuid4
- from spider.baidu_imgs import get_img_list
- from applications.config import db_config
- from applications.functions import whisper
- from applications.pipeline import question_fission, search_materials, summary_articles, generate_text
- class MatchArticlesTask(object):
- """
- 视频匹配文章流程
- 流程
- 1. 拿视频id,标题等信息匹配账号
- 2. 账号匹配成功后,使用 AI Search 获取文章的生产资料
- 3. 通过GPT4, 腾讯元宝等AI 优化文章
- 4. 生成一篇文章,包含标题,文本,封面, 插图, 以及匹配到到小程序
- """
- def __init__(self, mysql_client):
- """
- :param mysql_client mysql服务池
- """
- self.mysql_client = mysql_client
- async def whisper_task(self):
- """
- 执行定时任务,把库里面的视频转文本
- :return:
- """
- select_sql = f"""SELECT video_id FROM {db_config} WHERE status_code = 0 ORDER BY id ASC limit 1;"""
- video_list = await self.mysql_client.select(select_sql)
- async def whisper_and_update(video_id, mysql_client):
- """
- whisper处理视频并且把信息更新到mysql表中
- :param video_id:
- :param mysql_client:
- :return:
- """
- try:
- w_response = whisper(video_id)
- except:
- w_response = {"text": "whisper failed"}
- print(w_response)
- text = w_response['text'].replace("'", "")
- update_sql = f"""
- UPDATE {db_config}
- SET
- video_text = %s,
- status_code = %s
- WHERE video_id = %s;
- """
- print(update_sql)
- await mysql_client.async_insert(sql=update_sql, params=(text, 1, video_id))
- for vid in video_list:
- await whisper_and_update(video_id=vid[0], mysql_client=self.mysql_client)
- async def materials_task(self):
- """
- 获取task的材料
- :return:
- """
- select_sql = f"""SELECT task_id, video_title, video_text FROM {db_config} WHERE status_code = 1 ORDER BY id ASC limit 1;"""
- task_list = await self.mysql_client.select(select_sql)
- async def find_material(task_tuple, mysql_client):
- task_id, title, text = task_tuple
- # 先用视频标题作为query, 后续可逐步优化
- try:
- question_dict = question_fission(title)
- material_dict = {}
- for question_key in question_dict:
- question = question_dict[question_key]
- material = generate_text(question)
- cleand_question = question.replace("\n", "").replace("*", "").replace("#", "").replace(":", "").replace('"', "").replace("'", "")
- material_dict[cleand_question] = material
- material_result = json.dumps(material_dict, ensure_ascii=False)
- except:
- material_result=json.dumps({title: text}, ensure_ascii=False)
- update_sql = f"""
- UPDATE {db_config}
- SET materials = %s, status_code = %s
- WHERE task_id = %s;
- """
- print(update_sql)
- await mysql_client.async_insert(sql=update_sql, params=(material_result, 2, task_id))
- for task in task_list:
- await find_material(task, self.mysql_client)
- async def ai_task(self):
- """
- 通过ai工具和材料来生成文章
- :return:
- """
- select_sql = f"""SELECT task_id, video_title, materials FROM {db_config} WHERE status_code = 2 ORDER BY id ASC limit 1;"""
- task_list = await self.mysql_client.select(sql=select_sql)
- async def ai_generate_text(task_tuple, mysql_client):
- task_id, video_title, materials = task_tuple
- try:
- ai_title, ai_text = summary_articles(materials)
- except:
- ai_title, ai_text = video_title, "文章生成失败"
- imgs = get_img_list(video_title)
- update_sql = f"""
- UPDATE {db_config}
- SET ai_text = %s, ai_title = %s, cover = %s, img_list = %s, status_code = %s
- WHERE task_id = %s;
- """
- print(update_sql)
- await mysql_client.async_insert(
- sql=update_sql,
- params=(
- ai_text,
- ai_title,
- imgs[0],
- json.dumps(imgs, ensure_ascii=False),
- 3,
- task_id
- )
- )
- for task in task_list:
- await ai_generate_text(task, self.mysql_client)
- class MatchArticlesV1(object):
- """
- 接受请求,并且把数据存储到MySQL服务器中
- """
- def __init__(self, params, mysql_client):
- self.title = None
- self.video_id = None
- self.params = params
- self.mysql_client = mysql_client
- def check_params(self):
- """
- params check
- """
- try:
- self.video_id = self.params['videoId']
- self.title = self.params['title']
- return None
- except AttributeError as e:
- response = {
- "code": 0,
- "error": "Params Error",
- "msg": "Params: {} is not correct".format(e)
- }
- return response
- async def record(self):
- """
- 将数据存储到服务中
- :return:
- """
- request_id = "Article_{}_{}".format(uuid4(), int(time.time()))
- request_time = int(time.time())
- insert_sql = f"""
- INSERT INTO {db_config}
- (video_id, task_id, video_title, request_time)
- VALUES
- (%s, %s, %s, %s)
- """
- await self.mysql_client.async_insert(
- sql=insert_sql,
- params=(
- self.video_id,
- request_id,
- self.title,
- request_time
- )
- )
- return request_id
- async def deal(self):
- """
- deal function
- :return:
- """
- params_error = self.check_params()
- if params_error:
- return params_error
- else:
- task_id = await self.record()
- res = {
- "status": "success",
- "task_id": task_id
- }
- return res
- class MatchArticlesV2(object):
- """
- 获取视频信息
- """
- def __init__(self, params, mysql_client):
- self.task_id = None
- self.params = params
- self.mysql_client = mysql_client
- def check_params(self):
- """
- params check
- """
- try:
- self.task_id = self.params['taskId']
- return None
- except AttributeError as e:
- response = {
- "code": 0,
- "error": "Params Error",
- "msg": "Params: {} is not correct".format(e)
- }
- return response
- @classmethod
- def get_basic_video_info(cls, video_id):
- """
- 获取视频信息
- :return:
- """
- url = "http://localhost:8888/singleVideo"
- body = {
- "videoId": video_id
- }
- headers = {
- "Content-Type": "application/json"
- }
- response = requests.post(url=url, json=body, headers=headers)
- return response.json()
- async def recall_articles(self):
- """
- 从表中召回视频
- :return:
- """
- select_sql = f"""
- SELECT video_id, cover, img_list, ai_text, video_title, status_code
- FROM {db_config}
- WHERE task_id = '{self.task_id}';
- """
- result = await self.mysql_client.select(select_sql)
- video_id, cover, images, ai_text, ai_title, status_code = result[0]
- match status_code:
- case 0:
- return {
- "task_id": self.task_id,
- "code": 0,
- "msg": "未处理"
- }
- case 1:
- return {
- "task_id": self.task_id,
- "code": 1,
- "msg": "处理中, 已经用whisper生成视频文本"
- }
- case 2:
- return {
- "task_id": self.task_id,
- "code": 2,
- "msg": "处理中, 已经用AI搜索生成资料"
- }
- case 3:
- result = {
- "title": ai_title,
- "cover": cover,
- "content": ai_text,
- "images": json.loads(images),
- "videos": [
- self.get_basic_video_info(video_id)
- ]
- }
- response = {
- "status": "success",
- "article": result
- }
- return response
- async def deal(self):
- params_error = self.check_params()
- if params_error:
- return params_error
- else:
- return await self.recall_articles()
|