123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173 |
- from applications.api import feishu_robot
- class CrawlerDetailAnalysisConst:
- CATEGORY_LIST = [
- "知识科普",
- "国家大事",
- "历史人物",
- "奇闻趣事",
- "名人八卦",
- "怀旧时光",
- "情感故事",
- "社会法治",
- "现代人物",
- "社会现象",
- "健康养生",
- "家长里短",
- "军事历史",
- "财经科技",
- "政治新闻",
- ]
- TRANSFORMED_STATUS = 1
- NOT_TRANSFORMED_STATUS = 0
- CRAWLER_DETAIL_TASK_PLATFORM = "crawler_detail_by_platform"
- CRAWLER_DETAIL_TASK_CATEGORY = "crawler_detail_by_category"
- TRANSFORM_DETAIL_TASK_PLATFORM = "transform_detail_by_platform"
- TRANSFORM_DETAIL_TASK_CATEGORY = "transform_detail_by_category"
- class CrawlerDetailBase(CrawlerDetailAnalysisConst):
- def __init__(self):
- super().__init__()
- @staticmethod
- def create_feishu_column_map() -> dict:
- date_column = feishu_robot.create_feishu_columns_sheet(
- sheet_type="plain_text", sheet_name="dt", display_name="日期"
- )
- category_column = feishu_robot.create_feishu_columns_sheet(
- sheet_type="plain_text", sheet_name="category", display_name="文章品类"
- )
- platform_column = feishu_robot.create_feishu_columns_sheet(
- sheet_type="plain_text", sheet_name="platform", display_name="抓取渠道"
- )
- video_cnt_column = feishu_robot.create_feishu_columns_sheet(
- sheet_type="number", sheet_name="video_count", display_name="视频数量"
- )
- avg_score_column = feishu_robot.create_feishu_columns_sheet(
- sheet_type="number",
- sheet_name="average_similarity_score",
- display_name="相关性分均值",
- )
- return {
- "dt": date_column,
- "category": category_column,
- "platform": platform_column,
- "video_count": video_cnt_column,
- "average_similarity_score": avg_score_column,
- }
- class CrawlerArticleDetailAnalysis(CrawlerDetailBase):
- pass
- # raise NotImplementedError
- class CrawlerVideoDetailAnalysis(CrawlerDetailBase):
- def __init__(self, pool, trace_id):
- super().__init__()
- self.pool = pool
- self.trace_id = trace_id
- async def get_crawler_videos_by_platform(self, start_date, end_date):
- """
- 获取 start_dt && end_dt 之间每个渠道抓取的视频数量
- """
- query = """
- SELECT CAST(DATE(FROM_UNIXTIME(crawler_timestamp)) AS CHAR) AS dt, platform, count(1) AS video_count
- FROM publish_single_video_source
- WHERE crawler_timestamp BETWEEN UNIX_TIMESTAMP(%s) AND UNIX_TIMESTAMP(%s)
- GROUP BY dt, platform;
- """
- return await self.pool.async_fetch(query=query, params=(start_date, end_date))
- async def get_crawler_videos_by_category(self, start_date, end_date):
- """
- 获取 start_dt && end_dt 之间每个品类抓取的视频数量
- """
- category_place_holders = ", ".join(["%s"] * len(self.CATEGORY_LIST))
- query = f"""
- SELECT CAST(DATE(FROM_UNIXTIME(crawler_timestamp)) AS CHAR) AS dt, category, count(1) AS video_count
- FROM publish_single_video_source
- WHERE crawler_timestamp BETWEEN UNIX_TIMESTAMP(%s) AND UNIX_TIMESTAMP(%s)
- AND category IN ({category_place_holders})
- GROUP BY dt, category;
- """
- return await self.pool.async_fetch(
- query=query, params=tuple([start_date, end_date] + self.CATEGORY_LIST)
- )
- async def get_transform_videos_by_platform(self, start_date, end_date):
- query = """
- SELECT CAST(DATE(create_timestamp) AS CHAR) AS dt, platform,
- count(*) AS video_count, avg(score) AS average_similarity_score
- FROM single_video_transform_queue
- WHERE create_timestamp BETWEEN %s AND %s AND status = %s
- GROUP BY dt, platform;
- """
- return await self.pool.async_fetch(
- query=query, params=(start_date, end_date, self.TRANSFORMED_STATUS)
- )
- async def get_transform_videos_by_category(self, start_date, end_date):
- raise NotImplementedError()
- class CrawlerDetailDeal(CrawlerVideoDetailAnalysis, CrawlerArticleDetailAnalysis):
- def __init__(self, pool, trace_id):
- super().__init__(pool, trace_id)
- async def analysis_video_pool(self, task, start_date, end_date):
- match task:
- case self.CRAWLER_DETAIL_TASK_PLATFORM:
- return await self.get_crawler_videos_by_platform(start_date, end_date)
- case self.CRAWLER_DETAIL_TASK_CATEGORY:
- return await self.get_crawler_videos_by_category(start_date, end_date)
- case self.TRANSFORM_DETAIL_TASK_PLATFORM:
- return await self.get_transform_videos_by_platform(start_date, end_date)
- case self.TRANSFORM_DETAIL_TASK_CATEGORY:
- return await self.get_transform_videos_by_category(start_date, end_date)
- case _:
- return None
- async def analysis_article_pool(self, task, start_date, end_date):
- raise NotImplementedError()
- async def deal(self, params):
- start_date = params.get("start_date")
- end_date = params.get("end_date")
- media_type = params.get("media_type", "video")
- sub_task = params.get("sub_task_name", self.CRAWLER_DETAIL_TASK_PLATFORM)
- column_dict = self.create_feishu_column_map()
- match media_type:
- case "video":
- crawler_detail = await self.analysis_video_pool(sub_task, start_date, end_date)
- case "article":
- crawler_detail = await self.analysis_article_pool(sub_task, start_date, end_date)
- case _:
- return None
- column_list = list(crawler_detail[0].keys())
- columns = [column_dict[key] for key in column_list]
- await feishu_robot.bot(
- title=f"[{start_date}, {end_date}) 抓取 {media_type} 统计",
- detail={
- "columns": columns,
- "rows": crawler_detail,
- },
- table=True,
- mention=False,
- )
- return None
|