Ver Fonte

新增头条搜索账号模式

luojunhui há 1 mês atrás
pai
commit
eba451cc94

+ 2 - 0
applications/crawler/toutiao/__init__.py

@@ -2,5 +2,7 @@
 @author: luojunhui
 """
 
+from .search import search_in_toutiao
+from .search import get_toutiao_detail
 from .blogger import get_toutiao_account_info_list
 from .detail_recommend import get_associated_recommendation

+ 29 - 0
applications/crawler/toutiao/search.py

@@ -0,0 +1,29 @@
+import json
+
+from applications.utils import AsyncHttpClient
+
+
+async def search_in_toutiao(keyword):
+    url = "http://crawler-cn.aiddit.com/crawler/tou_tiao_hao/keyword"
+    data = {
+        "keyword": keyword,
+    }
+    headers = {
+        "Content-Type": "application/json",
+    }
+    async with AsyncHttpClient(timeout=120) as client:
+        response = await client.post(url, json=data, headers=headers)
+        return response
+
+
+async def get_toutiao_detail(link):
+    url = "http://crawler-cn.aiddit.com/crawler/tou_tiao_hao/detail"
+    data = {
+        "content_link": link,
+    }
+    headers = {
+        "Content-Type": "application/json",
+    }
+    async with AsyncHttpClient(timeout=120) as client:
+        response = await client.post(url, json=data, headers=headers)
+        return response

+ 8 - 0
applications/crawler/wechat/gzh_spider.py

@@ -121,3 +121,11 @@ def get_source_account_from_article(article_link) -> dict | None:
             data={"link": article_link},
         )
     return None
+
+
+@retry(**retry_desc)
+def search(keyword: str, page="1") -> dict | None:
+    url = "{}/keyword".format(base_url)
+    payload = json.dumps({"keyword": keyword, "cursor": page})
+    response = requests.request("POST", url, headers=headers, data=payload, timeout=120)
+    return response.json()

+ 43 - 54
applications/pipeline/crawler_pipeline.py

@@ -1,11 +1,22 @@
 import time
 
+from typing import Any, Dict, Tuple, Callable
+
+from pydantic import BaseModel
+
 from applications.api import AsyncApolloApi
 from applications.utils import CrawlerMetaArticle
+from applications.utils import CrawlerMetaAccount
 
 
 class CrawlerPipeline(AsyncApolloApi):
 
+    MODEL_TABLE_MAP: Dict[str, Tuple[type[BaseModel], str]] = {
+        "article": (CrawlerMetaArticle, "crawler_meta_article"),
+        "account": (CrawlerMetaAccount, "crawler_candidate_account_pool"),
+        # 如后续有新类型,直接在这里加即可
+    }
+
     def __init__(self, pool, log_client):
         super().__init__()
         self.pool = pool
@@ -23,64 +34,35 @@ class CrawlerPipeline(AsyncApolloApi):
         duplicated_id = await self.pool.async_fetch(query=query, params=(title,))
         return True if duplicated_id else False
 
-    async def save_article(self, article_item: dict) -> None:
-        """save articles into database"""
-        query = f"""
-            insert into crawler_meta_article
-            (platform, mode, category, out_account_id, article_index, title, link, 
-            read_cnt, like_cnt, description, publish_time, crawler_time, score, status,
-            unique_index, source_article_title, source_account, title_sensitivity)
-            values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s);
-        """
-        await self.pool.async_save(
-            query=query,
-            params=(
-                article_item.get("platform", "weixin"),
-                article_item.get("mode"),
-                article_item.get("category"),
-                article_item.get("out_account_id"),
-                article_item.get("article_index"),
-                article_item.get("title"),
-                article_item.get("link"),
-                article_item.get("read_cnt", 0),
-                article_item.get("like_cnt", 0),
-                article_item.get("description"),
-                article_item.get("publish_time"),
-                article_item.get("crawler_time", int(time.time())),
-                article_item.get("score"),
-                article_item.get("status", 1),
-                article_item.get("unique_index"),
-                article_item.get("source_article_title", None),
-                article_item.get("source_account", None),
-                article_item.get("title_sensitivity", 0),
-            ),
+    async def whether_account_exist(self, account_id: str, media_type: str) -> bool:
+        query = f"select id from crawler_candidate_account_pool where account_id = %s and media_type = %s;"
+        duplicated_id = await self.pool.async_fetch(
+            query=query, params=(account_id, media_type)
         )
+        return True if duplicated_id else False
 
-    async def save_article_v2(self, article_item: dict) -> None:
-        """save articles into database"""
-        new_article = CrawlerMetaArticle(**article_item)
-        new_article_dict = new_article.model_dump()
-        insert_template = (
-            """insert into crawler_meta_article ({columns}) values ({values});"""
-        )
-        insert_data = {k: v for k, v in new_article_dict.items() if v is not None}
-        columns = ", ".join(insert_data.keys())
-        values = ", ".join([f"%s" for i in range(len(insert_data))])
-        query = insert_template.format(columns=columns, values=values)
-        await self.pool.async_save(
-            query=query,
-            params=tuple(list(insert_data.values())),
-        )
+    async def save_single_record(self, media_type: str, item: dict) -> None:
+        try:
+            model_cls, table_name = self.MODEL_TABLE_MAP[media_type]
+
+        except KeyError:
+            raise ValueError(f"Unknown media type: {media_type!r}")
 
-    async def save_video(self, video_item: dict) -> str:
-        pass
+        record = model_cls(**item).model_dump(mode="python")
+        insert_data = {k: v for k, v in record.items() if v is not None}
+        if not insert_data:
+            raise ValueError("All fields are None, nothing to insert")
+
+        columns = ", ".join(f"`{col}`" for col in insert_data)
+        placeholders = ", ".join(["%s"] * len(insert_data))
+        sql = f"INSERT INTO `{table_name}` ({columns}) VALUES ({placeholders})"
+        await self.pool.async_save(sql, tuple(insert_data.values()))
 
     async def save_item_to_database(self, media_type: str, item: dict):
         """deal function"""
         match media_type:
             case "video":
-                await self.save_video(item)
-
+                await self.save_single_record(media_type, item)
             case "article":
                 log_data = {
                     "title": item["title"],
@@ -96,7 +78,6 @@ class CrawlerPipeline(AsyncApolloApi):
                         "code": 1001,
                     }
                 )
-                # 判断文章标题是否已经存在
                 if await self.whether_article_title_duplicate(log_data["title"]):
                     await self.log_client.log(
                         contents={
@@ -107,7 +88,7 @@ class CrawlerPipeline(AsyncApolloApi):
                         }
                     )
                     return
-                # 判断标题是否敏感
+
                 if await self.whether_title_sensitive(item["title"]):
                     await self.log_client.log(
                         contents={
@@ -118,8 +99,16 @@ class CrawlerPipeline(AsyncApolloApi):
                         }
                     )
                     item["title_sensitive"] = 1
-                # save article
-                await self.save_article_v2(item)
+
+                await self.save_single_record(media_type, item)
+
+            case "account":
+                if await self.whether_account_exist(
+                    item["account_id"], item["media_type"]
+                ):
+                    return
+
+                await self.save_single_record(media_type, item)
 
             case _:
                 raise Exception("Unknown media type")

+ 0 - 1
applications/tasks/__init__.py

@@ -1,3 +1,2 @@
 # from .task_scheduler import TaskScheduler
 from .task_scheduler_v2 import TaskScheduler
-

+ 101 - 1
applications/tasks/crawler_tasks/crawler_toutiao.py

@@ -1,5 +1,6 @@
 from __future__ import annotations
 
+import asyncio
 import json
 import time
 import aiohttp
@@ -11,8 +12,10 @@ from tqdm import tqdm
 
 from applications.api import feishu_robot
 from applications.crawler.toutiao import get_toutiao_account_info_list
+from applications.crawler.toutiao import search_in_toutiao
+from applications.crawler.toutiao import get_toutiao_detail
 from applications.pipeline import CrawlerPipeline
-from applications.utils import async_proxy
+from applications.utils import async_proxy, get_top_article_title_list
 
 
 class CrawlerToutiaoConst:
@@ -39,6 +42,8 @@ class CrawlerToutiaoConst:
     SLEEP_SECOND = 3
 
     RECOMMEND_TIMES = 10
+    # 文章模态
+    ARTICLE_TYPE = 1
 
 
 class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
@@ -177,6 +182,29 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
             else:
                 break
 
+    async def crawler_each_account(self, account_name, account_id, media_type):
+        """
+        get toutiao account info
+        """
+        new_account_item = {
+            "account_name": account_name,
+            "account_id": account_id,
+            "platform": self.PLATFORM,
+            "crawler_date": datetime.now().strftime("%Y-%m-%d"),
+            "media_type": media_type,
+        }
+        await self.log_client.log(
+            contents={
+                "task": "crawler_toutiao",
+                "function": "crawler_each_account",
+                "trace_id": self.trace_id,
+                "message": "抓取账号成功",
+                "status": "success",
+                "data": new_account_item,
+            }
+        )
+        await self.save_item_to_database(media_type="account", item=new_account_item)
+
     async def crawler_each_article(self, method, article_raw_data, category=None):
         """
         crawler each article
