Explorar o código

Merge branch 'feature/luojunhui/2025-08-14-front-improve' of Server/LongArticleTaskServer into master

luojunhui hai 3 semanas
pai
achega
e2e85057f8

+ 2 - 0
applications/api/__init__.py

@@ -23,6 +23,7 @@ from .aliyun_log_api import log
 from .async_aigc_system_api import delete_illegal_gzh_articles
 from .async_aigc_system_api import auto_create_crawler_task
 from .async_aigc_system_api import auto_bind_crawler_task_to_generate_task
+from .async_aigc_system_api import insert_crawler_relation_to_aigc_system
 
 feishu_robot = FeishuBotApi()
 feishu_sheet = FeishuSheetApi()
@@ -42,4 +43,5 @@ __all__ = [
     "auto_create_crawler_task",
     "auto_bind_crawler_task_to_generate_task",
     "AsyncElasticSearchClient",
+    "insert_crawler_relation_to_aigc_system"
 ]

+ 15 - 0
applications/api/async_aigc_system_api.py

@@ -1,4 +1,5 @@
 import json
+from typing import Optional, Dict, List, TypedDict
 from applications.utils import AsyncHttpClient
 
 HEADERS = {
@@ -23,6 +24,11 @@ PERSON_COOKIE = {
     "uid": 1,
 }
 
+class RelationDict(TypedDict):
+    videoPoolTraceId: str
+    channelContentId: str
+    platform: str
+
 
 async def delete_illegal_gzh_articles(gh_id: str, title: str):
     """
@@ -166,3 +172,12 @@ async def get_generate_task_detail(generate_task_id):
         return res["data"]
     else:
         return {}
+
+
+async def insert_crawler_relation_to_aigc_system(relation_list: List[RelationDict]) -> Optional[Dict]:
+        url = "http://aigc-api.cybertogether.net/aigc/crawler/content/videoPoolCrawlerRelation"
+        payload = json.dumps({"params": {"relations": relation_list}})
+        async with AsyncHttpClient(timeout=60) as client:
+            res = await client.post(url=url, headers=HEADERS, data=payload)
+
+        return res

+ 5 - 1
applications/config/__init__.py

@@ -17,6 +17,9 @@ from .elastic_search_mappings import es_index, es_mappings, es_settings
 # cold start config
 from .cold_start_config import category_config, input_source_map
 
+# name config
+from .task_chinese_name import name_map
+
 __all__ = [
     "aigc_db_config",
     "long_video_db_config",
@@ -29,5 +32,6 @@ __all__ = [
     "es_mappings",
     "es_settings",
     "category_config",
-    "input_source_map"
+    "input_source_map",
+    "name_map"
 ]

+ 17 - 0
applications/config/task_chinese_name.py

@@ -0,0 +1,17 @@
+name_map = {
+    "title_rewrite": "标题重写",
+    "crawler_gzh_articles": "抓取公众号文章",
+    "crawler_account_manager": "抓取账号管理",
+    "article_pool_category_generation": "文章池品类生成",
+    "candidate_account_quality_analysis": "候选账号质量分析",
+    "article_pool_cold_start": "文章路冷启动",
+    "crawler_toutiao": "头条抓取",
+    "task_processing_monitor": "协程监测",
+    "update_root_source_id": "更新今日root_source_id",
+    "daily_publish_articles_recycle": "回收今日发文",
+    "inner_article_monitor": "账号发文违规监测",
+    "outside_article_monitor": "外部服务号发文监测",
+    "get_off_videos": "自动下架视频",
+    "check_publish_video_audit_status": "校验发布视频状态",
+    "check_kimi_balance": "检验kimi余额",
+}

+ 15 - 3
applications/service/task_manager_service.py

@@ -1,5 +1,8 @@
+import json
 from typing import Optional
 
+from applications.utils import get_task_chinese_name
+
 
 def _build_where(id_eq=None, date_string=None, trace_id=None, task_status=None):
     conds, params = [], []
@@ -25,6 +28,15 @@ def _build_where(id_eq=None, date_string=None, trace_id=None, task_status=None):
     return where_clause, params
 
 
+def _safe_json(v):
+    try:
+        if isinstance(v, (str, bytes, bytearray)):
+            return json.loads(v)
+        return v or {}
+    except Exception:
+        return {}
+
+
 class TaskConst:
     INIT_STATUS = 0
     PROCESSING_STATUS = 1
@@ -77,13 +89,13 @@ class TaskManagerService(TaskConst):
                 SELECT COUNT(1) AS cnt
                 FROM long_articles_task_manager
                 WHERE {where_clause}
-            """
+        """
         count_rows = await self.pool.async_fetch(query=sql_count, params=tuple(params))
         total = count_rows[0]["cnt"] if count_rows else 0
 
         # 5) 查询数据
         sql_list = f"""
-                SELECT id, date_string, task_name, task_status, start_timestamp, finish_timestamp, trace_id
+                SELECT id, date_string, task_status, start_timestamp, finish_timestamp, trace_id, data
                 FROM long_articles_task_manager
                 WHERE {where_clause}
                 ORDER BY {sort_by} {sort_dir}
@@ -95,7 +107,7 @@ class TaskManagerService(TaskConst):
             {
                 **r,
                 "status_text": self.STATUS_TEXT.get(r["task_status"], str(r["task_status"])),
-                "data_json": self.data
+                "task_name": get_task_chinese_name(_safe_json(r["data"]))
             }
             for r in rows
         ]

