Browse Source

Merge branch 'feature/luojunhui/20250729-add-toutiao-main-recommend' of Server/LongArticleTaskServer into master

luojunhui 1 month ago
parent
commit
8f70d39f34

+ 17 - 1
README.md

@@ -45,6 +45,7 @@ docker compose up -d
 │   │   │   ├── blogger.py
 │   │   │   ├── detail_recommend.py
 │   │   │   ├── main_page_recomend.py
+│   │   │   ├── search.py
 │   │   │   ├── toutiao.js
 │   │   │   └── use_js.py
 │   │   └── wechat
@@ -62,6 +63,9 @@ docker compose up -d
 │   │   └── log_service.py
 │   ├── tasks
 │   │   ├── __init__.py
+│   │   ├── cold_start_tasks
+│   │   │   ├── __init__.py
+│   │   │   └── article_pool_cold_start.py
 │   │   ├── crawler_tasks
 │   │   │   ├── __init__.py
 │   │   │   └── crawler_toutiao.py
@@ -70,6 +74,7 @@ docker compose up -d
 │   │   │   └── recycle_daily_publish_articles.py
 │   │   ├── llm_tasks
 │   │   │   ├── __init__.py
+│   │   │   ├── candidate_account_process.py
 │   │   │   └── process_title.py
 │   │   ├── monitor_tasks
 │   │   │   ├── __init__.py
@@ -78,15 +83,26 @@ docker compose up -d
 │   │   │   ├── kimi_balance.py
 │   │   │   └── task_processing_monitor.py
 │   │   ├── task_mapper.py
-│   │   └── task_scheduler.py
+│   │   ├── task_scheduler.py
+│   │   └── task_scheduler_v2.py
 │   └── utils
 │       ├── __init__.py
+│       ├── aigc_system_database.py
 │       ├── async_apollo_client.py
 │       ├── async_http_client.py
+│       ├── async_mysql_utils.py
 │       ├── common.py
 │       ├── get_cover.py
 │       ├── item.py
 │       └── response.py
+├── dev
+│   ├── code.py
+│   ├── dev.py
+│   ├── run_task_dev.py
+│   ├── sample.txt
+│   ├── title.json
+│   └── totp.py
+├── dev.py
 ├── docker-compose.yaml
 ├── myapp.log
 ├── requirements.txt

+ 3 - 0
applications/ab_test/__init__.py

@@ -1 +1,4 @@
 from .get_cover import GetCoverService
+
+
+__all__ = ["GetCoverService"]

+ 16 - 0
applications/api/__init__.py

@@ -27,3 +27,19 @@ from .async_aigc_system_api import auto_bind_crawler_task_to_generate_task
 feishu_robot = FeishuBotApi()
 feishu_sheet = FeishuSheetApi()
 task_apollo = AsyncApolloApi()
+
+__all__ = [
+    "feishu_robot",
+    "feishu_sheet",
+    "change_video_audit_status",
+    "publish_video_to_piaoquan",
+    "fetch_piaoquan_video_list_detail",
+    "AsyncApolloApi",
+    "task_apollo",
+    "fetch_deepseek_completion",
+    "log",
+    "delete_illegal_gzh_articles",
+    "auto_create_crawler_task",
+    "auto_bind_crawler_task_to_generate_task",
+    "AsyncElasticSearchClient",
+]

+ 13 - 0
applications/config/__init__.py

@@ -13,3 +13,16 @@ from .deepseek_config import deep_seek_official_api_key
 
 # es config
 from .elastic_search_mappings import es_index, es_mappings, es_settings
+
+__all__ = [
+    "aigc_db_config",
+    "long_video_db_config",
+    "long_articles_db_config",
+    "piaoquan_crawler_db_config",
+    "aliyun_log_config",
+    "deep_seek_official_model",
+    "deep_seek_official_api_key",
+    "es_index",
+    "es_mappings",
+    "es_settings",
+]

+ 2 - 0
applications/crawler/toutiao/__init__.py

