Browse Source

gzh抓取优化

luojunhui 3 weeks ago
parent
commit
667a0abfdc

+ 5 - 1
applications/config/__init__.py

@@ -17,6 +17,9 @@ from .elastic_search_mappings import es_index, es_mappings, es_settings
 # cold start config
 from .cold_start_config import category_config, input_source_map
 
+# name config
+from .task_chinese_name import name_map
+
 __all__ = [
     "aigc_db_config",
     "long_video_db_config",
@@ -29,5 +32,6 @@ __all__ = [
     "es_mappings",
     "es_settings",
     "category_config",
-    "input_source_map"
+    "input_source_map",
+    "name_map"
 ]

+ 17 - 0
applications/config/task_chinese_name.py

@@ -0,0 +1,17 @@
+name_map = {
+    "title_rewrite": "标题重写",
+    "crawler_gzh_articles": "公众号文章抓取",
+    "crawler_account_manager": "抓取账号管理",
+    "article_pool_category_generation": "文章池品类生成",
+    "candidate_account_quality_analysis": "候选账号质量分析",
+    "article_pool_cold_start": "文章路冷启动",
+    "crawler_toutiao": "头条抓取",
+    "task_processing_monitor": "协程监测",
+    "update_root_source_id": "更新今日root_source_id",
+    "daily_publish_articles_recycle": "回收今日发文",
+    "inner_article_monitor": "账号发文违规监测",
+    "outside_article_monitor": "外部服务号发文监测",
+    "get_off_videos": "自动下架视频",
+    "check_publish_video_audit_status": "校验发布视频状态",
+    "check_kimi_balance": "检验kimi余额",
+}

+ 15 - 3
applications/service/task_manager_service.py

@@ -1,5 +1,8 @@
+import json
 from typing import Optional
 
+from applications.utils import get_task_chinese_name
+
 
 def _build_where(id_eq=None, date_string=None, trace_id=None, task_status=None):
     conds, params = [], []
@@ -25,6 +28,15 @@ def _build_where(id_eq=None, date_string=None, trace_id=None, task_status=None):
     return where_clause, params
 
 
+def _safe_json(v):
+    try:
+        if isinstance(v, (str, bytes, bytearray)):
+            return json.loads(v)
+        return v or {}
+    except Exception:
+        return {}
+
+
 class TaskConst:
     INIT_STATUS = 0
     PROCESSING_STATUS = 1
@@ -77,13 +89,13 @@ class TaskManagerService(TaskConst):
                 SELECT COUNT(1) AS cnt
                 FROM long_articles_task_manager
                 WHERE {where_clause}
-            """
+        """
         count_rows = await self.pool.async_fetch(query=sql_count, params=tuple(params))
         total = count_rows[0]["cnt"] if count_rows else 0
 
         # 5) 查询数据
         sql_list = f"""
-                SELECT id, date_string, task_name, task_status, start_timestamp, finish_timestamp, trace_id
+                SELECT id, date_string, task_status, start_timestamp, finish_timestamp, trace_id, data
                 FROM long_articles_task_manager
                 WHERE {where_clause}
                 ORDER BY {sort_by} {sort_dir}
@@ -95,7 +107,7 @@ class TaskManagerService(TaskConst):
             {
                 **r,
                 "status_text": self.STATUS_TEXT.get(r["task_status"], str(r["task_status"])),
-                "data_json": self.data
+                "task_name": get_task_chinese_name(_safe_json(r["data"]))
             }
             for r in rows
         ]

+ 28 - 0
applications/utils/common.py

@@ -19,6 +19,7 @@ from tenacity import (
     wait_exponential,
     retry_if_exception_type,
 )
+from applications.config import name_map
 
 
 def str_to_md5(strings):
@@ -218,3 +219,30 @@ def ci_lower(data: List[int], conf: float = 0.95) -> float:
     # t 分位点(左侧):ppf 返回负值
     t_left = t.ppf((1 - conf) / 2, df=n - 1)
     return mean + t_left * std
+
+
+def get_task_chinese_name(data):
+    """
+    通过输入任务详情信息获取任务名称
+    """
+    task_name = data['task_name']
+    task_name_chinese = name_map.get(task_name, task_name)
+
+    # account_method
+    if task_name == 'crawler_gzh_articles':
+        account_method = data.get('account_method', '')
+        crawl_mode = data.get('crawl_mode', '')
+        strategy = data.get('strategy', '')
+        return f"{task_name_chinese}{crawl_mode}{account_method}{strategy}"
+    elif task_name == 'article_pool_cold_start':
+        platform = data.get('platform')
+        strategy = data.get('strategy')
+        category_list = data.get('category_list', [])
+        crawler_methods = data.get('crawler_methods', [])
+        return f"{task_name_chinese}{platform}{strategy}{'、'.join(crawler_methods)}{'、'.join(category_list)}"
+    else:
+        return task_name_chinese
+
+
+
+