from applications.api import feishu_robot class CrawlerDetailAnalysisConst: CATEGORY_LIST = [ "知识科普", "国家大事", "历史人物", "奇闻趣事", "名人八卦", "怀旧时光", "情感故事", "社会法治", "现代人物", "社会现象", "健康养生", "家长里短", "军事历史", "财经科技", "政治新闻", ] TRANSFORMED_STATUS = 1 NOT_TRANSFORMED_STATUS = 0 CRAWLER_DETAIL_TASK_PLATFORM = "crawler_detail_by_platform" CRAWLER_DETAIL_TASK_CATEGORY = "crawler_detail_by_category" TRANSFORM_DETAIL_TASK_PLATFORM = "transform_detail_by_platform" TRANSFORM_DETAIL_TASK_CATEGORY = "transform_detail_by_category" class CrawlerDetailBase(CrawlerDetailAnalysisConst): def __init__(self): super().__init__() @staticmethod def create_feishu_column_map() -> dict: date_column = feishu_robot.create_feishu_columns_sheet( sheet_type="plain_text", sheet_name="dt", display_name="日期" ) category_column = feishu_robot.create_feishu_columns_sheet( sheet_type="plain_text", sheet_name="category", display_name="文章品类" ) platform_column = feishu_robot.create_feishu_columns_sheet( sheet_type="plain_text", sheet_name="platform", display_name="抓取渠道" ) video_cnt_column = feishu_robot.create_feishu_columns_sheet( sheet_type="number", sheet_name="video_count", display_name="视频数量" ) avg_score_column = feishu_robot.create_feishu_columns_sheet( sheet_type="number", sheet_name="average_similarity_score", display_name="相关性分均值", ) return { "dt": date_column, "category": category_column, "platform": platform_column, "video_count": video_cnt_column, "average_similarity_score": avg_score_column, } class CrawlerArticleDetailAnalysis(CrawlerDetailBase): pass # raise NotImplementedError class CrawlerVideoDetailAnalysis(CrawlerDetailBase): def __init__(self, pool, trace_id): super().__init__() self.pool = pool self.trace_id = trace_id async def get_crawler_videos_by_platform(self, start_date, end_date): """ 获取 start_dt && end_dt 之间每个渠道抓取的视频数量 """ query = """ SELECT CAST(DATE(FROM_UNIXTIME(crawler_timestamp)) AS CHAR) AS dt, platform, count(1) AS video_count FROM publish_single_video_source WHERE crawler_timestamp BETWEEN UNIX_TIMESTAMP(%s) AND UNIX_TIMESTAMP(%s) GROUP BY dt, platform; """ return await self.pool.async_fetch(query=query, params=(start_date, end_date)) async def get_crawler_videos_by_category(self, start_date, end_date): """ 获取 start_dt && end_dt 之间每个品类抓取的视频数量 """ category_place_holders = ", ".join(["%s"] * len(self.CATEGORY_LIST)) query = f""" SELECT CAST(DATE(FROM_UNIXTIME(crawler_timestamp)) AS CHAR) AS dt, category, count(1) AS video_count FROM publish_single_video_source WHERE crawler_timestamp BETWEEN UNIX_TIMESTAMP(%s) AND UNIX_TIMESTAMP(%s) AND category IN ({category_place_holders}) GROUP BY dt, category; """ return await self.pool.async_fetch( query=query, params=tuple([start_date, end_date] + self.CATEGORY_LIST) ) async def get_transform_videos_by_platform(self, start_date, end_date): query = """ SELECT CAST(DATE(create_timestamp) AS CHAR) AS dt, platform, count(*) AS video_count, avg(score) AS average_similarity_score FROM single_video_transform_queue WHERE create_timestamp BETWEEN %s AND %s AND status = %s GROUP BY dt, platform; """ return await self.pool.async_fetch( query=query, params=(start_date, end_date, self.TRANSFORMED_STATUS) ) async def get_transform_videos_by_category(self, start_date, end_date): raise NotImplementedError() class CrawlerDetailDeal(CrawlerVideoDetailAnalysis, CrawlerArticleDetailAnalysis): def __init__(self, pool, trace_id): super().__init__(pool, trace_id) async def analysis_video_pool(self, task, start_date, end_date): match task: case self.CRAWLER_DETAIL_TASK_PLATFORM: return await self.get_crawler_videos_by_platform(start_date, end_date) case self.CRAWLER_DETAIL_TASK_CATEGORY: return await self.get_crawler_videos_by_category(start_date, end_date) case self.TRANSFORM_DETAIL_TASK_PLATFORM: return await self.get_transform_videos_by_platform(start_date, end_date) case self.TRANSFORM_DETAIL_TASK_CATEGORY: return await self.get_transform_videos_by_category(start_date, end_date) case _: return None async def analysis_article_pool(self, task, start_date, end_date): raise NotImplementedError() async def deal(self, params): start_date = params.get("start_date") end_date = params.get("end_date") media_type = params.get("media_type", "video") sub_task = params.get("sub_task_name", self.CRAWLER_DETAIL_TASK_PLATFORM) column_dict = self.create_feishu_column_map() match media_type: case "video": crawler_detail = await self.analysis_video_pool(sub_task, start_date, end_date) case "article": crawler_detail = await self.analysis_article_pool(sub_task, start_date, end_date) case _: return None column_list = list(crawler_detail[0].keys()) columns = [column_dict[key] for key in column_list] await feishu_robot.bot( title=f"[{start_date}, {end_date}) 抓取 {media_type} 统计", detail={ "columns": columns, "rows": crawler_detail, }, table=True, mention=False, ) return None