import json import datetime import traceback from pymysql.cursors import DictCursor from tqdm import tqdm from applications import aiditApi from applications.api import ApolloApi, FeishuBotApi from applications.const import SingleVideoPoolPublishTaskConst from applications.db import DatabaseConnector from config import long_articles_config # init apollo_api = ApolloApi(env="prod") feishu_bot_api = FeishuBotApi() const = SingleVideoPoolPublishTaskConst() # get config information from apollo video_pool_config = json.loads( apollo_api.get_config_value(key="video_pool_publish_config") ) video_category_list = json.loads(apollo_api.get_config_value(key="category_list")) platform_list = list(video_pool_config.keys()) class PublishSingleVideoPoolVideos: def __init__(self): self.db_client = DatabaseConnector(db_config=long_articles_config) self.db_client.connect() def get_task_list(self, platform: str) -> list[dict]: daily_limit = video_pool_config[platform]["process_num_each_day"] fetch_query = f""" select t1.id, t1.content_trace_id, t1.pq_vid, t1.score, t2.category from single_video_transform_queue t1 join publish_single_video_source t2 on t1.content_trace_id = t2.content_trace_id where t1.status = {const.TRANSFORM_INIT_STATUS} and t1.platform = '{platform}' and t2.category_status = {const.SUCCESS_STATUS} order by score desc limit {daily_limit}; """ fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor) return fetch_response def update_tasks_status( self, task_id_tuple: tuple, ori_status: int, new_status: int ) -> int: update_query = f""" update single_video_transform_queue set status = %s where id in %s and status = %s; """ affected_rows = self.db_client.save( query=update_query, params=(new_status, task_id_tuple, ori_status) ) return affected_rows def create_crawler_plan( self, vid_list: list, platform: str, task_id_tuple: tuple, category: str ) -> None: try: # create video crawler plan plan_name = f"{video_pool_config[platform]['nick_name']}-{category}-{datetime.datetime.today().strftime('%Y-%m-%d')}-视频数量: {len(vid_list)}" crawler_plan_response = aiditApi.auto_create_single_video_crawler_task( plan_name=plan_name, plan_tag="单视频供给冷启动", video_id_list=vid_list, ) crawler_plan_id = crawler_plan_response["data"]["id"] crawler_plan_name = crawler_plan_response["data"]["name"] # bind crawler plan to generate plan crawler_task_list = [ { "contentType": 1, "inputSourceModal": 4, "inputSourceChannel": 10, "inputSourceType": 2, "inputSourceValue": crawler_plan_id, "inputSourceSubType": None, "fieldName": None, "inputSourceLabel": "原始帖子-视频-票圈小程序-内容添加计划-{}".format( crawler_plan_name ), } ] generate_plan_id = video_pool_config[platform]["generate_plan_id"] aiditApi.bind_crawler_task_to_generate_task( crawler_task_list=crawler_task_list, generate_task_id=generate_plan_id, ) # update status self.update_tasks_status( task_id_tuple=task_id_tuple, ori_status=const.TRANSFORM_INIT_STATUS, new_status=const.TRANSFORM_SUCCESS_STATUS, ) except Exception as e: feishu_bot_api.bot( title="视频内容池发布任务", detail={ "platform": platform, "date": datetime.datetime.today().strftime("%Y-%m-%d"), "msg": "发布视频内容池失败,原因:{}".format(str(e)), "detail": traceback.format_exc(), }, mention=False, ) def deal(self): """ entrance of this class """ platform_map = [ (key, video_pool_config[key]["nick_name"]) for key in video_pool_config ] columns = [ feishu_bot_api.create_feishu_columns_sheet( sheet_type="plain_text", sheet_name="category", display_name="品类" ), feishu_bot_api.create_feishu_columns_sheet( sheet_type="number", sheet_name="total", display_name="品类视频总量" ), *[ feishu_bot_api.create_feishu_columns_sheet( sheet_type="number", sheet_name=platform, display_name=display_name ) for platform, display_name in platform_map ], ] publish_detail_table = {} for platform in tqdm(platform_list, desc="process each platform"): task_list = self.get_task_list(platform) if task_list: # split task list into each category for category in video_category_list: task_list_with_category = [ task for task in task_list if task["category"] == category ] task_id_tuple = tuple( [task["id"] for task in task_list_with_category] ) vid_list = [task["pq_vid"] for task in task_list_with_category] if vid_list: self.create_crawler_plan( vid_list, platform, task_id_tuple, category ) if publish_detail_table.get(platform): publish_detail_table[platform][category] = len(vid_list) else: publish_detail_table[platform] = {category: len(vid_list)} else: continue else: feishu_bot_api.bot( title="视频内容池发布任务", detail={ "platform": platform, "date": datetime.datetime.today().strftime("%Y-%m-%d"), "msg": "该渠道今日无供给,注意关注供给详情", }, mention=False, ) detail_rows = [] for category in video_category_list: platform_counts = { platform: publish_detail_table.get(platform, {}).get(category, 0) for platform in platform_list } total = sum(platform_counts.values()) detail_rows.append( {"category": category, **platform_counts, "total": total} ) feishu_bot_api.bot( title="视频内容池冷启动发布任务", detail={ "columns": columns, "rows": detail_rows, }, table=True, mention=False, )