response.py 11 KB


  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import uuid
  6. import time
  7. import random
  8. import hashlib
  9. import urllib.parse
  10. from applications.log import logging
  11. class Response(object):
  12. """
  13. Response
  14. """
  15. REQUEST_INIT_STATUS = 0
  16. REQUEST_SUCCESS_STATUS = 1
  17. REQUEST_PROCESSING_TASK = 101
  18. TASK_MAX_PROCESS_TIMES = 3
  19. def __init__(self, params, mysql_client, config):
  20. """
  21. Response 接口
  22. """
  23. self.trace_id = None
  24. self.mini_program_type = None
  25. self.mysql_client = mysql_client
  26. self.params = params
  27. self.article_match_video_table = config.article_match_video_table
  28. self.mini_program_map = json.loads(config.get_config_value("miniMap"))
  29. def check_params(self):
  30. """
  31. 请求参数校验
  32. :return:
  33. """
  34. try:
  35. self.mini_program_type = self.params['miniprogramUseType']
  36. self.trace_id = self.params['traceId']
  37. return None
  38. except Exception as e:
  39. return {
  40. "error": "params error",
  41. "message": str(e),
  42. "info": self.params
  43. }
  44. async def get_videos_result(self):
  45. """
  46. 获取结果
  47. :return:
  48. """
  49. select_sql = f"""
  50. SELECT gh_id, content_status, response, process_times
  51. FROM {self.article_match_video_table}
  52. WHERE trace_id = '{self.trace_id}';
  53. """
  54. info_tuple = await self.mysql_client.async_select(select_sql)
  55. gh_id, content_status, response, process_times = info_tuple[0]
  56. return {
  57. "gh_id": gh_id,
  58. "content_status": content_status,
  59. "response": response,
  60. "process_times": process_times
  61. }
  62. def create_gzh_path(self, video_id, shared_uid, gh_id):
  63. """
  64. :param gh_id: 公众号账号的gh_id
  65. :param video_id: 视频 id
  66. :param shared_uid: 分享 id
  67. """
  68. def generate_source_id():
  69. """
  70. generate_source_id
  71. :return:
  72. """
  73. timestamp = str(int(time.time() * 1000))
  74. random_str = str(random.randint(1000, 9999))
  75. hash_input = f"{timestamp}-{random_str}"
  76. return hashlib.md5(hash_input.encode()).hexdigest()
  77. root_share_id = str(uuid.uuid4())
  78. if self.mini_program_type == 2:
  79. source_id = (
  80. "touliu_tencentGzhArticle_{}_".format(gh_id) + generate_source_id()
  81. )
  82. elif self.mini_program_type == 1:
  83. source_id = "longArticles_" + generate_source_id()
  84. elif self.mini_program_type == 3:
  85. source_id = "WeCom_" + generate_source_id()
  86. else:
  87. source_id = "Error mini_program_type {}".format(self.mini_program_type)
  88. url = f"pages/user-videos?id={video_id}&su={shared_uid}&fromGzh=1&rootShareId={root_share_id}&shareId={root_share_id}&rootSourceId={source_id}"
  89. # 自动把 root_share_id 加入到白名单
  90. # auto_white(root_share_id)
  91. return (
  92. root_share_id,
  93. source_id,
  94. f"pages/category?jumpPage={urllib.parse.quote(url, safe='')}",
  95. )
  96. async def generate_single_card(self, index, gh_id, mini_id, item):
  97. """
  98. 生成单个分享卡片
  99. :param item: 单个视频结果
  100. :param mini_id: 小程序 appType
  101. :param gh_id: 公众号 id
  102. :param index: 视频位置
  103. :return:
  104. """
  105. str_mini_id = str(mini_id)
  106. mini_info = self.mini_program_map[str_mini_id]
  107. avatar, app_id, app_name = mini_info['avatar'], mini_info['id'], mini_info['name']
  108. root_share_id, root_source_id, production_path = self.create_gzh_path(
  109. video_id=item['videoId'],
  110. shared_uid=item['uid'],
  111. gh_id=gh_id
  112. )
  113. logging(
  114. code="1002",
  115. info="root_share_id --{}, productionPath -- {}".format(
  116. root_share_id, production_path
  117. ),
  118. function="process",
  119. trace_id=self.trace_id,
  120. )
  121. result = {
  122. "productionCover": item['videoCover'],
  123. "productionName": item['kimiTitle'],
  124. "programAvatar": avatar,
  125. "programId": app_id,
  126. "programName": app_name,
  127. "source": item['source'],
  128. "rootShareId": root_share_id,
  129. "productionPath": production_path,
  130. "videoUrl": item['videoPath'],
  131. "mini_id": mini_id
  132. }
  133. if index == 1:
  134. result['paragraphPosition'] = 0.01
  135. else:
  136. position = (index - 1) * 0.25
  137. result['paragraphPosition'] = position
  138. item['rootSourceId'] = root_source_id
  139. return result, item
  140. async def generate_cards(self, result):
  141. """
  142. 生成返回卡片
  143. :return:
  144. """
  145. gh_id = result['gh_id']
  146. response = json.loads(result['response'])
  147. long_articles_mini_program_id = 25
  148. touliu_mini_program_id = 33
  149. we_com_mini_program_id = 27
  150. match self.mini_program_type:
  151. case 1:
  152. L = []
  153. new_item_list = []
  154. for index, item in enumerate(response, 1):
  155. # random_num = random.randint(1, 10)
  156. # if random_num in [1, 2, 3, 4, 5, 6]:
  157. # long_articles_mini_program_id = 25
  158. # elif random_num in [7, 8]:
  159. # long_articles_mini_program_id = 29
  160. # else:
  161. # long_articles_mini_program_id = 31
  162. card, new_item = await self.generate_single_card(index, gh_id, long_articles_mini_program_id, item)
  163. L.append(card)
  164. new_item_list.append(new_item)
  165. return L, new_item_list
  166. case 2:
  167. L = []
  168. new_item_list = []
  169. for index, item in enumerate(response, 1):
  170. card, new_item = await self.generate_single_card(index, gh_id, touliu_mini_program_id, item)
  171. L.append(card)
  172. new_item_list.append(new_item)
  173. return L, new_item_list
  174. case 3:
  175. L = []
  176. new_item_list = []
  177. for index, item in enumerate(response, 1):
  178. card, new_item = await self.generate_single_card(index, gh_id, we_com_mini_program_id, item)
  179. L.append(card)
  180. new_item_list.append(card)
  181. return L, new_item_list
  182. async def job(self):
  183. """
  184. 执行方法
  185. :return:
  186. """
  187. response = await self.get_videos_result()
  188. status_code = response.get('content_status')
  189. process_times = response.get('process_times')
  190. match status_code:
  191. case 0:
  192. if process_times > self.TASK_MAX_PROCESS_TIMES:
  193. result = {
  194. "traceId": self.trace_id,
  195. "code": 0,
  196. "error": "匹配失败,处理超过 3 次"
  197. }
  198. else:
  199. result = {
  200. "traceId": self.trace_id,
  201. "code": 0,
  202. "message": "该请求还没处理"
  203. }
  204. return result
  205. case 1:
  206. return {
  207. "traceId": self.trace_id,
  208. "code": 1,
  209. "message": "已经执行完kimi"
  210. }
  211. case 2:
  212. return {
  213. "traceId": self.trace_id,
  214. "code": 2,
  215. "message": "已经执行完爬虫"
  216. }
  217. case 3:
  218. return {
  219. "traceId": self.trace_id,
  220. "code": 3,
  221. "message": "已经执行完 etl"
  222. }
  223. case 4:
  224. # 修改任务状态为处理中
  225. update_sql = f"""
  226. UPDATE {self.article_match_video_table}
  227. SET success_status = %s
  228. WHERE success_status = %s and trace_id = %s;
  229. """
  230. affected_rows = await self.mysql_client.async_insert(
  231. sql=update_sql,
  232. params=(
  233. self.REQUEST_PROCESSING_TASK,
  234. self.REQUEST_INIT_STATUS,
  235. self.trace_id
  236. )
  237. )
  238. if affected_rows == 0:
  239. return {
  240. "traceId": self.trace_id,
  241. "info": "并发任务抢占锁失败",
  242. "message": "该 trace_id 正在处理中或者已经处理完成"
  243. }
  244. card_list, new_items = await self.generate_cards(result=response)
  245. update_sql = f"""
  246. UPDATE {self.article_match_video_table}
  247. SET response = %s, success_status = %s
  248. WHERE trace_id = %s and success_status = %s;
  249. """
  250. await self.mysql_client.async_insert(
  251. sql=update_sql,
  252. params=(
  253. json.dumps(new_items, ensure_ascii=False),
  254. self.REQUEST_SUCCESS_STATUS,
  255. self.trace_id,
  256. self.REQUEST_PROCESSING_TASK
  257. )
  258. )
  259. return {
  260. "traceId": self.trace_id,
  261. "miniprogramList": card_list
  262. }
  263. case 95:
  264. return {
  265. "traceId": self.trace_id,
  266. "code": 95,
  267. "error": "该文章被kimi识别为高风险文章,不处理"
  268. }
  269. case 96:
  270. return {
  271. "traceId": self.trace_id,
  272. "code": 96,
  273. "error": "该文章品类不符合这个账号,不做冷启动处理"
  274. }
  275. case 97:
  276. return {
  277. "traceId": self.trace_id,
  278. "code": 97,
  279. "error": "该文章已经退场or晋级, 不再冷启处理"
  280. }
  281. case 99:
  282. return {
  283. "traceId": self.trace_id,
  284. "code": 99,
  285. "error": "该任务执行失败"
  286. }
  287. case 101:
  288. return {
  289. "traceId": self.trace_id,
  290. "code": 101,
  291. "message": "该任务正在执行中"
  292. }
  293. async def deal(self):
  294. """
  295. api process starts from here
  296. :return:
  297. """
  298. params_error = self.check_params()
  299. if params_error:
  300. return params_error
  301. else:
  302. return await self.job()