luojunhui 6 일 전
부모
커밋
86d1ccd995

+ 2 - 0
applications/tasks/__init__.py

@@ -1 +1,3 @@
 from .task_scheduler import TaskScheduler
+
+__all__ = ["TaskScheduler"]

+ 11 - 2
applications/tasks/algorithm_tasks/account_category_analysis.py

@@ -139,7 +139,12 @@ class AccountCategoryAnalysis(CategoryRegression, AccountCategoryConst):
         params, t_stats, p_values = self.run_ols_linear_regression(
             sub_df, self.view_only, self.P_VALUE_THRESHOLD
         )
-        current_record = {"dt": end_dt, "gh_id": account_id, "category_map": {}, "name": account_name}
+        current_record = {
+            "dt": end_dt,
+            "gh_id": account_id,
+            "category_map": {},
+            "name": account_name,
+        }
         params_names = self.get_param_names()
         for name, param, p_value in zip(params_names, params, p_values):
             category_name = param_to_category_map.get(name, None)
@@ -197,5 +202,9 @@ class AccountCategoryAnalysis(CategoryRegression, AccountCategoryConst):
 
         for account_id in tqdm(account_ids, desc="analysis each account"):
             await self.predict_each_account(
-                pre_processed_dataframe, account_id, account_id_map, end_dt, param_to_category_map
+                pre_processed_dataframe,
+                account_id,
+                account_id_map,
+                end_dt,
+                param_to_category_map,
             )

+ 3 - 0
applications/tasks/analysis_task/__init__.py

@@ -0,0 +1,3 @@
+from .crawler_detail import CrawlerDetailDeal
+
+__all__ = ["CrawlerDetailDeal"]

+ 173 - 0
applications/tasks/analysis_task/crawler_detail.py

