response.py 12 KB


  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import uuid
  6. import time
  7. import random
  8. import hashlib
  9. import urllib.parse
  10. from applications.log import logging
  11. from applications.const import server_const
  12. from applications.functions.forward_request import forward_requests
  13. class Response(object):
  14. """
  15. Response
  16. """
  17. def __init__(self, params, mysql_client, config):
  18. """
  19. Response 接口
  20. """
  21. self.trace_id = None
  22. self.mini_program_type = None
  23. self.mysql_client = mysql_client
  24. self.params = params
  25. self.article_match_video_table = config.article_match_video_table
  26. self.mini_program_map = json.loads(config.get_config_value("miniMap"))
  27. def check_params(self):
  28. """
  29. 请求参数校验
  30. :return:
  31. """
  32. try:
  33. self.mini_program_type = self.params['miniprogramUseType']
  34. self.trace_id = self.params['traceId']
  35. if self.mini_program_type in [
  36. server_const.DAILY_CODE,
  37. server_const.TOULIU_CODE,
  38. server_const.WECOM_CODE,
  39. server_const.DAITOU_CODE,
  40. server_const.COOPERATION_CODE,
  41. server_const.SERVICE_COOPERATION_CODE
  42. ]:
  43. return None
  44. else:
  45. return {
  46. "error": "params error",
  47. "message": "mini_program_type error",
  48. "info": self.params
  49. }
  50. except Exception as e:
  51. return {
  52. "error": "params error",
  53. "message": str(e),
  54. "info": self.params
  55. }
  56. async def get_videos_result(self):
  57. """
  58. 获取结果
  59. :return:
  60. """
  61. select_sql = f"""
  62. SELECT gh_id, content_status, response, process_times
  63. FROM {self.article_match_video_table}
  64. WHERE trace_id = '{self.trace_id}';
  65. """
  66. info_tuple = await self.mysql_client.async_select(select_sql)
  67. gh_id, content_status, response, process_times = info_tuple[0]
  68. return {
  69. "gh_id": gh_id,
  70. "content_status": content_status,
  71. "response": response,
  72. "process_times": process_times
  73. }
  74. def create_gzh_path(self, video_id, shared_uid, gh_id):
  75. """
  76. :param gh_id: 公众号账号的gh_id
  77. :param video_id: 视频 id
  78. :param shared_uid: 分享 id
  79. """
  80. def generate_source_id():
  81. """
  82. generate_source_id
  83. :return:
  84. """
  85. timestamp = str(int(time.time() * 1000))
  86. random_str = str(random.randint(1000, 9999))
  87. hash_input = f"{timestamp}-{random_str}"
  88. return hashlib.md5(hash_input.encode()).hexdigest()
  89. root_share_id = str(uuid.uuid4())
  90. match self.mini_program_type:
  91. case server_const.DAILY_CODE:
  92. source_id = "{}_{}".format(server_const.DAILY_PREFIX, generate_source_id())
  93. case server_const.TOULIU_CODE:
  94. source_id = "{}_{}_{}".format(server_const.TOULIU_PREFIX, gh_id, generate_source_id())
  95. case server_const.WECOM_CODE:
  96. source_id = "{}_{}".format(server_const.WECOM_PREFIX, generate_source_id())
  97. case server_const.DAITOU_CODE:
  98. source_id = "{}_{}_{}".format(server_const.DAITOU_PREFIX, gh_id, generate_source_id())
  99. case server_const.COOPERATION_CODE:
  100. source_id = "{}_{}".format(server_const.COOPERATION_PREFIX, generate_source_id())
  101. case server_const.SERVICE_COOPERATION_CODE:
  102. source_id = "{}_{}".format(server_const.SERVICE_COOPERATION_PREFIX, generate_source_id())
  103. url = f"pages/user-videos?id={video_id}&su={shared_uid}&fromGzh=1&rootShareId={root_share_id}&shareId={root_share_id}&rootSourceId={source_id}"
  104. return (
  105. root_share_id,
  106. source_id,
  107. f"pages/category?jumpPage={urllib.parse.quote(url, safe='')}",
  108. )
  109. async def generate_single_card(self, index, gh_id, mini_id, item):
  110. """
  111. 生成单个分享卡片
  112. :param item: 单个视频结果
  113. :param mini_id: 小程序 appType
  114. :param gh_id: 公众号 id
  115. :param index: 视频位置
  116. :return:
  117. """
  118. str_mini_id = str(mini_id)
  119. mini_info = self.mini_program_map[str_mini_id]
  120. avatar, app_id, app_name = mini_info['avatar'], mini_info['id'], mini_info['name']
  121. root_share_id, root_source_id, production_path = self.create_gzh_path(
  122. video_id=item['videoId'],
  123. shared_uid=item['uid'],
  124. gh_id=gh_id
  125. )
  126. logging(
  127. code="1002",
  128. info="root_share_id --{}, productionPath -- {}".format(
  129. root_share_id, production_path
  130. ),
  131. function="process",
  132. trace_id=self.trace_id,
  133. )
  134. result = {
  135. "productionCover": item['videoCover'],
  136. "productionName": item['kimiTitle'],
  137. "programAvatar": avatar,
  138. "programId": app_id,
  139. "programName": app_name,
  140. "source": item['source'],
  141. "rootShareId": root_share_id,
  142. "productionPath": production_path,
  143. "videoUrl": item['videoPath'],
  144. "mini_id": mini_id
  145. }
  146. if index == 1:
  147. result['paragraphPosition'] = 0.01
  148. else:
  149. position = (index - 1) * 0.25
  150. result['paragraphPosition'] = position
  151. item['rootSourceId'] = root_source_id
  152. return result, item
  153. async def generate_cards(self, result):
  154. """
  155. 生成返回卡片
  156. :return:
  157. """
  158. gh_id = result['gh_id']
  159. response = json.loads(result['response'])
  160. card_list = []
  161. new_item_list = []
  162. for index, item in enumerate(response, 1):
  163. card, new_item = await self.generate_single_card(index, gh_id, server_const.DEFAULT_APP_ID, item)
  164. card_list.append(card)
  165. new_item_list.append(new_item)
  166. return card_list, new_item_list
  167. async def job(self):
  168. """
  169. 执行方法
  170. :return:
  171. """
  172. video_result = await self.get_videos_result()
  173. status_code = video_result.get('content_status')
  174. process_times = video_result.get('process_times')
  175. match status_code:
  176. case server_const.TASK_INIT_CODE:
  177. if process_times > server_const.TASK_MAX_PROCESS_TIMES:
  178. result = {
  179. "traceId": self.trace_id,
  180. "code": server_const.TASK_INIT_CODE,
  181. "error": "匹配失败,处理超过{}次".format(server_const.TASK_MAX_PROCESS_TIMES)
  182. }
  183. else:
  184. result = {
  185. "traceId": self.trace_id,
  186. "code": server_const.TASK_INIT_CODE,
  187. "message": "该请求还没处理"
  188. }
  189. return result
  190. case server_const.TASK_KIMI_FINISHED_CODE:
  191. return {
  192. "traceId": self.trace_id,
  193. "code": server_const.TASK_KIMI_FINISHED_CODE,
  194. "message": "已经执行完kimi"
  195. }
  196. case server_const.TASK_SPIDER_FINISHED_CODE:
  197. return {
  198. "traceId": self.trace_id,
  199. "code": server_const.TASK_SPIDER_FINISHED_CODE,
  200. "message": "已经执行完爬虫"
  201. }
  202. case server_const.TASK_ETL_FINISHED_CODE:
  203. return {
  204. "traceId": self.trace_id,
  205. "code": server_const.TASK_ETL_FINISHED_CODE,
  206. "message": "已经执行完 etl"
  207. }
  208. case server_const.TASK_PUBLISHED_CODE:
  209. # 修改任务状态为处理中
  210. update_sql = f"""
  211. UPDATE {self.article_match_video_table}
  212. SET success_status = %s
  213. WHERE success_status = %s and trace_id = %s;
  214. """
  215. affected_rows = await self.mysql_client.async_insert(
  216. sql=update_sql,
  217. params=(
  218. server_const.REQUEST_PROCESSING_TASK,
  219. server_const.REQUEST_INIT_STATUS,
  220. self.trace_id
  221. )
  222. )
  223. if affected_rows == 0:
  224. return {
  225. "traceId": self.trace_id,
  226. "info": "并发任务抢占锁失败",
  227. "message": "该 trace_id 正在处理中或者已经处理完成"
  228. }
  229. card_list, new_items = await self.generate_cards(result=video_result)
  230. update_sql = f"""
  231. UPDATE {self.article_match_video_table}
  232. SET response = %s, success_status = %s
  233. WHERE trace_id = %s and success_status = %s;
  234. """
  235. await self.mysql_client.async_insert(
  236. sql=update_sql,
  237. params=(
  238. json.dumps(new_items, ensure_ascii=False),
  239. server_const.REQUEST_SUCCESS_STATUS,
  240. self.trace_id,
  241. server_const.REQUEST_PROCESSING_TASK
  242. )
  243. )
  244. return {
  245. "traceId": self.trace_id,
  246. "miniprogramList": card_list
  247. }
  248. case server_const.TASK_ILLEGAL_CODE:
  249. return {
  250. "traceId": self.trace_id,
  251. "code": server_const.TASK_ILLEGAL_CODE,
  252. "error": "该文章被kimi识别为高风险文章,不处理"
  253. }
  254. case server_const.TASK_BAD_CATEGORY_CODE:
  255. return {
  256. "traceId": self.trace_id,
  257. "code": server_const.TASK_BAD_CATEGORY_CODE,
  258. "error": "该文章品类不符合这个账号,不做冷启动处理"
  259. }
  260. case server_const.TASK_EXIT_CODE:
  261. return {
  262. "traceId": self.trace_id,
  263. "code": server_const.TASK_EXIT_CODE,
  264. "error": "该文章已经退场or晋级, 不再冷启处理"
  265. }
  266. case server_const.TASK_FAIL_CODE:
  267. return {
  268. "traceId": self.trace_id,
  269. "code": server_const.TASK_FAIL_CODE,
  270. "error": "该任务执行失败"
  271. }
  272. case server_const.TASK_PROCESSING_CODE:
  273. return {
  274. "traceId": self.trace_id,
  275. "code": server_const.TASK_PROCESSING_CODE,
  276. "message": "该任务正在执行中"
  277. }
  278. async def check_trace_id(self):
  279. """
  280. check trace id 是否存在与系统中
  281. """
  282. select_sql = f"""
  283. SELECT trace_id
  284. FROM {self.article_match_video_table}
  285. WHERE trace_id = '{self.trace_id}';
  286. """
  287. response = await self.mysql_client.async_select(select_sql)
  288. if response:
  289. return True
  290. else:
  291. return False
  292. async def deal(self):
  293. """
  294. api process starts from here
  295. :return:
  296. """
  297. params_error = self.check_params()
  298. if params_error:
  299. return params_error
  300. else:
  301. trace_id_exist_flag = await self.check_trace_id()
  302. if trace_id_exist_flag:
  303. return await self.job()
  304. else:
  305. response = await forward_requests(
  306. params={
  307. "traceId": self.trace_id,
  308. "miniprogramUseType": self.mini_program_type
  309. },
  310. api="recall_videos"
  311. )
  312. return response