@@ -2,5 +2,7 @@
 @author: luojunhui
 """
 
+from .search import search_in_toutiao
+from .search import get_toutiao_detail
 from .blogger import get_toutiao_account_info_list
 from .detail_recommend import get_associated_recommendation

+ 29 - 0
applications/crawler/toutiao/search.py

@@ -0,0 +1,29 @@
+import json
+
+from applications.utils import AsyncHttpClient
+
+
+async def search_in_toutiao(keyword):
+    url = "http://crawler-cn.aiddit.com/crawler/tou_tiao_hao/keyword"
+    data = {
+        "keyword": keyword,
+    }
+    headers = {
+        "Content-Type": "application/json",
+    }
+    async with AsyncHttpClient(timeout=120) as client:
+        response = await client.post(url, json=data, headers=headers)
+        return response
+
+
+async def get_toutiao_detail(link):
+    url = "http://crawler-cn.aiddit.com/crawler/tou_tiao_hao/detail"
+    data = {
+        "content_link": link,
+    }
+    headers = {
+        "Content-Type": "application/json",
+    }
+    async with AsyncHttpClient(timeout=120) as client:
+        response = await client.post(url, json=data, headers=headers)
+        return response

+ 8 - 0
applications/crawler/wechat/gzh_spider.py

@@ -121,3 +121,11 @@ def get_source_account_from_article(article_link) -> dict | None:
             data={"link": article_link},
         )
     return None
+
+
+@retry(**retry_desc)
+def search(keyword: str, page="1") -> dict | None:
+    url = "{}/keyword".format(base_url)
+    payload = json.dumps({"keyword": keyword, "cursor": page})
+    response = requests.request("POST", url, headers=headers, data=payload, timeout=120)
+    return response.json()

+ 60 - 55
applications/pipeline/crawler_pipeline.py

@@ -1,11 +1,22 @@
 import time
 
+from typing import Any, Dict, Tuple, Callable
+
+from pydantic import BaseModel
+
 from applications.api import AsyncApolloApi
 from applications.utils import CrawlerMetaArticle
+from applications.utils import CrawlerMetaAccount
 
 
 class CrawlerPipeline(AsyncApolloApi):
 
+    MODEL_TABLE_MAP: Dict[str, Tuple[type[BaseModel], str]] = {
+        "article": (CrawlerMetaArticle, "crawler_meta_article"),
+        "account": (CrawlerMetaAccount, "crawler_candidate_account_pool"),
+        # 如后续有新类型,直接在这里加即可
+    }
+
     def __init__(self, pool, log_client):
         super().__init__()
         self.pool = pool
@@ -23,64 +34,35 @@ class CrawlerPipeline(AsyncApolloApi):
         duplicated_id = await self.pool.async_fetch(query=query, params=(title,))
         return True if duplicated_id else False
 
-    async def save_article(self, article_item: dict) -> None:
-        """save articles into database"""
-        query = f"""
-            insert into crawler_meta_article
-            (platform, mode, category, out_account_id, article_index, title, link, 
-            read_cnt, like_cnt, description, publish_time, crawler_time, score, status,
-            unique_index, source_article_title, source_account, title_sensitivity)
-            values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s);
-        """
-        await self.pool.async_save(
-            query=query,
-            params=(
-                article_item.get("platform", "weixin"),
-                article_item.get("mode"),
-                article_item.get("category"),
-                article_item.get("out_account_id"),
-                article_item.get("article_index"),
-                article_item.get("title"),
-                article_item.get("link"),
-                article_item.get("read_cnt", 0),
-                article_item.get("like_cnt", 0),
-                article_item.get("description"),
-                article_item.get("publish_time"),
-                article_item.get("crawler_time", int(time.time())),
-                article_item.get("score"),
-                article_item.get("status", 1),
-                article_item.get("unique_index"),
-                article_item.get("source_article_title", None),
-                article_item.get("source_account", None),
-                article_item.get("title_sensitivity", 0),
-            ),
+    async def whether_account_exist(self, account_id: str, media_type: str) -> bool:
+        query = f"select id from crawler_candidate_account_pool where account_id = %s and media_type = %s;"
+        duplicated_id = await self.pool.async_fetch(
+            query=query, params=(account_id, media_type)
         )
+        return True if duplicated_id else False
 
-    async def save_article_v2(self, article_item: dict) -> None:
-        """save articles into database"""
-        new_article = CrawlerMetaArticle(**article_item)
-        new_article_dict = new_article.model_dump()
-        insert_template = (
-            """insert into crawler_meta_article ({columns}) values ({values});"""
-        )
-        insert_data = {k: v for k, v in new_article_dict.items() if v is not None}
-        columns = ", ".join(insert_data.keys())
-        values = ", ".join([f"%s" for i in range(len(insert_data))])
-        query = insert_template.format(columns=columns, values=values)
-        await self.pool.async_save(
-            query=query,
-            params=tuple(list(insert_data.values())),
-        )
+    async def save_single_record(self, media_type: str, item: dict) -> None:
+        try:
+            model_cls, table_name = self.MODEL_TABLE_MAP[media_type]
 
-    async def save_video(self, video_item: dict) -> str:
-        pass
+        except KeyError:
+            raise ValueError(f"Unknown media type: {media_type!r}")
 
-    async def save_item_to_database(self, media_type: str, item: dict):
+        record = model_cls(**item).model_dump(mode="python")
+        insert_data = {k: v for k, v in record.items() if v is not None}
+        if not insert_data:
+            raise ValueError("All fields are None, nothing to insert")
+
+        columns = ", ".join(f"`{col}`" for col in insert_data)
+        placeholders = ", ".join(["%s"] * len(insert_data))
+        sql = f"INSERT INTO `{table_name}` ({columns}) VALUES ({placeholders})"
+        await self.pool.async_save(sql, tuple(insert_data.values()))
+
+    async def save_item_to_database(self, media_type: str, item: dict, trace_id: str):
         """deal function"""
         match media_type:
             case "video":
-                await self.save_video(item)
-
+                await self.save_single_record(media_type, item)
             case "article":
                 log_data = {
                     "title": item["title"],
@@ -96,7 +78,6 @@ class CrawlerPipeline(AsyncApolloApi):
                         "code": 1001,
                     }
                 )
-                # 判断文章标题是否已经存在
                 if await self.whether_article_title_duplicate(log_data["title"]):
                     await self.log_client.log(
                         contents={
@@ -107,7 +88,7 @@ class CrawlerPipeline(AsyncApolloApi):
                         }
                     )
                     return
-                # 判断标题是否敏感
+
                 if await self.whether_title_sensitive(item["title"]):
                     await self.log_client.log(
                         contents={
@@ -118,8 +99,32 @@ class CrawlerPipeline(AsyncApolloApi):
                         }
                     )
                     item["title_sensitive"] = 1
-                # save article
-                await self.save_article_v2(item)
+
+                await self.save_single_record(media_type, item)
+                await self.log_client.log(
+                    contents={
+                        "trace_id": trace_id,
+                        "function": "save_article",
+                        "data": item,
+                        "message": "save article successfully",
+                    }
+                )
+
+            case "account":
+                if await self.whether_account_exist(
+                    item["account_id"], item["media_type"]
+                ):
+                    return
+
+                await self.save_single_record(media_type, item)
+                await self.log_client.log(
+                    contents={
+                        "trace_id": trace_id,
+                        "function": "save_account",
+                        "data": item,
+                        "message": "save account successfully",
+                    }
+                )
 
             case _:
                 raise Exception("Unknown media type")

+ 2 - 1
applications/tasks/__init__.py

@@ -1 +1,2 @@
-from .task_scheduler import TaskScheduler
+# from .task_scheduler import TaskScheduler
+from .task_scheduler_v2 import TaskScheduler

+ 3 - 0
applications/tasks/cold_start_tasks/__init__.py

@@ -1 +1,4 @@
 from .article_pool_cold_start import ArticlePoolColdStart
+
+
+__all__ = ["ArticlePoolColdStart"]

+ 2 - 0
applications/tasks/crawler_tasks/__init__.py

@@ -1 +1,3 @@
 from .crawler_toutiao import CrawlerToutiao
+
+__all__ = ["CrawlerToutiao"]

+ 263 - 14
applications/tasks/crawler_tasks/crawler_toutiao.py

@@ -1,15 +1,21 @@
 from __future__ import annotations
 
+import asyncio
 import json
 import time
+import aiohttp
 import traceback
+from datetime import datetime
 from typing import List, Dict
 
 from tqdm import tqdm
 
 from applications.api import feishu_robot
 from applications.crawler.toutiao import get_toutiao_account_info_list
+from applications.crawler.toutiao import search_in_toutiao
+from applications.crawler.toutiao import get_toutiao_detail
 from applications.pipeline import CrawlerPipeline
+from applications.utils import async_proxy, get_top_article_title_list
 
 
 class CrawlerToutiaoConst:
@@ -35,10 +41,37 @@ class CrawlerToutiaoConst:
     # sleep second
     SLEEP_SECOND = 3
 
+    RECOMMEND_TIMES = 10
+    # 文章模态
+    ARTICLE_TYPE = 1
+
 
 class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
-    def __init__(self, pool, log_client):
+    def __init__(self, pool, log_client, trace_id):
         super().__init__(pool, log_client)
+        self.trace_id = trace_id
+
+    async def get_request_params(self, category):
+        """
+        get request params
+        """
+        query = f"""
+            select request_method, request_url, request_headers, post_data
+            from toutiao_request_params
+            where category = %s and expire_flag = %s 
+            order by id desc limit 1;
+        """
+        response = await self.pool.async_fetch(query=query, params=(category, 0))
+        if not response:
+            now = datetime.now()
+            if 10 < now.hour < 21:
+                await feishu_robot.bot(
+                    title="今日头条推荐流,cookie 过期",
+                    detail={"info": "cookie expired"},
+                )
+            return None
+        else:
+            return response[0]
 
     async def get_account_list(self, media_type: str) -> List[dict]:
         """get toutiao account list"""
@@ -57,6 +90,16 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
             where platform = 'toutiao' and status = {self.TOUTIAO_ACCOUNT_GOOD_STATUS};
         """
         response = await self.pool.async_fetch(query)
