publish_video_to_pq_for_audit.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. """
  2. @author: luojunhui
  3. 将抓取的视频发送至pq获取视频的审核结果
  4. """
  5. import time
  6. import traceback
  7. from typing import List, Dict
  8. from tqdm import tqdm
  9. from pymysql.cursors import DictCursor
  10. from applications import log
  11. from applications import PQAPI
  12. from applications.api import AigcSystemApi
  13. from applications.api import fetch_moon_shot_response
  14. from applications.const import WeixinVideoCrawlerConst
  15. from applications.db import DatabaseConnector
  16. from config import long_articles_config
  17. const = WeixinVideoCrawlerConst()
  18. pq_functions = PQAPI()
  19. aigc = AigcSystemApi()
  20. class PublishVideosForAudit(object):
  21. """
  22. 发布视频到pq,获取video_id,并且轮询获取视频id状态
  23. """
  24. def __init__(self):
  25. self.db_client = DatabaseConnector(db_config=long_articles_config)
  26. self.db_client.connect()
  27. def get_publish_video_list(self) -> List[Dict]:
  28. """
  29. 获取视频的信息
  30. :return:
  31. """
  32. already_published_count = self.get_published_articles_today()
  33. rest_count = const.MAX_VIDEO_NUM - already_published_count
  34. limit_count = min(rest_count, const.MAX_VIDEO_NUM_PER_PUBLISH)
  35. sql = f"""
  36. SELECT id, article_title, video_oss_path
  37. FROM publish_single_video_source
  38. WHERE audit_status = {const.VIDEO_AUDIT_INIT_STATUS} and bad_status = {const.TITLE_DEFAULT_STATUS} and platform = 'sohu'
  39. and score > 0.5
  40. ORDER BY score DESC
  41. LIMIT {limit_count};
  42. """
  43. response = self.db_client.fetch(sql, cursor_type=DictCursor)
  44. return response
  45. def update_audit_status(self, video_id: int, ori_audit_status: int, new_audit_status: int) -> int:
  46. """
  47. 更新视频的审核状态
  48. :param new_audit_status:
  49. :param ori_audit_status:
  50. :param video_id:
  51. :param
  52. :return:
  53. """
  54. update_sql = f"""
  55. UPDATE publish_single_video_source
  56. SET audit_status = %s
  57. WHERE audit_video_id = %s and audit_status = %s;
  58. """
  59. affected_rows = self.db_client.save(
  60. query=update_sql,
  61. params=(new_audit_status, video_id, ori_audit_status)
  62. )
  63. return affected_rows
  64. def get_published_articles_today(self):
  65. """
  66. 获取今天发布的视频数量总量
  67. :return:
  68. """
  69. select_sql = f"""
  70. SELECT COUNT(1) as total_count
  71. FROM publish_single_video_source
  72. WHERE audit_status != {const.VIDEO_AUDIT_INIT_STATUS}
  73. AND DATE(FROM_UNIXTIME(audit_timestamp)) = CURDATE();
  74. """
  75. response = self.db_client.fetch(select_sql, cursor_type=DictCursor)
  76. return response[0]['total_count']
  77. def publish_each_video(self, video_obj: Dict) -> Dict:
  78. """
  79. 发布视频到pq
  80. :param video_obj:
  81. :return:
  82. """
  83. response = pq_functions.publish_to_pq(
  84. oss_path=video_obj.get("video_oss_path"),
  85. uid=const.DEFAULT_ACCOUNT_UID,
  86. title=video_obj.get("article_title")
  87. )
  88. response_json = response.json()
  89. if response_json.get("code") == const.REQUEST_SUCCESS:
  90. video_id = response_json['data']['id']
  91. update_sql = f"""
  92. UPDATE publish_single_video_source
  93. SET audit_status = %s, audit_video_id = %s, audit_timestamp = %s
  94. WHERE id = %s;
  95. """
  96. affected_rows = self.db_client.save(
  97. query=update_sql,
  98. params=(const.VIDEO_AUDIT_PROCESSING_STATUS, video_id, int(time.time()), video_obj['id'])
  99. )
  100. if affected_rows:
  101. result = {
  102. "status": "success",
  103. "video_id": video_id
  104. }
  105. return result
  106. else:
  107. result = {
  108. "status": "fail",
  109. "video_id": video_id,
  110. "error_msg": "抢占锁失败,update执行操作修改0行"
  111. }
  112. return result
  113. else:
  114. if response_json.get("code") == const.PUBLISHED_ILLEGAL_TITLE_CODE:
  115. # 发布了标题违规的视频,发布失败, 修改审核状态从0-->2
  116. update_sql = f"""
  117. UPDATE publish_single_video_source
  118. SET audit_status = %s
  119. WHERE id = %s and audit_status = %s;
  120. """
  121. self.db_client.save(update_sql, params=(const.VIDEO_AUDIT_FAIL_STATUS, video_obj['id'], const.VIDEO_AUDIT_INIT_STATUS))
  122. result = {
  123. "status": "fail",
  124. "error_msg": "发布到pq失败",
  125. "title": video_obj.get("article_title"),
  126. "oss_path": video_obj.get("video_oss_path"),
  127. "response": response_json
  128. }
  129. return result
  130. def get_check_article_list(self) -> List[Dict]:
  131. """
  132. 获取需要检查的视频列表
  133. :return:
  134. """
  135. sql = f"""
  136. select content_trace_id, audit_video_id, score, platform
  137. from publish_single_video_source
  138. where audit_status = {const.VIDEO_AUDIT_PROCESSING_STATUS};
  139. """
  140. response = self.db_client.fetch(sql, cursor_type=DictCursor)
  141. return response
  142. def update_mini_program_title(self, video_id: int) -> bool:
  143. """
  144. :param video_id:
  145. """
  146. select_sql = f"""
  147. SELECT article_title FROM publish_single_video_source WHERE audit_video_id = {video_id};
  148. """
  149. title = self.db_client.fetch(select_sql, cursor_type=DictCursor)[0]['article_title']
  150. try:
  151. # generate kimi title
  152. mini_program_title = fetch_moon_shot_response(task='generate_kimi_title', input_text=title)
  153. # score kimi title
  154. kimi_safe_title = None
  155. title_safe_score = fetch_moon_shot_response(task='get_title_safe_score', input_text=mini_program_title)
  156. if int(title_safe_score) > const.TITLE_SAFE_SCORE_THRESHOLD:
  157. kimi_safe_title_obj = fetch_moon_shot_response(task='make_title_safe', input_text=title, output_type='json')
  158. kimi_safe_title = kimi_safe_title_obj['title_v2']
  159. mini_program_title = kimi_safe_title if kimi_safe_title else mini_program_title
  160. update_sql = f"""
  161. UPDATE publish_single_video_source SET mini_program_title = %s WHERE audit_video_id = %s;
  162. """
  163. self.db_client.save(update_sql, params=(mini_program_title, video_id))
  164. log(
  165. task="publish_video_for_audit",
  166. function="update_mini_program_title",
  167. message="修改小程序标题成功",
  168. data={
  169. "video_id": video_id,
  170. "title": title,
  171. "mini_program_title": mini_program_title
  172. }
  173. )
  174. return True
  175. except Exception as e:
  176. log(
  177. task="publish_video_for_audit",
  178. function="update_mini_program_title",
  179. status="fail",
  180. data={
  181. "video_id": video_id,
  182. "title": title,
  183. "error": str(e),
  184. "error_stack": traceback.format_exc()
  185. }
  186. )
  187. return False
  188. def insert_into_task_queue(self, video) -> int:
  189. """
  190. enqueue
  191. """
  192. insert_query = f"""
  193. insert into single_video_transform_queue
  194. (content_trace_id, pq_vid, score, platform)
  195. values (%s, %s, %s, %s);
  196. """
  197. affected_rows = self.db_client.save(
  198. query=insert_query,
  199. params=(
  200. video['content_trace_id'], video['audit_video_id'], video['score'], video['platform']
  201. )
  202. )
  203. return affected_rows
  204. def check_video_status(self, video_obj: dict) -> Dict:
  205. """
  206. 检查视频的状态,若视频审核通过or不通过,修改记录状态
  207. :param video_obj:
  208. :return:
  209. """
  210. video_id = video_obj['audit_video_id']
  211. response = pq_functions.getPQVideoListDetail([video_id])
  212. audit_status = response.get("data")[0].get("auditStatus")
  213. # 请求成功
  214. if audit_status == const.PQ_AUDIT_SUCCESS_STATUS:
  215. # 更新小程序标题字段
  216. mini_program_title_flag = self.update_mini_program_title(video_id)
  217. if mini_program_title_flag:
  218. # 处理成功,修改审核状态为1
  219. affected_rows = self.update_audit_status(
  220. video_id=video_id,
  221. ori_audit_status=const.VIDEO_AUDIT_PROCESSING_STATUS,
  222. new_audit_status=const.VIDEO_AUDIT_SUCCESS_STATUS
  223. )
  224. # 将视频存储到任务队列
  225. self.insert_into_task_queue(video_obj)
  226. # 将视频存储到 aigc 表
  227. aigc.insert_crawler_relation_to_aigc_system(
  228. relation_list=[
  229. {
  230. "videoPoolTraceId": video_obj['content_trace_id'],
  231. "channelContentId": str(video_id),
  232. "platform": video_obj['platform'],
  233. }
  234. ]
  235. )
  236. else:
  237. # 修改小程序标题失败,修改审核状态为4
  238. affected_rows = self.update_audit_status(
  239. video_id=video_id,
  240. ori_audit_status=const.VIDEO_AUDIT_PROCESSING_STATUS,
  241. new_audit_status=const.VIDEO_TITLE_GENERATE_FAIL_STATUS
  242. )
  243. elif audit_status in {const.PQ_AUDIT_SELF_VISIBLE_STATUS, const.PQ_AUDIT_FAIL_STATUS}:
  244. # 视频审核失败,修改审核状态为2
  245. affected_rows = self.update_audit_status(
  246. video_id=video_id,
  247. ori_audit_status=const.VIDEO_AUDIT_PROCESSING_STATUS,
  248. new_audit_status=const.VIDEO_AUDIT_FAIL_STATUS
  249. )
  250. elif audit_status == const.PQ_AUDIT_PROCESSING_STATUS:
  251. # 视频正在审核中,不做处理
  252. affected_rows = 0
  253. else:
  254. # 其他情况,暂时不做处理
  255. affected_rows = 0
  256. result = {
  257. "affected_rows": affected_rows,
  258. "video_id": video_id,
  259. "audit_status": audit_status
  260. }
  261. return result
  262. def publish_job(self):
  263. """
  264. 发布视频到pq
  265. :return:
  266. """
  267. video_list = self.get_publish_video_list()
  268. for video_obj in tqdm(video_list, desc="视频发布"):
  269. try:
  270. response = self.publish_each_video(video_obj)
  271. if response.get("status") == "success":
  272. log(
  273. task="publish_video_for_audit",
  274. message="发送至PQ成功",
  275. function="publish_each_video",
  276. data={
  277. "video_id": response.get("video_id")
  278. }
  279. )
  280. else:
  281. log(
  282. task="publish_video_for_audit",
  283. message=response.get('error_msg'),
  284. function="publish_each_video",
  285. status="fail",
  286. data={
  287. "response": response,
  288. "video_obj": video_obj
  289. }
  290. )
  291. except Exception as e:
  292. error_msg = traceback.format_exc()
  293. log(
  294. task="publish_video_for_audit",
  295. message="发送至PQ代码执行失败",
  296. function="publish_each_video",
  297. status="fail",
  298. data={
  299. "error_msg": error_msg,
  300. "video_obj": video_obj,
  301. "error": str(e)
  302. }
  303. )
  304. def check_job(self):
  305. """
  306. 检查视频的状态
  307. :return:
  308. """
  309. video_list = self.get_check_article_list()
  310. for video_obj in tqdm(video_list, desc="视频检查"):
  311. video_id = video_obj.get("audit_video_id")
  312. try:
  313. response = self.check_video_status(video_obj)
  314. if response.get("affected_rows"):
  315. continue
  316. else:
  317. log(
  318. task="publish_video_for_audit",
  319. function="check_each_video",
  320. message="修改行数为0",
  321. data={
  322. "video_id": video_id,
  323. "audit_status": response['audit_status']
  324. }
  325. )
  326. except Exception as e:
  327. error_msg = traceback.format_exc()
  328. log(
  329. task="publish_video_for_audit",
  330. message="查询状态执行失败",
  331. function="check_each_video",
  332. status="fail",
  333. data={
  334. "error_msg": error_msg,
  335. "video_obj": video_obj,
  336. "error": str(e)
  337. }
  338. )