response.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import uuid
  6. import time
  7. import random
  8. import hashlib
  9. import urllib.parse
  10. from applications.log import logging
  11. class Response(object):
  12. """
  13. Response
  14. """
  15. REQUEST_INIT_STATUS = 0
  16. REQUEST_SUCCESS_STATUS = 1
  17. REQUEST_PROCESSING_TASK = 101
  18. TASK_MAX_PROCESS_TIMES = 3
  19. def __init__(self, params, mysql_client, config):
  20. """
  21. Response 接口
  22. """
  23. self.trace_id = None
  24. self.mini_program_type = None
  25. self.mysql_client = mysql_client
  26. self.params = params
  27. self.article_match_video_table = config.article_match_video_table
  28. self.mini_program_map = json.loads(config.get_config_value("miniMap"))
  29. def check_params(self):
  30. """
  31. 请求参数校验
  32. :return:
  33. """
  34. try:
  35. self.mini_program_type = self.params['miniprogramUseType']
  36. self.trace_id = self.params['traceId']
  37. return None
  38. except Exception as e:
  39. return {
  40. "error": "params error",
  41. "message": str(e),
  42. "info": self.params
  43. }
  44. async def get_videos_result(self):
  45. """
  46. 获取结果
  47. :return:
  48. """
  49. select_sql = f"""
  50. SELECT gh_id, content_status, response, process_times
  51. FROM {self.article_match_video_table}
  52. WHERE trace_id = '{self.trace_id}';
  53. """
  54. info_tuple = await self.mysql_client.async_select(select_sql)
  55. gh_id, content_status, response, process_times = info_tuple[0]
  56. return {
  57. "gh_id": gh_id,
  58. "content_status": content_status,
  59. "response": response,
  60. "process_times": process_times
  61. }
  62. def create_gzh_path(self, video_id, shared_uid, gh_id):
  63. """
  64. :param gh_id: 公众号账号的gh_id
  65. :param video_id: 视频 id
  66. :param shared_uid: 分享 id
  67. """
  68. def generate_source_id():
  69. """
  70. generate_source_id
  71. :return:
  72. """
  73. timestamp = str(int(time.time() * 1000))
  74. random_str = str(random.randint(1000, 9999))
  75. hash_input = f"{timestamp}-{random_str}"
  76. return hashlib.md5(hash_input.encode()).hexdigest()
  77. root_share_id = str(uuid.uuid4())
  78. if self.mini_program_type == 2:
  79. source_id = (
  80. "touliu_tencentGzhArticle_{}_".format(gh_id) + generate_source_id()
  81. )
  82. elif self.mini_program_type == 1:
  83. source_id = "longArticles_" + generate_source_id()
  84. elif self.mini_program_type == 3:
  85. source_id = "WeCom_" + generate_source_id()
  86. else:
  87. source_id = "Error mini_program_type {}".format(self.mini_program_type)
  88. url = f"pages/user-videos?id={video_id}&su={shared_uid}&fromGzh=1&rootShareId={root_share_id}&shareId={root_share_id}&rootSourceId={source_id}"
  89. # 自动把 root_share_id 加入到白名单
  90. # auto_white(root_share_id)
  91. return (
  92. root_share_id,
  93. source_id,
  94. f"pages/category?jumpPage={urllib.parse.quote(url, safe='')}",
  95. )
  96. async def generate_single_card(self, index, gh_id, mini_id, item):
  97. """
  98. 生成单个分享卡片
  99. :param item: 单个视频结果
  100. :param mini_id: 小程序 appType
  101. :param gh_id: 公众号 id
  102. :param index: 视频位置
  103. :return:
  104. """
  105. str_mini_id = str(mini_id)
  106. mini_info = self.mini_program_map[str_mini_id]
  107. avatar, app_id, app_name = mini_info['avatar'], mini_info['id'], mini_info['name']
  108. root_share_id, root_source_id, production_path = self.create_gzh_path(
  109. video_id=item['videoId'],
  110. shared_uid=item['uid'],
  111. gh_id=gh_id
  112. )
  113. logging(
  114. code="1002",
  115. info="root_share_id --{}, productionPath -- {}".format(
  116. root_share_id, production_path
  117. ),
  118. function="process",
  119. trace_id=self.trace_id,
  120. )
  121. result = {
  122. "productionCover": item['videoCover'],
  123. "productionName": item['kimiTitle'],
  124. "programAvatar": avatar,
  125. "programId": app_id,
  126. "programName": app_name,
  127. "source": item['source'],
  128. "rootShareId": root_share_id,
  129. "productionPath": production_path,
  130. "videoUrl": item['videoPath'],
  131. "mini_id": mini_id,
  132. "paragraphPosition": index * 0.25
  133. }
  134. if index == 1:
  135. result['paragraphPosition'] = 0.01
  136. item['rootSourceId'] = root_source_id
  137. return result, item
  138. async def generate_cards(self, result):
  139. """
  140. 生成返回卡片
  141. :return:
  142. """
  143. gh_id = result['gh_id']
  144. response = json.loads(result['response'])
  145. long_articles_mini_program_id = 25
  146. touliu_mini_program_id = 33
  147. we_com_mini_program_id = 27
  148. match self.mini_program_type:
  149. case 1:
  150. L = []
  151. new_item_list = []
  152. for index, item in enumerate(response, 1):
  153. # random_num = random.randint(1, 10)
  154. # if random_num in [1, 2, 3, 4, 5, 6]:
  155. # long_articles_mini_program_id = 25
  156. # elif random_num in [7, 8]:
  157. # long_articles_mini_program_id = 29
  158. # else:
  159. # long_articles_mini_program_id = 31
  160. card, new_item = await self.generate_single_card(index, gh_id, long_articles_mini_program_id, item)
  161. L.append(card)
  162. new_item_list.append(new_item)
  163. return L, new_item_list
  164. case 2:
  165. L = []
  166. new_item_list = []
  167. for index, item in enumerate(response, 1):
  168. card, new_item = await self.generate_single_card(index, gh_id, touliu_mini_program_id, item)
  169. L.append(card)
  170. new_item_list.append(new_item)
  171. return L, new_item_list
  172. case 3:
  173. L = []
  174. new_item_list = []
  175. for index, item in enumerate(response, 1):
  176. card, new_item = await self.generate_single_card(index, gh_id, we_com_mini_program_id, item)
  177. L.append(card)
  178. new_item_list.append(card)
  179. return L, new_item_list
  180. async def job(self):
  181. """
  182. 执行方法
  183. :return:
  184. """
  185. response = await self.get_videos_result()
  186. status_code = response.get('content_status')
  187. process_times = response.get('process_times')
  188. match status_code:
  189. case 0:
  190. if process_times > self.TASK_MAX_PROCESS_TIMES:
  191. result = {
  192. "traceId": self.trace_id,
  193. "code": 0,
  194. "error": "匹配失败,处理超过 3 次"
  195. }
  196. else:
  197. result = {
  198. "traceId": self.trace_id,
  199. "code": 0,
  200. "message": "该请求还没处理"
  201. }
  202. return result
  203. case 1:
  204. return {
  205. "traceId": self.trace_id,
  206. "code": 1,
  207. "message": "已经执行完kimi"
  208. }
  209. case 2:
  210. return {
  211. "traceId": self.trace_id,
  212. "code": 2,
  213. "message": "已经执行完爬虫"
  214. }
  215. case 3:
  216. return {
  217. "traceId": self.trace_id,
  218. "code": 3,
  219. "message": "已经执行完 etl"
  220. }
  221. case 4:
  222. # 修改任务状态为处理中
  223. update_sql = f"""
  224. UPDATE {self.article_match_video_table}
  225. SET success_status = %s
  226. WHERE success_status = %s and trace_id = %s;
  227. """
  228. affected_rows = await self.mysql_client.async_insert(
  229. sql=update_sql,
  230. params=(
  231. self.REQUEST_PROCESSING_TASK,
  232. self.REQUEST_INIT_STATUS,
  233. self.trace_id
  234. )
  235. )
  236. if affected_rows == 0:
  237. return {
  238. "traceId": self.trace_id,
  239. "info": "并发任务抢占锁失败",
  240. "message": "该 trace_id 正在处理中或者已经处理完成"
  241. }
  242. card_list, new_items = await self.generate_cards(result=response)
  243. update_sql = f"""
  244. UPDATE {self.article_match_video_table}
  245. SET response = %s, success_status = %s
  246. WHERE trace_id = %s and success_status = %s;
  247. """
  248. await self.mysql_client.async_insert(
  249. sql=update_sql,
  250. params=(
  251. json.dumps(new_items, ensure_ascii=False),
  252. self.REQUEST_SUCCESS_STATUS,
  253. self.trace_id,
  254. self.REQUEST_PROCESSING_TASK
  255. )
  256. )
  257. return {
  258. "traceId": self.trace_id,
  259. "miniprogramList": card_list
  260. }
  261. case 95:
  262. return {
  263. "traceId": self.trace_id,
  264. "code": 95,
  265. "error": "该文章被kimi识别为高风险文章,不处理"
  266. }
  267. case 96:
  268. return {
  269. "traceId": self.trace_id,
  270. "code": 96,
  271. "error": "该文章品类不符合这个账号,不做冷启动处理"
  272. }
  273. case 97:
  274. return {
  275. "traceId": self.trace_id,
  276. "code": 97,
  277. "error": "该文章已经退场or晋级, 不再冷启处理"
  278. }
  279. case 99:
  280. return {
  281. "traceId": self.trace_id,
  282. "code": 99,
  283. "error": "该任务执行失败"
  284. }
  285. case 101:
  286. return {
  287. "traceId": self.trace_id,
  288. "code": 101,
  289. "message": "该任务正在执行中"
  290. }
  291. async def deal(self):
  292. """
  293. api process starts from here
  294. :return:
  295. """
  296. params_error = self.check_params()
  297. if params_error:
  298. return params_error
  299. else:
  300. return await self.job()