publish_single_video_pool_videos.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. import json
  2. import datetime
  3. import traceback
  4. from pymysql.cursors import DictCursor
  5. from tqdm import tqdm
  6. from applications import bot, aiditApi
  7. from applications.const import SingleVideoPoolPublishTaskConst
  8. from applications.db import DatabaseConnector
  9. from config import long_articles_config, apolloConfig
  10. config = apolloConfig()
  11. const = SingleVideoPoolPublishTaskConst()
  12. video_pool_config = json.loads(config.getConfigValue(key="video_pool_publish_config"))
  13. video_pool_config["piaoquan"] = {
  14. "nick_name": "票圈视频",
  15. "process_num_each_day": 1000,
  16. "generate_plan_id": "20250416060125363145973"
  17. }
  18. class PublishSingleVideoPoolVideos:
  19. def __init__(self):
  20. self.db_client = DatabaseConnector(db_config=long_articles_config)
  21. self.db_client.connect()
  22. def get_task_list(self, platform:str) -> list[dict]:
  23. daily_limit = video_pool_config[platform]['process_num_each_day']
  24. fetch_query = f"""
  25. select id, content_trace_id, pq_vid
  26. from single_video_transform_queue
  27. where status = {const.TRANSFORM_INIT_STATUS} and platform = '{platform}'
  28. order by score desc
  29. limit {daily_limit};
  30. """
  31. fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
  32. return fetch_response
  33. def update_tasks_status(self,
  34. task_id_tuple: tuple,
  35. ori_status: int,
  36. new_status: int)-> int:
  37. update_query = f"""
  38. update single_video_transform_queue
  39. set status = %s
  40. where id in %s and status = %s;
  41. """
  42. affected_rows = self.db_client.save(
  43. query=update_query,
  44. params=(new_status, task_id_tuple, ori_status)
  45. )
  46. return affected_rows
  47. def deal(self):
  48. """
  49. entrance of this class
  50. """
  51. platform_list = ["sph", "gzh", "toutiao", "hksp"]
  52. for platform in tqdm(platform_list, desc='process each platform'):
  53. task_list = self.get_task_list(platform)
  54. task_id_tuple = tuple([task['id'] for task in task_list])
  55. vid_list = [task['pq_vid'] for task in task_list]
  56. if vid_list:
  57. try:
  58. # create video crawler plan
  59. plan_name = f"{video_pool_config[platform]['nick_name']}-{datetime.datetime.today().strftime('%Y-%m-%d')}-视频数量: {len(vid_list)}"
  60. crawler_plan_response = aiditApi.auto_create_single_video_crawler_task(
  61. plan_name=plan_name,
  62. plan_tag="单视频供给冷启动",
  63. video_id_list=vid_list,
  64. )
  65. crawler_plan_id = crawler_plan_response["data"]["id"]
  66. crawler_plan_name = crawler_plan_response["data"]["name"]
  67. # bind crawler plan to generate plan
  68. crawler_task_list = [
  69. {
  70. "contentType": 1,
  71. "inputSourceModal": 4,
  72. "inputSourceChannel": 10,
  73. "inputSourceType": 2,
  74. "inputSourceValue": crawler_plan_id,
  75. "inputSourceSubType": None,
  76. "fieldName": None,
  77. "inputSourceLabel": "原始帖子-视频-票圈小程序-内容添加计划-{}".format(crawler_plan_name),
  78. }
  79. ]
  80. generate_plan_id = video_pool_config[platform]['generate_plan_id']
  81. aiditApi.bind_crawler_task_to_generate_task(
  82. crawler_task_list=crawler_task_list,
  83. generate_task_id=generate_plan_id,
  84. )
  85. # update status
  86. self.update_tasks_status(
  87. task_id_tuple=task_id_tuple,
  88. ori_status=const.TRANSFORM_INIT_STATUS,
  89. new_status=const.TRANSFORM_SUCCESS_STATUS
  90. )
  91. except Exception as e:
  92. bot(
  93. title='视频内容池发布任务',
  94. detail={
  95. 'platform': platform,
  96. 'date': datetime.datetime.today().strftime('%Y-%m-%d'),
  97. 'msg': '发布视频内容池失败,原因:{}'.format(str(e)),
  98. 'detail': traceback.format_exc(),
  99. },
  100. mention=False
  101. )
  102. else:
  103. bot(
  104. title='视频内容池发布任务',
  105. detail={
  106. 'platform': platform,
  107. 'date': datetime.datetime.today().strftime('%Y-%m-%d'),
  108. 'msg': '该平台无待发布视频,请关注供给的抓取'
  109. },
  110. mention=False
  111. )
  112. def create_crawler_plan_by_category(self):
  113. platform = 'piaoquan'
  114. fetch_query = f"""
  115. select t1.id, t1.content_trace_id, t1.pq_vid, t2.category
  116. from single_video_transform_queue t1
  117. join publish_single_video_source t2 on t1.content_trace_id = t2.content_trace_id
  118. where t1.status = {const.TRANSFORM_INIT_STATUS} and t1.platform = '{platform}';
  119. """
  120. fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
  121. category_list = ['健康养生', '军事历史', '历史人物', '名人八卦', '奇闻趣事', '家长里短', '情感故事', '政治新闻', '知识科普', '社会法治']
  122. for category in category_list:
  123. category_task_list = [task for task in fetch_response if task['category'] == category]
  124. task_id_tuple = tuple([task['id'] for task in category_task_list])
  125. vid_list = [task['pq_vid'] for task in category_task_list]
  126. if vid_list:
  127. try:
  128. # create video crawler plan
  129. plan_name = f"{video_pool_config[platform]['nick_name']}-{category}-{datetime.datetime.today().strftime('%Y-%m-%d')}-视频数量: {len(vid_list)}"
  130. crawler_plan_response = aiditApi.auto_create_single_video_crawler_task(
  131. plan_name=plan_name,
  132. plan_tag="单视频供给冷启动",
  133. video_id_list=vid_list,
  134. )
  135. crawler_plan_id = crawler_plan_response["data"]["id"]
  136. crawler_plan_name = crawler_plan_response["data"]["name"]
  137. # bind crawler plan to generate plan
  138. crawler_task_list = [
  139. {
  140. "contentType": 1,
  141. "inputSourceModal": 4,
  142. "inputSourceChannel": 10,
  143. "inputSourceType": 2,
  144. "inputSourceValue": crawler_plan_id,
  145. "inputSourceSubType": None,
  146. "fieldName": None,
  147. "inputSourceLabel": "原始帖子-视频-票圈小程序-内容添加计划-{}".format(crawler_plan_name),
  148. }
  149. ]
  150. generate_plan_id = video_pool_config[platform]['generate_plan_id']
  151. aiditApi.bind_crawler_task_to_generate_task(
  152. crawler_task_list=crawler_task_list,
  153. generate_task_id=generate_plan_id,
  154. )
  155. # update status
  156. self.update_tasks_status(
  157. task_id_tuple=task_id_tuple,
  158. ori_status=const.TRANSFORM_INIT_STATUS,
  159. new_status=const.TRANSFORM_SUCCESS_STATUS
  160. )
  161. except Exception as e:
  162. bot(
  163. title='视频内容池发布任务',
  164. detail={
  165. 'platform': platform,
  166. 'date': datetime.datetime.today().strftime('%Y-%m-%d'),
  167. 'msg': '发布视频内容池失败,原因:{}'.format(str(e)),
  168. 'detail': traceback.format_exc(),
  169. },
  170. mention=False
  171. )
  172. else:
  173. bot(
  174. title='视频内容池发布任务',
  175. detail={
  176. 'platform': platform,
  177. 'date': datetime.datetime.today().strftime('%Y-%m-%d'),
  178. 'msg': '该平台无待发布视频,请关注供给的抓取'
  179. },
  180. mention=False
  181. )
  182. return fetch_response
  183. P = PublishSingleVideoPoolVideos()
  184. P.create_crawler_plan_by_category()