publish_single_video_pool_videos.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. import json
  2. import datetime
  3. import traceback
  4. from pymysql.cursors import DictCursor
  5. from tqdm import tqdm
  6. from applications import bot, aiditApi
  7. from applications.const import SingleVideoPoolPublishTaskConst
  8. from applications.db import DatabaseConnector
  9. from config import long_articles_config, apolloConfig
  10. config = apolloConfig()
  11. const = SingleVideoPoolPublishTaskConst()
  12. # video_pool_config = json.loads(config.getConfigValue(key="video_pool_publish_config"))
  13. video_pool_config = {
  14. "sph": {
  15. "nick_name": "视频号",
  16. "process_num_each_day": 218,
  17. "generate_plan_id": "20250325025917853810062"
  18. },
  19. "gzh": {
  20. "nick_name": "公众号",
  21. "process_num_each_day": 201,
  22. "generate_plan_id": "20250324132413116896899"
  23. },
  24. "toutiao": {
  25. "nick_name": "头条号",
  26. "process_num_each_day": 411,
  27. "generate_plan_id": "20250324132226090387919"
  28. },
  29. "hksp": {
  30. "nick_name": "好看视频",
  31. "process_num_each_day": 165,
  32. "generate_plan_id": "20250325025446821867933"
  33. },
  34. "sohu": {
  35. "nick_name": "搜狐",
  36. "process_num_each_day": 100,
  37. "generate_plan_id": "20250409083938381788492"
  38. }
  39. }
  40. class PublishSingleVideoPoolVideos:
  41. def __init__(self):
  42. self.db_client = DatabaseConnector(db_config=long_articles_config)
  43. self.db_client.connect()
  44. def get_task_list(self, platform:str) -> list[dict]:
  45. daily_limit = video_pool_config[platform]['process_num_each_day']
  46. fetch_query = f"""
  47. select id, content_trace_id, pq_vid
  48. from single_video_transform_queue
  49. where status = {const.TRANSFORM_INIT_STATUS} and platform = '{platform}'
  50. order by score desc
  51. limit {daily_limit};
  52. """
  53. fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
  54. return fetch_response
  55. def update_tasks_status(self,
  56. task_id_tuple: tuple,
  57. ori_status: int,
  58. new_status: int)-> int:
  59. update_query = f"""
  60. update single_video_transform_queue
  61. set status = %s
  62. where id in %s and status = %s;
  63. """
  64. affected_rows = self.db_client.save(
  65. query=update_query,
  66. params=(new_status, task_id_tuple, ori_status)
  67. )
  68. return affected_rows
  69. def deal(self):
  70. """
  71. entrance of this class
  72. """
  73. platform_list = ["sohu"]
  74. for platform in tqdm(platform_list, desc='process each platform'):
  75. task_list = self.get_task_list(platform)
  76. task_id_tuple = tuple([task['id'] for task in task_list])
  77. vid_list = [task['pq_vid'] for task in task_list]
  78. if vid_list:
  79. try:
  80. # create video crawler plan
  81. plan_name = f"{video_pool_config[platform]['nick_name']}-{datetime.datetime.today().strftime('%Y-%m-%d')}-视频数量: {len(vid_list)}"
  82. crawler_plan_response = aiditApi.auto_create_single_video_crawler_task(
  83. plan_name=plan_name,
  84. plan_tag="单视频供给冷启动",
  85. video_id_list=vid_list,
  86. )
  87. crawler_plan_id = crawler_plan_response["data"]["id"]
  88. crawler_plan_name = crawler_plan_response["data"]["name"]
  89. # bind crawler plan to generate plan
  90. crawler_task_list = [
  91. {
  92. "contentType": 1,
  93. "inputSourceModal": 4,
  94. "inputSourceChannel": 10,
  95. "inputSourceType": 2,
  96. "inputSourceValue": crawler_plan_id,
  97. "inputSourceSubType": None,
  98. "fieldName": None,
  99. "inputSourceLabel": "原始帖子-视频-票圈小程序-内容添加计划-{}".format(crawler_plan_name),
  100. }
  101. ]
  102. generate_plan_id = video_pool_config[platform]['generate_plan_id']
  103. aiditApi.bind_crawler_task_to_generate_task(
  104. crawler_task_list=crawler_task_list,
  105. generate_task_id=generate_plan_id,
  106. )
  107. # update status
  108. self.update_tasks_status(
  109. task_id_tuple=task_id_tuple,
  110. ori_status=const.TRANSFORM_INIT_STATUS,
  111. new_status=const.TRANSFORM_SUCCESS_STATUS
  112. )
  113. except Exception as e:
  114. bot(
  115. title='视频内容池发布任务',
  116. detail={
  117. 'platform': platform,
  118. 'date': datetime.datetime.today().strftime('%Y-%m-%d'),
  119. 'msg': '发布视频内容池失败,原因:{}'.format(str(e)),
  120. 'detail': traceback.format_exc(),
  121. },
  122. mention=False
  123. )
  124. else:
  125. bot(
  126. title='视频内容池发布任务',
  127. detail={
  128. 'platform': platform,
  129. 'date': datetime.datetime.today().strftime('%Y-%m-%d'),
  130. 'msg': '该平台无待发布视频,请关注供给的抓取'
  131. },
  132. mention=False
  133. )
  134. if __name__ == '__main__':
  135. PublishSingleVideoPoolVideos().deal()