get_off_videos.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import traceback
  7. from applications.log import logging
  8. from applications.const import server_const
  9. from applications.functions.forward_request import forward_requests
  10. class GetOffVideos(object):
  11. """
  12. 下架视频
  13. """
  14. def __init__(self, params, mysql_client, config):
  15. self.params = params
  16. self.mysql_client = mysql_client
  17. self.article_match_video_table = config.article_match_video_table
  18. self.get_off_videos = config.get_off_video_table
  19. self.trace_id = None
  20. self.push_type = None
  21. def check_params(self):
  22. """
  23. :return:
  24. """
  25. try:
  26. self.trace_id = self.params['traceId']
  27. self.push_type = self.params.get('pushType')
  28. logging(
  29. code="4121",
  30. info='自动下架视频接口参数校验成功',
  31. function="get_off_videos",
  32. data=self.params,
  33. trace_id=self.trace_id
  34. )
  35. return None
  36. except Exception as e:
  37. response = {
  38. "error": "params error",
  39. "info": str(e),
  40. "data": self.params
  41. }
  42. return response
  43. async def record_published_trace_id(self, gh_id):
  44. """
  45. 记录已发布的trace_id
  46. :return:
  47. """
  48. insert_sql = f"""
  49. INSERT IGNORE INTO long_articles_published_trace_id
  50. (trace_id, gh_id, push_type, create_timestamp)
  51. VALUES
  52. (%s, %s, %s, %s);
  53. """
  54. await self.mysql_client.async_insert(
  55. sql=insert_sql,
  56. params=(
  57. self.trace_id,
  58. gh_id,
  59. self.push_type,
  60. int(time.time())
  61. )
  62. )
  63. async def record_get_off_videos(self, video_list):
  64. """
  65. 存储视频id到getOffVideos表中
  66. """
  67. info_list = [
  68. (
  69. item['videoId'],
  70. int(time.time()),
  71. 1,
  72. self.trace_id
  73. )
  74. for item in video_list
  75. ]
  76. insert_sql = f"""
  77. INSERT INTO {self.get_off_videos}
  78. (video_id, publish_time, video_status, trace_id)
  79. values
  80. (%s, %s, %s, %s);
  81. """
  82. await self.mysql_client.async_insert_many(
  83. sql=insert_sql,
  84. params_list=info_list
  85. )
  86. async def push_video_into_queue(self):
  87. """
  88. 将视频id记录到待下架表中
  89. :return:
  90. """
  91. select_sql = f"""
  92. select gh_id, response from {self.article_match_video_table} where trace_id = '{self.trace_id}';
  93. """
  94. result = await self.mysql_client.async_select(sql=select_sql)
  95. if result:
  96. video_list = json.loads(result[0][1])
  97. gh_id = result[0][0]
  98. try:
  99. await self.record_get_off_videos(video_list)
  100. if self.push_type:
  101. await self.record_published_trace_id(gh_id)
  102. logging(
  103. code="4122",
  104. info='自动下架视频接口存储成功',
  105. function="get_off_videos",
  106. trace_id=self.trace_id
  107. )
  108. return {
  109. "status": "success",
  110. "traceId": self.trace_id
  111. }
  112. except Exception as e:
  113. logging(
  114. code="4123",
  115. info="自动下架视频处理失败: {}".format(e),
  116. data={
  117. "traceback": traceback.format_exc()
  118. },
  119. function="get_off_videos",
  120. trace_id=self.trace_id
  121. )
  122. return {
  123. "status": "fail",
  124. "traceId": self.trace_id,
  125. "msg": "insert fails, {}".format(e)
  126. }
  127. else:
  128. return {
  129. "status": "fail",
  130. "traceId": self.trace_id,
  131. "msg": "traceId error, can't find trace_id"
  132. }
  133. async def check_trace_id(self):
  134. """
  135. check trace id 是否存在与系统中
  136. """
  137. select_sql = f"""
  138. SELECT trace_id
  139. FROM {self.article_match_video_table}
  140. WHERE trace_id = '{self.trace_id}';
  141. """
  142. response = await self.mysql_client.async_select(select_sql)
  143. if response:
  144. return True
  145. else:
  146. return False
  147. async def check_trace_id_startswith(self):
  148. """
  149. trace_id是否以search开头
  150. """
  151. if self.trace_id.startswith('search'):
  152. return server_const.ARTICLES_VIDEO
  153. elif self.trace_id.startswith('video'):
  154. return server_const.SEARCH_VIDEO
  155. else:
  156. return False
  157. async def deal(self):
  158. """
  159. :return:
  160. """
  161. params_error = self.check_params()
  162. if params_error:
  163. return params_error
  164. else:
  165. trace_id_process_flag = await self.check_trace_id_startswith()
  166. if not trace_id_process_flag:
  167. return {
  168. "traceId": self.trace_id,
  169. "msg": "do not process trace id"
  170. }
  171. trace_id_exist_flag = await self.check_trace_id()
  172. if trace_id_exist_flag:
  173. return await self.push_video_into_queue()
  174. else:
  175. # 只需要传trace_id, 老系统接口不穿strategy参数默认strategy_v1
  176. response = await forward_requests(
  177. params={
  178. "traceId": self.trace_id
  179. },
  180. api="get_off_videos"
  181. )
  182. return response