@@ -0,0 +1,173 @@
+
+from applications.api import feishu_robot
+
+
+class CrawlerDetailAnalysisConst:
+    CATEGORY_LIST = [
+        "知识科普",
+        "国家大事",
+        "历史人物",
+        "奇闻趣事",
+        "名人八卦",
+        "怀旧时光",
+        "情感故事",
+        "社会法治",
+        "现代人物",
+        "社会现象",
+        "健康养生",
+        "家长里短",
+        "军事历史",
+        "财经科技",
+        "政治新闻",
+    ]
+
+    TRANSFORMED_STATUS = 1
+    NOT_TRANSFORMED_STATUS = 0
+
+    CRAWLER_DETAIL_TASK_PLATFORM = "crawler_detail_by_platform"
+    CRAWLER_DETAIL_TASK_CATEGORY = "crawler_detail_by_category"
+
+    TRANSFORM_DETAIL_TASK_PLATFORM = "transform_detail_by_platform"
+    TRANSFORM_DETAIL_TASK_CATEGORY = "transform_detail_by_category"
+
+
+class CrawlerDetailBase(CrawlerDetailAnalysisConst):
+    def __init__(self):
+        super().__init__()
+
+    @staticmethod
+    def create_feishu_column_map() -> dict:
+        date_column = feishu_robot.create_feishu_columns_sheet(
+            sheet_type="plain_text", sheet_name="dt", display_name="日期"
+        )
+        category_column = feishu_robot.create_feishu_columns_sheet(
+            sheet_type="plain_text", sheet_name="category", display_name="文章品类"
+        )
+        platform_column = feishu_robot.create_feishu_columns_sheet(
+            sheet_type="plain_text", sheet_name="platform", display_name="抓取渠道"
+        )
+        video_cnt_column = feishu_robot.create_feishu_columns_sheet(
+            sheet_type="number", sheet_name="video_count", display_name="视频数量"
+        )
+        avg_score_column = feishu_robot.create_feishu_columns_sheet(
+            sheet_type="number",
+            sheet_name="average_similarity_score",
+            display_name="相关性分均值",
+        )
+        return {
+            "dt": date_column,
+            "category": category_column,
+            "platform": platform_column,
+            "video_count": video_cnt_column,
+            "average_similarity_score": avg_score_column,
+        }
+
+
+class CrawlerArticleDetailAnalysis(CrawlerDetailBase):
+    pass
+    # raise NotImplementedError
+
+
+class CrawlerVideoDetailAnalysis(CrawlerDetailBase):
+    def __init__(self, pool, trace_id):
+        super().__init__()
+        self.pool = pool
+        self.trace_id = trace_id
+
+    async def get_crawler_videos_by_platform(self, start_date, end_date):
+        """
+        获取 start_dt && end_dt 之间每个渠道抓取的视频数量
+        """
+        query = """
+            SELECT CAST(DATE(FROM_UNIXTIME(crawler_timestamp)) AS CHAR) AS dt, platform, count(1) AS video_count
+            FROM publish_single_video_source
+            WHERE crawler_timestamp BETWEEN UNIX_TIMESTAMP(%s) AND UNIX_TIMESTAMP(%s)
+            GROUP BY dt, platform;
+        """
+        return await self.pool.async_fetch(query=query, params=(start_date, end_date))
+
+    async def get_crawler_videos_by_category(self, start_date, end_date):
+        """
+        获取 start_dt && end_dt 之间每个品类抓取的视频数量
+        """
+        category_place_holders = ", ".join(["%s"] * len(self.CATEGORY_LIST))
+        query = f"""
+            SELECT CAST(DATE(FROM_UNIXTIME(crawler_timestamp)) AS CHAR) AS dt, category, count(1) AS video_count
+            FROM publish_single_video_source
+            WHERE crawler_timestamp BETWEEN UNIX_TIMESTAMP(%s) AND UNIX_TIMESTAMP(%s)
+              AND category IN ({category_place_holders})
+            GROUP BY dt, category;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=tuple([start_date, end_date] + self.CATEGORY_LIST)
+        )
+
+    async def get_transform_videos_by_platform(self, start_date, end_date):
+        query = """
+            SELECT CAST(DATE(create_timestamp) AS CHAR) AS dt, platform, 
+                   count(*) AS video_count, avg(score) AS average_similarity_score
+            FROM single_video_transform_queue
+            WHERE create_timestamp BETWEEN %s AND %s AND status = %s
+            GROUP BY dt, platform;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(start_date, end_date, self.TRANSFORMED_STATUS)
+        )
+
+    async def get_transform_videos_by_category(self, start_date, end_date):
+        raise NotImplementedError()
+
+
+class CrawlerDetailDeal(CrawlerVideoDetailAnalysis, CrawlerArticleDetailAnalysis):
+    def __init__(self, pool, trace_id):
+        super().__init__(pool, trace_id)
+
+    async def analysis_video_pool(self, task, start_date, end_date):
+        match task:
+            case self.CRAWLER_DETAIL_TASK_PLATFORM:
+                return await self.get_crawler_videos_by_platform(start_date, end_date)
+
+            case self.CRAWLER_DETAIL_TASK_CATEGORY:
+                return await self.get_crawler_videos_by_category(start_date, end_date)
+
+            case self.TRANSFORM_DETAIL_TASK_PLATFORM:
+                return await self.get_transform_videos_by_platform(start_date, end_date)
+
+            case self.TRANSFORM_DETAIL_TASK_CATEGORY:
+                return await self.get_transform_videos_by_category(start_date, end_date)
+
+            case _:
+                return None
+
+    async def analysis_article_pool(self, task, start_date, end_date):
+        raise NotImplementedError()
+
+    async def deal(self, params):
+        start_date = params.get("start_date")
+        end_date = params.get("end_date")
+        media_type = params.get("media_type", "video")
+        sub_task = params.get("sub_task_name", self.CRAWLER_DETAIL_TASK_PLATFORM)
+
+        column_dict = self.create_feishu_column_map()
+
+        match media_type:
+            case "video":
+                crawler_detail = await self.analysis_video_pool(sub_task, start_date, end_date)
+
+            case "article":
+                crawler_detail = await self.analysis_article_pool(sub_task, start_date, end_date)
+            case _:
+                return None
+
+        column_list = list(crawler_detail[0].keys())
+        columns = [column_dict[key] for key in column_list]
+        await feishu_robot.bot(
+            title=f"[{start_date}, {end_date}) 抓取 {media_type} 统计",
+            detail={
+                "columns": columns,
+                "rows": crawler_detail,
+            },
+            table=True,
+            mention=False,
+        )
+        return None