@@ -264,6 +292,7 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
                 query, (max_publish_timestamp, account_id, self.PLATFORM)
             )
 
+    # 获取个人主页文章/视频
     async def crawler_task(self, media_type: str) -> None:
         """
         class entrance
@@ -376,9 +405,80 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
                 else:
                     continue
 
+    # 抓推荐流
     async def crawl_toutiao_recommend_task(self, category_list: List[str]) -> None:
         if not category_list:
             category_list = ["finance", "tech", "history", "entertainment"]
 
         for category in category_list:
             await self.crawler_recommend_articles(category=category)
+
+    # 搜索抓账号
+    async def search_candidate_accounts(self):
+        top_title_list = await get_top_article_title_list(pool=self.pool)
+        for article in top_title_list:
+            title = article["title"]
+            try:
+                search_response = await search_in_toutiao(keyword=title)
+                if not search_response:
+                    continue
+
+                article_list = search_response["data"]["data"]
+                for search_article in article_list:
+                    try:
+                        article_url = search_article["article_url"]
+                        account_name = search_article["source"]
+                        if not (
+                            article_url
+                            and account_name
+                            and "toutiao.com" in article_url
+                        ):
+                            continue
+
+                        article_detail = await get_toutiao_detail(article_url)
+                        if not article_detail:
+                            continue
+
+                        account_id = (
+                            article_detail.get("data", {})
+                            .get("data", {})
+                            .get("channel_account_id")
+                        )
+                        if account_id:
+                            await self.crawler_each_account(
+                                account_name, account_id, self.ARTICLE_TYPE
+                            )
+
+                        await asyncio.sleep(1)
+
+                    except Exception as e:
+                        await self.log_client.log(
+                            contents={
+                                "task": "crawler_toutiao",
+                                "function": "search_candidate_accounts",
+                                "trace_id": self.trace_id,
+                                "message": "crawler_account fail",
+                                "status": "fail",
+                                "date": {
+                                    "error": str(e),
+                                    "traceback": traceback.format_exc(),
+                                    "article_info": article,
+                                },
+                            }
+                        )
+                await asyncio.sleep(5)
+
+            except Exception as e:
+                await self.log_client.log(
+                    contents={
+                        "task": "crawler_toutiao",
+                        "function": "search_candidate_accounts",
+                        "trace_id": self.trace_id,
+                        "message": "search_in_toutiao failed",
+                        "status": "fail",
+                        "data": {
+                            "error": str(e),
+                            "traceback": traceback.format_exc(),
+                        },
+                    }
+                )

+ 17 - 9
applications/tasks/task_scheduler_v2.py

@@ -138,14 +138,19 @@ class TaskScheduler(TaskMapper):
                 )
                 await feishu_robot.bot(
                     title=f"{task_name} is failed",
-                    detail={"task": task_name, "err": str(e), "traceback": traceback.format_exc()},
+                    detail={
+                        "task": task_name,
+                        "err": str(e),
+                        "traceback": traceback.format_exc(),
+                    },
                 )
             finally:
                 await self._release_task(task_name, date_str, status)
 
         asyncio.create_task(_wrapper(), name=task_name)
         return await task_schedule_response.success_response(
-            task_name=task_name, data={"code": 0, "message": "task started", "trace_id": self.trace_id}
+            task_name=task_name,
+            data={"code": 0, "message": "task started", "trace_id": self.trace_id},
         )
 
     # ---------- 主入口 ----------
@@ -177,7 +182,7 @@ class TaskScheduler(TaskMapper):
             # 每日发文更新root_source_id
             "update_root_source_id": self._update_root_source_id_handler,
             # 头条文章,视频抓取
-            "crawler_toutiao_articles": self._crawler_toutiao_handler,
+            "crawler_toutiao": self._crawler_toutiao_handler,
             # 文章池冷启动发布
             "article_pool_pool_cold_start": self._article_pool_cold_start_handler,
             # 任务超时监控
@@ -250,12 +255,15 @@ class TaskScheduler(TaskMapper):
         method = self.data.get("method", "account")
         category_list = self.data.get("category_list", [])
 
-        if method == "account":
-            await sub_task.crawler_task(media_type=media_type)
-        elif method == "recommend":
-            await sub_task.crawl_toutiao_recommend_task(category_list)
-        else:
-            raise ValueError(f"Unsupported method {method}")
+        match method:
+            case "account":
+                await sub_task.crawler_task(media_type=media_type)
+            case "recommend":
+                await sub_task.crawl_toutiao_recommend_task(category_list)
+            case "search":
+                await sub_task.search_candidate_accounts()
+            case _:
+                raise ValueError(f"Unsupported method {method}")
         return self.TASK_SUCCESS_STATUS
 
     async def _article_pool_cold_start_handler(self) -> int:

+ 4 - 0
applications/utils/__init__.py

@@ -19,5 +19,9 @@ from .common import *
 
 # import item
 from .item import CrawlerMetaArticle
+from .item import CrawlerMetaAccount
+
+# mysql utils
+from .async_mysql_utils import *
 
 task_schedule_response = TaskScheduleResponse()

+ 14 - 0
applications/utils/async_mysql_utils.py

@@ -0,0 +1,14 @@
+from typing import List, Dict
+
+
+async def get_top_article_title_list(pool) -> List[Dict]:
+    query = f"""
+        select distinct title, source_id
+        from datastat_sort_strategy
+        where produce_plan_name = %s and source_id is not null;
+    """
+    return await pool.async_fetch(query=query, params=("TOP100",))
+
+
+async def get():
+    pass

+ 29 - 1
applications/utils/item.py

@@ -2,7 +2,7 @@
 @author: luojunhui
 """
 
