response.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import uuid
  6. import time
  7. import random
  8. import hashlib
  9. import urllib.parse
  10. from applications.log import logging
  11. from applications.const import server_const
  12. from applications.functions.forward_request import forward_requests
  13. class Response(object):
  14. """
  15. Response
  16. """
  17. def __init__(self, params, mysql_client, config):
  18. """
  19. Response 接口
  20. """
  21. self.trace_id = None
  22. self.mini_program_type = None
  23. self.mysql_client = mysql_client
  24. self.params = params
  25. self.article_match_video_table = config.article_match_video_table
  26. self.mini_program_map = json.loads(config.get_config_value("miniMap"))
  27. def check_params(self):
  28. """
  29. 请求参数校验
  30. :return:
  31. """
  32. try:
  33. self.mini_program_type = self.params['miniprogramUseType']
  34. self.trace_id = self.params['traceId']
  35. return None
  36. except Exception as e:
  37. return {
  38. "error": "params error",
  39. "message": str(e),
  40. "info": self.params
  41. }
  42. async def get_videos_result(self):
  43. """
  44. 获取结果
  45. :return:
  46. """
  47. select_sql = f"""
  48. SELECT gh_id, content_status, response, process_times
  49. FROM {self.article_match_video_table}
  50. WHERE trace_id = '{self.trace_id}';
  51. """
  52. info_tuple = await self.mysql_client.async_select(select_sql)
  53. gh_id, content_status, response, process_times = info_tuple[0]
  54. return {
  55. "gh_id": gh_id,
  56. "content_status": content_status,
  57. "response": response,
  58. "process_times": process_times
  59. }
  60. def create_gzh_path(self, video_id, shared_uid, gh_id):
  61. """
  62. :param gh_id: 公众号账号的gh_id
  63. :param video_id: 视频 id
  64. :param shared_uid: 分享 id
  65. """
  66. def generate_source_id():
  67. """
  68. generate_source_id
  69. :return:
  70. """
  71. timestamp = str(int(time.time() * 1000))
  72. random_str = str(random.randint(1000, 9999))
  73. hash_input = f"{timestamp}-{random_str}"
  74. return hashlib.md5(hash_input.encode()).hexdigest()
  75. root_share_id = str(uuid.uuid4())
  76. if self.mini_program_type == 2:
  77. source_id = (
  78. "touliu_tencentGzhArticle_{}_".format(gh_id) + generate_source_id()
  79. )
  80. elif self.mini_program_type == 1:
  81. source_id = "longArticles_" + generate_source_id()
  82. elif self.mini_program_type == 3:
  83. source_id = "WeCom_" + generate_source_id()
  84. else:
  85. source_id = "Error mini_program_type {}".format(self.mini_program_type)
  86. url = f"pages/user-videos?id={video_id}&su={shared_uid}&fromGzh=1&rootShareId={root_share_id}&shareId={root_share_id}&rootSourceId={source_id}"
  87. # 自动把 root_share_id 加入到白名单
  88. # auto_white(root_share_id)
  89. return (
  90. root_share_id,
  91. source_id,
  92. f"pages/category?jumpPage={urllib.parse.quote(url, safe='')}",
  93. )
  94. async def generate_single_card(self, index, gh_id, mini_id, item):
  95. """
  96. 生成单个分享卡片
  97. :param item: 单个视频结果
  98. :param mini_id: 小程序 appType
  99. :param gh_id: 公众号 id
  100. :param index: 视频位置
  101. :return:
  102. """
  103. str_mini_id = str(mini_id)
  104. mini_info = self.mini_program_map[str_mini_id]
  105. avatar, app_id, app_name = mini_info['avatar'], mini_info['id'], mini_info['name']
  106. root_share_id, root_source_id, production_path = self.create_gzh_path(
  107. video_id=item['videoId'],
  108. shared_uid=item['uid'],
  109. gh_id=gh_id
  110. )
  111. logging(
  112. code="1002",
  113. info="root_share_id --{}, productionPath -- {}".format(
  114. root_share_id, production_path
  115. ),
  116. function="process",
  117. trace_id=self.trace_id,
  118. )
  119. result = {
  120. "productionCover": item['videoCover'],
  121. "productionName": item['kimiTitle'],
  122. "programAvatar": avatar,
  123. "programId": app_id,
  124. "programName": app_name,
  125. "source": item['source'],
  126. "rootShareId": root_share_id,
  127. "productionPath": production_path,
  128. "videoUrl": item['videoPath'],
  129. "mini_id": mini_id
  130. }
  131. if index == 1:
  132. result['paragraphPosition'] = 0.01
  133. else:
  134. position = (index - 1) * 0.25
  135. result['paragraphPosition'] = position
  136. item['rootSourceId'] = root_source_id
  137. return result, item
  138. async def generate_cards(self, result):
  139. """
  140. 生成返回卡片
  141. :return:
  142. """
  143. gh_id = result['gh_id']
  144. response = json.loads(result['response'])
  145. long_articles_mini_program_id = 25
  146. touliu_mini_program_id = 33
  147. we_com_mini_program_id = 27
  148. match self.mini_program_type:
  149. case 1:
  150. L = []
  151. new_item_list = []
  152. for index, item in enumerate(response, 1):
  153. card, new_item = await self.generate_single_card(index, gh_id, long_articles_mini_program_id, item)
  154. L.append(card)
  155. new_item_list.append(new_item)
  156. return L, new_item_list
  157. case 2:
  158. L = []
  159. new_item_list = []
  160. for index, item in enumerate(response, 1):
  161. card, new_item = await self.generate_single_card(index, gh_id, touliu_mini_program_id, item)
  162. L.append(card)
  163. new_item_list.append(new_item)
  164. return L, new_item_list
  165. case 3:
  166. L = []
  167. new_item_list = []
  168. for index, item in enumerate(response, 1):
  169. card, new_item = await self.generate_single_card(index, gh_id, we_com_mini_program_id, item)
  170. L.append(card)
  171. new_item_list.append(card)
  172. return L, new_item_list
  173. async def job(self):
  174. """
  175. 执行方法
  176. :return:
  177. """
  178. response = await self.get_videos_result()
  179. status_code = response.get('content_status')
  180. process_times = response.get('process_times')
  181. match status_code:
  182. case 0:
  183. if process_times > server_const.TASK_MAX_PROCESS_TIMES:
  184. result = {
  185. "traceId": self.trace_id,
  186. "code": 0,
  187. "error": "匹配失败,处理超过 3 次"
  188. }
  189. else:
  190. result = {
  191. "traceId": self.trace_id,
  192. "code": 0,
  193. "message": "该请求还没处理"
  194. }
  195. return result
  196. case 1:
  197. return {
  198. "traceId": self.trace_id,
  199. "code": 1,
  200. "message": "已经执行完kimi"
  201. }
  202. case 2:
  203. return {
  204. "traceId": self.trace_id,
  205. "code": 2,
  206. "message": "已经执行完爬虫"
  207. }
  208. case 3:
  209. return {
  210. "traceId": self.trace_id,
  211. "code": 3,
  212. "message": "已经执行完 etl"
  213. }
  214. case 4:
  215. # 修改任务状态为处理中
  216. update_sql = f"""
  217. UPDATE {self.article_match_video_table}
  218. SET success_status = %s
  219. WHERE success_status = %s and trace_id = %s;
  220. """
  221. affected_rows = await self.mysql_client.async_insert(
  222. sql=update_sql,
  223. params=(
  224. server_const.REQUEST_PROCESSING_TASK,
  225. server_const.REQUEST_INIT_STATUS,
  226. self.trace_id
  227. )
  228. )
  229. if affected_rows == 0:
  230. return {
  231. "traceId": self.trace_id,
  232. "info": "并发任务抢占锁失败",
  233. "message": "该 trace_id 正在处理中或者已经处理完成"
  234. }
  235. card_list, new_items = await self.generate_cards(result=response)
  236. update_sql = f"""
  237. UPDATE {self.article_match_video_table}
  238. SET response = %s, success_status = %s
  239. WHERE trace_id = %s and success_status = %s;
  240. """
  241. await self.mysql_client.async_insert(
  242. sql=update_sql,
  243. params=(
  244. json.dumps(new_items, ensure_ascii=False),
  245. server_const.REQUEST_SUCCESS_STATUS,
  246. self.trace_id,
  247. server_const.REQUEST_PROCESSING_TASK
  248. )
  249. )
  250. return {
  251. "traceId": self.trace_id,
  252. "miniprogramList": card_list
  253. }
  254. case 95:
  255. return {
  256. "traceId": self.trace_id,
  257. "code": 95,
  258. "error": "该文章被kimi识别为高风险文章,不处理"
  259. }
  260. case 96:
  261. return {
  262. "traceId": self.trace_id,
  263. "code": 96,
  264. "error": "该文章品类不符合这个账号,不做冷启动处理"
  265. }
  266. case 97:
  267. return {
  268. "traceId": self.trace_id,
  269. "code": 97,
  270. "error": "该文章已经退场or晋级, 不再冷启处理"
  271. }
  272. case 99:
  273. return {
  274. "traceId": self.trace_id,
  275. "code": 99,
  276. "error": "该任务执行失败"
  277. }
  278. case 101:
  279. return {
  280. "traceId": self.trace_id,
  281. "code": 101,
  282. "message": "该任务正在执行中"
  283. }
  284. async def check_trace_id(self):
  285. """
  286. check trace id 是否存在与系统中
  287. """
  288. select_sql = f"""
  289. SELECT trace_id
  290. FROM {self.article_match_video_table}
  291. WHERE trace_id = '{self.trace_id}';
  292. """
  293. response = await self.mysql_client.async_select(select_sql)
  294. if response:
  295. return True
  296. else:
  297. return False
  298. async def deal(self):
  299. """
  300. api process starts from here
  301. :return:
  302. """
  303. params_error = self.check_params()
  304. if params_error:
  305. return params_error
  306. else:
  307. trace_id_exist_flag = await self.check_trace_id()
  308. if trace_id_exist_flag:
  309. return await self.job()
  310. else:
  311. response = await forward_requests(
  312. params={
  313. "traceId": self.trace_id,
  314. "miniprogramUseType": self.mini_program_type
  315. },
  316. api="recall_videos"
  317. )
  318. return response