+ 1 - 1
applications/tasks/cold_start_tasks/article_pool/article_pool_filter_strategy.py

@@ -121,7 +121,7 @@ class ArticlePoolFilterStrategy(ArticlePoolColdStartConst):
                 self.cold_start_records.append(
                     {
                         "category": category,
-                        "cold_start_num": min(daily_article_num, len(filter_df))
+                        "cold_start_num": min(daily_article_num, len(filter_df)),
                     }
                 )
 

+ 18 - 12
applications/tasks/cold_start_tasks/article_pool_cold_start.py

@@ -16,7 +16,7 @@ from applications.config import cold_start_category_map, input_source_map
 from applications.utils import get_titles_from_produce_plan
 from applications.tasks.cold_start_tasks.article_pool import (
     ArticlePoolColdStartStrategy,
-    ArticlePoolFilterStrategy
+    ArticlePoolFilterStrategy,
 )
 
 
@@ -111,13 +111,13 @@ class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrate
         return affect_rows
 
     async def create_crawler_plan_and_bind_to_produce_plan(
-            self,
-            strategy: str,
-            crawl_method: str,
-            category: str,
-            platform: str,
-            url_list: List[str],
-            plan_id: str,
+        self,
+        strategy: str,
+        crawl_method: str,
+        category: str,
+        platform: str,
+        url_list: List[str],
+        plan_id: str,
     ):
         # create_crawler_plan
         crawler_plan_response = await auto_create_crawler_task(
@@ -202,16 +202,23 @@ class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrate
                     url_list = filter_category_df["link"].values.tolist()
                     if url_list:
                         await self.create_crawler_plan_and_bind_to_produce_plan(
-                            strategy, crawl_method, ai_category, platform, url_list, plan_id
+                            strategy,
+                            crawl_method,
+                            ai_category,
+                            platform,
+                            url_list,
+                            plan_id,
                         )
                         # change article status
-                        article_id_list = filter_category_df["article_id"].values.tolist()
+                        article_id_list = filter_category_df[
+                            "article_id"
+                        ].values.tolist()
                         await self.change_article_status_while_publishing(
                             article_id_list=article_id_list
                         )
 
             case "strategy_v2":
-                url_list =  filter_article_df["link"].values.tolist()
+                url_list = filter_article_df["link"].values.tolist()
                 await self.create_crawler_plan_and_bind_to_produce_plan(
                     strategy, crawl_method, category, platform, url_list, plan_id
                 )
@@ -221,7 +228,6 @@ class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrate
                     article_id_list=article_id_list
                 )
 
