publish_video_to_pq_for_audit.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. """
  2. @author: luojunhui
  3. 将抓取的视频发送至pq获取视频的审核结果
  4. """
  5. import time
  6. import traceback
  7. from typing import List, Dict
  8. from tqdm import tqdm
  9. from pymysql.cursors import DictCursor
  10. from applications import log
  11. from applications import PQAPI
  12. from applications import longArticlesMySQL
  13. from applications.const import WeixinVideoCrawlerConst
  14. const = WeixinVideoCrawlerConst()
  15. pq_functions = PQAPI()
  16. class PublishVideosForAudit(object):
  17. """
  18. 发布视频到pq,获取video_id,并且轮询获取视频id状态
  19. """
  20. def __init__(self):
  21. self.db = longArticlesMySQL()
  22. def get_publish_video_list(self) -> List[Dict]:
  23. """
  24. 获取视频的信息
  25. :return:
  26. """
  27. already_published_count = self.get_published_articles_today()
  28. rest_count = const.MAX_VIDEO_NUM - already_published_count
  29. sql = f"""
  30. SELECT id, article_title, video_oss_path
  31. FROM publish_single_video_source
  32. WHERE audit_status = {const.VIDEO_AUDIT_INIT_STATUS}
  33. LIMIT {rest_count};
  34. """
  35. response = self.db.select(sql, cursor_type=DictCursor)
  36. return response
  37. def update_audit_status(self, video_id: int, ori_audit_status: int, new_audit_status: int) -> int:
  38. """
  39. 更新视频的审核状态
  40. :param new_audit_status:
  41. :param ori_audit_status:
  42. :param video_id:
  43. :param
  44. :return:
  45. """
  46. update_sql = f"""
  47. UPDATE publish_single_video_source
  48. SET audit_status = %s
  49. WHERE audit_video_id = %s and audit_status = %s;
  50. """
  51. affected_rows = self.db.update(
  52. sql=update_sql,
  53. params=(new_audit_status, video_id, ori_audit_status)
  54. )
  55. return affected_rows
  56. def get_published_articles_today(self):
  57. """
  58. 获取今天发布的视频数量总量
  59. :return:
  60. """
  61. select_sql = f"""
  62. SELECT COUNT(1) as total_count
  63. FROM publish_single_video_source
  64. WHERE audit_status != {const.VIDEO_AUDIT_INIT_STATUS}
  65. AND DATE(FROM_UNIXTIME(audit_timestamp)) = CURDATE();
  66. """
  67. response = self.db.select(select_sql, cursor_type=DictCursor)
  68. return response[0]['total_count']
  69. def publish_each_video(self, video_obj: Dict) -> None:
  70. """
  71. 发布视频到pq
  72. :param video_obj:
  73. :return:
  74. """
  75. response = pq_functions.publish_to_pq(
  76. oss_path=video_obj.get("video_oss_path"),
  77. uid=const.DEFAULT_ACCOUNT_UID,
  78. title=video_obj.get("article_title")
  79. )
  80. response_json = response.json()
  81. if response_json.get("code") == const.REQUEST_SUCCESS:
  82. video_id = response_json['data']['id']
  83. update_sql = f"""
  84. UPDATE publish_single_video_source
  85. SET audit_status = %s, audit_video_id = %s, audit_timestamp = %s
  86. WHERE id = %s;
  87. """
  88. affected_rows = self.db.update(
  89. sql=update_sql,
  90. params=(const.VIDEO_AUDIT_PROCESSING_STATUS, video_id, int(time.time()), video_obj['id'])
  91. )
  92. if affected_rows:
  93. print("视频发布成功--{}".format(video_id))
  94. else:
  95. print("视频发布失败--{}".format(video_id))
  96. else:
  97. print("视频发布失败--{}".format(video_obj.get("video_oss_path")))
  98. def get_check_article_list(self) -> List[Dict]:
  99. """
  100. 获取需要检查的视频列表
  101. :return:
  102. """
  103. sql = f"""SELECT audit_video_id FROM publish_single_video_source WHERE audit_status = {const.VIDEO_AUDIT_PROCESSING_STATUS};"""
  104. response = self.db.select(sql, cursor_type=DictCursor)
  105. return response
  106. def check_video_status(self, video_id: int) -> bool:
  107. """
  108. 检查视频的状态,若视频审核通过or不通过,修改记录状态
  109. :param video_id:
  110. :return:
  111. """
  112. response = pq_functions.getPQVideoListDetail([video_id])
  113. audit_status = response.get("data")[0].get("auditStatus")
  114. if audit_status == const.PQ_AUDIT_SUCCESS_STATUS:
  115. affected_rows = self.update_audit_status(
  116. video_id=video_id,
  117. ori_audit_status=const.VIDEO_AUDIT_PROCESSING_STATUS,
  118. new_audit_status=const.VIDEO_AUDIT_SUCCESS_STATUS
  119. )
  120. if affected_rows:
  121. return True
  122. else:
  123. return False
  124. elif audit_status in {const.PQ_AUDIT_SELF_VISIBLE_STATUS, const.PQ_AUDIT_FAIL_STATUS}:
  125. affected_rows = self.update_audit_status(
  126. video_id=video_id,
  127. ori_audit_status=const.VIDEO_AUDIT_PROCESSING_STATUS,
  128. new_audit_status=const.VIDEO_AUDIT_FAIL_STATUS
  129. )
  130. if affected_rows:
  131. return True
  132. else:
  133. return False
  134. else:
  135. return False
  136. def publish_job(self):
  137. """
  138. 发布视频到pq
  139. :return:
  140. """
  141. video_list = self.get_publish_video_list()
  142. for video_obj in tqdm(video_list, desc="视频发布"):
  143. try:
  144. self.publish_each_video(video_obj)
  145. log(
  146. task="publish_video_for_audit",
  147. message="成功发送至pq",
  148. function="publish_each_video",
  149. data={
  150. "video_obj": video_obj
  151. }
  152. )
  153. except Exception as e:
  154. error_msg = traceback.format_exc()
  155. log(
  156. task="publish_video_for_audit",
  157. message="发送至PQ失败",
  158. function="publish_each_video",
  159. status="fail",
  160. data={
  161. "error_msg": error_msg,
  162. "video_obj": video_obj,
  163. "error": str(e)
  164. }
  165. )
  166. def check_job(self):
  167. """
  168. 检查视频的状态
  169. :return:
  170. """
  171. video_list = self.get_check_article_list()
  172. for video_obj in tqdm(video_list, desc="视频检查"):
  173. video_id = video_obj.get("audit_video_id")
  174. try:
  175. self.check_video_status(video_id)
  176. except Exception as e:
  177. error_msg = traceback.format_exc()
  178. log(
  179. task="publish_video_for_audit",
  180. message="查询状态失败",
  181. function="check_each_video",
  182. status="fail",
  183. data={
  184. "error_msg": error_msg,
  185. "video_obj": video_obj,
  186. "error": str(e)
  187. }
  188. )