publish_single_video_pool_videos.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. import json
  2. import datetime
  3. import traceback
  4. from pymysql.cursors import DictCursor
  5. from tqdm import tqdm
  6. from applications import aiditApi
  7. from applications.api import ApolloApi, FeishuBotApi
  8. from applications.const import SingleVideoPoolPublishTaskConst
  9. from applications.db import DatabaseConnector
  10. from config import long_articles_config
  11. # init
  12. apollo_api = ApolloApi(env="prod")
  13. feishu_bot_api = FeishuBotApi()
  14. const = SingleVideoPoolPublishTaskConst()
  15. # get config information from apollo
  16. video_pool_config = json.loads(
  17. apollo_api.get_config_value(key="video_pool_publish_config")
  18. )
  19. video_category_list = json.loads(apollo_api.get_config_value(key="category_list"))
  20. platform_list = list(video_pool_config.keys())
  21. experimental_account = json.loads(
  22. apollo_api.get_config_value(key="experimental_account")
  23. )
  24. experimental_account_set = set(experimental_account)
  25. class PublishSingleVideoPoolVideos:
  26. def __init__(self):
  27. self.db_client = DatabaseConnector(db_config=long_articles_config)
  28. self.db_client.connect()
  29. def get_task_list(self, platform: str) -> list[dict]:
  30. daily_limit = video_pool_config[platform]["process_num_each_day"]
  31. fetch_query = f"""
  32. select t1.id, t1.content_trace_id, t1.pq_vid, t1.score, t2.category, t2.out_account_id
  33. from single_video_transform_queue t1
  34. join publish_single_video_source t2 on t1.content_trace_id = t2.content_trace_id
  35. where
  36. t1.status = {const.TRANSFORM_INIT_STATUS}
  37. and t1.platform = '{platform}'
  38. and t2.category_status = {const.SUCCESS_STATUS}
  39. order by score desc
  40. limit {daily_limit};
  41. """
  42. fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
  43. return fetch_response
  44. def update_tasks_status(
  45. self, task_id_tuple: tuple, ori_status: int, new_status: int
  46. ) -> int:
  47. update_query = f"""
  48. update single_video_transform_queue
  49. set status = %s
  50. where id in %s and status = %s;
  51. """
  52. affected_rows = self.db_client.save(
  53. query=update_query, params=(new_status, task_id_tuple, ori_status)
  54. )
  55. return affected_rows
  56. def create_crawler_plan(
  57. self,
  58. vid_list: list,
  59. platform: str,
  60. task_id_tuple: tuple,
  61. category: str,
  62. experiment_tag: str = None,
  63. ) -> None:
  64. try:
  65. # create video crawler plan
  66. date_info = datetime.datetime.today().strftime("%Y-%m-%d")
  67. if experiment_tag:
  68. plan_name = f"{video_pool_config[platform]['nick_name']}-{category}-{date_info}-视频数量: {len(vid_list): {experiment_tag}}"
  69. else:
  70. plan_name = f"{video_pool_config[platform]['nick_name']}-{category}-{date_info}-视频数量: {len(vid_list)}"
  71. crawler_plan_response = aiditApi.auto_create_single_video_crawler_task(
  72. plan_name=plan_name,
  73. plan_tag="单视频供给冷启动",
  74. video_id_list=vid_list,
  75. )
  76. crawler_plan_id = crawler_plan_response["data"]["id"]
  77. crawler_plan_name = crawler_plan_response["data"]["name"]
  78. # bind crawler plan to generate plan
  79. crawler_task_list = [
  80. {
  81. "contentType": 1,
  82. "inputSourceModal": 4,
  83. "inputSourceChannel": 10,
  84. "inputSourceType": 2,
  85. "inputSourceValue": crawler_plan_id,
  86. "inputSourceSubType": None,
  87. "fieldName": None,
  88. "inputSourceLabel": "原始帖子-视频-票圈小程序-内容添加计划-{}".format(
  89. crawler_plan_name
  90. ),
  91. }
  92. ]
  93. generate_plan_id = video_pool_config[platform]["generate_plan_id"]
  94. aiditApi.bind_crawler_task_to_generate_task(
  95. crawler_task_list=crawler_task_list,
  96. generate_task_id=generate_plan_id,
  97. )
  98. # update status
  99. self.update_tasks_status(
  100. task_id_tuple=task_id_tuple,
  101. ori_status=const.TRANSFORM_INIT_STATUS,
  102. new_status=const.TRANSFORM_SUCCESS_STATUS,
  103. )
  104. except Exception as e:
  105. feishu_bot_api.bot(
  106. title="视频内容池发布任务",
  107. detail={
  108. "platform": platform,
  109. "date": datetime.datetime.today().strftime("%Y-%m-%d"),
  110. "msg": "发布视频内容池失败,原因:{}".format(str(e)),
  111. "detail": traceback.format_exc(),
  112. },
  113. mention=False,
  114. )
  115. def deal(self):
  116. """
  117. entrance of this class
  118. """
  119. platform_map = [
  120. (key, video_pool_config[key]["nick_name"]) for key in video_pool_config
  121. ]
  122. columns = [
  123. feishu_bot_api.create_feishu_columns_sheet(
  124. sheet_type="plain_text", sheet_name="category", display_name="品类"
  125. ),
  126. feishu_bot_api.create_feishu_columns_sheet(
  127. sheet_type="number", sheet_name="total", display_name="品类视频总量"
  128. ),
  129. *[
  130. feishu_bot_api.create_feishu_columns_sheet(
  131. sheet_type="number", sheet_name=platform, display_name=display_name
  132. )
  133. for platform, display_name in platform_map
  134. ],
  135. ]
  136. publish_detail_table = {}
  137. for platform in tqdm(platform_list, desc="process each platform"):
  138. task_list = self.get_task_list(platform)
  139. if task_list:
  140. # split task list into each category
  141. for category in video_category_list:
  142. task_list_with_category = [
  143. task for task in task_list if task["category"] == category
  144. ]
  145. task_id_tuple = tuple(
  146. [task["id"] for task in task_list_with_category]
  147. )
  148. # 区分账号ID 是否属于实验账号
  149. experimental_vid_list = [
  150. task["pq_vid"]
  151. for task in task_list_with_category
  152. if task["out_account_id"] in experimental_account_set
  153. ]
  154. normal_vid_list = [
  155. task["pq_vid"]
  156. for task in task_list_with_category
  157. if task["out_account_id"] not in experimental_account_set
  158. ]
  159. if normal_vid_list:
  160. self.create_crawler_plan(
  161. normal_vid_list, platform, task_id_tuple, category
  162. )
  163. if publish_detail_table.get(platform):
  164. publish_detail_table[platform][category] = len(
  165. normal_vid_list
  166. )
  167. else:
  168. publish_detail_table[platform] = {
  169. category: len(normal_vid_list)
  170. }
  171. if experimental_vid_list:
  172. self.create_crawler_plan(
  173. experimental_vid_list,
  174. platform,
  175. task_id_tuple,
  176. category,
  177. "20250610-品类账号实验",
  178. )
  179. if publish_detail_table.get(platform):
  180. publish_detail_table[platform][category] += len(
  181. experimental_vid_list
  182. )
  183. else:
  184. publish_detail_table[platform] = {
  185. category: len(experimental_vid_list)
  186. }
  187. else:
  188. feishu_bot_api.bot(
  189. title="视频内容池发布任务",
  190. detail={
  191. "platform": platform,
  192. "date": datetime.datetime.today().strftime("%Y-%m-%d"),
  193. "msg": "该渠道今日无供给,注意关注供给详情",
  194. },
  195. mention=False,
  196. )
  197. detail_rows = []
  198. for category in video_category_list:
  199. platform_counts = {
  200. platform: publish_detail_table.get(platform, {}).get(category, 0)
  201. for platform in platform_list
  202. }
  203. total = sum(platform_counts.values())
  204. detail_rows.append(
  205. {"category": category, **platform_counts, "total": total}
  206. )
  207. feishu_bot_api.bot(
  208. title="视频内容池冷启动发布任务",
  209. detail={
  210. "columns": columns,
  211. "rows": detail_rows,
  212. },
  213. table=True,
  214. mention=False,
  215. )