crawler_detail.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. from datetime import datetime
  2. from applications.api import feishu_robot
  3. class CrawlerDetailAnalysisConst:
  4. CATEGORY_LIST = [
  5. "知识科普",
  6. "国家大事",
  7. "历史人物",
  8. "奇闻趣事",
  9. "名人八卦",
  10. "怀旧时光",
  11. "情感故事",
  12. "社会法治",
  13. "现代人物",
  14. "社会现象",
  15. "健康养生",
  16. "家长里短",
  17. "军事历史",
  18. "财经科技",
  19. "政治新闻",
  20. ]
  21. TRANSFORMED_STATUS = 1
  22. NOT_TRANSFORMED_STATUS = 0
  23. CRAWLER_DETAIL_TASK_PLATFORM = "crawler_detail_by_platform"
  24. CRAWLER_DETAIL_TASK_CATEGORY = "crawler_detail_by_category"
  25. TRANSFORM_DETAIL_TASK_PLATFORM = "transform_detail_by_platform"
  26. TRANSFORM_DETAIL_TASK_CATEGORY = "transform_detail_by_category"
  27. class CrawlerDetailBase(CrawlerDetailAnalysisConst):
  28. def __init__(self):
  29. super().__init__()
  30. @staticmethod
  31. def create_feishu_column_map() -> dict:
  32. date_column = feishu_robot.create_feishu_columns_sheet(
  33. sheet_type="plain_text", sheet_name="dt", display_name="日期"
  34. )
  35. category_column = feishu_robot.create_feishu_columns_sheet(
  36. sheet_type="plain_text", sheet_name="category", display_name="文章品类"
  37. )
  38. platform_column = feishu_robot.create_feishu_columns_sheet(
  39. sheet_type="plain_text", sheet_name="platform", display_name="抓取渠道"
  40. )
  41. video_cnt_column = feishu_robot.create_feishu_columns_sheet(
  42. sheet_type="number", sheet_name="video_count", display_name="视频数量"
  43. )
  44. avg_score_column = feishu_robot.create_feishu_columns_sheet(
  45. sheet_type="number",
  46. sheet_name="average_similarity_score",
  47. display_name="相关性分均值",
  48. )
  49. return {
  50. "dt": date_column,
  51. "category": category_column,
  52. "platform": platform_column,
  53. "video_count": video_cnt_column,
  54. "average_similarity_score": avg_score_column,
  55. }
  56. class CrawlerArticleDetailAnalysis(CrawlerDetailBase):
  57. pass
  58. # raise NotImplementedError
  59. class CrawlerVideoDetailAnalysis(CrawlerDetailBase):
  60. def __init__(self, pool, trace_id):
  61. super().__init__()
  62. self.pool = pool
  63. self.trace_id = trace_id
  64. async def get_crawler_videos_by_platform(self, start_date, end_date):
  65. """
  66. 获取 start_dt && end_dt 之间每个渠道抓取的视频数量
  67. """
  68. query = """
  69. SELECT CAST(
  70. DATE(FROM_UNIXTIME(crawler_timestamp)) AS CHAR
  71. ) AS dt, platform, count(1) AS video_count
  72. FROM publish_single_video_source
  73. WHERE crawler_timestamp BETWEEN UNIX_TIMESTAMP(%s) AND UNIX_TIMESTAMP(%s)
  74. GROUP BY dt, platform;
  75. """
  76. return await self.pool.async_fetch(query=query, params=(start_date, end_date))
  77. async def get_crawler_videos_by_category(self, start_date, end_date):
  78. """
  79. 获取 start_dt && end_dt 之间每个品类抓取的视频数量
  80. """
  81. category_place_holders = ", ".join(["%s"] * len(self.CATEGORY_LIST))
  82. query = f"""
  83. SELECT DATE(FROM_UNIXTIME(crawler_timestamp)) AS dt, category, count(1) AS video_count
  84. FROM publish_single_video_source
  85. WHERE FROM_UNIXTIME(crawler_timestamp) BETWEEN %s AND %s
  86. AND category IN ({category_place_holders})
  87. GROUP BY DATE(FROM_UNIXTIME(crawler_timestamp)), category;
  88. """
  89. return await self.pool.async_fetch(
  90. query=query, params=tuple([start_date, end_date] + self.CATEGORY_LIST)
  91. )
  92. async def get_transform_videos_by_platform(self, start_date, end_date):
  93. query = """
  94. SELECT DATE(create_timestamp) AS dt, platform,
  95. count(*) AS video_count, avg(score) AS average_similarity_score
  96. FROM single_video_transform_queue
  97. WHERE create_timestamp BETWEEN %s AND %s AND status = %s
  98. GROUP BY DATE(create_timestamp), platform;
  99. """
  100. return await self.pool.async_fetch(
  101. query=query, params=(start_date, end_date, self.TRANSFORMED_STATUS)
  102. )
  103. async def get_transform_videos_by_category(self, start_date, end_date):
  104. raise NotImplementedError()
  105. class CrawlerDetailDeal(CrawlerVideoDetailAnalysis, CrawlerArticleDetailAnalysis):
  106. def __init__(self, pool, trace_id):
  107. super().__init__(pool, trace_id)
  108. async def analysis_video_pool(self, task, start_date, end_date):
  109. match task:
  110. case self.CRAWLER_DETAIL_TASK_PLATFORM:
  111. return await self.get_crawler_videos_by_platform(start_date, end_date)
  112. case self.CRAWLER_DETAIL_TASK_CATEGORY:
  113. return await self.get_crawler_videos_by_category(start_date, end_date)
  114. case self.TRANSFORM_DETAIL_TASK_PLATFORM:
  115. return await self.get_transform_videos_by_platform(start_date, end_date)
  116. case self.TRANSFORM_DETAIL_TASK_CATEGORY:
  117. return await self.get_transform_videos_by_category(start_date, end_date)
  118. case _:
  119. return None
  120. async def analysis_article_pool(self, task, start_date, end_date):
  121. raise NotImplementedError()
  122. async def deal(self, params):
  123. start_date = params.get("start_date")
  124. end_date = params.get("end_date")
  125. media_type = params.get("media_type", "video")
  126. sub_task = params.get("sub_task_name", self.CRAWLER_DETAIL_TASK_PLATFORM)
  127. column_dict = self.create_feishu_column_map()
  128. match media_type:
  129. case "video":
  130. response = await self.analysis_video_pool(sub_task, start_date, end_date)
  131. column_list = list(response[0].keys())
  132. columns = [column_dict[key] for key in column_list]
  133. print(columns)
  134. await feishu_robot.bot(
  135. title=f"[{start_date}, {end_date}) 抓取视频分平台统计",
  136. detail={
  137. "columns": columns,
  138. "rows": response,
  139. },
  140. table=True,
  141. mention=False,
  142. )
  143. print("bot 成功")
  144. case "article":
  145. resource = await self.analysis_article_pool(sub_task, start_date, end_date)
  146. case _:
  147. pass