response.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import uuid
  6. import time
  7. import random
  8. import hashlib
  9. import urllib.parse
  10. from applications.log import logging
  11. class Response(object):
  12. """
  13. Response
  14. """
  15. REQUEST_INIT_STATUS = 0
  16. REQUEST_SUCCESS_STATUS = 1
  17. REQUEST_PROCESSING_TASK = 101
  18. TASK_MAX_PROCESS_TIMES = 3
  19. def __init__(self, params, mysql_client, config):
  20. """
  21. Response 接口
  22. """
  23. self.trace_id = None
  24. self.mini_program_type = None
  25. self.mysql_client = mysql_client
  26. self.params = params
  27. self.article_match_video_table = config.article_match_video_table
  28. self.mini_program_map = json.loads(config.get_config_value("miniMap"))
  29. def check_params(self):
  30. """
  31. 请求参数校验
  32. :return:
  33. """
  34. try:
  35. self.mini_program_type = self.params['miniprogramUseType']
  36. self.trace_id = self.params['traceId']
  37. return None
  38. except Exception as e:
  39. return {
  40. "error": "params error",
  41. "message": str(e),
  42. "info": self.params
  43. }
  44. async def get_videos_result(self):
  45. """
  46. 获取结果
  47. :return:
  48. """
  49. select_sql = f"""
  50. SELECT gh_id, content_status, response, process_times
  51. FROM {self.article_match_video_table}
  52. WHERE trace_id = '{self.trace_id}';
  53. """
  54. info_tuple = await self.mysql_client.async_select(select_sql)
  55. gh_id, content_status, response, process_times = info_tuple[0]
  56. return {
  57. "gh_id": gh_id,
  58. "content_status": content_status,
  59. "response": response,
  60. "process_times": process_times
  61. }
  62. def create_gzh_path(self, video_id, shared_uid, gh_id):
  63. """
  64. :param gh_id: 公众号账号的gh_id
  65. :param video_id: 视频 id
  66. :param shared_uid: 分享 id
  67. """
  68. def generate_source_id():
  69. """
  70. generate_source_id
  71. :return:
  72. """
  73. timestamp = str(int(time.time() * 1000))
  74. random_str = str(random.randint(1000, 9999))
  75. hash_input = f"{timestamp}-{random_str}"
  76. return hashlib.md5(hash_input.encode()).hexdigest()
  77. root_share_id = str(uuid.uuid4())
  78. if self.mini_program_type == 2:
  79. source_id = (
  80. "touliu_tencentGzhArticle_{}_".format(gh_id) + generate_source_id()
  81. )
  82. elif self.mini_program_type == 1:
  83. source_id = "longArticles_" + generate_source_id()
  84. elif self.mini_program_type == 3:
  85. source_id = "WeCom_" + generate_source_id()
  86. else:
  87. source_id = "Error mini_program_type {}".format(self.mini_program_type)
  88. url = f"pages/user-videos?id={video_id}&su={shared_uid}&fromGzh=1&rootShareId={root_share_id}&shareId={root_share_id}&rootSourceId={source_id}"
  89. # 自动把 root_share_id 加入到白名单
  90. # auto_white(root_share_id)
  91. return (
  92. root_share_id,
  93. source_id,
  94. f"pages/category?jumpPage={urllib.parse.quote(url, safe='')}",
  95. )
  96. async def generate_single_card(self, index, gh_id, mini_id, item):
  97. """
  98. 生成单个分享卡片
  99. :param item: 单个视频结果
  100. :param mini_id: 小程序 appType
  101. :param gh_id: 公众号 id
  102. :param index: 视频位置
  103. :return:
  104. """
  105. str_mini_id = str(mini_id)
  106. mini_info = self.mini_program_map[str_mini_id]
  107. avatar, app_id, app_name = mini_info['avatar'], mini_info['id'], mini_info['name']
  108. root_share_id, root_source_id, production_path = self.create_gzh_path(
  109. video_id=item['videoId'],
  110. shared_uid=item['uid'],
  111. gh_id=gh_id
  112. )
  113. logging(
  114. code="1002",
  115. info="root_share_id --{}, productionPath -- {}".format(
  116. root_share_id, production_path
  117. ),
  118. function="process",
  119. trace_id=self.trace_id,
  120. )
  121. result = {
  122. "productionCover": item['videoCover'],
  123. "productionName": item['kimiTitle'],
  124. "programAvatar": avatar,
  125. "programId": app_id,
  126. "programName": app_name,
  127. "source": item['source'],
  128. "rootShareId": root_share_id,
  129. "productionPath": production_path,
  130. "videoUrl": item['videoPath'],
  131. "mini_id": mini_id,
  132. "paragraphPosition": index * 0.25
  133. }
  134. if index == 1:
  135. result['paragraphPosition'] = 0.01
  136. item['rootSourceId'] = root_source_id
  137. return result, item
  138. async def generate_cards(self, result):
  139. """
  140. 生成返回卡片
  141. :return:
  142. """
  143. gh_id = result['gh_id']
  144. response = json.loads(result['response'])
  145. touliu_mini_program_id = 33
  146. we_com_mini_program_id = 27
  147. match self.mini_program_type:
  148. case 1:
  149. L = []
  150. new_item_list = []
  151. for index, item in enumerate(response, 1):
  152. random_num = random.randint(1, 10)
  153. if random_num in [1, 2, 3, 4, 5, 6]:
  154. long_articles_mini_program_id = 25
  155. elif random_num in [7, 8]:
  156. long_articles_mini_program_id = 29
  157. else:
  158. long_articles_mini_program_id = 31
  159. card, new_item = await self.generate_single_card(index, gh_id, long_articles_mini_program_id, item)
  160. L.append(card)
  161. new_item_list.append(new_item)
  162. return L, new_item_list
  163. case 2:
  164. L = []
  165. new_item_list = []
  166. for index, item in enumerate(response, 1):
  167. card, new_item = await self.generate_single_card(index, gh_id, touliu_mini_program_id, item)
  168. L.append(card)
  169. new_item_list.append(new_item)
  170. return L, new_item_list
  171. case 3:
  172. L = []
  173. new_item_list = []
  174. for index, item in enumerate(response, 1):
  175. card, new_item = await self.generate_single_card(index, gh_id, we_com_mini_program_id, item)
  176. L.append(card)
  177. new_item_list.append(card)
  178. return L, new_item_list
  179. async def job(self):
  180. """
  181. 执行方法
  182. :return:
  183. """
  184. response = await self.get_videos_result()
  185. status_code = response.get('content_status')
  186. process_times = response.get('process_times')
  187. match status_code:
  188. case 0:
  189. if process_times > self.TASK_MAX_PROCESS_TIMES:
  190. result = {
  191. "traceId": self.trace_id,
  192. "code": 0,
  193. "error": "匹配失败,处理超过 3 次"
  194. }
  195. else:
  196. result = {
  197. "traceId": self.trace_id,
  198. "code": 0,
  199. "message": "该请求还没处理"
  200. }
  201. return result
  202. case 1:
  203. return {
  204. "traceId": self.trace_id,
  205. "code": 1,
  206. "message": "已经执行完kimi"
  207. }
  208. case 2:
  209. return {
  210. "traceId": self.trace_id,
  211. "code": 2,
  212. "message": "已经执行完爬虫"
  213. }
  214. case 3:
  215. return {
  216. "traceId": self.trace_id,
  217. "code": 3,
  218. "message": "已经执行完 etl"
  219. }
  220. case 4:
  221. # 修改任务状态为处理中
  222. update_sql = f"""
  223. UPDATE {self.article_match_video_table}
  224. SET success_status = %s
  225. WHERE success_status = %s and trace_id = %s;
  226. """
  227. affected_rows = await self.mysql_client.async_insert(
  228. sql=update_sql,
  229. params=(
  230. self.REQUEST_PROCESSING_TASK,
  231. self.REQUEST_INIT_STATUS,
  232. self.trace_id
  233. )
  234. )
  235. if affected_rows == 0:
  236. return {
  237. "traceId": self.trace_id,
  238. "info": "并发任务抢占锁失败",
  239. "message": "该 trace_id 正在处理中或者已经处理完成"
  240. }
  241. card_list, new_items = await self.generate_cards(result=response)
  242. update_sql = f"""
  243. UPDATE {self.article_match_video_table}
  244. SET response = %s, success_status = %s
  245. WHERE trace_id = %s and success_status = %s;
  246. """
  247. await self.mysql_client.async_insert(
  248. sql=update_sql,
  249. params=(
  250. json.dumps(new_items, ensure_ascii=False),
  251. self.REQUEST_SUCCESS_STATUS,
  252. self.trace_id,
  253. self.REQUEST_PROCESSING_TASK
  254. )
  255. )
  256. return {
  257. "traceId": self.trace_id,
  258. "miniprogramList": card_list
  259. }
  260. case 99:
  261. return {
  262. "traceId": self.trace_id,
  263. "code": 99,
  264. "error": "该任务执行失败"
  265. }
  266. case 101:
  267. return {
  268. "traceId": self.trace_id,
  269. "code": 101,
  270. "message": "该任务正在执行中"
  271. }
  272. async def deal(self):
  273. """
  274. api process starts from here
  275. :return:
  276. """
  277. params_error = self.check_params()
  278. if params_error:
  279. return params_error
  280. else:
  281. return await self.job()