response.py 12 KB


  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import uuid
  6. import time
  7. import random
  8. import hashlib
  9. import urllib.parse
  10. from applications.log import logging
  11. from applications.const import server_const
  12. from applications.functions.forward_request import forward_requests
  13. class Response(object):
  14. """
  15. Response
  16. """
  17. def __init__(self, params, mysql_client, config):
  18. """
  19. Response 接口
  20. """
  21. self.trace_id = None
  22. self.mini_program_type = None
  23. self.mysql_client = mysql_client
  24. self.params = params
  25. self.article_match_video_table = config.article_match_video_table
  26. self.mini_program_map = json.loads(config.get_config_value("miniMap"))
  27. def check_params(self):
  28. """
  29. 请求参数校验
  30. :return:
  31. """
  32. try:
  33. self.mini_program_type = self.params['miniprogramUseType']
  34. self.trace_id = self.params['traceId']
  35. if self.mini_program_type in [
  36. server_const.DAILY_CODE,
  37. server_const.TOULIU_CODE,
  38. server_const.WECOM_CODE,
  39. server_const.DAITOU_CODE,
  40. server_const.COOPERATION_CODE
  41. ]:
  42. return None
  43. else:
  44. return {
  45. "error": "params error",
  46. "message": "mini_program_type error",
  47. "info": self.params
  48. }
  49. except Exception as e:
  50. return {
  51. "error": "params error",
  52. "message": str(e),
  53. "info": self.params
  54. }
  55. async def get_videos_result(self):
  56. """
  57. 获取结果
  58. :return:
  59. """
  60. select_sql = f"""
  61. SELECT gh_id, content_status, response, process_times
  62. FROM {self.article_match_video_table}
  63. WHERE trace_id = '{self.trace_id}';
  64. """
  65. info_tuple = await self.mysql_client.async_select(select_sql)
  66. gh_id, content_status, response, process_times = info_tuple[0]
  67. return {
  68. "gh_id": gh_id,
  69. "content_status": content_status,
  70. "response": response,
  71. "process_times": process_times
  72. }
  73. def create_gzh_path(self, video_id, shared_uid, gh_id):
  74. """
  75. :param gh_id: 公众号账号的gh_id
  76. :param video_id: 视频 id
  77. :param shared_uid: 分享 id
  78. """
  79. def generate_source_id():
  80. """
  81. generate_source_id
  82. :return:
  83. """
  84. timestamp = str(int(time.time() * 1000))
  85. random_str = str(random.randint(1000, 9999))
  86. hash_input = f"{timestamp}-{random_str}"
  87. return hashlib.md5(hash_input.encode()).hexdigest()
  88. root_share_id = str(uuid.uuid4())
  89. match self.mini_program_type:
  90. case server_const.DAILY_CODE:
  91. source_id = "{}_{}".format(server_const.DAILY_PREFIX, generate_source_id())
  92. case server_const.TOULIU_CODE:
  93. source_id = "{}_{}_{}".format(server_const.TOULIU_PREFIX, gh_id, generate_source_id())
  94. case server_const.WECOM_CODE:
  95. source_id = "{}_{}".format(server_const.WECOM_PREFIX, generate_source_id())
  96. case server_const.DAITOU_CODE:
  97. source_id = "{}_{}_{}".format(server_const.DAITOU_PREFIX, gh_id, generate_source_id())
  98. case server_const.COOPERATION_CODE:
  99. source_id = "{}_{}".format(server_const.COOPERATION_PREFIX, generate_source_id())
  100. url = f"pages/user-videos?id={video_id}&su={shared_uid}&fromGzh=1&rootShareId={root_share_id}&shareId={root_share_id}&rootSourceId={source_id}"
  101. return (
  102. root_share_id,
  103. source_id,
  104. f"pages/category?jumpPage={urllib.parse.quote(url, safe='')}",
  105. )
  106. async def generate_single_card(self, index, gh_id, mini_id, item):
  107. """
  108. 生成单个分享卡片
  109. :param item: 单个视频结果
  110. :param mini_id: 小程序 appType
  111. :param gh_id: 公众号 id
  112. :param index: 视频位置
  113. :return:
  114. """
  115. str_mini_id = str(mini_id)
  116. mini_info = self.mini_program_map[str_mini_id]
  117. avatar, app_id, app_name = mini_info['avatar'], mini_info['id'], mini_info['name']
  118. root_share_id, root_source_id, production_path = self.create_gzh_path(
  119. video_id=item['videoId'],
  120. shared_uid=item['uid'],
  121. gh_id=gh_id
  122. )
  123. logging(
  124. code="1002",
  125. info="root_share_id --{}, productionPath -- {}".format(
  126. root_share_id, production_path
  127. ),
  128. function="process",
  129. trace_id=self.trace_id,
  130. )
  131. result = {
  132. "productionCover": item['videoCover'],
  133. "productionName": item['kimiTitle'],
  134. "programAvatar": avatar,
  135. "programId": app_id,
  136. "programName": app_name,
  137. "source": item['source'],
  138. "rootShareId": root_share_id,
  139. "productionPath": production_path,
  140. "videoUrl": item['videoPath'],
  141. "mini_id": mini_id
  142. }
  143. if index == 1:
  144. result['paragraphPosition'] = 0.01
  145. else:
  146. position = (index - 1) * 0.25
  147. result['paragraphPosition'] = position
  148. item['rootSourceId'] = root_source_id
  149. return result, item
  150. async def generate_cards(self, result):
  151. """
  152. 生成返回卡片
  153. :return:
  154. """
  155. gh_id = result['gh_id']
  156. response = json.loads(result['response'])
  157. card_list = []
  158. new_item_list = []
  159. for index, item in enumerate(response, 1):
  160. card, new_item = await self.generate_single_card(index, gh_id, server_const.DEFAULT_APP_ID, item)
  161. card_list.append(card)
  162. new_item_list.append(new_item)
  163. return card_list, new_item_list
  164. async def job(self):
  165. """
  166. 执行方法
  167. :return:
  168. """
  169. video_result = await self.get_videos_result()
  170. status_code = video_result.get('content_status')
  171. process_times = video_result.get('process_times')
  172. match status_code:
  173. case server_const.TASK_INIT_CODE:
  174. if process_times > server_const.TASK_MAX_PROCESS_TIMES:
  175. result = {
  176. "traceId": self.trace_id,
  177. "code": server_const.TASK_INIT_CODE,
  178. "error": "匹配失败,处理超过{}次".format(server_const.TASK_MAX_PROCESS_TIMES)
  179. }
  180. else:
  181. result = {
  182. "traceId": self.trace_id,
  183. "code": server_const.TASK_INIT_CODE,
  184. "message": "该请求还没处理"
  185. }
  186. return result
  187. case server_const.TASK_KIMI_FINISHED_CODE:
  188. return {
  189. "traceId": self.trace_id,
  190. "code": server_const.TASK_KIMI_FINISHED_CODE,
  191. "message": "已经执行完kimi"
  192. }
  193. case server_const.TASK_SPIDER_FINISHED_CODE:
  194. return {
  195. "traceId": self.trace_id,
  196. "code": server_const.TASK_SPIDER_FINISHED_CODE,
  197. "message": "已经执行完爬虫"
  198. }
  199. case server_const.TASK_ETL_FINISHED_CODE:
  200. return {
  201. "traceId": self.trace_id,
  202. "code": server_const.TASK_ETL_FINISHED_CODE,
  203. "message": "已经执行完 etl"
  204. }
  205. case server_const.TASK_PUBLISHED_CODE:
  206. # 修改任务状态为处理中
  207. update_sql = f"""
  208. UPDATE {self.article_match_video_table}
  209. SET success_status = %s
  210. WHERE success_status = %s and trace_id = %s;
  211. """
  212. affected_rows = await self.mysql_client.async_insert(
  213. sql=update_sql,
  214. params=(
  215. server_const.REQUEST_PROCESSING_TASK,
  216. server_const.REQUEST_INIT_STATUS,
  217. self.trace_id
  218. )
  219. )
  220. if affected_rows == 0:
  221. return {
  222. "traceId": self.trace_id,
  223. "info": "并发任务抢占锁失败",
  224. "message": "该 trace_id 正在处理中或者已经处理完成"
  225. }
  226. card_list, new_items = await self.generate_cards(result=video_result)
  227. update_sql = f"""
  228. UPDATE {self.article_match_video_table}
  229. SET response = %s, success_status = %s
  230. WHERE trace_id = %s and success_status = %s;
  231. """
  232. await self.mysql_client.async_insert(
  233. sql=update_sql,
  234. params=(
  235. json.dumps(new_items, ensure_ascii=False),
  236. server_const.REQUEST_SUCCESS_STATUS,
  237. self.trace_id,
  238. server_const.REQUEST_PROCESSING_TASK
  239. )
  240. )
  241. return {
  242. "traceId": self.trace_id,
  243. "miniprogramList": card_list
  244. }
  245. case server_const.TASK_ILLEGAL_CODE:
  246. return {
  247. "traceId": self.trace_id,
  248. "code": server_const.TASK_ILLEGAL_CODE,
  249. "error": "该文章被kimi识别为高风险文章,不处理"
  250. }
  251. case server_const.TASK_BAD_CATEGORY_CODE:
  252. return {
  253. "traceId": self.trace_id,
  254. "code": server_const.TASK_BAD_CATEGORY_CODE,
  255. "error": "该文章品类不符合这个账号,不做冷启动处理"
  256. }
  257. case server_const.TASK_EXIT_CODE:
  258. return {
  259. "traceId": self.trace_id,
  260. "code": server_const.TASK_EXIT_CODE,
  261. "error": "该文章已经退场or晋级, 不再冷启处理"
  262. }
  263. case server_const.TASK_FAIL_CODE:
  264. return {
  265. "traceId": self.trace_id,
  266. "code": server_const.TASK_FAIL_CODE,
  267. "error": "该任务执行失败"
  268. }
  269. case server_const.TASK_PROCESSING_CODE:
  270. return {
  271. "traceId": self.trace_id,
  272. "code": server_const.TASK_PROCESSING_CODE,
  273. "message": "该任务正在执行中"
  274. }
  275. async def check_trace_id(self):
  276. """
  277. check trace id 是否存在与系统中
  278. """
  279. select_sql = f"""
  280. SELECT trace_id
  281. FROM {self.article_match_video_table}
  282. WHERE trace_id = '{self.trace_id}';
  283. """
  284. response = await self.mysql_client.async_select(select_sql)
  285. if response:
  286. return True
  287. else:
  288. return False
  289. async def deal(self):
  290. """
  291. api process starts from here
  292. :return:
  293. """
  294. params_error = self.check_params()
  295. if params_error:
  296. return params_error
  297. else:
  298. trace_id_exist_flag = await self.check_trace_id()
  299. if trace_id_exist_flag:
  300. return await self.job()
  301. else:
  302. response = await forward_requests(
  303. params={
  304. "traceId": self.trace_id,
  305. "miniprogramUseType": self.mini_program_type
  306. },
  307. api="recall_videos"
  308. )
  309. return response