|
@@ -1,3 +1,8 @@
|
|
|
+from datetime import datetime
|
|
|
+
|
|
|
+from applications.api import feishu_robot
|
|
|
+
|
|
|
+
|
|
|
class CrawlerDetailAnalysisConst:
|
|
|
CATEGORY_LIST = [
|
|
|
"知识科普",
|
|
@@ -18,54 +23,154 @@ class CrawlerDetailAnalysisConst:
|
|
|
]
|
|
|
|
|
|
TRANSFORMED_STATUS = 1
|
|
|
+ NOT_TRANSFORMED_STATUS = 0
|
|
|
+
|
|
|
+ CRAWLER_DETAIL_TASK_PLATFORM = "crawler_detail_by_platform"
|
|
|
+ CRAWLER_DETAIL_TASK_CATEGORY = "crawler_detail_by_category"
|
|
|
+
|
|
|
+ TRANSFORM_DETAIL_TASK_PLATFORM = "transform_detail_by_platform"
|
|
|
+ TRANSFORM_DETAIL_TASK_CATEGORY = "transform_detail_by_category"
|
|
|
+
|
|
|
+
|
|
|
+class CrawlerDetailBase(CrawlerDetailAnalysisConst):
|
|
|
+ def __init__(self):
|
|
|
+ super().__init__()
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def create_feishu_column_map() -> dict:
|
|
|
+ date_column = feishu_robot.create_feishu_columns_sheet(
|
|
|
+ sheet_type="plain_text", sheet_name="dt", display_name="日期"
|
|
|
+ )
|
|
|
+ category_column = feishu_robot.create_feishu_columns_sheet(
|
|
|
+ sheet_type="plain_text", sheet_name="category", display_name="文章品类"
|
|
|
+ )
|
|
|
+ platform_column = feishu_robot.create_feishu_columns_sheet(
|
|
|
+ sheet_type="plain_text", sheet_name="platform", display_name="抓取渠道"
|
|
|
+ )
|
|
|
+ video_cnt_column = feishu_robot.create_feishu_columns_sheet(
|
|
|
+ sheet_type="number", sheet_name="video_count", display_name="视频数量"
|
|
|
+ )
|
|
|
+ avg_score_column = feishu_robot.create_feishu_columns_sheet(
|
|
|
+ sheet_type="number",
|
|
|
+ sheet_name="average_similarity_score",
|
|
|
+ display_name="相关性分均值",
|
|
|
+ )
|
|
|
+ return {
|
|
|
+ "dt": date_column,
|
|
|
+ "category": category_column,
|
|
|
+ "platform": platform_column,
|
|
|
+ "video_count": video_cnt_column,
|
|
|
+ "average_similarity_score": avg_score_column,
|
|
|
+ }
|
|
|
|
|
|
|
|
|
-class CrawlerDetail(CrawlerDetailAnalysisConst):
|
|
|
+class CrawlerArticleDetailAnalysis(CrawlerDetailBase):
|
|
|
pass
|
|
|
+ # raise NotImplementedError
|
|
|
|
|
|
|
|
|
-class CrawlerVideoDetailAnalysis(CrawlerDetail):
|
|
|
+class CrawlerVideoDetailAnalysis(CrawlerDetailBase):
|
|
|
def __init__(self, pool, trace_id):
|
|
|
+ super().__init__()
|
|
|
self.pool = pool
|
|
|
self.trace_id = trace_id
|
|
|
|
|
|
- async def get_crawler_videos_by_platform(self, start_date, end_data):
|
|
|
+ async def get_crawler_videos_by_platform(self, start_date, end_date):
|
|
|
"""
|
|
|
获取 start_dt && end_dt 之间每个渠道抓取的视频数量
|
|
|
"""
|
|
|
query = """
|
|
|
- SELECT FROM_UNIXTIME(crawler_timestamp, '%Y-%m-%d') AS crawler_date, platform, count(1) AS video_count
|
|
|
+ SELECT CAST(
|
|
|
+ DATE(FROM_UNIXTIME(crawler_timestamp)) AS CHAR
|
|
|
+ ) AS dt, platform, count(1) AS video_count
|
|
|
FROM publish_single_video_source
|
|
|
- WHERE FROM_UNIXTIME(crawler_timestamp, '%Y-%m-%d') BETWEEN %s AND %s
|
|
|
- GROUP BY FROM_UNIXTIME(crawler_timestamp, '%Y-%m-%d'), platform;
|
|
|
+ WHERE crawler_timestamp BETWEEN UNIX_TIMESTAMP(%s) AND UNIX_TIMESTAMP(%s)
|
|
|
+ GROUP BY dt, platform;
|
|
|
"""
|
|
|
- return await self.pool.async_fetch(query=query, params=(start_date, end_data))
|
|
|
+ return await self.pool.async_fetch(query=query, params=(start_date, end_date))
|
|
|
|
|
|
- async def get_crawler_videos_by_category(self, start_date, end_data):
|
|
|
+ async def get_crawler_videos_by_category(self, start_date, end_date):
|
|
|
"""
|
|
|
获取 start_dt && end_dt 之间每个品类抓取的视频数量
|
|
|
"""
|
|
|
- query = """
|
|
|
- SELECT FROM_UNIXTIME(crawler_timestamp, '%Y-%m-%d') AS crawler_date, category, count(1) AS video_count
|
|
|
+ category_place_holders = ", ".join(["%s"] * len(self.CATEGORY_LIST))
|
|
|
+ query = f"""
|
|
|
+ SELECT DATE(FROM_UNIXTIME(crawler_timestamp)) AS dt, category, count(1) AS video_count
|
|
|
FROM publish_single_video_source
|
|
|
- WHERE FROM_UNIXTIME(crawler_timestamp) BETWEEN %s AND %s AND category in %s
|
|
|
- GROUP BY FROM_UNIXTIME(crawler_timestamp, '%Y-%m-%d'), category;
|
|
|
+ WHERE FROM_UNIXTIME(crawler_timestamp) BETWEEN %s AND %s
|
|
|
+ AND category IN ({category_place_holders})
|
|
|
+ GROUP BY DATE(FROM_UNIXTIME(crawler_timestamp)), category;
|
|
|
"""
|
|
|
return await self.pool.async_fetch(
|
|
|
- query=query, params=(start_date, end_data, tuple(self.CATEGORY_LIST))
|
|
|
+ query=query, params=tuple([start_date, end_date] + self.CATEGORY_LIST)
|
|
|
)
|
|
|
|
|
|
- async def get_transform_videos_by_platform(self, start_date, end_data):
|
|
|
+ async def get_transform_videos_by_platform(self, start_date, end_date):
|
|
|
query = """
|
|
|
- SELECT DATE_FORMAT(create_timestamp, '%Y-%m-%d') AS dt, platform,
|
|
|
+ SELECT DATE(create_timestamp) AS dt, platform,
|
|
|
count(*) AS video_count, avg(score) AS average_similarity_score
|
|
|
FROM single_video_transform_queue
|
|
|
WHERE create_timestamp BETWEEN %s AND %s AND status = %s
|
|
|
- GROUP BY DATE_FORMAT(create_timestamp, '%Y-%m-%d'), platform;
|
|
|
+ GROUP BY DATE(create_timestamp), platform;
|
|
|
"""
|
|
|
return await self.pool.async_fetch(
|
|
|
- query=query, params=(start_date, end_data, self.TRANSFORMED_STATUS)
|
|
|
+ query=query, params=(start_date, end_date, self.TRANSFORMED_STATUS)
|
|
|
)
|
|
|
|
|
|
- async def get_transform_videos_by_category(self, start_date, end_data):
|
|
|
- pass
|
|
|
+ async def get_transform_videos_by_category(self, start_date, end_date):
|
|
|
+ raise NotImplementedError()
|
|
|
+
|
|
|
+
|
|
|
+class CrawlerDetailDeal(CrawlerVideoDetailAnalysis, CrawlerArticleDetailAnalysis):
|
|
|
+ def __init__(self, pool, trace_id):
|
|
|
+ super().__init__(pool, trace_id)
|
|
|
+
|
|
|
+ async def analysis_video_pool(self, task, start_date, end_date):
|
|
|
+ match task:
|
|
|
+ case self.CRAWLER_DETAIL_TASK_PLATFORM:
|
|
|
+ return await self.get_crawler_videos_by_platform(start_date, end_date)
|
|
|
+
|
|
|
+ case self.CRAWLER_DETAIL_TASK_CATEGORY:
|
|
|
+ return await self.get_crawler_videos_by_category(start_date, end_date)
|
|
|
+
|
|
|
+ case self.TRANSFORM_DETAIL_TASK_PLATFORM:
|
|
|
+ return await self.get_transform_videos_by_platform(start_date, end_date)
|
|
|
+
|
|
|
+ case self.TRANSFORM_DETAIL_TASK_CATEGORY:
|
|
|
+ return await self.get_transform_videos_by_category(start_date, end_date)
|
|
|
+
|
|
|
+ case _:
|
|
|
+ return None
|
|
|
+
|
|
|
+ async def analysis_article_pool(self, task, start_date, end_date):
|
|
|
+ raise NotImplementedError()
|
|
|
+
|
|
|
+ async def deal(self, params):
|
|
|
+ start_date = params.get("start_date")
|
|
|
+ end_date = params.get("end_date")
|
|
|
+ media_type = params.get("media_type", "video")
|
|
|
+ sub_task = params.get("sub_task_name", self.CRAWLER_DETAIL_TASK_PLATFORM)
|
|
|
+
|
|
|
+ column_dict = self.create_feishu_column_map()
|
|
|
+
|
|
|
+ match media_type:
|
|
|
+ case "video":
|
|
|
+ response = await self.analysis_video_pool(sub_task, start_date, end_date)
|
|
|
+ column_list = list(response[0].keys())
|
|
|
+ columns = [column_dict[key] for key in column_list]
|
|
|
+ print(columns)
|
|
|
+ await feishu_robot.bot(
|
|
|
+ title=f"[{start_date}, {end_date}) 抓取视频分平台统计",
|
|
|
+ detail={
|
|
|
+ "columns": columns,
|
|
|
+ "rows": response,
|
|
|
+ },
|
|
|
+ table=True,
|
|
|
+ mention=False,
|
|
|
+ )
|
|
|
+ print("bot 成功")
|
|
|
+
|
|
|
+ case "article":
|
|
|
+ resource = await self.analysis_article_pool(sub_task, start_date, end_date)
|
|
|
+ case _:
|
|
|
+ pass
|