|
- """
- @author: luojunhui
- """
- import json
- import uuid
- import time
- import random
- import hashlib
- import urllib.parse
- from applications.log import logging
- from applications.const import server_const
- from applications.functions.forward_request import forward_requests
- class Response(object):
- """
- Response
- """
- def __init__(self, params, mysql_client, config):
- """
- Response 接口
- """
- self.trace_id = None
- self.mini_program_type = None
- self.mysql_client = mysql_client
- self.params = params
- self.article_match_video_table = config.article_match_video_table
- self.mini_program_map = json.loads(config.get_config_value("miniMap"))
- def check_params(self):
- """
- 请求参数校验
- :return:
- """
- try:
- self.mini_program_type = self.params['miniprogramUseType']
- self.trace_id = self.params['traceId']
- if self.mini_program_type in [
- server_const.DAILY_CODE,
- server_const.TOULIU_CODE,
- server_const.WECOM_CODE,
- server_const.DAITOU_CODE,
- server_const.COOPERATION_CODE,
- server_const.SERVICE_COOPERATION_CODE
- ]:
- return None
- else:
- return {
- "error": "params error",
- "message": "mini_program_type error",
- "info": self.params
- }
- except Exception as e:
- return {
- "error": "params error",
- "message": str(e),
- "info": self.params
- }
- async def get_videos_result(self):
- """
- 获取结果
- :return:
- """
- select_sql = f"""
- SELECT gh_id, content_status, response, process_times
- FROM {self.article_match_video_table}
- WHERE trace_id = '{self.trace_id}';
- """
- info_tuple = await self.mysql_client.async_select(select_sql)
- gh_id, content_status, response, process_times = info_tuple[0]
- return {
- "gh_id": gh_id,
- "content_status": content_status,
- "response": response,
- "process_times": process_times
- }
- def create_gzh_path(self, video_id, shared_uid, gh_id):
- """
- :param gh_id: 公众号账号的gh_id
- :param video_id: 视频 id
- :param shared_uid: 分享 id
- """
- def generate_source_id():
- """
- generate_source_id
- :return:
- """
- timestamp = str(int(time.time() * 1000))
- random_str = str(random.randint(1000, 9999))
- hash_input = f"{timestamp}-{random_str}"
- return hashlib.md5(hash_input.encode()).hexdigest()
- root_share_id = str(uuid.uuid4())
- match self.mini_program_type:
- case server_const.DAILY_CODE:
- source_id = "{}_{}".format(server_const.DAILY_PREFIX, generate_source_id())
- case server_const.TOULIU_CODE:
- source_id = "{}_{}_{}".format(server_const.TOULIU_PREFIX, gh_id, generate_source_id())
- case server_const.WECOM_CODE:
- source_id = "{}_{}".format(server_const.WECOM_PREFIX, generate_source_id())
- case server_const.DAITOU_CODE:
- source_id = "{}_{}_{}".format(server_const.DAITOU_PREFIX, gh_id, generate_source_id())
- case server_const.COOPERATION_CODE:
- source_id = "{}_{}".format(server_const.COOPERATION_PREFIX, generate_source_id())
- case server_const.SERVICE_COOPERATION_CODE:
- source_id = "{}_{}".format(server_const.SERVICE_COOPERATION_PREFIX, generate_source_id())
- url = f"pages/user-videos?id={video_id}&su={shared_uid}&fromGzh=1&rootShareId={root_share_id}&shareId={root_share_id}&rootSourceId={source_id}"
- return (
- root_share_id,
- source_id,
- f"pages/category?jumpPage={urllib.parse.quote(url, safe='')}",
- )
- async def generate_single_card(self, index, gh_id, mini_id, item):
- """
- 生成单个分享卡片
- :param item: 单个视频结果
- :param mini_id: 小程序 appType
- :param gh_id: 公众号 id
- :param index: 视频位置
- :return:
- """
- str_mini_id = str(mini_id)
- mini_info = self.mini_program_map[str_mini_id]
- avatar, app_id, app_name = mini_info['avatar'], mini_info['id'], mini_info['name']
- root_share_id, root_source_id, production_path = self.create_gzh_path(
- video_id=item['videoId'],
- shared_uid=item['uid'],
- gh_id=gh_id
- )
- logging(
- code="1002",
- info="root_share_id --{}, productionPath -- {}".format(
- root_share_id, production_path
- ),
- function="process",
- trace_id=self.trace_id,
- )
- result = {
- "productionCover": item['videoCover'],
- "productionName": item['kimiTitle'],
- "programAvatar": avatar,
- "programId": app_id,
- "programName": app_name,
- "source": item['source'],
- "rootShareId": root_share_id,
- "productionPath": production_path,
- "videoUrl": item['videoPath'],
- "mini_id": mini_id
- }
- if index == 1:
- result['paragraphPosition'] = 0.01
- else:
- position = (index - 1) * 0.25
- result['paragraphPosition'] = position
- item['rootSourceId'] = root_source_id
- return result, item
- async def generate_cards(self, result):
- """
- 生成返回卡片
- :return:
- """
- gh_id = result['gh_id']
- response = json.loads(result['response'])
- card_list = []
- new_item_list = []
- for index, item in enumerate(response, 1):
- card, new_item = await self.generate_single_card(index, gh_id, server_const.DEFAULT_APP_ID, item)
- card_list.append(card)
- new_item_list.append(new_item)
- return card_list, new_item_list
- async def job(self):
- """
- 执行方法
- :return:
- """
- video_result = await self.get_videos_result()
- status_code = video_result.get('content_status')
- process_times = video_result.get('process_times')
- match status_code:
- case server_const.TASK_INIT_CODE:
- if process_times > server_const.TASK_MAX_PROCESS_TIMES:
- result = {
- "traceId": self.trace_id,
- "code": server_const.TASK_INIT_CODE,
- "error": "匹配失败,处理超过{}次".format(server_const.TASK_MAX_PROCESS_TIMES)
- }
- else:
- result = {
- "traceId": self.trace_id,
- "code": server_const.TASK_INIT_CODE,
- "message": "该请求还没处理"
- }
- return result
- case server_const.TASK_KIMI_FINISHED_CODE:
- return {
- "traceId": self.trace_id,
- "code": server_const.TASK_KIMI_FINISHED_CODE,
- "message": "已经执行完kimi"
- }
- case server_const.TASK_SPIDER_FINISHED_CODE:
- return {
- "traceId": self.trace_id,
- "code": server_const.TASK_SPIDER_FINISHED_CODE,
- "message": "已经执行完爬虫"
- }
- case server_const.TASK_ETL_FINISHED_CODE:
- return {
- "traceId": self.trace_id,
- "code": server_const.TASK_ETL_FINISHED_CODE,
- "message": "已经执行完 etl"
- }
- case server_const.TASK_PUBLISHED_CODE:
- # 修改任务状态为处理中
- update_sql = f"""
- UPDATE {self.article_match_video_table}
- SET success_status = %s
- WHERE success_status = %s and trace_id = %s;
- """
- affected_rows = await self.mysql_client.async_insert(
- sql=update_sql,
- params=(
- server_const.REQUEST_PROCESSING_TASK,
- server_const.REQUEST_INIT_STATUS,
- self.trace_id
- )
- )
- if affected_rows == 0:
- return {
- "traceId": self.trace_id,
- "info": "并发任务抢占锁失败",
- "message": "该 trace_id 正在处理中或者已经处理完成"
- }
- card_list, new_items = await self.generate_cards(result=video_result)
- update_sql = f"""
- UPDATE {self.article_match_video_table}
- SET response = %s, success_status = %s
- WHERE trace_id = %s and success_status = %s;
- """
- await self.mysql_client.async_insert(
- sql=update_sql,
- params=(
- json.dumps(new_items, ensure_ascii=False),
- server_const.REQUEST_SUCCESS_STATUS,
- self.trace_id,
- server_const.REQUEST_PROCESSING_TASK
- )
- )
- return {
- "traceId": self.trace_id,
- "miniprogramList": card_list
- }
- case server_const.TASK_ILLEGAL_CODE:
- return {
- "traceId": self.trace_id,
- "code": server_const.TASK_ILLEGAL_CODE,
- "error": "该文章被kimi识别为高风险文章,不处理"
- }
- case server_const.TASK_BAD_CATEGORY_CODE:
- return {
- "traceId": self.trace_id,
- "code": server_const.TASK_BAD_CATEGORY_CODE,
- "error": "该文章品类不符合这个账号,不做冷启动处理"
- }
- case server_const.TASK_EXIT_CODE:
- return {
- "traceId": self.trace_id,
- "code": server_const.TASK_EXIT_CODE,
- "error": "该文章已经退场or晋级, 不再冷启处理"
- }
- case server_const.TASK_FAIL_CODE:
- return {
- "traceId": self.trace_id,
- "code": server_const.TASK_FAIL_CODE,
- "error": "该任务执行失败"
- }
- case server_const.TASK_PROCESSING_CODE:
- return {
- "traceId": self.trace_id,
- "code": server_const.TASK_PROCESSING_CODE,
- "message": "该任务正在执行中"
- }
- async def check_trace_id(self):
- """
- check trace id 是否存在与系统中
- """
- select_sql = f"""
- SELECT trace_id
- FROM {self.article_match_video_table}
- WHERE trace_id = '{self.trace_id}';
- """
- response = await self.mysql_client.async_select(select_sql)
- if response:
- return True
- else:
- return False
- async def deal(self):
- """
- api process starts from here
- :return:
- """
- params_error = self.check_params()
- if params_error:
- return params_error
- else:
- trace_id_exist_flag = await self.check_trace_id()
- if trace_id_exist_flag:
- return await self.job()
- else:
- response = await forward_requests(
- params={
- "traceId": self.trace_id,
- "miniprogramUseType": self.mini_program_type
- },
- api="recall_videos"
- )
- return response
|