+        await self.log_client.log(
+            contents={
+                "trace_id": self.trace_id,
+                "task": "crawler_toutiao",
+                "function": "get_account_list",
+                "message": f"get toutiao account list, media_type: {media_type}",
+                "status": "success",
+                "data": response,
+            }
+        )
         if not response:
             await feishu_robot.bot(
                 title=f"抓取头条账号内容任务: 任务模态:{media_type} 异常",
@@ -85,14 +128,12 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
             key="toutiao_blogger_cookie", output_type="string"
         )
         while has_more:
-            print(account_id, max_cursor)
             response = await get_toutiao_account_info_list(
                 account_id=account_id,
                 cookie=cookie,
                 media_type=media_type,
                 max_behot_time=current_cursor,
             )
-            print(response)
             if not response:
                 break
 
@@ -120,7 +161,6 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
                     raise Exception(f"unknown media type: {media_type}")
 
             crawler_info_list_bar = tqdm(info_list, desc=bar_description)
-            print(json.dumps(info_list, ensure_ascii=False, indent=4))
             for info in crawler_info_list_bar:
                 try:
                     crawler_info_list_bar.set_postfix({"id": info["id"]})
@@ -128,7 +168,9 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
                         case "video":
                             await self.crawler_each_video(info)
                         case "article":
-                            await self.crawler_each_article(info)
+                            await self.crawler_each_article(
+                                method="account", article_raw_data=info
+                            )
                         case _:
                             raise Exception(f"unknown media type: {media_type}")
 
@@ -140,24 +182,69 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
             else:
                 break
 
-    async def crawler_each_article(self, article_raw_data):
+    async def crawler_each_account(self, account_name, account_id, media_type, cookie):
+        """
+        get toutiao account info
+        """
+        new_account_item = {
+            "account_name": account_name,
+            "account_id": account_id,
+            "platform": self.PLATFORM,
+            "crawler_date": datetime.now().strftime("%Y-%m-%d"),
+            "media_type": media_type,
+        }
+        # get title_list
+        response = await get_toutiao_account_info_list(
+            account_id=account_id, cookie=cookie, media_type="article"
+        )
+        if not response:
+            return
+
+        article_raw_data = response["data"]
+        title_list = [i["title"] for i in article_raw_data]
+        new_account_item["title_list"] = json.dumps(title_list, ensure_ascii=False)
+        await self.save_item_to_database(
+            media_type="account", item=new_account_item, trace_id=self.trace_id
+        )
+
+    async def crawler_each_article(self, method, article_raw_data, category=None):
         """
         crawler each article
         """
-        new_article_item = {
+        # 公共字段提取
+        base_item = {
             "platform": self.PLATFORM,
-            "mode": "account",
-            "category": "toutiao_account_association",
+            "mode": method,
             "out_account_id": article_raw_data["user_info"]["user_id"],
             "title": article_raw_data["title"],
-            "link": f"https://www.toutiao.com/article/{article_raw_data['group_id']}",
             "read_cnt": article_raw_data["read_count"],
             "like_cnt": article_raw_data["like_count"],
-            "description": article_raw_data["abstract"],
             "publish_time": article_raw_data["publish_time"],
-            "unique_index": article_raw_data["group_id"],
+            "crawler_time": int(time.time()),
         }
-        await self.save_item_to_database(media_type="article", item=new_article_item)
+        match method:
+            case "account":
+                new_article_item = {
+                    **base_item,
+                    "category": "toutiao_account_association",
+                    "link": f"https://www.toutiao.com/article/{article_raw_data['group_id']}",
+                    "description": article_raw_data["abstract"],
+                    "unique_index": article_raw_data["group_id"],
+                }
+            case "recommend":
+                new_article_item = {
+                    **base_item,
+                    "category": category,
+                    "title": article_raw_data["title"],
+                    "link": f"https://www.toutiao.com/article/{article_raw_data['item_id']}",
+                    "description": article_raw_data["Abstract"],
+                    "unique_index": article_raw_data["item_id"],
+                }
+            case _:
+                raise Exception(f"unknown method: {method}")
+        await self.save_item_to_database(
+            media_type="article", item=new_article_item, trace_id=self.trace_id
+        )
 
     async def crawler_each_video(self, video_raw_data):
         pass
@@ -199,6 +286,7 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
                 query, (max_publish_timestamp, account_id, self.PLATFORM)
             )
 