-
     async def deal(
         self,
         platform: str,

+ 11 - 9
applications/tasks/cold_start_tasks/video_pool/video_pool_audit_strategy.py

@@ -18,7 +18,9 @@ class VideoPoolAuditStrategy(VideoPoolConst):
             SET audit_status = %s 
             WHERE audit_video_id = %s AND audit_status = %s;
         """
-        return await self.pool.async_save(query=query, params=(new_status, video_id, ori_status))
+        return await self.pool.async_save(
+            query=query, params=(new_status, video_id, ori_status)
+        )
 
     async def get_auditing_video_list(self):
         """get auditing video list"""
@@ -27,13 +29,13 @@ class VideoPoolAuditStrategy(VideoPoolConst):
             from publish_single_video_source
             where audit_status = %s
         """
-        return await self.pool.async_fetch(query=query, params=(-1, ))
+        return await self.pool.async_fetch(query=query, params=(-1,))
 
     async def get_video_audit_info(self, video_obj):
         """
         get audit video info from piaoquan
         """
-        video_id = video_obj['audit_video_id']
+        video_id = video_obj["audit_video_id"]
         response = await fetch_piaoquan_video_list_detail([video_id])
         response_data = response.get("data")
         if not response_data:
@@ -50,7 +52,7 @@ class VideoPoolAuditStrategy(VideoPoolConst):
                     affected_rows = await self.update_video_audit_status(
                         video_id=video_id,
                         ori_status=self.VIDEO_AUDIT_PROCESSING_STATUS,
-                        new_status=self.VIDEO_AUDIT_SUCCESS_STATUS
+                        new_status=self.VIDEO_AUDIT_SUCCESS_STATUS,
                     )
                     # 将视频存储到任务队列
                     self.insert_into_task_queue(video_obj)
@@ -59,9 +61,9 @@ class VideoPoolAuditStrategy(VideoPoolConst):
                     await insert_crawler_relation_to_aigc_system(
                         relation_list=[
                             {
-                                "videoPoolTraceId": video_obj['content_trace_id'],
+                                "videoPoolTraceId": video_obj["content_trace_id"],
                                 "channelContentId": str(video_id),
-                                "platform": video_obj['platform'],
+                                "platform": video_obj["platform"],
                             }
                         ]
                     )
@@ -70,7 +72,7 @@ class VideoPoolAuditStrategy(VideoPoolConst):
                     affected_rows = await self.update_video_audit_status(
                         video_id=video_id,
                         ori_status=self.VIDEO_AUDIT_PROCESSING_STATUS,
-                        new_status=self.VIDEO_TITLE_GENERATE_FAIL_STATUS
+                        new_status=self.VIDEO_TITLE_GENERATE_FAIL_STATUS,
                     )
 
             case self.PQ_AUDIT_SELF_VISIBLE_STATUS, self.PQ_AUDIT_FAIL_STATUS:
@@ -78,7 +80,7 @@ class VideoPoolAuditStrategy(VideoPoolConst):
                 affected_rows = await self.update_video_audit_status(
                     video_id=video_id,
                     ori_status=self.VIDEO_AUDIT_PROCESSING_STATUS,
-                    new_status=self.VIDEO_AUDIT_FAIL_STATUS
+                    new_status=self.VIDEO_AUDIT_FAIL_STATUS,
                 )
 
             case self.PQ_AUDIT_PROCESSING_STATUS:
@@ -90,5 +92,5 @@ class VideoPoolAuditStrategy(VideoPoolConst):
         return {
             "affected_rows": affected_rows,
             "video_id": video_id,
-            "audit_status": audit_status
+            "audit_status": audit_status,
         }

+ 1 - 1
applications/tasks/cold_start_tasks/video_pool/video_pool_const.py

@@ -72,4 +72,4 @@ class VideoPoolConst:
     INIT_STATUS = 0
     PROCESSING_STATUS = 1
     SUCCESS_STATUS = 2
-    FAIL_STATUS = 99
+    FAIL_STATUS = 99

+ 1 - 3
applications/tasks/cold_start_tasks/video_pool_cold_start.py

@@ -1,4 +1,2 @@
-
-
 class VideoPoolColdStart:
-    pass
+    pass

+ 22 - 6
applications/tasks/crawler_tasks/crawler_gzh.py

@@ -52,7 +52,8 @@ class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
                     where account_category = %s and is_using = %s and daily_scrape = %s;
                 """
                 return await self.pool.async_fetch(
-                    query=query, params=(method, self.USING_STATUS, self.DAILY_SCRAPE_POSTIVE)
+                    query=query,
+                    params=(method, self.USING_STATUS, self.DAILY_SCRAPE_POSTIVE),
                 )
 
             case "V2":
@@ -63,7 +64,8 @@ class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
                     order by recent_score_ci_lower desc limit %s; 
                 """
                 return await self.pool.async_fetch(
-                    query=query, params=(method, self.USING_STATUS, self.CRAWL_ACCOUNT_FIRST_LEVEL)
+                    query=query,
+                    params=(method, self.USING_STATUS, self.CRAWL_ACCOUNT_FIRST_LEVEL),
                 )
 
             case _:
@@ -161,7 +163,15 @@ class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
                 """
                 insert_rows = await self.pool.async_save(
                     query=insert_query,
-                    params=(gh_id, account_name, position, read_avg, today_dt, self.USING_STATUS, remark),
+                    params=(
+                        gh_id,
+                        account_name,
+                        position,
+                        read_avg,
+                        today_dt,
+                        self.USING_STATUS,
+                        remark,
+                    ),
                 )
                 if insert_rows:
                     update_query = """
@@ -284,14 +294,18 @@ class CrawlerGzhSearchArticles(CrawlerGzhBaseStrategy):
     def __init__(self, pool, log_client, trace_id):
         super().__init__(pool, log_client, trace_id)
 
-    async def crawl_search_articles_detail(self, article_list: List[Dict], source_title: str):
+    async def crawl_search_articles_detail(
+        self, article_list: List[Dict], source_title: str
+    ):
         """
         @description: 对于搜索到的文章list,获取文章详情, 并且存储到meta表中
         """
         for article in tqdm(article_list, desc="获取搜索结果详情"):
             print(f"{datetime.now()}: start crawling article: {article['title']}")
             url = article["url"]
-            detail_response = await get_article_detail(url, is_count=True, is_cache=False)
+            detail_response = await get_article_detail(
+                url, is_count=True, is_cache=False
+            )
             if not detail_response:
                 continue
 
@@ -351,7 +365,9 @@ class CrawlerGzhSearchArticles(CrawlerGzhBaseStrategy):
             try:
                 await self.search_each_title(hot_title)
             except Exception as e:
-                print(f"crawler_gzh_articles error:{e}\nexception:{traceback.format_exc()}")
+                print(
+                    f"crawler_gzh_articles error:{e}\nexception:{traceback.format_exc()}"
+                )
 
             print(f"{datetime.now()}: finish searched hot title: {hot_title}")
         await feishu_robot.bot(

+ 10 - 6
applications/tasks/data_recycle_tasks/recycle_daily_publish_articles.py

@@ -29,9 +29,9 @@ class Const:
         "gh_cd041ed721e6",
         "gh_62d7f423f382",
         "gh_043223059726",
-        'gh_6cfd1132df94',
-        'gh_7f5075624a50',
-        'gh_d4dffc34ac39',
+        "gh_6cfd1132df94",
+        "gh_7f5075624a50",
+        "gh_d4dffc34ac39",
         'gh_c69776baf2cd',
         'gh_9877c8541764'
     ]
@@ -325,7 +325,9 @@ class UpdateRootSourceIdAndUpdateTimeTask(Const):
                 mini_program = data.get("mini_program", [])
                 if mini_program:
                     root_source_id_list = [
-                        urllib.parse.parse_qs(urllib.parse.unquote(i["path"])).get("rootSourceId", [""])[0]
+                        urllib.parse.parse_qs(urllib.parse.unquote(i["path"])).get(
+                            "rootSourceId", [""]
+                        )[0]
                         for i in mini_program
                     ]
                 else:
@@ -364,7 +366,7 @@ class UpdateRootSourceIdAndUpdateTimeTask(Const):
             ),
         )
         if publish_timestamp_s == self.REQUEST_FAIL_STATUS:
