get_off_videos.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import traceback
  7. from applications.log import logging
  8. from applications.functions.forward_request import forward_requests
  9. class GetOffVideos(object):
  10. """
  11. 下架视频
  12. """
  13. def __init__(self, params, mysql_client, config):
  14. self.params = params
  15. self.mysql_client = mysql_client
  16. self.article_match_video_table = config.article_match_video_table
  17. self.get_off_videos = config.get_off_video_table
  18. self.trace_id = None
  19. self.push_type = None
  20. def check_params(self):
  21. """
  22. :return:
  23. """
  24. try:
  25. self.trace_id = self.params['traceId']
  26. self.push_type = self.params.get('pushType')
  27. logging(
  28. code="4121",
  29. info='自动下架视频接口参数校验成功',
  30. function="get_off_videos",
  31. data=self.params,
  32. trace_id=self.trace_id
  33. )
  34. return None
  35. except Exception as e:
  36. response = {
  37. "error": "params error",
  38. "info": str(e),
  39. "data": self.params
  40. }
  41. return response
  42. async def record_published_trace_id(self, gh_id):
  43. """
  44. 记录已发布的trace_id
  45. :return:
  46. """
  47. insert_sql = f"""
  48. INSERT IGNORE INTO long_articles_published_trace_id
  49. (trace_id, gh_id, push_type, create_timestamp)
  50. VALUES
  51. (%s, %s, %s, %s);
  52. """
  53. await self.mysql_client.async_insert(
  54. sql=insert_sql,
  55. params=(
  56. self.trace_id,
  57. gh_id,
  58. self.push_type,
  59. int(time.time())
  60. )
  61. )
  62. async def record_get_off_videos(self, video_list):
  63. """
  64. 存储视频id到getOffVideos表中
  65. """
  66. info_list = [
  67. (
  68. item['videoId'],
  69. int(time.time()),
  70. 1,
  71. self.trace_id
  72. )
  73. for item in video_list
  74. ]
  75. insert_sql = f"""
  76. INSERT INTO {self.get_off_videos}
  77. (video_id, publish_time, video_status, trace_id)
  78. values
  79. (%s, %s, %s, %s);
  80. """
  81. await self.mysql_client.async_insert_many(
  82. sql=insert_sql,
  83. params_list=info_list
  84. )
  85. async def push_video_into_queue(self):
  86. """
  87. 将视频id记录到待下架表中
  88. :return:
  89. """
  90. select_sql = f"""
  91. select gh_id, response from {self.article_match_video_table} where trace_id = '{self.trace_id}';
  92. """
  93. result = await self.mysql_client.async_select(sql=select_sql)
  94. if result:
  95. video_list = json.loads(result[0][1])
  96. gh_id = result[0][0]
  97. try:
  98. await self.record_get_off_videos(video_list)
  99. if self.push_type:
  100. await self.record_published_trace_id(gh_id)
  101. logging(
  102. code="4122",
  103. info='自动下架视频接口存储成功',
  104. function="get_off_videos",
  105. trace_id=self.trace_id
  106. )
  107. return {
  108. "status": "success",
  109. "traceId": self.trace_id
  110. }
  111. except Exception as e:
  112. logging(
  113. code="4123",
  114. info="自动下架视频处理失败: {}".format(e),
  115. data={
  116. "traceback": traceback.format_exc()
  117. },
  118. function="get_off_videos",
  119. trace_id=self.trace_id
  120. )
  121. return {
  122. "status": "fail",
  123. "traceId": self.trace_id,
  124. "msg": "insert fails, {}".format(e)
  125. }
  126. else:
  127. return {
  128. "status": "fail",
  129. "traceId": self.trace_id,
  130. "msg": "traceId error, can't find trace_id"
  131. }
  132. async def check_trace_id(self):
  133. """
  134. check trace id 是否存在与系统中
  135. """
  136. select_sql = f"""
  137. SELECT trace_id
  138. FROM {self.article_match_video_table}
  139. WHERE trace_id = '{self.trace_id}';
  140. """
  141. response = await self.mysql_client.async_select(select_sql)
  142. if response:
  143. return True
  144. else:
  145. return False
  146. async def check_trace_id_startswith(self):
  147. """
  148. trace_id是否以search开头
  149. """
  150. if self.trace_id.startswith('search'):
  151. return True
  152. else:
  153. return False
  154. async def deal(self):
  155. """
  156. :return:
  157. """
  158. params_error = self.check_params()
  159. if params_error:
  160. return params_error
  161. else:
  162. trace_id_process_flag = await self.check_trace_id_startswith()
  163. if not trace_id_process_flag:
  164. return {
  165. "traceId": self.trace_id,
  166. "msg": "do not process trace id"
  167. }
  168. trace_id_exist_flag = await self.check_trace_id()
  169. if trace_id_exist_flag:
  170. return await self.push_video_into_queue()
  171. else:
  172. # 只需要传trace_id, 老系统接口不穿strategy参数默认strategy_v1
  173. response = await forward_requests(
  174. params={
  175. "traceId": self.trace_id
  176. },
  177. api="get_off_videos"
  178. )
  179. return response