+    # 获取个人主页文章/视频
     async def crawler_task(self, media_type: str) -> None:
         """
         class entrance
@@ -216,13 +304,24 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
                 await self.update_account_max_cursor(
                     media_type=media_type, account_id=account_id
                 )
+                await self.log_client.log(
+                    contents={
+                        "trace_id": self.trace_id,
+                        "task": "crawler_toutiao_account_info",
+                        "function": "crawler_task",
+                        "message": f"crawler account: {account_id} successfully, media type: {media_type}",
+                        "status": "success",
+                    }
+                )
 
             except Exception as e:
                 await self.log_client.log(
                     contents={
+                        "trace_id": self.trace_id,
                         "task": "crawler_toutiao_account_info",
                         "function": "crawler_task",
-                        "message": account_id,
+                        "message": f"crawler_account: {account_id} fail",
+                        "status": "fail",
                         "data": {
                             "media_type": media_type,
                             "error": str(e),
@@ -230,3 +329,153 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
                         },
                     }
                 )
+
+    async def crawler_recommend_articles(self, category: str) -> None:
+        cookie = await self.get_request_params(category=category)
+        if not cookie:
+            return
+
+        for crawler_time in range(self.RECOMMEND_TIMES):
+            try:
+                proxy_url = async_proxy()["url"]
+                proxy_auth = aiohttp.BasicAuth(
+                    async_proxy()["username"], async_proxy()["password"]
+                )
+                async with aiohttp.ClientSession() as session:
+                    async with session.request(
+                        method=cookie["request_method"],
+                        url=cookie["request_url"],
+                        headers=json.loads(cookie["request_headers"]),
+                        proxy=proxy_url,
+                        proxy_auth=proxy_auth,
+                    ) as response:
+                        response.raise_for_status()
+                        response_json = await response.json()
+
+                await self.log_client.log(
+                    contents={
+                        "task": "crawler_toutiao",
+                        "function": "crawler_recommend_articles",
+                        "message": f"crawler {category} articles, crawler time: {crawler_time + 1}",
+                        "trace_id": self.trace_id,
+                        "status": "success",
+                        "data": response_json,
+                    }
+                )
+            except Exception as e:
+                await self.log_client.log(
+                    contents={
+                        "task": "crawler_toutiao",
+                        "function": "crawler_recommend_articles",
+                        "message": f"crawler {category} articles, crawler time: {crawler_time + 1}",
+                        "status": "fail",
+                        "trace_id": self.trace_id,
+                        "data": {
+                            "error": str(e),
+                            "traceback": traceback.format_exc(),
+                        },
+                    }
+                )
+                continue
+
+            if not response_json:
+                continue
+            article_list = response_json["data"]
+            for article in article_list:
+                if article.get("article_url"):
+                    video_flag = article.get("has_video")
+                    if not video_flag:
+                        try:
+                            await self.crawler_each_article(
+                                method="recommend",
+                                article_raw_data=article,
+                                category=category,
+                            )
+                        except Exception as e:
+                            print(f"crawler_recommend_articles error: {e}")
+                    else:
+                        print("this is an video rather than article")
+                        continue
+                else:
+                    continue
+
+    # 抓推荐流
+    async def crawl_toutiao_recommend_task(self, category_list: List[str]) -> None:
+        if not category_list:
+            category_list = ["finance", "tech", "history", "entertainment"]
+
+        for category in category_list:
+            await self.crawler_recommend_articles(category=category)
+
+    # 搜索抓账号
+    async def search_candidate_accounts(self):
+        top_title_list = await get_top_article_title_list(pool=self.pool)
+        cookie = await self.get_config_value(
+            key="toutiao_blogger_cookie", output_type="string"
+        )
+        for article in top_title_list:
+            title = article["title"]
+            try:
+                search_response = await search_in_toutiao(keyword=title)
+                if not search_response:
+                    continue
+
+                article_list = search_response["data"]["data"]
+                for search_article in article_list:
+                    try:
+                        article_url = search_article["article_url"]
+                        account_name = search_article["source"]
+                        if not (
+                            article_url
+                            and account_name
+                            and "toutiao.com" in article_url
+                        ):
+                            continue
+
+                        article_detail = await get_toutiao_detail(article_url)
+                        if not article_detail:
+                            continue
+
+                        account_id = (
+                            article_detail.get("data", {})
+                            .get("data", {})
+                            .get("channel_account_id")
+                        )
+                        if account_id:
+                            await self.crawler_each_account(
+                                account_name, account_id, self.ARTICLE_TYPE, cookie
+                            )
+
+                        await asyncio.sleep(1)
+
+                    except Exception as e:
+                        await self.log_client.log(
+                            contents={
+                                "task": "crawler_toutiao",
+                                "function": "search_candidate_accounts",
+                                "trace_id": self.trace_id,
+                                "message": "crawler_account fail",
+                                "status": "fail",
+                                "date": {
+                                    "error": str(e),
+                                    "traceback": traceback.format_exc(),
+                                    "article_info": article,
+                                },
+                            }
+                        )
+                await asyncio.sleep(5)
+
+            except Exception as e:
+                await self.log_client.log(
+                    contents={
+                        "task": "crawler_toutiao",
+                        "function": "search_candidate_accounts",
+                        "trace_id": self.trace_id,
+                        "message": "search_in_toutiao failed",
+                        "status": "fail",
+                        "data": {
+                            "error": str(e),
+                            "traceback": traceback.format_exc(),
+                        },
+                    }
+                )

+ 7 - 0
applications/tasks/data_recycle_tasks/__init__.py

@@ -1,3 +1,10 @@
 from .recycle_daily_publish_articles import RecycleDailyPublishArticlesTask
 from .recycle_daily_publish_articles import CheckDailyPublishArticlesTask
 from .recycle_daily_publish_articles import UpdateRootSourceIdAndUpdateTimeTask
+
+
+__all__ = [
+    "RecycleDailyPublishArticlesTask",
+    "CheckDailyPublishArticlesTask",
+    "UpdateRootSourceIdAndUpdateTimeTask",
+]

+ 6 - 0
applications/tasks/llm_tasks/__init__.py

@@ -1 +1,7 @@
 from .process_title import TitleRewrite
+from .candidate_account_process import CandidateAccountQualityScoreRecognizer
+
+__all__ = [
+    "TitleRewrite",
+    "CandidateAccountQualityScoreRecognizer",
+]

+ 181 - 0
applications/tasks/llm_tasks/candidate_account_process.py

@@ -0,0 +1,181 @@
+import json
+import traceback
+from typing import List, Dict
+from tqdm import tqdm
+
+from applications.api import fetch_deepseek_completion
+
+
+class CandidateAccountProcessConst:
+
+    INIT_STATUS = 0
+    PROCESSING_STATUS = 1
+    SUCCESS_STATUS = 2
+    FAILED_STATUS = 99
+    LACK_ARTICLE_STATUS = 11
+    TITLE_TOO_LONG_STATUS = 14
+
+    AVG_SCORE_THRESHOLD = 65
+
+    ARTICLE_COUNT_THRESHOLD = 13
+    AVG_TITLE_LENGTH_THRESHOLD = 45
+
+    @staticmethod
+    def generate_title_match_score_prompt(title_list):
+        title_list_string = "\n".join(title_list)
+        prompt = f"""
+** 任务指令 **
+    你是一名资深中文新闻编辑,需根据以下标准对一批标题进行主题匹配度评分(0-100分)
+
+** 评估维度及权重 **
+    1. 受众精准度(50%)
+        正向匹配:存款/养老/健康/饮食/疾病警示/家庭伦理/近现代战争历史/老知青/奇闻异事
+        负向排除:影视解说/文学解读/个人收藏(钱币/邮票)/机械科普/数码测评/电子游戏/时尚潮流/明星八卦/极限运动/学术研究/网络热梗/宠物饲养/音乐/棋牌
+
+    2. 标题技法(40%)
+        悬念设计:疑问句/省略号/反转结构(例:"打开后瞬间愣住...")
+        情感强度:使用"痛心!""寒心!"等情绪词
+        数据冲击:具体数字增强可信度(例:"存款180万消失")
+        口语化表达:使用"涨知识了""别不当回事"等日常用语
+
+    3. 内容调性(10%)
+        煽情猎奇:家庭悲剧/离奇事件(例:"棺材板挖出金条")
+        警示价值:健康建议/法律案例(例:"三种食物禁止二次加热")
+        历史揭秘:人物秘闻/老照片故事
+        爱国情怀:军事突破/资源发现(例:"南极发现巨型粮仓")
+
+** 评分规则 **
+    90-100分:同时满足3个维度且要素齐全,无负向内容
+    70-89分:满足2个核心维度,无负向内容
+    50-69分:仅满足受众群体正向匹配,无负向内容
+    30-49分:存在轻微关联但要素缺失
+    0-29分:完全无关或包含任意负向品类内容
+
+** 待评估标题 **
+    {title_list_string}
+
+** 输出要求 **
+    输出结果为JSON,仅输出这一批标题的评分,用数组 List 返回 [score1, score2, score3,...] 不要包含任何解释或说明。
+"""
+        return prompt
+
+
+class CandidateAccountQualityScoreRecognizer(CandidateAccountProcessConst):
+    def __init__(self, pool, log_client, trace_id):
+        self.pool = pool
+        self.log_client = log_client
+        self.trace_id = trace_id
+
+    async def get_task_list(self) -> List[Dict]:
+        """
+        get account tasks from the database
+        """
+        fetch_query = f"""
+            select id, title_list, platform 
+            from crawler_candidate_account_pool
+            where avg_score is null and status = {self.INIT_STATUS} and title_list is not null;
+        """
+        fetch_response = await self.pool.async_fetch(
+            fetch_query,
+        )
+        return fetch_response
+
+    async def update_account_status(
+        self, account_id: int, ori_status: int, new_status: int
+    ) -> int:
+        """update account status"""
+        update_query = f"""
+                update crawler_candidate_account_pool
+                set status = %s
+                where id = %s and status = %s;
+            """
+        return await self.pool.async_save(
+            update_query, (new_status, account_id, ori_status)
+        )
+
+    async def score_for_each_account_by_llm(self, account):
+        account_id = account["id"]
+        # lock
+        if not await self.update_account_status(
+            account_id, self.INIT_STATUS, self.PROCESSING_STATUS
+        ):
+            return
+
+        # start processing
+        title_list = json.loads(account["title_list"])
+        if (
+            len(title_list) < self.ARTICLE_COUNT_THRESHOLD
+            and account["platform"] == "toutiao"
+        ):
+            await self.update_account_status(
+                account_id, self.PROCESSING_STATUS, self.LACK_ARTICLE_STATUS
+            )
+            return
+
+        # 平均标题过长
+        avg_title_length = sum([len(title) for title in title_list]) / len(title_list)
+        if avg_title_length > self.AVG_TITLE_LENGTH_THRESHOLD:
+            await self.update_account_status(
+                account_id, self.PROCESSING_STATUS, self.TITLE_TOO_LONG_STATUS
+            )
+            return
+
+        prompt = self.generate_title_match_score_prompt(title_list)
+        try:
+            completion = fetch_deepseek_completion(
+                model="DeepSeek-V3", prompt=prompt, output_type="json"
+            )
+            avg_score = sum(completion) / len(completion)
+            query = f"""
+                update crawler_candidate_account_pool
+                set score_list = %s, avg_score = %s, status = %s
+                where id = %s and status = %s;
+            """
+            await self.pool.async_save(
+                query=query,
+                params=(
+                    json.dumps(completion),
+                    avg_score,
+                    self.PROCESSING_STATUS,
+                    account_id,
+                    self.SUCCESS_STATUS,
+                ),
+            )
+
+        except Exception as e:
+            await self.log_client.log(
+                contents={
+                    "task": "candidate_account_analysis",
+                    "trace_id": self.trace_id,
+                    "function": "score_for_each_account_by_llm",
+                    "message": "大模型识别账号失败",
+                    "status": "fail",
+                    "data": {
+                        "error": str(e),
+                        "title_list": json.dumps(title_list),
+                    },
+                }
+            )
+            await self.update_account_status(
+                account_id, self.PROCESSING_STATUS, self.FAILED_STATUS
+            )
+
+    async def deal(self):
+        task_list = await self.get_task_list()
+        for task in tqdm(task_list, desc="use llm to analysis each account"):
+            try:
+                await self.score_for_each_account_by_llm(task)
+            except Exception as e:
+                await self.log_client.log(
+                    contents={
+                        "task": "candidate_account_analysis",
+                        "trace_id": self.trace_id,
+                        "function": "deal",
+                        "status": "fail",
+                        "data": {
+                            "error": str(e),
+                            "traceback": traceback.format_exc(),
+                            "task": task,
+                        },
+                    }
+                )

+ 10 - 0
applications/tasks/monitor_tasks/__init__.py

@@ -5,3 +5,13 @@ from .gzh_article_monitor import OutsideGzhArticlesMonitor
 from .gzh_article_monitor import OutsideGzhArticlesCollector
 from .gzh_article_monitor import InnerGzhArticlesMonitor
 from .task_processing_monitor import TaskProcessingMonitor
+
+__all__ = [
+    "check_kimi_balance",
+    "GetOffVideos",
+    "CheckVideoAuditStatus",
+    "OutsideGzhArticlesMonitor",
+    "OutsideGzhArticlesCollector",
+    "InnerGzhArticlesMonitor",
+    "TaskProcessingMonitor",
+]

+ 57 - 2
applications/tasks/monitor_tasks/get_off_videos.py

@@ -32,9 +32,10 @@ class GetOffVideosConst:
 
 
 class GetOffVideos(GetOffVideosConst):
-    def __init__(self, db_client, log_client):
+    def __init__(self, db_client, log_client, trace_id):
         self.db_client = db_client
         self.log_client = log_client
+        self.trace_id = trace_id
 
     async def get_task_list(
         self, earliest_timestamp_threshold: int, expire_timestamp_threshold: int
@@ -76,6 +77,14 @@ class GetOffVideos(GetOffVideosConst):
         task_list = await self.get_task_list(
             earliest_timestamp_threshold, expire_timestamp_threshold
         )
+        await self.log_client.log(
+            contents={
+                "task": "get_off_videos",
+                "trace_id": self.trace_id,
+                "message": f"获取{len(task_list)}条待下架视频",
+                "data": task_list,
+            }
+        )
         success_count = 0
         failed_count = 0
         for task in tqdm(task_list):
@@ -90,6 +99,7 @@ class GetOffVideos(GetOffVideosConst):
                         "function": "get_off_job",
                         "status": "fail",
                         "message": "get off video fail",
+                        "trace_id": self.trace_id,
                         "data": {
                             "video_id": video_id,
                             "error": str(e),
@@ -125,11 +135,25 @@ class GetOffVideos(GetOffVideosConst):
         else:
             return self.TASK_SUCCESS_STATUS
 
+    async def deal(self):
+        await self.get_off_job()
+        task_status = await self.check()
+        await self.log_client.log(
+            contents={
+                "task": "get_off_videos",
+                "function": "deal",
+                "trace_id": self.trace_id,
+                "message": "任务执行完成",
+            }
+        )
+        return task_status
+
 
 class CheckVideoAuditStatus(GetOffVideosConst):
-    def __init__(self, db_client, log_client):
+    def __init__(self, db_client, log_client, trace_id):
         self.db_client = db_client
         self.log_client = log_client
+        self.trace_id = trace_id
 
     async def get_video_list_status(self, video_list: List[int]):
         response = await fetch_piaoquan_video_list_detail(video_list)
@@ -170,20 +194,51 @@ class CheckVideoAuditStatus(GetOffVideosConst):
                 yield arr[i : i + chunk_size]
 
         video_id_list = await self.get_unchecked_video_list()
+        if video_id_list:
+            await self.log_client.log(
+                contents={
+                    "task": "check_video_audit_status",
+                    "function": "deal",
+                    "trace_id": self.trace_id,
+                    "message": f"一共获取{len(video_id_list)}条视频",
+                }
+            )
+        else:
+            return self.TASK_SUCCESS_STATUS
+
         video_chunks = chuck_iterator(video_id_list, 10)
 
         bad_videos_count = 0
         fail_list = []
+        batch = 0
         for video_chunk in video_chunks:
+            batch += 1
             bad_video_id_list = await self.get_video_list_status(video_chunk)
             if bad_video_id_list:
                 bad_videos_count += len(bad_video_id_list)
+                await self.log_client.log(
+                    contents={
+                        "task": "check_video_audit_status",
+                        "function": "deal",
+                        "trace_id": self.trace_id,
+                        "message": f"batch: {batch} has {len(bad_video_id_list)} bad videos",
+                        "data": bad_video_id_list,
+                    }
+                )
                 for bad_video_id in tqdm(bad_video_id_list):
                     response = await change_video_audit_status(bad_video_id)
                     if not response:
                         fail_list.append(bad_video_id)
 
             await self.update_check_status(video_chunk)
+            await self.log_client.log(
+                contents={
+                    "task": "check_video_audit_status",
+                    "function": "deal",
+                    "trace_id": self.trace_id,
+                    "message": f"finish process batch: {batch}",
+                }
+            )
 
         if fail_list:
             await feishu_robot.bot(

+ 2 - 1
applications/tasks/task_mapper.py

@@ -19,6 +19,7 @@ class Const:
     UPDATE_ROOT_SOURCE_ID_TIMEOUT = 3600
     CRAWLER_TOUTIAO_ARTICLES_TIMEOUT = 5 * 3600
     ARTICLE_POOL_COLD_START_TIMEOUT = 4 * 3600
+    TASK_MAX_NUM = 5
 
 
 class TaskMapper(Const):
@@ -58,4 +59,4 @@ class TaskMapper(Const):
             case _:
                 expire_duration = self.DEFAULT_TIMEOUT
 
-        return {"expire_duration": expire_duration}
+        return {"expire_duration": expire_duration, "task_max_num": self.TASK_MAX_NUM}

+ 12 - 3
applications/tasks/task_scheduler.py

@@ -301,10 +301,19 @@ class TaskScheduler(TaskMapper):
             case "crawler_toutiao_articles":
 
                 async def background_crawler_toutiao_articles():
-                    sub_task = CrawlerToutiao(self.db_client, self.log_client)
-                    await sub_task.crawler_task(
-                        media_type=self.data.get("media_type", "article")
+                    sub_task = CrawlerToutiao(
+                        self.db_client, self.log_client, self.trace_id
                     )
+                    media_type = self.data.get("media_type", "article")
+                    method = self.data.get("method", "account")
+                    category_list = self.data.get("category_list", [])
+                    match method:
+                        case "account":
+                            await sub_task.crawler_task(media_type=media_type)
+                        case "recommend":
+                            await sub_task.crawl_toutiao_recommend_task(
+                                category_list=category_list
+                            )
                     await self.release_task(
                         task_name=task_name, date_string=date_string
                     )

+ 289 - 0
applications/tasks/task_scheduler_v2.py

@@ -0,0 +1,289 @@
+import asyncio
+import json
+import time
+import traceback
+from datetime import datetime
+from typing import Awaitable, Callable, Dict
+
+from applications.api import feishu_robot
+from applications.utils import task_schedule_response, generate_task_trace_id
+
+from applications.tasks.cold_start_tasks import ArticlePoolColdStart
+from applications.tasks.crawler_tasks import CrawlerToutiao
+from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
+from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
+from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
+from applications.tasks.llm_tasks import TitleRewrite
+from applications.tasks.llm_tasks import CandidateAccountQualityScoreRecognizer
+from applications.tasks.monitor_tasks import check_kimi_balance
+from applications.tasks.monitor_tasks import GetOffVideos
+from applications.tasks.monitor_tasks import CheckVideoAuditStatus
+from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
+from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
+from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
+from applications.tasks.monitor_tasks import TaskProcessingMonitor
+from applications.tasks.task_mapper import TaskMapper
+
+
+class TaskScheduler(TaskMapper):
+    """统一调度入口:外部只需调用 `await TaskScheduler(data, log_cli, db_cli).deal()`"""
+
+    # ---------- 初始化 ----------
+    def __init__(self, data, log_service, db_client):
+        self.data = data
+        self.log_client = log_service
+        self.db_client = db_client
+        self.table = "long_articles_task_manager"
+        self.trace_id = generate_task_trace_id()
+
+    # ---------- 公共数据库工具 ----------
+    async def _insert_or_ignore_task(self, task_name: str, date_str: str) -> None:
+        """新建记录(若同键已存在则忽略)"""
+        query = (
+            f"insert ignore into {self.table} "
+            "(date_string, task_name, start_timestamp, task_status, trace_id, data) "
+            "values (%s, %s, %s, %s, %s, %s);"
+        )
+        await self.db_client.async_save(
+            query=query,
+            params=(
+                date_str,
+                task_name,
+                int(time.time()),
+                self.TASK_INIT_STATUS,
+                self.trace_id,
+                json.dumps(self.data, ensure_ascii=False),
+            ),
+        )
+
+    async def _try_lock_task(self) -> bool:
+        """一次 UPDATE 抢锁;返回 True 表示成功上锁"""
+        query = (
+            f"update {self.table} "
+            "set task_status = %s "
+            "where trace_id = %s  and task_status = %s;"
+        )
+        res = await self.db_client.async_save(
+            query=query,
+            params=(
+                self.TASK_PROCESSING_STATUS,
+                self.trace_id,
+                self.TASK_INIT_STATUS,
+            ),
+        )
+        return True if res else False
+
+    async def _release_task(self, status: int) -> None:
+        query = (
+            f"update {self.table} set task_status=%s, finish_timestamp=%s "
+            "where trace_id=%s and task_status=%s;"
+        )
+        await self.db_client.async_save(
+            query=query,
+            params=(
+                status,
+                int(time.time()),
+                self.trace_id,
+                self.TASK_PROCESSING_STATUS,
+            ),
+        )
+
+    async def _is_processing_overtime(self, task_name) -> bool:
+        """检测在处理任务是否超时,或者超过最大并行数,若超时会发飞书告警"""
+        query = f"select trace_id from {self.table} where task_status = %s and task_name = %s;"
+        rows = await self.db_client.async_fetch(
+            query=query, params=(self.TASK_PROCESSING_STATUS, task_name)
+        )
+        if not rows:
+            return False
+
+        processing_task_num = len(rows)
+        if processing_task_num >= self.get_task_config(task_name).get(
+            "task_max_num", self.TASK_MAX_NUM
+        ):
+            await feishu_robot.bot(
+                title=f"multi {task_name} is processing ",
+                detail={"detail": rows},
+            )
+            return True
+
+        return False
+
+    async def _run_with_guard(
+        self, task_name: str, date_str: str, task_coro: Callable[[], Awaitable[int]]
+    ):
+        """公共:检查、建记录、抢锁、后台运行"""
+        # 1. 超时检测
+        if await self._is_processing_overtime(task_name):
+            return await task_schedule_response.fail_response(
+                "5005", "muti tasks with same task_name is processing"
+            )
+
+        # 2. 记录并尝试抢锁
+        await self._insert_or_ignore_task(task_name, date_str)
+        if not await self._try_lock_task():
+            return await task_schedule_response.fail_response(
+                "5001", "task is processing"
+            )
+
+        # 3. 真正执行任务 —— 使用后台协程保证不阻塞调度入口
+        async def _wrapper():
+            status = self.TASK_FAILED_STATUS
+            try:
+                status = await task_coro()
+            except Exception as e:
+                await self.log_client.log(
+                    contents={
+                        "trace_id": self.trace_id,
+                        "function": "cor_wrapper",
+                        "task": task_name,
+                        "error": str(e),
+                    }
+                )
+                await feishu_robot.bot(
+                    title=f"{task_name} is failed",
+                    detail={
+                        "task": task_name,
+                        "err": str(e),
+                        "traceback": traceback.format_exc(),
+                    },
+                )
+            finally:
+                await self._release_task(status)
+
+        asyncio.create_task(_wrapper(), name=task_name)
+        return await task_schedule_response.success_response(
+            task_name=task_name,
+            data={"code": 0, "message": "task started", "trace_id": self.trace_id},
+        )
+
+    # ---------- 主入口 ----------
+    async def deal(self):
+        task_name: str | None = self.data.get("task_name")
+        if not task_name:
+            return await task_schedule_response.fail_response(
+                "4003", "task_name must be input"
+            )
+
+        date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
+
+        # === 所有任务在此注册:映射到一个返回 int 状态码的异步函数 ===
+        handlers: Dict[str, Callable[[], Awaitable[int]]] = {
+            # 校验kimi余额
+            "check_kimi_balance": self._check_kimi_balance_handler,
+            # 长文视频发布之后,三天后下架
+            "get_off_videos": self._get_off_videos_task_handler,
+            # 长文视频发布之后,三天内保持视频可见状态
+            "check_publish_video_audit_status": self._check_video_audit_status_handler,
+            # 外部服务号发文监测
+            "outside_article_monitor": self._outside_monitor_handler,
+            # 站内发文监测
+            "inner_article_monitor": self._inner_gzh_articles_monitor_handler,
+            # 标题重写(代测试)
+            "title_rewrite": self._title_rewrite_handler,
+            # 每日发文数据回收
+            "daily_publish_articles_recycle": self._recycle_article_data_handler,
+            # 每日发文更新root_source_id
+            "update_root_source_id": self._update_root_source_id_handler,
+            # 头条文章,视频抓取
+            "crawler_toutiao": self._crawler_toutiao_handler,
+            # 文章池冷启动发布
+            "article_pool_cold_start": self._article_pool_cold_start_handler,
+            # 任务超时监控
+            "task_processing_monitor": self._task_processing_monitor_handler,
+            # 候选账号质量分析
+            "candidate_account_quality_analysis": self._candidate_account_quality_score_handler,
+        }
+
+        if task_name not in handlers:
+            return await task_schedule_response.fail_response(
+                "4001", "wrong task name input"
+            )
+        return await self._run_with_guard(task_name, date_str, handlers[task_name])
+
+    # ---------- 下面是若干复合任务的局部实现 ----------
+    async def _check_kimi_balance_handler(self) -> int:
+        response = await check_kimi_balance()
+        await self.log_client.log(
+            contents={
+                "trace_id": self.trace_id,
+                "task": "check_kimi_balance",
+                "data": response,
+            }
+        )
+        return self.TASK_SUCCESS_STATUS
+
+    async def _get_off_videos_task_handler(self) -> int:
+        sub_task = GetOffVideos(self.db_client, self.log_client, self.trace_id)
+        return await sub_task.deal()
+
+    async def _check_video_audit_status_handler(self) -> int:
+        sub_task = CheckVideoAuditStatus(self.db_client, self.log_client, self.trace_id)
+        return await sub_task.deal()
+
+    async def _task_processing_monitor_handler(self) -> int:
+        sub_task = TaskProcessingMonitor(self.db_client)
+        await sub_task.deal()
+        return self.TASK_SUCCESS_STATUS
+
+    async def _inner_gzh_articles_monitor_handler(self) -> int:
+        sub_task = InnerGzhArticlesMonitor(self.db_client)
+        return await sub_task.deal()
+
+    async def _title_rewrite_handler(self):
+        sub_task = TitleRewrite(self.db_client, self.log_client)
+        return await sub_task.deal()
+
+    async def _update_root_source_id_handler(self) -> int:
+        sub_task = UpdateRootSourceIdAndUpdateTimeTask(self.db_client, self.log_client)
+        await sub_task.deal()
+        return self.TASK_SUCCESS_STATUS
+
+    async def _outside_monitor_handler(self) -> int:
+        collector = OutsideGzhArticlesCollector(self.db_client)
+        await collector.deal()
+        monitor = OutsideGzhArticlesMonitor(self.db_client)
+        return await monitor.deal()  # 应返回 SUCCESS / FAILED
+
+    async def _recycle_article_data_handler(self) -> int:
+        date_str = self.data.get("date_string") or datetime.now().strftime("%Y-%m-%d")
+        recycle = RecycleDailyPublishArticlesTask(
+            self.db_client, self.log_client, date_str
+        )
+        await recycle.deal()
+        check = CheckDailyPublishArticlesTask(self.db_client, self.log_client, date_str)
+        await check.deal()
+        return self.TASK_SUCCESS_STATUS
+
+    async def _crawler_toutiao_handler(self) -> int:
+        sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id)
+        method = self.data.get("method", "account")
+        media_type = self.data.get("media_type", "article")
+        category_list = self.data.get("category_list", [])
+
+        match method:
+            case "account":
+                await sub_task.crawler_task(media_type=media_type)
+            case "recommend":
+                await sub_task.crawl_toutiao_recommend_task(category_list)
+            case "search":
+                await sub_task.search_candidate_accounts()
+            case _:
+                raise ValueError(f"Unsupported method {method}")
+        return self.TASK_SUCCESS_STATUS
+
+    async def _article_pool_cold_start_handler(self) -> int:
+        cold_start = ArticlePoolColdStart(
+            self.db_client, self.log_client, self.trace_id
+        )
+        platform = self.data.get("platform", "weixin")
+        crawler_methods = self.data.get("crawler_methods", [])
+        await cold_start.deal(platform=platform, crawl_methods=crawler_methods)
+        return self.TASK_SUCCESS_STATUS
+
+    async def _candidate_account_quality_score_handler(self) -> int:
+        task = CandidateAccountQualityScoreRecognizer(
+            self.db_client, self.log_client, self.trace_id
+        )
+        await task.deal()
+        return self.TASK_SUCCESS_STATUS

+ 4 - 0
applications/utils/__init__.py

@@ -19,5 +19,9 @@ from .common import *
 
 # import item
 from .item import CrawlerMetaArticle
+from .item import CrawlerMetaAccount
+
+# mysql utils
+from .async_mysql_utils import *
 
 task_schedule_response = TaskScheduleResponse()

+ 14 - 0
applications/utils/async_mysql_utils.py

@@ -0,0 +1,14 @@
+from typing import List, Dict
+
+
+async def get_top_article_title_list(pool) -> List[Dict]:
+    query = f"""
+        select distinct title, source_id
+        from datastat_sort_strategy
+        where produce_plan_name = %s and source_id is not null;
+    """
+    return await pool.async_fetch(query=query, params=("TOP100",))
+
+
+async def get():
+    pass

+ 30 - 2
applications/utils/item.py

@@ -2,7 +2,7 @@
 @author: luojunhui
 """
 
