123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246 |
- """
- @author: luojunhui
- """
- import json
- import uuid
- import time
- import random
- import hashlib
- import urllib.parse
- from applications.log import logging
- class Response(object):
- """
- Response
- """
- def __init__(self, params, mysql_client, config):
- """
- Response 接口
- """
- self.trace_id = None
- self.mini_program_type = None
- self.mysql_client = mysql_client
- self.params = params
- self.article_match_video_table = config.article_match_video_table
- self.mini_program_map = json.loads(config.getConfigValue("miniMap"))
- def check_params(self):
- """
- 请求参数校验
- :return:
- """
- try:
- self.mini_program_type = self.params['miniprogramUseType']
- self.trace_id = self.params['traceId']
- return None
- except Exception as e:
- return {
- "error": "params error",
- "msg": str(e),
- "info": self.params
- }
- async def get_videos_result(self):
- """
- 获取结果
- :return:
- """
- select_sql = f"""
- SELECT gh_id, content_status, response, process_times
- FROM {self.article_match_video_table}
- WHERE trace_id = '{self.trace_id}';
- """
- info_tuple = await self.mysql_client.async_select(select_sql)
- gh_id, content_status, response, process_times = info_tuple[0]
- return {
- "ghId": gh_id,
- "contentStatus": content_status,
- "response": response,
- "processTimes": process_times
- }
- def create_gzh_path(self, video_id, shared_uid, gh_id):
- """
- :param gh_id: 公众号账号的gh_id
- :param video_id: 视频 id
- :param shared_uid: 分享 id
- """
- def generate_source_id():
- """
- generate_source_id
- :return:
- """
- timestamp = str(int(time.time() * 1000))
- random_str = str(random.randint(1000, 9999))
- hash_input = f"{timestamp}-{random_str}"
- return hashlib.md5(hash_input.encode()).hexdigest()
- root_share_id = str(uuid.uuid4())
- if self.mini_program_type == 2:
- source_id = (
- "touliu_tencentGzhArticle_{}_".format(gh_id) + generate_source_id()
- )
- elif self.mini_program_type == 1:
- source_id = "longArticles_" + generate_source_id()
- elif self.mini_program_type == 3:
- source_id = "WeCom_" + generate_source_id()
- else:
- source_id = "Error mini_program_type {}".format(self.mini_program_type)
- url = f"pages/user-videos?id={video_id}&su={shared_uid}&fromGzh=1&rootShareId={root_share_id}&shareId={root_share_id}&rootSourceId={source_id}"
- # 自动把 root_share_id 加入到白名单
- # auto_white(root_share_id)
- return (
- root_share_id,
- source_id,
- f"pages/category?jumpPage={urllib.parse.quote(url, safe='')}",
- )
- async def generate_single_card(self, index, gh_id, mini_id, item):
- """
- 生成单个分享卡片
- :param item: 单个视频结果
- :param mini_id: 小程序 appType
- :param gh_id: 公众号 id
- :param index: 视频位置
- :return:
- """
- str_mini_id = str(mini_id)
- mini_info = self.mini_program_map[str_mini_id]
- avatar, app_id, app_name = mini_info['avatar'], mini_info['id'], mini_info['name']
- root_share_id, root_source_id, production_path = self.create_gzh_path(
- video_id=item['videoId'],
- shared_uid=item['uid'],
- gh_id=gh_id
- )
- logging(
- code="1002",
- info="root_share_id --{}, productionPath -- {}".format(
- root_share_id, production_path
- ),
- function="process",
- trace_id=self.trace_id,
- )
- result = {
- "productionCover": item['videoCover'],
- "productionName": item['kimiTitle'],
- "programAvatar": avatar,
- "programId": app_id,
- "programName": app_name,
- "source": item['source'],
- "rootShareId": root_share_id,
- "productionPath": production_path,
- "videoUrl": item['videoPath'],
- "mini_id": mini_id,
- "paragraphPosition": index * 0.25
- }
- if index == 1:
- result['paragraphPosition'] = 0.01
- item['rootSourceId'] = root_source_id
- return result, item
- async def generate_cards(self, result):
- """
- 生成返回卡片
- :return:
- """
- gh_id = result['ghId']
- response = json.loads(result['response'])
- touliu_mini_program_id = 33
- we_com_mini_program_id = 27
- match self.mini_program_type:
- case 1:
- L = []
- new_item_list = []
- for index, item in enumerate(response, 1):
- random_num = random.randint(1, 10)
- if random_num in [1, 2, 3, 4, 5, 6]:
- long_articles_mini_program_id = 25
- elif random_num in [7, 8]:
- long_articles_mini_program_id = 29
- else:
- long_articles_mini_program_id = 31
- card, new_item = await self.generate_single_card(index, gh_id, long_articles_mini_program_id, item)
- L.append(card)
- new_item_list.append(new_item)
- return L, new_item_list
- case 2:
- L = []
- new_item_list = []
- for index, item in enumerate(response, 1):
- card, new_item = await self.generate_single_card(index, gh_id, touliu_mini_program_id, item)
- L.append(card)
- new_item_list.append(new_item)
- return L, new_item_list
- case 3:
- L = []
- new_item_list = []
- for index, item in enumerate(response, 1):
- card, new_item = await self.generate_single_card(index, gh_id, we_com_mini_program_id, item)
- L.append(card)
- new_item_list.append(card)
- return L, new_item_list
- async def job(self):
- """
- 执行方法
- :return:
- """
- response = await self.get_videos_result()
- status_code = response.get('contentStatus')
- process_times = response.get('processTimes')
- match status_code:
- case 0:
- if process_times > 3:
- result = {
- "traceId": self.trace_id,
- "code": 0,
- "error": "匹配失败,处理超过 3 次"
- }
- else:
- result = {
- "traceId": self.trace_id,
- "code": 0,
- "Message": "该请求还没处理"
- }
- return result
- case 1:
- return {
- "traceId": self.trace_id,
- "code": 1,
- "Message": "该请求正在处理中"
- }
- case 2:
- card_list, new_items = await self.generate_cards(result=response)
- update_sql = f"""
- UPDATE {self.article_match_video_table}
- SET response = %s, success_status = %s
- WHERE trace_id = %s;
- """
- await self.mysql_client.async_insert(
- sql=update_sql,
- params=(json.dumps(new_items, ensure_ascii=False), 1, self.trace_id)
- )
- return {"traceId": self.trace_id, "miniprogramList": card_list}
- case 3:
- return {
- "traceId": self.trace_id,
- "code": 3,
- "error": "匹配失败,超过三次"
- }
- case 4:
- return {}
- async def deal(self):
- """
- api process starts from here
- :return:
- """
- params_error = self.check_params()
- if params_error:
- return params_error
- else:
- return await self.job()
|