get_off_videos.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import traceback
  7. from applications.log import logging
  8. from applications.functions.forward_request import forward_requests
  9. class GetOffVideos(object):
  10. """
  11. 下架视频
  12. """
  13. def __init__(self, params, mysql_client, config):
  14. self.params = params
  15. self.mysql_client = mysql_client
  16. self.article_match_video_table = config.article_match_video_table
  17. self.get_off_videos = config.get_off_video_table
  18. self.trace_id = None
  19. self.push_type = None
  20. def check_params(self):
  21. """
  22. :return:
  23. """
  24. try:
  25. self.trace_id = self.params['traceId']
  26. self.push_type = self.params.get('pushType')
  27. logging(
  28. code="4121",
  29. info='自动下架视频接口参数校验成功',
  30. function="get_off_videos",
  31. data=self.params,
  32. trace_id=self.trace_id
  33. )
  34. return None
  35. except Exception as e:
  36. response = {
  37. "error": "params error",
  38. "info": str(e),
  39. "data": self.params
  40. }
  41. return response
  42. async def record_published_trace_id(self, gh_id):
  43. """
  44. 记录已发布的trace_id
  45. :return:
  46. """
  47. insert_sql = f"""
  48. INSERT IGNORE INTO long_articles_published_trace_id
  49. (trace_id, gh_id, push_type)
  50. VALUES
  51. (%s, %s, %s);
  52. """
  53. await self.mysql_client.async_insert(
  54. sql=insert_sql,
  55. params=(
  56. self.trace_id,
  57. gh_id,
  58. self.push_type
  59. )
  60. )
  61. async def record_get_off_videos(self, video_list):
  62. """
  63. 存储视频id到getOffVideos表中
  64. """
  65. info_list = [
  66. (
  67. item['videoId'],
  68. int(time.time()),
  69. 1,
  70. self.trace_id
  71. )
  72. for item in video_list
  73. ]
  74. insert_sql = f"""
  75. INSERT INTO {self.get_off_videos}
  76. (video_id, publish_time, video_status, trace_id)
  77. values
  78. (%s, %s, %s, %s);
  79. """
  80. await self.mysql_client.async_insert_many(
  81. sql=insert_sql,
  82. params_list=info_list
  83. )
  84. async def push_video_into_queue(self):
  85. """
  86. 将视频id记录到待下架表中
  87. :return:
  88. """
  89. select_sql = f"""
  90. select gh_id, response from {self.article_match_video_table} where trace_id = '{self.trace_id}';
  91. """
  92. result = await self.mysql_client.async_select(sql=select_sql)
  93. if result:
  94. video_list = json.loads(result[0][1])
  95. gh_id = result[0][0]
  96. try:
  97. await self.record_get_off_videos(video_list)
  98. if self.push_type:
  99. await self.record_published_trace_id(gh_id)
  100. logging(
  101. code="4122",
  102. info='自动下架视频接口存储成功',
  103. function="get_off_videos",
  104. trace_id=self.trace_id
  105. )
  106. return {
  107. "status": "success",
  108. "traceId": self.trace_id
  109. }
  110. except Exception as e:
  111. logging(
  112. code="4123",
  113. info="自动下架视频处理失败: {}".format(e),
  114. data={
  115. "traceback": traceback.format_exc()
  116. },
  117. function="get_off_videos",
  118. trace_id=self.trace_id
  119. )
  120. return {
  121. "status": "fail",
  122. "traceId": self.trace_id,
  123. "msg": "insert fails, {}".format(e)
  124. }
  125. else:
  126. return {
  127. "status": "fail",
  128. "traceId": self.trace_id,
  129. "msg": "traceId error, can't find trace_id"
  130. }
  131. async def check_trace_id(self):
  132. """
  133. check trace id 是否存在与系统中
  134. """
  135. select_sql = f"""
  136. SELECT trace_id
  137. FROM {self.article_match_video_table}
  138. WHERE trace_id = '{self.trace_id}';
  139. """
  140. response = await self.mysql_client.async_select(select_sql)
  141. if response:
  142. return True
  143. else:
  144. return False
  145. async def check_trace_id_startswith(self):
  146. """
  147. trace_id是否以search开头
  148. """
  149. if self.trace_id.startswith('search'):
  150. return True
  151. else:
  152. return False
  153. async def deal(self):
  154. """
  155. :return:
  156. """
  157. params_error = self.check_params()
  158. if params_error:
  159. return params_error
  160. else:
  161. trace_id_process_flag = await self.check_trace_id_startswith()
  162. if not trace_id_process_flag:
  163. return {
  164. "traceId": self.trace_id,
  165. "msg": "do not process trace id"
  166. }
  167. trace_id_exist_flag = await self.check_trace_id()
  168. if trace_id_exist_flag:
  169. return await self.push_video_into_queue()
  170. else:
  171. # 只需要传trace_id, 老系统接口不穿strategy参数默认strategy_v1
  172. response = await forward_requests(
  173. params={
  174. "traceId": self.trace_id
  175. },
  176. api="get_off_videos"
  177. )
  178. return response