-import time
+import datetime
 
 from pydantic import BaseModel, Field
 from typing import Optional
@@ -41,3 +41,31 @@ class CrawlerMetaArticle(BaseModel):
         default=0,
         description="文章内嵌套视频状态 0: init; 1: processing; 2: successfully; 3:article link bad ;99: fail",
     )
+
+
+class CrawlerMetaAccount(BaseModel):
+    account_name: str = Field(..., description="账号名称", min_length=1)
+    account_id: str = Field(..., description="账号id", min_length=1)
+    title_list: str = Field(default=None, description="账号主页第一页标题list")
+    score_list: str = Field(
+        default=None, description="账号主页第一页标题list契合得分(By LLM)"
+    )
+    avg_score: float = Field(default=None, description="score_list 的平均分")
+    status: int = Field(
+        default=0,
+        description="分析状态,0: init, 1: processing, 2: successfully, 99: fail",
+    )
+    platform: str = Field(default=None, description="账号来源于哪个外部平台")
+    crawler_date: datetime.date = Field(
+        default=None, description="账号抓取日期,格式为“YYYY-MM-DD”"
+    )
+    using_status: int = Field(
+        default=0,
+        description="账号状态, 0: init, 1: processing, 2: successfully, 99: fail",
+    )
+    category_status: int = Field(
+        default=0,
+        description="账号品类处理状态, 0: init, 1: processing, 2: successfully, 99: fail",
+    )
+    category: str = Field(default=None, description="账号的品类")
+    media_type: int = Field(default=2, description="账号抓取模态 1: 文章 2:视频")

Diff do ficheiro suprimidas por serem muito extensas
+ 1 - 3
dev.py


Alguns ficheiros não foram mostrados porque muitos ficheiros mudaram neste diff