-import time
+import datetime
 
 from pydantic import BaseModel, Field
 from typing import Optional
@@ -26,7 +26,7 @@ class CrawlerMetaArticle(BaseModel):
         default=None, max_length=255, description="文章简介"
     )
     publish_time: int = Field(default=None, description="文章发布时间")
-    crawler_time: int = Field(default=int(time.time()), description="抓取时间")
+    crawler_time: int = Field(default=None, description="抓取时间")
     score: float = Field(default=None, description="相似度分")
     status: int = Field(default=1, description="文章状态")
     unique_index: str = Field(default=..., description="文章唯一index")
@@ -41,3 +41,31 @@ class CrawlerMetaArticle(BaseModel):
         default=0,
         description="文章内嵌套视频状态 0: init; 1: processing; 2: successfully; 3:article link bad ;99: fail",
     )
+
+
+class CrawlerMetaAccount(BaseModel):
+    account_name: str = Field(..., description="账号名称", min_length=1)
+    account_id: str = Field(..., description="账号id", min_length=1)
+    title_list: str = Field(default=None, description="账号主页第一页标题list")
+    score_list: str = Field(
+        default=None, description="账号主页第一页标题list契合得分(By LLM)"
+    )
+    avg_score: float = Field(default=None, description="score_list 的平均分")
+    status: int = Field(
+        default=0,
+        description="分析状态,0: init, 1: processing, 2: successfully, 99: fail",
+    )
+    platform: str = Field(default=None, description="账号来源于哪个外部平台")
+    crawler_date: datetime.date = Field(
+        default=None, description="账号抓取日期,格式为“YYYY-MM-DD”"
+    )
+    using_status: int = Field(
+        default=0,
+        description="账号状态, 0: init, 1: processing, 2: successfully, 99: fail",
+    )
+    category_status: int = Field(
+        default=0,
+        description="账号品类处理状态, 0: init, 1: processing, 2: successfully, 99: fail",
+    )
+    category: str = Field(default=None, description="账号的品类")
+    media_type: int = Field(default=2, description="账号抓取模态 1: 文章 2:视频")