-            article['wx_sn'] = wx_sn
+            article["wx_sn"] = wx_sn
             return article
         else:
             return None
@@ -393,7 +395,9 @@ class UpdateRootSourceIdAndUpdateTimeTask(Const):
             set publish_timestamp = updateTime
             where publish_timestamp < %s;
         """
-        affected_rows_2 = await self.pool.async_save(query=update_sql_2, params=(0,), db_name="piaoquan_crawler")
+        affected_rows_2 = await self.pool.async_save(
+            query=update_sql_2, params=(0,), db_name="piaoquan_crawler"
+        )
         if affected_rows_1 or affected_rows_2:
             await feishu_robot.bot(
                 title="执行兜底修改发布时间戳",

+ 23 - 2
applications/tasks/task_handler.py

@@ -1,5 +1,6 @@
 from datetime import datetime
 
+from applications.tasks.analysis_task import CrawlerDetailDeal
 from applications.tasks.algorithm_tasks import AccountCategoryAnalysis
 from applications.tasks.cold_start_tasks import ArticlePoolColdStart
 from applications.tasks.crawler_tasks import CrawlerToutiao
@@ -109,7 +110,12 @@ class TaskHandler(TaskMapper):
         crawler_methods = self.data.get("crawler_methods", [])
         category_list = self.data.get("category_list", [])
         strategy = self.data.get("strategy", "strategy_v1")
-        await cold_start.deal(platform=platform, crawl_methods=crawler_methods, category_list=category_list, strategy=strategy)
+        await cold_start.deal(
+            platform=platform,
+            crawl_methods=crawler_methods,
+            category_list=category_list,
+            strategy=strategy,
+        )
         return self.TASK_SUCCESS_STATUS
 
     async def _candidate_account_quality_score_handler(self) -> int:
@@ -142,6 +148,7 @@ class TaskHandler(TaskMapper):
         await task.deal(platform=platform, account_id_list=account_id_list)
         return self.TASK_SUCCESS_STATUS
 
+    # 抓取公众号文章
     async def _crawler_gzh_article_handler(self) -> int:
         account_method = self.data.get("account_method")
         crawl_mode = self.data.get("crawl_mode")
@@ -161,15 +168,29 @@ class TaskHandler(TaskMapper):
                 raise ValueError(f"Unsupported crawl mode {crawl_mode}")
         return self.TASK_SUCCESS_STATUS
 
+    # 回收服务号文章
     async def _recycle_fwh_article_handler(self) -> int:
         task = RecycleFwhDailyPublishArticlesTask(self.db_client, self.log_client)
         await task.deal()
         return self.TASK_SUCCESS_STATUS
 
+    # 账号品类处理任务
     async def _account_category_analysis_handler(self) -> int:
         task = AccountCategoryAnalysis(
-            pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id, data=self.data, date_string=None
+            pool=self.db_client,
+            log_client=self.log_client,
+            trace_id=self.trace_id,
+            data=self.data,
+            date_string=None,
         )
         await task.deal()
         return self.TASK_SUCCESS_STATUS
 
+    # 抓取视频/文章详情分析统计
+    async def _crawler_article_analysis_handler(self) -> int:
+        task = CrawlerDetailDeal(pool=self.db_client, trace_id=self.trace_id)
+        await task.deal(params=self.data)
+        return self.TASK_SUCCESS_STATUS
+
+
+__all__ = ["TaskHandler"]

+ 3 - 0
applications/tasks/task_mapper.py

@@ -60,3 +60,6 @@ class TaskMapper(Const):
                 expire_duration = self.DEFAULT_TIMEOUT
 
         return {"expire_duration": expire_duration, "task_max_num": self.TASK_MAX_NUM}
+
+
+__all__ = ["TaskMapper"]

+ 8 - 1
applications/tasks/task_scheduler.py

@@ -151,7 +151,9 @@ class TaskScheduler(TaskHandler):
                 "4003", "task_name must be input"
             )
 
-        date_str = self.data.get("date_string") or (datetime.utcnow() + timedelta(hours=8)).strftime("%Y-%m-%d")
+        date_str = self.data.get("date_string") or (
+            datetime.utcnow() + timedelta(hours=8)
+        ).strftime("%Y-%m-%d")
 
         # === 所有任务在此注册:映射到一个返回 int 状态码的异步函数 ===
         handlers: Dict[str, Callable[[], Awaitable[int]]] = {
@@ -189,6 +191,8 @@ class TaskScheduler(TaskHandler):
             "fwh_daily_recycle": self._recycle_fwh_article_handler,
             # 发文账号品类分析
             "account_category_analysis": self._account_category_analysis_handler,
+            # 抓取 文章/视频 数量分析
+            "crawler_detail_analysis": self._crawler_article_analysis_handler,
         }
 
         if task_name not in handlers:
@@ -196,3 +200,6 @@ class TaskScheduler(TaskHandler):
                 "4001", "wrong task name input"
             )
         return await self._run_with_guard(task_name, date_str, handlers[task_name])
+
+
+__all__ = ["TaskScheduler"]