+ 0 - 0
applications/tasks/cold_start_tasks/video_pool/__init__.py


+ 94 - 0
applications/tasks/cold_start_tasks/video_pool/video_pool_audit_strategy.py

@@ -0,0 +1,94 @@
+from applications.api import fetch_piaoquan_video_list_detail
+from applications.api import insert_crawler_relation_to_aigc_system
+
+from .video_pool_const import VideoPoolConst
+
+
+class VideoPoolAuditStrategy(VideoPoolConst):
+    def __init__(self, pool, log_client, trace_id):
+        super().__init__()
+        self.pool = pool
+        self.log_client = log_client
+        self.trace_id = trace_id
+
+    async def update_video_audit_status(self, video_id, ori_status, new_status):
+        """修改视频审核状态"""
+        query = """
+            UPDATE publish_single_video_source
+            SET audit_status = %s 
+            WHERE audit_video_id = %s AND audit_status = %s;
+        """
+        return await self.pool.async_save(query=query, params=(new_status, video_id, ori_status))
+
+    async def get_auditing_video_list(self):
+        """get auditing video list"""
+        query = """
+            select content_trace_id, audit_video_id, score, platform 
+            from publish_single_video_source
+            where audit_status = %s
+        """
+        return await self.pool.async_fetch(query=query, params=(-1, ))
+
+    async def get_video_audit_info(self, video_obj):
+        """
+        get audit video info from piaoquan
+        """
+        video_id = video_obj['audit_video_id']
+        response = await fetch_piaoquan_video_list_detail([video_id])
+        response_data = response.get("data")
+        if not response_data:
+            audit_status = self.PQ_AUDIT_FAIL_STATUS
+        else:
+            audit_status = response_data[0].get("auditStatus")
+
+        match audit_status:
+            case self.PQ_AUDIT_SUCCESS_STATUS:
+                # 更新小程序标题字段
+                mini_program_title_flag = self.update_mini_program_title(video_id)
+                if mini_program_title_flag:
+                    # 处理成功,修改审核状态为1
+                    affected_rows = await self.update_video_audit_status(
+                        video_id=video_id,
+                        ori_status=self.VIDEO_AUDIT_PROCESSING_STATUS,
+                        new_status=self.VIDEO_AUDIT_SUCCESS_STATUS
+                    )
+                    # 将视频存储到任务队列
+                    self.insert_into_task_queue(video_obj)
+
+                    # 将视频存储到 aigc 表
+                    await insert_crawler_relation_to_aigc_system(
+                        relation_list=[
+                            {
+                                "videoPoolTraceId": video_obj['content_trace_id'],
+                                "channelContentId": str(video_id),
+                                "platform": video_obj['platform'],
+                            }
+                        ]
+                    )
+                else:
+                    # 修改小程序标题失败,修改审核状态为4
+                    affected_rows = await self.update_video_audit_status(
+                        video_id=video_id,
+                        ori_status=self.VIDEO_AUDIT_PROCESSING_STATUS,
+                        new_status=self.VIDEO_TITLE_GENERATE_FAIL_STATUS
+                    )
+
+            case self.PQ_AUDIT_SELF_VISIBLE_STATUS, self.PQ_AUDIT_FAIL_STATUS:
+                # 视频审核失败,修改审核状态为2
+                affected_rows = await self.update_video_audit_status(
+                    video_id=video_id,
+                    ori_status=self.VIDEO_AUDIT_PROCESSING_STATUS,
+                    new_status=self.VIDEO_AUDIT_FAIL_STATUS
+                )
+
+            case self.PQ_AUDIT_PROCESSING_STATUS:
+                affected_rows = 0
+
+            case _:
+                affected_rows = 0
+
+        return {
+            "affected_rows": affected_rows,
+            "video_id": video_id,
+            "audit_status": audit_status
+        }

+ 75 - 0
applications/tasks/cold_start_tasks/video_pool/video_pool_const.py