File diff suppressed because it is too large
+ 1 - 3
dev.py


+ 3 - 2
requirements.txt

@@ -5,7 +5,7 @@ pymysql
 aiohttp~=3.10.4
 requests~=2.32.3
 numpy
-pandas
+pandas~=2.2.3
 aliyun-log-python-sdk
 aliyun-python-sdk-core
 aliyun-python-sdk-kms
@@ -18,4 +18,5 @@ elasticsearch~=8.17.2
 openai~=1.47.1
 tenacity~=9.0.0
 fake-useragent~=2.1.0
-pydantic~=2.10.6
+pydantic~=2.10.6
+aiomonitor~=0.7.1

+ 4 - 2
routes/blueprint.py

@@ -17,13 +17,15 @@ def server_routes(pools, log_service):
     @server_blueprint.route("/run_task", methods=["POST"])
     async def run_task():
         data = await request.get_json()
+        print("ss", data)
         task_scheduler = TaskScheduler(data, log_service, pools)
         response = await task_scheduler.deal()
+        print(response)
         return jsonify(response)
 
-    @server_blueprint.route("/finish_task", methods=["POST"])
+    @server_blueprint.route("/finish_task", methods=["GET"])
     async def finish_task():
-        data = await request.get_json()
+        # data = await request.get_json()
         return jsonify({"message": "hello world"})
 
     return server_blueprint

+ 11 - 0
task_app.py

@@ -1,6 +1,8 @@
+import asyncio
 import logging
 
 from quart import Quart
+from aiomonitor import start_monitor
 from applications.config import aliyun_log_config
 from applications.database import mysql_manager
 from applications.service import LogService
@@ -14,9 +16,15 @@ app.register_blueprint(routes)
 
 logging.basicConfig(level=logging.INFO)
 
+_monitor = None
+
 
 @app.before_serving
 async def startup():
+    global _monitor
+    loop = asyncio.get_event_loop()
+    _monitor = start_monitor(loop=loop, host="127.0.0.1", port=50101)
+    logging.info(f"Monitor started at {_monitor}")
     logging.info("Starting application...")
     await mysql_manager.init_pools()
     logging.info("Mysql pools init successfully")
@@ -31,3 +39,6 @@ async def shutdown():
     logging.info("Mysql pools close successfully")
     await log_service.stop()
     logging.info("aliyun log service stop successfully")
+    if _monitor:
+        _monitor.close()
+        logging.info("Monitor stopped successfully")

Some files were not shown because too many files changed in this diff