publish_video_to_pq_for_audit.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. """
  2. @author: luojunhui
  3. 将抓取的视频发送至pq获取视频的审核结果
  4. """
  5. from typing import List, Dict
  6. from tqdm import tqdm
  7. from pymysql.cursors import DictCursor
  8. from applications import PQAPI
  9. from applications import longArticlesMySQL
  10. from applications.const import WeixinVideoCrawlerConst
  11. const = WeixinVideoCrawlerConst()
  12. pq_functions = PQAPI()
  13. class PublishVideosForAudit(object):
  14. """
  15. 发布视频到pq,获取video_id,并且轮询获取视频id状态
  16. """
  17. def __init__(self):
  18. self.db = longArticlesMySQL()
  19. def get_publish_video_list(self) -> List[Dict]:
  20. """
  21. 获取视频的信息
  22. :return:
  23. """
  24. sql = f"""SELECT id, article_title, video_oss_path FROM publish_single_video_source WHERE audit_status = {const.VIDEO_AUDIT_INIT_STATUS};"""
  25. response = self.db.select(sql, cursor_type=DictCursor)
  26. return response
  27. def update_audit_status(self, video_id: int, ori_audit_status: int, new_audit_status: int) -> int:
  28. """
  29. 更新视频的审核状态
  30. :param new_audit_status:
  31. :param ori_audit_status:
  32. :param video_id:
  33. :param
  34. :return:
  35. """
  36. update_sql = f"""
  37. UPDATE publish_single_video_source
  38. SET audit_status = %s
  39. WHERE audit_video_id = %s and audit_status = %s;
  40. """
  41. affected_rows = self.db.update(
  42. sql=update_sql,
  43. params=(new_audit_status, video_id, ori_audit_status)
  44. )
  45. return affected_rows
  46. def publish_each_video(self, video_obj: Dict) -> None:
  47. """
  48. 发布视频到pq
  49. :param video_obj:
  50. :return:
  51. """
  52. response = pq_functions.publish_to_pq(
  53. oss_path=video_obj.get("video_oss_path"),
  54. uid=const.DEFAULT_ACCOUNT_UID,
  55. title=video_obj.get("article_title")
  56. )
  57. response_json = response.json()
  58. if response_json.get("code") == const.REQUEST_SUCCESS:
  59. video_id = response_json['data']['id']
  60. update_sql = f"""
  61. UPDATE publish_single_video_source
  62. SET audit_status = %s, audit_video_id = %s
  63. WHERE id = %s;
  64. """
  65. affected_rows = self.db.update(
  66. sql=update_sql,
  67. params=(const.VIDEO_AUDIT_PROCESSING_STATUS, video_id, video_obj['id'])
  68. )
  69. if affected_rows:
  70. print("视频发布成功--{}".format(video_id))
  71. else:
  72. print("视频发布失败--{}".format(video_id))
  73. else:
  74. print("视频发布失败--{}".format(video_obj.get("video_oss_path")))
  75. def get_check_article_list(self) -> List[Dict]:
  76. """
  77. 获取需要检查的视频列表
  78. :return:
  79. """
  80. sql = f"""SELECT audit_video_id FROM publish_single_video_source WHERE audit_status = {const.VIDEO_AUDIT_PROCESSING_STATUS};"""
  81. response = self.db.select(sql, cursor_type=DictCursor)
  82. return response
  83. def check_video_status(self, video_id: int) -> bool:
  84. """
  85. 检查视频的状态,若视频审核通过or不通过,修改记录状态
  86. :param video_id:
  87. :return:
  88. """
  89. response = pq_functions.getPQVideoListDetail([video_id])
  90. audit_status = response.get("data")[0].get("auditStatus")
  91. if audit_status == const.PQ_AUDIT_SUCCESS_STATUS:
  92. affected_rows = self.update_audit_status(
  93. video_id=video_id,
  94. ori_audit_status=const.VIDEO_AUDIT_PROCESSING_STATUS,
  95. new_audit_status=const.VIDEO_AUDIT_SUCCESS_STATUS
  96. )
  97. if affected_rows:
  98. return True
  99. else:
  100. return False
  101. elif audit_status in {const.PQ_AUDIT_SELF_VISIBLE_STATUS, const.PQ_AUDIT_FAIL_STATUS}:
  102. affected_rows = self.update_audit_status(
  103. video_id=video_id,
  104. ori_audit_status=const.VIDEO_AUDIT_PROCESSING_STATUS,
  105. new_audit_status=const.VIDEO_AUDIT_FAIL_STATUS
  106. )
  107. if affected_rows:
  108. return True
  109. else:
  110. return False
  111. else:
  112. return False
  113. def publish_job(self):
  114. """
  115. 发布视频到pq
  116. :return:
  117. """
  118. video_list = self.get_publish_video_list()
  119. for video_obj in tqdm(video_list, desc="视频发布"):
  120. self.publish_each_video(video_obj)
  121. print("视频发布完成")
  122. def check_job(self):
  123. """
  124. 检查视频的状态
  125. :return:
  126. """
  127. video_list = self.get_check_article_list()
  128. for video_obj in tqdm(video_list, desc="视频检查"):
  129. video_id = video_obj.get("audit_video_id")
  130. self.check_video_status(video_id)
  131. print("视频检查完成")