@@ -0,0 +1,75 @@
+class VideoPoolConst:
+    """
+    微信视频抓取常量配置
+    """
+
+    # 账号抓取状态
+    ACCOUNT_CRAWL_STATUS = 1
+    ACCOUNT_DO_NOT_CRAWL_STATUS = 0
+
+    # 默认最早抓取时间戳(2024-01-01)
+    DEFAULT_TIMESTAMP = 1704038400
+
+    # 搜索爬虫最大页数
+    MAX_SEARCH_PAGE_NUM = 10
+
+    # 抓取每一页的等待时间
+    SLEEP_SECONDS = 5
+
+    # 种子标题最低阅读均值倍数
+    READ_AVG_MULTIPLE = 1.3
+
+    # 种子标题最低阅读量
+    MIN_READ_COUNT = 2000
+
+    # 获取种子标题的统计周期
+    STAT_PERIOD = 7 * 24 * 60 * 60
+
+    # 接口请求成功code
+    REQUEST_SUCCESS = 0
+    PUBLISHED_ILLEGAL_TITLE_CODE = 1015
+
+    # 是否需要扫描查询源账号
+    NEED_SCAN_SOURCE_ACCOUNT = 1
+    DO_NOT_NEED_SOURCE_ACCOUNT = 0
+
+    # 视频审核状态长文库
+    VIDEO_AUDIT_INIT_STATUS = 0
+    VIDEO_AUDIT_SUCCESS_STATUS = 1
+    VIDEO_AUDIT_FAIL_STATUS = 2
+    VIDEO_TITLE_GENERATE_FAIL_STATUS = 4
+    VIDEO_AUDIT_PROCESSING_STATUS = -1
+
+    # 票圈视频审核状态, 1 审核中,2 不通过 3 待修改,4 自己可见 5 通过
+    PQ_AUDIT_PROCESSING_STATUS = 1
+    PQ_AUDIT_FAIL_STATUS = 2
+    PQ_AUDIT_WAIT_STATUS = 3
+    PQ_AUDIT_SELF_VISIBLE_STATUS = 4
+    PQ_AUDIT_SUCCESS_STATUS = 5
+
+    # 默认账号
+    DEFAULT_ACCOUNT_UID = 76862180
+
+    # 每天发送的审核视频数量
+    MAX_VIDEO_NUM = 1000
+
+    # 单次发布视频审核量
+    MAX_VIDEO_NUM_PER_PUBLISH = 350
+
+    # 标题状态
+    TITLE_DEFAULT_STATUS = 0
+    TITLE_EXIT_STATUS = 1
+    TITLE_FESTIVAL_STATUS = 2
+    TITLE_SHORT_STATUS = 3
+
+    # 标题最短长度
+    TITLE_MIN_LENGTH = 15
+
+    # safe score
+    TITLE_SAFE_SCORE_THRESHOLD = 7
+
+    # Task Status
+    INIT_STATUS = 0
+    PROCESSING_STATUS = 1
+    SUCCESS_STATUS = 2
+    FAIL_STATUS = 99

+ 4 - 0
applications/tasks/cold_start_tasks/video_pool_cold_start.py

@@ -0,0 +1,4 @@
+
+
+class VideoPoolColdStart:
+    pass

+ 34 - 0
applications/utils/common.py

@@ -19,6 +19,7 @@ from tenacity import (
     wait_exponential,
     retry_if_exception_type,
 )
+from applications.config import name_map
 
 
 def str_to_md5(strings):
@@ -218,3 +219,36 @@ def ci_lower(data: List[int], conf: float = 0.95) -> float:
     # t 分位点(左侧):ppf 返回负值
     t_left = t.ppf((1 - conf) / 2, df=n - 1)
     return mean + t_left * std
+
+
+def get_task_chinese_name(data):
+    """
+    通过输入任务详情信息获取任务名称
+    """
+    task_name = data['task_name']
+    task_name_chinese = name_map.get(task_name, task_name)
+
+    # account_method
+    if task_name == 'crawler_gzh_articles':
+        account_method = data.get('account_method', '')
+        account_method = account_method.replace("account_association", "账号联想").replace("search", "")
+        crawl_mode = data.get('crawl_mode', '')
+        crawl_mode = crawl_mode.replace("search", "搜索").replace("account", "抓账号")
+        strategy = data.get('strategy', '')
+        return f"{task_name_chinese}\t{crawl_mode}\t{account_method}\t{strategy}"
+    elif task_name == 'article_pool_cold_start':
+        platform = data.get('platform', '')
+        platform = platform.replace('toutiao', '今日头条').replace("weixin", "微信")
+        strategy = data.get('strategy', '')
+        strategy = strategy.replace("strategy", "策略")
+        category_list = data.get('category_list', [])
+        category_list = "、".join(category_list)
+        crawler_methods = data.get('crawler_methods', [])
+        crawler_methods = "、".join(crawler_methods)
+        return f"{task_name_chinese}\t{platform}\t{crawler_methods}\t{category_list}\t{strategy}"
+    else:
+        return task_name_chinese
+
+
+
+