|
@@ -5,15 +5,23 @@ import traceback
|
|
|
from pymysql.cursors import DictCursor
|
|
|
from tqdm import tqdm
|
|
|
|
|
|
-from applications import bot, aiditApi
|
|
|
+from applications import aiditApi
|
|
|
+from applications.api import ApolloApi, FeishuBotApi
|
|
|
from applications.const import SingleVideoPoolPublishTaskConst
|
|
|
from applications.db import DatabaseConnector
|
|
|
-from config import long_articles_config, apolloConfig
|
|
|
+from config import long_articles_config
|
|
|
|
|
|
-config = apolloConfig()
|
|
|
+# init
|
|
|
+apollo_api = ApolloApi(env="prod")
|
|
|
+feishu_bot_api = FeishuBotApi()
|
|
|
const = SingleVideoPoolPublishTaskConst()
|
|
|
|
|
|
-video_pool_config = json.loads(config.getConfigValue(key="video_pool_publish_config"))
|
|
|
+# get config information from apollo
|
|
|
+video_pool_config = json.loads(
|
|
|
+ apollo_api.get_config_value(key="video_pool_publish_config")
|
|
|
+)
|
|
|
+video_category_list = json.loads(apollo_api.get_config_value(key="category_list"))
|
|
|
+platform_list = list(video_pool_config.keys())
|
|
|
|
|
|
|
|
|
class PublishSingleVideoPoolVideos:
|
|
@@ -21,97 +29,159 @@ class PublishSingleVideoPoolVideos:
|
|
|
self.db_client = DatabaseConnector(db_config=long_articles_config)
|
|
|
self.db_client.connect()
|
|
|
|
|
|
- def get_task_list(self, platform:str) -> list[dict]:
|
|
|
- daily_limit = video_pool_config[platform]['process_num_each_day']
|
|
|
+ def get_task_list(self, platform: str) -> list[dict]:
|
|
|
+ daily_limit = video_pool_config[platform]["process_num_each_day"]
|
|
|
fetch_query = f"""
|
|
|
- select id, content_trace_id, pq_vid
|
|
|
- from single_video_transform_queue
|
|
|
- where status = {const.TRANSFORM_INIT_STATUS} and platform = '{platform}'
|
|
|
+ select t1.id, t1.content_trace_id, t1.pq_vid, t1.score, t2.category
|
|
|
+ from single_video_transform_queue t1
|
|
|
+ join publish_single_video_source t2 on t1.content_trace_id = t2.content_trace_id
|
|
|
+ where
|
|
|
+ t1.status = {const.TRANSFORM_SUCCESS_STATUS}
|
|
|
+ and t1.platform = '{platform}'
|
|
|
+ and t2.category_status = {const.SUCCESS_STATUS}
|
|
|
order by score desc
|
|
|
limit {daily_limit};
|
|
|
"""
|
|
|
fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
|
|
|
return fetch_response
|
|
|
|
|
|
- def update_tasks_status(self,
|
|
|
- task_id_tuple: tuple,
|
|
|
- ori_status: int,
|
|
|
- new_status: int)-> int:
|
|
|
+ def update_tasks_status(
|
|
|
+ self, task_id_tuple: tuple, ori_status: int, new_status: int
|
|
|
+ ) -> int:
|
|
|
update_query = f"""
|
|
|
update single_video_transform_queue
|
|
|
set status = %s
|
|
|
where id in %s and status = %s;
|
|
|
"""
|
|
|
affected_rows = self.db_client.save(
|
|
|
- query=update_query,
|
|
|
- params=(new_status, task_id_tuple, ori_status)
|
|
|
+ query=update_query, params=(new_status, task_id_tuple, ori_status)
|
|
|
)
|
|
|
return affected_rows
|
|
|
|
|
|
+ def create_crawler_plan(
|
|
|
+ self, vid_list: list, platform: str, task_id_tuple: tuple, category: str
|
|
|
+ ) -> None:
|
|
|
+ try:
|
|
|
+ # create video crawler plan
|
|
|
+ plan_name = f"{video_pool_config[platform]['nick_name']}-{category}-{datetime.datetime.today().strftime('%Y-%m-%d')}-视频数量: {len(vid_list)}"
|
|
|
+ crawler_plan_response = aiditApi.auto_create_single_video_crawler_task(
|
|
|
+ plan_name=plan_name,
|
|
|
+ plan_tag="单视频供给冷启动",
|
|
|
+ video_id_list=vid_list,
|
|
|
+ )
|
|
|
+ crawler_plan_id = crawler_plan_response["data"]["id"]
|
|
|
+ crawler_plan_name = crawler_plan_response["data"]["name"]
|
|
|
+
|
|
|
+ # bind crawler plan to generate plan
|
|
|
+ crawler_task_list = [
|
|
|
+ {
|
|
|
+ "contentType": 1,
|
|
|
+ "inputSourceModal": 4,
|
|
|
+ "inputSourceChannel": 10,
|
|
|
+ "inputSourceType": 2,
|
|
|
+ "inputSourceValue": crawler_plan_id,
|
|
|
+ "inputSourceSubType": None,
|
|
|
+ "fieldName": None,
|
|
|
+ "inputSourceLabel": "原始帖子-视频-票圈小程序-内容添加计划-{}".format(
|
|
|
+ crawler_plan_name
|
|
|
+ ),
|
|
|
+ }
|
|
|
+ ]
|
|
|
+ generate_plan_id = video_pool_config[platform]["generate_plan_id"]
|
|
|
+ aiditApi.bind_crawler_task_to_generate_task(
|
|
|
+ crawler_task_list=crawler_task_list,
|
|
|
+ generate_task_id=generate_plan_id,
|
|
|
+ )
|
|
|
+
|
|
|
+ # update status
|
|
|
+ self.update_tasks_status(
|
|
|
+ task_id_tuple=task_id_tuple,
|
|
|
+ ori_status=const.TRANSFORM_INIT_STATUS,
|
|
|
+ new_status=const.TRANSFORM_SUCCESS_STATUS,
|
|
|
+ )
|
|
|
+ except Exception as e:
|
|
|
+ feishu_bot_api.bot(
|
|
|
+ title="视频内容池发布任务",
|
|
|
+ detail={
|
|
|
+ "platform": platform,
|
|
|
+ "date": datetime.datetime.today().strftime("%Y-%m-%d"),
|
|
|
+ "msg": "发布视频内容池失败,原因:{}".format(str(e)),
|
|
|
+ "detail": traceback.format_exc(),
|
|
|
+ },
|
|
|
+ mention=False,
|
|
|
+ )
|
|
|
+
|
|
|
def deal(self):
|
|
|
"""
|
|
|
entrance of this class
|
|
|
"""
|
|
|
- platform_list = ["sph", "gzh", "toutiao", "hksp", "sohu", "piaoquan"]
|
|
|
- for platform in tqdm(platform_list, desc='process each platform'):
|
|
|
- task_list = self.get_task_list(platform)
|
|
|
- task_id_tuple = tuple([task['id'] for task in task_list])
|
|
|
- vid_list = [task['pq_vid'] for task in task_list]
|
|
|
- if vid_list:
|
|
|
- try:
|
|
|
- # create video crawler plan
|
|
|
- plan_name = f"{video_pool_config[platform]['nick_name']}-{datetime.datetime.today().strftime('%Y-%m-%d')}-视频数量: {len(vid_list)}"
|
|
|
- crawler_plan_response = aiditApi.auto_create_single_video_crawler_task(
|
|
|
- plan_name=plan_name,
|
|
|
- plan_tag="单视频供给冷启动",
|
|
|
- video_id_list=vid_list,
|
|
|
- )
|
|
|
- crawler_plan_id = crawler_plan_response["data"]["id"]
|
|
|
- crawler_plan_name = crawler_plan_response["data"]["name"]
|
|
|
+ platform_map = [
|
|
|
+ (key, video_pool_config[key]["nick_name"]) for key in video_pool_config
|
|
|
+ ]
|
|
|
+ columns = [
|
|
|
+ feishu_bot_api.create_feishu_columns_sheet(
|
|
|
+ sheet_type="plain_text", sheet_name="category", display_name="品类"
|
|
|
+ ),
|
|
|
+ feishu_bot_api.create_feishu_columns_sheet(
|
|
|
+ sheet_type="number", sheet_name="total", display_name="品类视频总量"
|
|
|
+ ),
|
|
|
+ *[
|
|
|
+ feishu_bot_api.create_feishu_columns_sheet(
|
|
|
+ sheet_type="number", sheet_name=platform, display_name=display_name
|
|
|
+ )
|
|
|
+ for platform, display_name in platform_map
|
|
|
+ ],
|
|
|
+ ]
|
|
|
+ publish_detail_table = {}
|
|
|
|
|
|
- # bind crawler plan to generate plan
|
|
|
- crawler_task_list = [
|
|
|
- {
|
|
|
- "contentType": 1,
|
|
|
- "inputSourceModal": 4,
|
|
|
- "inputSourceChannel": 10,
|
|
|
- "inputSourceType": 2,
|
|
|
- "inputSourceValue": crawler_plan_id,
|
|
|
- "inputSourceSubType": None,
|
|
|
- "fieldName": None,
|
|
|
- "inputSourceLabel": "原始帖子-视频-票圈小程序-内容添加计划-{}".format(crawler_plan_name),
|
|
|
- }
|
|
|
+ for platform in tqdm(platform_list, desc="process each platform"):
|
|
|
+ task_list = self.get_task_list(platform)
|
|
|
+ if task_list:
|
|
|
+ # split task list into each category
|
|
|
+ for category in video_category_list:
|
|
|
+ task_list_with_category = [
|
|
|
+ task for task in task_list if task["category"] == category
|
|
|
]
|
|
|
- generate_plan_id = video_pool_config[platform]['generate_plan_id']
|
|
|
- aiditApi.bind_crawler_task_to_generate_task(
|
|
|
- crawler_task_list=crawler_task_list,
|
|
|
- generate_task_id=generate_plan_id,
|
|
|
- )
|
|
|
-
|
|
|
- # update status
|
|
|
- self.update_tasks_status(
|
|
|
- task_id_tuple=task_id_tuple,
|
|
|
- ori_status=const.TRANSFORM_INIT_STATUS,
|
|
|
- new_status=const.TRANSFORM_SUCCESS_STATUS
|
|
|
- )
|
|
|
- except Exception as e:
|
|
|
- bot(
|
|
|
- title='视频内容池发布任务',
|
|
|
- detail={
|
|
|
- 'platform': platform,
|
|
|
- 'date': datetime.datetime.today().strftime('%Y-%m-%d'),
|
|
|
- 'msg': '发布视频内容池失败,原因:{}'.format(str(e)),
|
|
|
- 'detail': traceback.format_exc(),
|
|
|
- },
|
|
|
- mention=False
|
|
|
+ task_id_tuple = tuple(
|
|
|
+ [task["id"] for task in task_list_with_category]
|
|
|
)
|
|
|
+ vid_list = [task["pq_vid"] for task in task_list_with_category]
|
|
|
+ if vid_list:
|
|
|
+ self.create_crawler_plan(
|
|
|
+ vid_list, platform, task_id_tuple, category
|
|
|
+ )
|
|
|
+ if publish_detail_table.get(platform):
|
|
|
+ publish_detail_table[platform][category] = len(vid_list)
|
|
|
+ else:
|
|
|
+ publish_detail_table[platform] = {category: len(vid_list)}
|
|
|
+ else:
|
|
|
+ continue
|
|
|
else:
|
|
|
- bot(
|
|
|
- title='视频内容池发布任务',
|
|
|
+ feishu_bot_api.bot(
|
|
|
+ title="视频内容池发布任务",
|
|
|
detail={
|
|
|
- 'platform': platform,
|
|
|
- 'date': datetime.datetime.today().strftime('%Y-%m-%d'),
|
|
|
- 'msg': '该平台无待发布视频,请关注供给的抓取'
|
|
|
+ "platform": platform,
|
|
|
+ "date": datetime.datetime.today().strftime("%Y-%m-%d"),
|
|
|
+ "msg": "该渠道今日无供给,注意关注供给详情",
|
|
|
},
|
|
|
- mention=False
|
|
|
- )
|
|
|
+ mention=False,
|
|
|
+ )
|
|
|
+ detail_rows = []
|
|
|
+ for category in video_category_list:
|
|
|
+ platform_counts = {
|
|
|
+ platform: publish_detail_table.get(platform, {}).get(category, 0)
|
|
|
+ for platform in platform_list
|
|
|
+ }
|
|
|
+ total = sum(platform_counts.values())
|
|
|
+ detail_rows.append(
|
|
|
+ {"category": category, **platform_counts, "total": total}
|
|
|
+ )
|
|
|
+ feishu_bot_api.bot(
|
|
|
+ title="视频内容池冷启动发布任务",
|
|
|
+ detail={
|
|
|
+ "columns": columns,
|
|
|
+ "rows": detail_rows,
|
|
|
+ },
|
|
|
+ table=True,
|
|
|
+ mention=False,
|
|
|
+ )
|