Selaa lähdekoodia

update crawler toutiao articles

luojunhui 1 kuukausi sitten
vanhempi
commit
12a3e1c038
30 muutettua tiedostoa jossa 8258 lisäystä ja 109 poistoa
  1. 51 6
      README.md
  2. 3 20
      applications/ab_test/get_cover.py
  3. 2 2
      applications/api/async_aigc_system_api.py
  4. 2 2
      applications/api/async_apollo_api.py
  5. 5 5
      applications/api/async_feishu_api.py
  6. 4 4
      applications/api/async_piaoquan_api.py
  7. 6 0
      applications/crawler/toutiao/__init__.py
  8. 52 0
      applications/crawler/toutiao/blogger.py
  9. 60 0
      applications/crawler/toutiao/detail_recommend.py
  10. 0 0
      applications/crawler/toutiao/main_page_recomend.py
  11. 7548 0
      applications/crawler/toutiao/toutiao.js
  12. 24 0
      applications/crawler/toutiao/use_js.py
  13. 5 3
      applications/database/mysql_pools.py
  14. 1 0
      applications/pipeline/__init__.py
  15. 124 2
      applications/pipeline/crawler_pipeline.py
  16. 1 0
      applications/tasks/crawler_tasks/__init__.py
  17. 229 0
      applications/tasks/crawler_tasks/crawler_toutiao.py
  18. 8 18
      applications/tasks/data_recycle_tasks/recycle_daily_publish_articles.py
  19. 1 1
      applications/tasks/monitor_tasks/__init__.py
  20. 5 6
      applications/tasks/monitor_tasks/get_off_videos.py
  21. 7 9
      applications/tasks/monitor_tasks/gzh_article_monitor.py
  22. 2 2
      applications/tasks/monitor_tasks/kimi_balance.py
  23. 7 7
      applications/tasks/monitor_tasks/task_processing_monitor.py
  24. 4 1
      applications/tasks/task_mapper.py
  25. 48 17
      applications/tasks/task_scheduler.py
  26. 4 1
      applications/utils/__init__.py
  27. 1 1
      applications/utils/async_http_client.py
  28. 8 0
      applications/utils/common.py
  29. 43 0
      applications/utils/item.py
  30. 3 2
      requirements.txt

+ 51 - 6
README.md

@@ -20,39 +20,84 @@ docker compose up -d
 ├── app_config.toml
 ├── applications
 │   ├── __init__.py
+│   ├── ab_test
+│   │   ├── __init__.py
+│   │   └── get_cover.py
 │   ├── api
 │   │   ├── __init__.py
-│   │   └── async_feishu_api.py
+│   │   ├── aliyun_log_api.py
+│   │   ├── async_aigc_system_api.py
+│   │   ├── async_apollo_api.py
+│   │   ├── async_feishu_api.py
+│   │   ├── async_piaoquan_api.py
+│   │   ├── deep_seek_official_api.py
+│   │   └── elastic_search_api.py
 │   ├── config
 │   │   ├── __init__.py
 │   │   ├── aliyun_log_config.py
+│   │   ├── deepseek_config.py
+│   │   ├── elastic_search_mappings.py
+│   │   ├── es_certs.crt
 │   │   └── mysql_config.py
+│   ├── crawler
+│   │   ├── toutiao
+│   │   │   ├── __init__.py
+│   │   │   ├── blogger.py
+│   │   │   ├── detail_recommend.py
+│   │   │   ├── main_page_recomend.py
+│   │   │   ├── toutiao.js
+│   │   │   └── use_js.py
+│   │   └── wechat
+│   │       ├── __init__.py
+│   │       └── gzh_spider.py
 │   ├── database
 │   │   ├── __init__.py
 │   │   └── mysql_pools.py
+│   ├── pipeline
+│   │   ├── __init__.py
+│   │   ├── crawler_pipeline.py
+│   │   └── data_recycle_pipeline.py
 │   ├── service
 │   │   ├── __init__.py
-│   │   ├── get_cover.py
 │   │   └── log_service.py
 │   ├── tasks
 │   │   ├── __init__.py
+│   │   ├── crawler_tasks
+│   │   │   ├── __init__.py
+│   │   │   └── crawler_toutiao.py
+│   │   ├── data_recycle_tasks
+│   │   │   ├── __init__.py
+│   │   │   └── recycle_daily_publish_articles.py
+│   │   ├── llm_tasks
+│   │   │   ├── __init__.py
+│   │   │   └── process_title.py
 │   │   ├── monitor_tasks
 │   │   │   ├── __init__.py
-│   │   │   └── kimi_balance.py
+│   │   │   ├── get_off_videos.py
+│   │   │   ├── gzh_article_monitor.py
+│   │   │   ├── kimi_balance.py
+│   │   │   └── task_processing_monitor.py
+│   │   ├── task_mapper.py
 │   │   └── task_scheduler.py
 │   └── utils
 │       ├── __init__.py
+│       ├── async_apollo_client.py
 │       ├── async_http_client.py
+│       ├── common.py
 │       ├── get_cover.py
+│       ├── item.py
 │       └── response.py
-├── dev
-│   ├── dev.py
-│   └── run_task_dev.py
 ├── docker-compose.yaml
+├── myapp.log
 ├── requirements.txt
 ├── routes
 │   ├── __init__.py
 │   └── blueprint.py
 └── task_app.py
+```
 
+### get code strategy
 ```
+tree -I "__pycache__|*.pyc"
+```
+

+ 3 - 20
applications/ab_test/get_cover.py

@@ -30,14 +30,7 @@ class GetCoverService(Response):
 
         match pool_name:
             case "aigc":
-                fetch_response, sql_error = await fetch_aigc_cover(
-                    self.pool, channel_content_id
-                )
-                if sql_error:
-                    return self.error_response(
-                        error_code="111",
-                        error_message="sql_error!!\n{}".format(sql_error),
-                    )
+                fetch_response = await fetch_aigc_cover(self.pool, channel_content_id)
                 if fetch_response:
                     image_oss = fetch_response[0]["oss_object_key"]
                     if image_oss:
@@ -53,14 +46,9 @@ class GetCoverService(Response):
                 else:
                     return await self.fetch_cover_info("long_video", channel_content_id)
             case "long_video":
-                fetch_response, sql_error = await fetch_long_video_cover(
+                fetch_response = await fetch_long_video_cover(
                     self.pool, channel_content_id
                 )
-                if sql_error:
-                    return self.error_response(
-                        error_code="111",
-                        error_message="sql_error!!\n{}".format(sql_error),
-                    )
                 if fetch_response:
                     image_oss = fetch_response[1]["image_path"]
                     cover = await self.montage_cover(image_oss, "long_video")
@@ -81,12 +69,7 @@ class GetCoverService(Response):
         if video_index == 2:
             return await self.fetch_cover_info("long_video", seed_video_id)
 
-        channel_info, sql_error = await fetch_channel_info(self.pool, content_id)
-        if sql_error:
-            return self.error_response(
-                error_code="111",
-                error_message="sql_error!!\n{}".format(sql_error),
-            )
+        channel_info = await fetch_channel_info(self.pool, content_id)
         if not channel_info:
             return self.error_response(
                 error_code="402",

+ 2 - 2
applications/api/async_aigc_system_api.py

@@ -1,4 +1,4 @@
-from applications.utils import AsyncHttPClient
+from applications.utils import AsyncHttpClient
 
 
 async def delete_illegal_gzh_articles(gh_id: str, title: str):
@@ -13,7 +13,7 @@ async def delete_illegal_gzh_articles(gh_id: str, title: str):
         "ghId": gh_id,
     }
     headers = {"Content-Type": "application/json;charset=UTF-8"}
-    async with AsyncHttPClient(timeout=600) as client:
+    async with AsyncHttpClient(timeout=600) as client:
         res = await client.post(url=url, headers=headers, json=payload)
 
     return res

+ 2 - 2
applications/api/async_apollo_api.py

@@ -1,5 +1,5 @@
 import json
-from typing import Optional, Dict
+from typing import Optional, Dict, Union
 from applications.utils import AsyncApolloClient
 
 
@@ -21,7 +21,7 @@ class AsyncApolloApi:
 
     async def get_config_value(
         self, key: str, output_type: str = "json"
-    ) -> Optional[Dict]:
+    ) -> Union[Dict, str]:
         match output_type:
             case "json":
                 response = await self.apollo_connection.get_value(key)

+ 5 - 5
applications/api/async_feishu_api.py

@@ -2,7 +2,7 @@ import json
 
 import requests
 
-from applications.utils import AsyncHttPClient
+from applications.utils import AsyncHttpClient
 
 
 class Feishu:
@@ -36,7 +36,7 @@ class Feishu:
             "app_id": "cli_a51114cf8bf8d00c",
             "app_secret": "cNoTAqMpsAm7mPBcpCAXFfvOzCNL27fe",
         }
-        async with AsyncHttPClient(default_headers=self.headers) as client:
+        async with AsyncHttpClient(default_headers=self.headers) as client:
             response = await client.post(url=url, data=post_data)
 
         tenant_access_token = response["tenant_access_token"]
@@ -56,7 +56,7 @@ class FeishuSheetApi(Feishu):
         body = {
             "valueRange": {"range": "{}!{}".format(sheet_id, ranges), "values": values}
         }
-        async with AsyncHttPClient() as client:
+        async with AsyncHttpClient() as client:
             response = await client.post(
                 url=insert_value_url, json=body, headers=headers
             )
@@ -76,7 +76,7 @@ class FeishuSheetApi(Feishu):
         body = {
             "valueRange": {"range": "{}!{}".format(sheet_id, ranges), "values": values}
         }
-        async with AsyncHttPClient() as client:
+        async with AsyncHttpClient() as client:
             response = await client.put(
                 url=insert_value_url, json=body, headers=headers
             )
@@ -228,6 +228,6 @@ class FeishuBotApi(Feishu):
             )
 
         data = {"msg_type": "interactive", "card": card}
-        async with AsyncHttPClient() as client:
+        async with AsyncHttpClient() as client:
             res = await client.post(url=url, headers=headers, json=data)
         return res

+ 4 - 4
applications/api/async_piaoquan_api.py

@@ -1,6 +1,6 @@
 from typing import Optional, Dict, List
 
-from applications.utils import AsyncHttPClient
+from applications.utils import AsyncHttpClient
 
 
 async def fetch_piaoquan_video_list_detail(video_list: List[int]) -> Optional[Dict]:
@@ -10,7 +10,7 @@ async def fetch_piaoquan_video_list_detail(video_list: List[int]) -> Optional[Di
     header = {
         "Content-Type": "application/json",
     }
-    async with AsyncHttPClient() as client:
+    async with AsyncHttpClient() as client:
         response = await client.post(url, json=data, headers=header)
 
     return response
@@ -36,7 +36,7 @@ async def change_video_audit_status(video_id: int, status_code: int = 5) -> Dict
         "sec-fetch-site": "same-origin",
         "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/126.0.0.0 Safari/537.36",
     }
-    async with AsyncHttPClient() as client:
+    async with AsyncHttpClient() as client:
         response = await client.post(url, data=payload, headers=headers)
 
     return response
@@ -72,7 +72,7 @@ async def publish_video_to_piaoquan(oss_path: str, uid: str, title: str) -> Dict
         "appType": 888888,
         "repeatStatus": 1,
     }
-    async with AsyncHttPClient() as client:
+    async with AsyncHttpClient() as client:
         response = await client.post(url, data=payload, headers=headers)
 
     return response

+ 6 - 0
applications/crawler/toutiao/__init__.py

@@ -0,0 +1,6 @@
+"""
+@author: luojunhui
+"""
+
+from .blogger import get_toutiao_account_info_list
+from .detail_recommend import get_associated_recommendation

+ 52 - 0
applications/crawler/toutiao/blogger.py

@@ -0,0 +1,52 @@
+"""
+@author: luojunhui
+"""
+
+from __future__ import annotations
+
+import aiohttp
+
+from applications.utils import async_proxy
+from .use_js import call_js_function
+
+
+async def get_toutiao_account_info_list(
+    account_id: str,
+    cookie: str,
+    media_type: str,
+    max_behot_time=0,
+) -> dict | None:
+    """toutiao blogger"""
+    ms_token = "mFs9gU4FJc23gFWPvBfQxFsBRrx1xBEJD_ZRTAolHfPrae84kTEBaHQR3s8ToiLX4-U9hgATTZ2cVHlSixmj5YCTOPoVM-43gOt3aVHkxfXHEuUtTJe-wUEs%3D"
+    query_params = [
+        0,
+        1,
+        14,
+        "category=pc_user_hot&token={}&aid=24&app_name=toutiao_web&msToken={}".format(
+            account_id, ms_token
+        ),
+        "",
+        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36",
+    ]
+    a_bogus = call_js_function(query_params)
+    headers = {
+        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3",
+        "Cookie": cookie,
+        "Content-Type": "application/json",
+    }
+
+    match media_type:
+        case "article":
+            url = f"https://www.toutiao.com/api/pc/list/user/feed?category=pc_profile_article&token={account_id}&max_behot_time={max_behot_time}&entrance_gid=&aid=24&app_name=toutiao_web&msToken={ms_token}&a_bogus={a_bogus}"
+        case "video":
+            url = f"https://www.toutiao.com/api/pc/list/user/feed?category=pc_profile_video&token={account_id}&max_behot_time={max_behot_time}&hot_video=0&entrance_gid=&aid=24&app_name=toutiao_web&msToken={ms_token}&a_bogus={a_bogus}"
+        case _:
+            return None
+    proxy_url = async_proxy()["url"]
+    proxy_auth = aiohttp.BasicAuth(async_proxy()["username"], async_proxy()["password"])
+    async with aiohttp.ClientSession() as session:
+        async with session.get(
+            url, headers=headers, proxy=proxy_url, proxy_auth=proxy_auth
+        ) as response:
+            response.raise_for_status()
+            return await response.json()

+ 60 - 0
applications/crawler/toutiao/detail_recommend.py

@@ -0,0 +1,60 @@
+"""
+@author: luojunhui
+"""
+
+from __future__ import annotations
+
+import json
+import requests
+from tenacity import retry
+
+from applications.utils import proxy, request_retry
+from .use_js import call_js_function
+
+retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30)
+
+
+@retry(**retry_desc)
+def get_associated_recommendation(article_id: str, cookie: str):
+    """
+    toutiao related recommendation
+    """
+    ms_token = "-aYwLj97uyCi3oghPfhz2nXaekLoFR5YnYUBA5SuyQZae_NLllO4zC30-CeVLth0A6Hmm7MuGr4_IN9MjHUn8wkq-UQKXJxoGmIAokpUsPsOLjdQKffe-cGWCiZ6xqgh7XE%3D"
+    query_params = [
+        0,
+        1,
+        14,
+        "min_behot_time=0&channel_id=91558184576&category=pc_profile_channel&disable_raw_data=true&client_extra_params=%7B%22playparam%22%3A%22codec_type%3A0%2Cenable_dash%3A1%2Cunwatermark%3A1%22%2C%22group_id%22%3A%22{}%22%7D&aid=24&app_name=toutiao_web&msToken={}".format(
+            article_id, ms_token, ms_token
+        ),
+        "",
+        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36",
+    ]
+    a_bogus = call_js_function(query_params)
+    url = f"https://www.toutiao.com/api/pc/list/feed?min_behot_time=0&channel_id=91558184576&category=pc_profile_channel&disable_raw_data=true&client_extra_params=%7B%22playparam%22%3A%22codec_type%3A0%2Cenable_dash%3A1%2Cunwatermark%3A1%22%2C%22group_id%22%3A%22{article_id}%22%7D&aid=24&app_name=toutiao_web&msToken={ms_token}&a_bogus={a_bogus}"
+    headers = {
+        "accept": "application/json, text/plain, */*",
+        "accept-language": "zh",
+        "referer": "https://www.toutiao.com/video/{}/".format(article_id),
+        "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36",
+        "Cookie": cookie,
+    }
+    try:
+        response = requests.get(url, headers=headers, proxies=proxy())
+        response.raise_for_status()
+        return response.json()
+    except requests.exceptions.RequestException as e:
+        log(
+            task="toutiao account crawler",
+            function="get_toutiao_account_video_list",
+            message=f"API请求失败: {e}",
+            data={"account_id": article_id},
+        )
+    except json.JSONDecodeError as e:
+        log(
+            task="toutiao account crawler",
+            function="get_toutiao_account_video_list",
+            message=f"响应解析失败: {e}",
+            data={"account_id": article_id},
+        )
+    return None

+ 0 - 0
applications/crawler/toutiao/main_page_recomend.py


Tiedoston diff-näkymää rajattu, sillä se on liian suuri
+ 7548 - 0
applications/crawler/toutiao/toutiao.js


+ 24 - 0
applications/crawler/toutiao/use_js.py

@@ -0,0 +1,24 @@
+"""
+@author: luojunhui
+"""
+
+import json
+import subprocess
+
+toutiao_js_path = "applications/crawler/toutiao/toutiao.js"
+
+
+def call_js_function(arguments_list):
+    """
+    call js function
+    """
+    # 将参数转换为JSON字符串
+    args_json = json.dumps(arguments_list)
+    # 调用Node.js执行脚本
+    result = subprocess.run(
+        ["node", toutiao_js_path, args_json], capture_output=True, text=True
+    )
+    if result.returncode == 0:
+        return result.stdout.strip()
+    else:
+        raise Exception(f"Error: {result.stderr}")

+ 5 - 3
applications/database/mysql_pools.py

@@ -44,7 +44,7 @@ class DatabaseManager:
             if pool:
                 pool.close()
                 await pool.wait_closed()
-                logging.info("🔌 Closed connection pool for {name}")
+                logging.info(f"Closed connection pool for {name}")
 
     async def async_fetch(
         self, query, db_name="long_articles", params=None, cursor_type=DictCursor
@@ -58,9 +58,11 @@ class DatabaseManager:
                 async with conn.cursor(cursor_type) as cursor:
                     await cursor.execute(query, params)
                     fetch_response = await cursor.fetchall()
-            return fetch_response, None
+
+            return fetch_response
         except Exception as e:
-            return None, str(e)
+            logging.error(f"Failed to fetch {query}: {str(e)}")
+            return None
 
     async def async_save(self, query, params, db_name="long_articles"):
         pool = self.pools[db_name]

+ 1 - 0
applications/pipeline/__init__.py

@@ -1 +1,2 @@
 from .data_recycle_pipeline import insert_article_into_recycle_pool
+from .crawler_pipeline import CrawlerPipeline

+ 124 - 2
applications/pipeline/crawler_pipeline.py

@@ -1,3 +1,125 @@
-class CrawlerPipeline:
+import time
 
-    pass
+from applications.api import AsyncApolloApi
+from applications.utils import CrawlerMetaArticle
+
+
+class CrawlerPipeline(AsyncApolloApi):
+
+    def __init__(self, pool, log_client):
+        super().__init__()
+        self.pool = pool
+        self.log_client = log_client
+
+    async def whether_title_sensitive(self, title: str) -> bool:
+        sensitive_word_list = await self.get_config_value("sensitive_word_list")
+        for word in sensitive_word_list:
+            if word in title:
+                return True
+        return False
+
+    async def whether_article_title_duplicate(self, title: str) -> bool:
+        query = f"""select article_id from crawler_meta_article where title = %s;"""
+        duplicated_id = await self.pool.async_fetch(query=query, params=(title,))
+        return True if duplicated_id else False
+
+    async def save_article(self, article_item: dict) -> None:
+        """save articles into database"""
+        query = f"""
+            insert into crawler_meta_article
+            (platform, mode, category, out_account_id, article_index, title, link, 
+            read_cnt, like_cnt, description, publish_time, crawler_time, score, status,
+            unique_index, source_article_title, source_account, title_sensitivity)
+            values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,%s);
+        """
+        await self.pool.async_save(
+            query=query,
+            params=(
+                article_item.get("platform", "weixin"),
+                article_item.get("mode"),
+                article_item.get("category"),
+                article_item.get("out_account_id"),
+                article_item.get("article_index"),
+                article_item.get("title"),
+                article_item.get("link"),
+                article_item.get("read_cnt", 0),
+                article_item.get("like_cnt", 0),
+                article_item.get("description"),
+                article_item.get("publish_time"),
+                article_item.get("crawler_time", int(time.time())),
+                article_item.get("score"),
+                article_item.get("status", 1),
+                article_item.get("unique_index"),
+                article_item.get("source_article_title", None),
+                article_item.get("source_account", None),
+                article_item.get("title_sensitivity", 0),
+            ),
+        )
+
+    async def save_article_v2(self, article_item: dict) -> None:
+        """save articles into database"""
+        new_article = CrawlerMetaArticle(**article_item)
+        new_article_dict = new_article.model_dump()
+        insert_template = (
+            """insert into crawler_meta_article ({columns}) values ({values});"""
+        )
+        insert_data = {k: v for k, v in new_article_dict.items() if v is not None}
+        columns = ", ".join(insert_data.keys())
+        values = ", ".join([f"%s" for i in range(len(insert_data))])
+        query = insert_template.format(columns=columns, values=values)
+        await self.pool.async_save(
+            query=query,
+            params=tuple(list(insert_data.values())),
+        )
+
+    async def save_video(self, video_item: dict) -> str:
+        pass
+
+    async def save_item_to_database(self, media_type: str, item: dict):
+        """deal function"""
+        match media_type:
+            case "video":
+                await self.save_video(item)
+
+            case "article":
+                log_data = {
+                    "title": item["title"],
+                    "platform": item["platform"],
+                    "mode": item["mode"],
+                    "source": item["category"],
+                }
+                await self.log_client.log(
+                    contents={
+                        "task": "save_article",
+                        "data": log_data,
+                        "message": "start crawler article",
+                        "code": 1001,
+                    }
+                )
+                # 判断文章标题是否已经存在
+                if await self.whether_article_title_duplicate(log_data["title"]):
+                    await self.log_client.log(
+                        contents={
+                            "task": "save_article",
+                            "data": log_data,
+                            "message": "duplicate article title",
+                            "code": 1002,
+                        }
+                    )
+                    return
+                # 判断标题是否敏感
+                if await self.whether_title_sensitive(item["title"]):
+                    await self.log_client.log(
+                        contents={
+                            "task": "save_article",
+                            "data": log_data,
+                            "message": "title_sensitive",
+                            "code": 1003,
+                        }
+                    )
+                    item["title_sensitive"] = 1
+                # save article
+                await self.save_article_v2(item)
+
+            case _:
+                raise Exception("Unknown media type")

+ 1 - 0
applications/tasks/crawler_tasks/__init__.py

@@ -0,0 +1 @@
+from .crawler_toutiao import CrawlerToutiao

+ 229 - 0
applications/tasks/crawler_tasks/crawler_toutiao.py

@@ -0,0 +1,229 @@
+from __future__ import annotations
+
+import json
+import time
+import traceback
+from typing import List, Dict
+
+from tqdm import tqdm
+
+from applications.api import feishu_robot
+from applications.crawler.toutiao import get_toutiao_account_info_list
+from applications.pipeline import CrawlerPipeline
+
+
+class CrawlerToutiaoConst:
+    # platform
+    PLATFORM = "toutiao"
+
+    # account status
+    TOUTIAO_ACCOUNT_GOOD_STATUS = 1
+    TOUTIAO_ACCOUNT_BAD_STATUS = 0
+
+    # earliest cursor, 2021-01-01 00:00:00
+    DEFAULT_CURSOR = 1609430400
+
+    # no source account
+    NO_SOURCE_ACCOUNT_STATUS = 0
+
+    # title length min
+    MIN_TITLE_LENGTH = 10
+
+    # max video length(second)
+    MAX_VIDEO_LENGTH = 600
+
+    # sleep second
+    SLEEP_SECOND = 3
+
+
+class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
+    def __init__(self, pool, log_client):
+        super().__init__(pool, log_client)
+
+    async def get_account_list(self, media_type: str) -> List[dict]:
+        """get toutiao account list"""
+        match media_type:
+            case "video":
+                table = "video_meta_accounts"
+            case "article":
+                table = "article_meta_accounts"
+            case _:
+                return []
+
+        # fetch query
+        query = f"""
+            select account_id, max_cursor
+            from {table}
+            where platform = 'toutiao' and status = {self.TOUTIAO_ACCOUNT_GOOD_STATUS};
+        """
+        response = await self.pool.async_fetch(query)
+        if not response:
+            await feishu_robot.bot(
+                title=f"抓取头条账号内容任务: 任务模态:{media_type} 异常",
+                detail={"platform": self.PLATFORM, "error": "获取账号异常"},
+            )
+            return []
+        else:
+            return response
+
+    async def crawler_each_account_info_list(
+        self,
+        account_id: str,
+        media_type: str,
+        max_cursor: int | None,
+        max_behot_time: int = 0,
+    ):
+        """
+        account_id: toutiao account id
+        max_cursor: crawler latest cursor for each account
+        max_behot_time: max behot time from toutiao, use to switch to next page
+        """
+        has_more = True
+        current_cursor = max_behot_time
+        max_cursor = max_cursor or self.DEFAULT_CURSOR
+        cookie = await self.get_config_value(
+            key="toutiao_blogger_cookie", output_type="string"
+        )
+        while has_more:
+            response = await get_toutiao_account_info_list(
+                account_id=account_id,
+                cookie=cookie,
+                media_type=media_type,
+                max_behot_time=current_cursor,
+            )
+            if not response:
+                break
+
+            if response["message"] != "success":
+                break
+
+            info_list = response["data"]
+            has_more = response["has_more"]
+            current_cursor = response["next"]["max_behot_time"]
+
+            if not info_list:
+                break
+
+            max_timestamp_in_this_group = info_list[0]["publish_time"]
+            if max_timestamp_in_this_group < max_cursor:
+                break
+
+            # do crawler
+            match media_type:
+                case "video":
+                    bar_description = "crawler videos"
+                case "article":
+                    bar_description = "crawler articles"
+                case _:
+                    raise Exception(f"unknown media type: {media_type}")
+
+            crawler_info_list_bar = tqdm(info_list, desc=bar_description)
+            for info in crawler_info_list_bar:
+                try:
+                    crawler_info_list_bar.set_postfix({"id": info["id"]})
+                    match media_type:
+                        case "video":
+                            await self.crawler_each_video(info)
+                        case "article":
+                            await self.crawler_each_article(info)
+                        case _:
+                            raise Exception(f"unknown media type: {media_type}")
+
+                except Exception as e:
+                    raise Exception(f"crawler each info failed: {e}")
+
+            if has_more:
+                time.sleep(self.SLEEP_SECOND)
+            else:
+                break
+
+    async def crawler_each_article(self, article_raw_data):
+        """
+        crawler each article
+        """
+        new_article_item = {
+            "platform": self.PLATFORM,
+            "mode": "account",
+            "category": "toutiao_account_association",
+            "out_account_id": article_raw_data["user_info"]["user_id"],
+            "title": article_raw_data["title"],
+            "link": f"https://www.toutiao.com/article/{article_raw_data['group_id']}",
+            "read_cnt": article_raw_data["read_count"],
+            "like_cnt": article_raw_data["like_count"],
+            "description": article_raw_data["abstract"],
+            "publish_time": article_raw_data["publish_time"],
+            "unique_index": article_raw_data["group_id"],
+        }
+        await self.save_item_to_database(media_type="article", item=new_article_item)
+
+    async def crawler_each_video(self, video_raw_data):
+        pass
+
+    async def update_account_max_cursor(self, media_type: str, account_id: str) -> None:
+        """
+        update account max cursor
+        """
+        match media_type:
+            case "video":
+                query = f"""
+                    select max(publish_timestamp) as max_cursor 
+                    from publish_single_video_source 
+                    where out_account_id = %s and platform = %s;
+                """
+                table = "video_meta_accounts"
+            case "article":
+                query = f"""
+                    select max(publish_time) as max_cursor
+                    from crawler_meta_article 
+                    where out_account_id = %s and platform = %s;
+                """
+                table = "article_meta_accounts"
+            case _:
+                raise Exception(f"unknown media type: {media_type}")
+
+        response = await self.pool.async_fetch(
+            query, params=(account_id, self.PLATFORM)
+        )
+        max_publish_timestamp = response[0]["max_cursor"]
+
+        if max_publish_timestamp:
+            query = f"""
+                update {table}
+                set max_cursor = %s
+                where account_id = %s and platform = %s;
+            """
+            await self.pool.async_save(
+                query, (max_publish_timestamp, account_id, self.PLATFORM)
+            )
+
+    async def crawler_task(self, media_type: str) -> None:
+        """
+        class entrance
+        """
+        account_list = await self.get_account_list(media_type=media_type)
+        account_list_bar = tqdm(account_list, desc="crawler toutiao accounts")
+        for account in account_list_bar:
+            account_id = account["account_id"]
+            max_cursor = account["max_cursor"]
+            try:
+                account_list_bar.set_postfix({"account_id": account_id})
+                await self.crawler_each_account_info_list(
+                    account_id=account_id, media_type=media_type, max_cursor=max_cursor
+                )
+                await self.update_account_max_cursor(
+                    media_type=media_type, account_id=account_id
+                )
+
+            except Exception as e:
+                await self.log_client.log(
+                    contents={
+                        "task": "crawler_toutiao_account_info",
+                        "function": "crawler_task",
+                        "message": account_id,
+                        "data": {
+                            "media_type": media_type,
+                            "error": str(e),
+                            "traceback": traceback.format_exc(),
+                        },
+                    }
+                )

+ 8 - 18
applications/tasks/data_recycle_tasks/recycle_daily_publish_articles.py

@@ -71,7 +71,7 @@ class RecycleDailyPublishArticlesTask(Const):
             where t1.plan_status = 1 and t1.content_modal = 3 and t3.channel = 5
             group by t3.id;
         """
-        account_list, error = await self.pool.async_fetch(query, db_name="aigc")
+        account_list = await self.pool.async_fetch(query, db_name="aigc")
         return [i for i in account_list if "自动回复" not in str(i["account_remark"])]
 
     async def get_account_status(self):
@@ -81,7 +81,7 @@ class RecycleDailyPublishArticlesTask(Const):
             from wx_statistics_group_source_account t1
             join wx_statistics_group_source t2 on t1.group_source_name = t2.account_source_name;
         """
-        account_status_list, error = await self.pool.async_fetch(sql, db_name="aigc")
+        account_status_list = await self.pool.async_fetch(sql, db_name="aigc")
         account_status_dict = {
             account["account_id"]: account["status"] for account in account_status_list
         }
@@ -92,7 +92,7 @@ class RecycleDailyPublishArticlesTask(Const):
         query = f"""
             select max(publish_timestamp) as publish_timestamp from official_articles_v2 where ghId = %s;
         """
-        response, error = await self.pool.async_fetch(
+        response = await self.pool.async_fetch(
             query, params=(account["gh_id"],), db_name="piaoquan_crawler"
         )
         if response:
@@ -184,26 +184,16 @@ class CheckDailyPublishArticlesTask(RecycleDailyPublishArticlesTask):
             select accountName, count(1) as publish_count 
             from official_articles_v2 where ghId = %s and from_unixtime(publish_timestamp) > %s;
         """
-        response, error = await self.pool.async_fetch(
+        response = await self.pool.async_fetch(
             query=query,
             db_name="piaoquan_crawler",
             params=(account["gh_id"], date_string),
         )
-        if error:
-            await feishu_robot.bot(
-                title="sql错误",
-                detail={
-                    "task": "CheckDailyPublishArticlesTask",
-                    "function": "check_account",
-                    "account": account,
-                    "date_string": date_string,
-                },
-                mention=False,
-            )
-            return False
-        else:
+        if response:
             today_publish_count = response[0]["publish_count"]
             return today_publish_count > 0
+        else:
+            return False
 
     async def deal(self):
         task_list = await self.get_task_list()
@@ -279,7 +269,7 @@ class UpdateRootSourceIdAndUpdateTimeTask(Const):
 
     async def get_article_list(self):
         query = f"""select ContentUrl, wx_sn from official_articles_v2 where publish_timestamp in %s;"""
-        article_list, error = await self.pool.async_fetch(
+        article_list = await self.pool.async_fetch(
             query=query, db_name="piaoquan_crawler", params=(tuple([0, -1]),)
         )
         return article_list

+ 1 - 1
applications/tasks/monitor_tasks/__init__.py

@@ -4,4 +4,4 @@ from .get_off_videos import CheckVideoAuditStatus
 from .gzh_article_monitor import OutsideGzhArticlesMonitor
 from .gzh_article_monitor import OutsideGzhArticlesCollector
 from .gzh_article_monitor import InnerGzhArticlesMonitor
-from .task_processing_monitor import TaskProcessingMonitor
+from .task_processing_monitor import TaskProcessingMonitor

+ 5 - 6
applications/tasks/monitor_tasks/get_off_videos.py

@@ -43,7 +43,7 @@ class GetOffVideos(GetOffVideosConst):
         query = f"""
             select video_id from {self.table} where video_status = %s and publish_time between %s and %s;
         """
-        video_list, error = await self.db_client.async_fetch(
+        video_list = await self.db_client.async_fetch(
             query,
             params=(
                 self.VIDEO_AVAILABLE_STATUS,
@@ -150,14 +150,13 @@ class CheckVideoAuditStatus(GetOffVideosConst):
         query = f"""
             select video_id from {self.table} where check_status = %s and video_status = %s limit 1000;
         """
-        video_id_list, error = await self.db_client.async_fetch(
+        video_id_list = await self.db_client.async_fetch(
             query, params=(self.CHECK_INIT_STATUS, self.VIDEO_AVAILABLE_STATUS)
         )
-        if error:
-            print("error", error)
-            return None
-        else:
+        if video_id_list:
             return [i["video_id"] for i in video_id_list]
+        else:
+            return None
 
     async def update_check_status(self, video_list: List[int]):
         query = f"""update {self.table} set check_status = %s where video_id in %s;"""

+ 7 - 9
applications/tasks/monitor_tasks/gzh_article_monitor.py

@@ -58,7 +58,7 @@ class OutsideGzhArticlesManager(MonitorConst):
             order by publish_timestamp desc
             limit %s;
         """
-        response, error = await self.pool.async_fetch(query=query, params=(gh_id, 1))
+        response = await self.pool.async_fetch(query=query, params=(gh_id, 1))
         if response:
             publish_timestamp = response[0]["publish_timestamp"]
             if publish_timestamp is None:
@@ -84,7 +84,7 @@ class OutsideGzhArticlesCollector(OutsideGzhArticlesManager):
             where
                 t1.mode_type = '代运营服务号';
         """
-        response, error = await self.pool.async_fetch(query=query, db_name="aigc")
+        response = await self.pool.async_fetch(query=query, db_name="aigc")
         return response
 
     async def fetch_each_account(self, account: dict):
@@ -187,7 +187,7 @@ class OutsideGzhArticlesMonitor(OutsideGzhArticlesManager):
             from outside_gzh_account_monitor
             where illegal_status = {self.INIT_STATUS} and publish_timestamp > {publish_timestamp_threshold};
         """
-        response, error = await self.pool.async_fetch(query=fetch_query)
+        response = await self.pool.async_fetch(query=fetch_query)
         return response
 
     async def check_each_article(self, article: dict):
@@ -246,7 +246,7 @@ class InnerGzhArticlesMonitor(MonitorConst):
         query = f"""
             select title_md5 from article_unsafe_title where title_md5 = '{title_md5}';
         """
-        response, error = await self.pool.async_fetch(query=query)
+        response = await self.pool.async_fetch(query=query)
         return True if response else False
 
     async def fetch_article_list_to_check(self, run_date: str = None) -> Optional[List]:
@@ -266,13 +266,11 @@ class InnerGzhArticlesMonitor(MonitorConst):
             where publish_timestamp >= {start_timestamp}
             order by publish_timestamp desc;
         """
-        response, error = await self.pool.async_fetch(
-            query=query, db_name="piaoquan_crawler"
-        )
-        if error:
+        response = await self.pool.async_fetch(query=query, db_name="piaoquan_crawler")
+        if not response:
             await feishu_robot.bot(
                 title="站内微信公众号发文监测任务异常",
-                detail={"error": error, "message": "查询数据库异常"},
+                detail={"message": "查询数据库异常"},
             )
             return None
         else:

+ 2 - 2
applications/tasks/monitor_tasks/kimi_balance.py

@@ -1,7 +1,7 @@
 import traceback
 from typing import Dict
 from applications.api import feishu_robot
-from applications.utils import AsyncHttPClient
+from applications.utils import AsyncHttpClient
 
 # const
 BALANCE_LIMIT_THRESHOLD = 100.0
@@ -15,7 +15,7 @@ async def check_kimi_balance() -> Dict:
     }
 
     try:
-        async with AsyncHttPClient() as client:
+        async with AsyncHttpClient() as client:
             response = await client.get(url, headers=headers)
 
         balance = response["data"]["available_balance"]

+ 7 - 7
applications/tasks/monitor_tasks/task_processing_monitor.py

@@ -13,16 +13,18 @@ class TaskProcessingMonitor(TaskMapper):
         query = f"""
             select task_name, start_timestamp from long_articles_task_manager where task_status = %s;
         """
-        processing_task_list, error = await self.pool.async_fetch(query=query, params=(self.TASK_PROCESSING_STATUS,))
+        processing_task_list = await self.pool.async_fetch(
+            query=query, params=(self.TASK_PROCESSING_STATUS,)
+        )
         return processing_task_list
 
     async def deal(self):
         tasks = await self.get_processing_tasks()
         bad_tasks = []
         for task in tasks:
-            task_name = task['task_name']
-            start_timestamp = task['start_timestamp']
-            task_timeout = self.get_task_config(task_name)['expire_duration']
+            task_name = task["task_name"]
+            start_timestamp = task["start_timestamp"]
+            task_timeout = self.get_task_config(task_name)["expire_duration"]
             if int(time.time()) - start_timestamp >= task_timeout:
                 bad_tasks.append(task_name)
 
@@ -30,7 +32,5 @@ class TaskProcessingMonitor(TaskMapper):
             await feishu_robot.bot(
                 title="任务执行异常,超过正常时间,请注意!",
                 detail=bad_tasks,
-                mention=True
+                mention=True,
             )
-
-

+ 4 - 1
applications/tasks/task_mapper.py

@@ -3,7 +3,7 @@ class Const:
     TASK_INIT_STATUS = 0
     TASK_PROCESSING_STATUS = 1
     TASK_SUCCESS_STATUS = 2
-    TASK_FAILED_STATUS = 3
+    TASK_FAILED_STATUS = 99
 
     # DEFAULT
     DEFAULT_TIMEOUT = 1800
@@ -17,6 +17,7 @@ class Const:
     TITLE_REWRITE_TIMEOUT = 1800
     RECYCLE_DAILY_ARTICLE_TIMEOUT = 3600
     UPDATE_ROOT_SOURCE_ID_TIMEOUT = 3600
+    CRAWLER_TOUTIAO_ARTICLES_TIMEOUT = 5 * 3600
 
 
 class TaskMapper(Const):
@@ -39,6 +40,8 @@ class TaskMapper(Const):
                 expire_duration = self.RECYCLE_DAILY_ARTICLE_TIMEOUT
             case "update_root_source_id":
                 expire_duration = self.UPDATE_ROOT_SOURCE_ID_TIMEOUT
+            case "crawler_toutiao_articles":
+                expire_duration = self.CRAWLER_TOUTIAO_ARTICLES_TIMEOUT
             case _:
                 expire_duration = self.DEFAULT_TIMEOUT
 

+ 48 - 17
applications/tasks/task_scheduler.py

@@ -5,6 +5,7 @@ from datetime import datetime
 from applications.api import feishu_robot
 from applications.utils import task_schedule_response
 
+from applications.tasks.crawler_tasks import CrawlerToutiao
 from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
 from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
 from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
@@ -31,9 +32,7 @@ class TaskScheduler(TaskMapper):
         query = f"""
             select start_timestamp from {self.table} where task_name = %s and task_status = %s;
         """
-        response, error = await self.db_client.async_fetch(
-            query=query, params=(task_name, 1)
-        )
+        response = await self.db_client.async_fetch(query=query, params=(task_name, 1))
         if not response:
             # no task is processing
             return False
@@ -60,21 +59,35 @@ class TaskScheduler(TaskMapper):
     async def lock_task(self, task_name, date_string):
         query = f"""update {self.table} set task_status = %s where task_name = %s and date_string = %s and task_status = %s;"""
         return await self.db_client.async_save(
-            query=query, params=(1, task_name, date_string, 0)
+            query=query,
+            params=(
+                self.TASK_PROCESSING_STATUS,
+                task_name,
+                date_string,
+                self.TASK_INIT_STATUS,
+            ),
         )
 
-    async def release_task(self, task_name, date_string, final_status):
+    async def release_task(self, task_name, date_string, final_status=None):
         """
         任务执行完成之后,将任务状态设置为完成状态/失败状态
         """
+        if not final_status:
+            final_status = self.TASK_SUCCESS_STATUS
         query = f"""
-               update {self.table} 
-               set task_status = %s, finish_timestamp = %s
-               where task_name = %s and date_string = %s and task_status = %s;
-           """
+           update {self.table} 
+           set task_status = %s, finish_timestamp = %s
+           where task_name = %s and date_string = %s and task_status = %s;
+        """
         return await self.db_client.async_save(
             query=query,
-            params=(final_status, int(time.time()), task_name, date_string, 1),
+            params=(
+                final_status,
+                int(time.time()),
+                task_name,
+                date_string,
+                self.TASK_PROCESSING_STATUS,
+            ),
         )
 
     async def deal(self):
@@ -209,7 +222,7 @@ class TaskScheduler(TaskMapper):
                     sub_task = TitleRewrite(self.db_client, self.log_client)
                     await sub_task.deal()
                     await self.release_task(
-                        task_name=task_name, date_string=date_string, final_status=2
+                        task_name=task_name, date_string=date_string
                     )
 
                 asyncio.create_task(background_title_rewrite())
@@ -228,14 +241,12 @@ class TaskScheduler(TaskMapper):
                         self.db_client, self.log_client, date_string
                     )
                     await sub_task.deal()
-
                     task = CheckDailyPublishArticlesTask(
                         self.db_client, self.log_client, date_string
                     )
                     await task.deal()
-
                     await self.release_task(
-                        task_name=task_name, date_string=date_string, final_status=2
+                        task_name=task_name, date_string=date_string
                     )
 
                 asyncio.create_task(background_daily_publish_articles_recycle())
@@ -252,7 +263,7 @@ class TaskScheduler(TaskMapper):
                     )
                     await sub_task.deal()
                     await self.release_task(
-                        task_name=task_name, date_string=date_string, final_status=2
+                        task_name=task_name, date_string=date_string
                     )
 
                 asyncio.create_task(background_update_root_source_id())
@@ -262,10 +273,13 @@ class TaskScheduler(TaskMapper):
                 )
 
             case "task_processing_monitor":
+
                 async def background_task_processing_monitor():
                     sub_task = TaskProcessingMonitor(self.db_client)
                     await sub_task.deal()
-                    await self.release_task(task_name=task_name, date_string=date_string, final_status=2)
+                    await self.release_task(
+                        task_name=task_name, date_string=date_string
+                    )
 
                 asyncio.create_task(background_task_processing_monitor())
                 return await task_schedule_response.success_response(
@@ -273,6 +287,23 @@ class TaskScheduler(TaskMapper):
                     data={"code": 0, "message": "task started background"},
                 )
 
+            case "crawler_toutiao_articles":
+
+                async def background_crawler_toutiao_articles():
+                    sub_task = CrawlerToutiao(self.db_client, self.log_client)
+                    await sub_task.crawler_task(
+                        media_type=self.data.get("media_type", "article")
+                    )
+                    await self.release_task(
+                        task_name=task_name, date_string=date_string
+                    )
+
+                asyncio.create_task(background_crawler_toutiao_articles())
+                return await task_schedule_response.success_response(
+                    task_name=task_name,
+                    data={"code": 0, "message": "task started background"},
+                )
+
             case _:
                 await self.log_client.log(
                     contents={
@@ -283,7 +314,7 @@ class TaskScheduler(TaskMapper):
                         "data": self.data,
                     }
                 )
-                await self.release_task(task_name, date_string, 99)
+                await self.release_task(task_name, date_string, self.TASK_FAILED_STATUS)
                 return await task_schedule_response.fail_response(
                     error_code="4001", error_message="wrong task name input"
                 )

+ 4 - 1
applications/utils/__init__.py

@@ -2,7 +2,7 @@
 from .async_apollo_client import AsyncApolloClient
 
 # import async http client
-from .async_http_client import AsyncHttPClient
+from .async_http_client import AsyncHttpClient
 
 from .get_cover import fetch_channel_info
 from .get_cover import fetch_aigc_cover
@@ -14,4 +14,7 @@ from .response import TaskScheduleResponse
 # common
 from .common import *
 
+# import item
+from .item import CrawlerMetaArticle
+
 task_schedule_response = TaskScheduleResponse()

+ 1 - 1
applications/utils/async_http_client.py

@@ -2,7 +2,7 @@ import aiohttp
 from typing import Optional, Union, Dict, Any
 
 
-class AsyncHttPClient:
+class AsyncHttpClient:
     def __init__(
         self,
         timeout: int = 10,

+ 8 - 0
applications/utils/common.py

@@ -50,6 +50,14 @@ def proxy():
     return proxies
 
 
+def async_proxy():
+    return {
+        "url": "http://j685.kdltps.com:15818",
+        "username": "t14070979713487",
+        "password": "hqwanfvy",
+    }
+
+
 def request_retry(retry_times, min_retry_delay, max_retry_delay):
     """
     :param retry_times:

+ 43 - 0
applications/utils/item.py

@@ -0,0 +1,43 @@
+"""
+@author: luojunhui
+"""
+
+import time
+
+from pydantic import BaseModel, Field
+from typing import Optional
+
+
+class CrawlerMetaArticle(BaseModel):
+    platform: str = Field(default=..., description="抓取平台")
+    mode: str = Field(default=..., description="抓取模式")
+    category: str = Field(
+        default=..., description="抓取类型:最初设计不合理,积重难返,实际与品类无关"
+    )
+    out_account_id: str = Field(default=..., description="抓取账号账号id")
+    article_index: str = Field(
+        default=None, description="群发发文位置,常见于微信公众号"
+    )
+    title: str = Field(default=..., description="文章标题")
+    link: str = Field(default=..., description="文章链接")
+    read_cnt: int = Field(default=0, description="阅读量")
+    like_cnt: int = Field(default=0, description="点赞量")
+    description: Optional[str] = Field(
+        default=None, max_length=255, description="文章简介"
+    )
+    publish_time: int = Field(default=None, description="文章发布时间")
+    crawler_time: int = Field(default=int(time.time()), description="抓取时间")
+    score: float = Field(default=None, description="相似度分")
+    status: int = Field(default=1, description="文章状态")
+    unique_index: str = Field(default=..., description="文章唯一index")
+    source_article_title: str = Field(default=None, description="文章联想的种子文章")
+    source_account: str = Field(default=None, description="账号联想种子账号")
+    title_sensitivity: int = Field(default=0, description="标题是否敏感")
+    category_status: int = Field(
+        default=0,
+        description="品类处理状态 0: init; 1: processing; 2: successfully; 99: fail",
+    )
+    has_video: int = Field(
+        default=0,
+        description="文章内嵌套视频状态 0: init; 1: processing; 2: successfully; 3:article link bad ;99: fail",
+    )

+ 3 - 2
requirements.txt

@@ -15,6 +15,7 @@ tqdm~=4.66.6
 pyapollos~=0.1.5
 pyotp~=2.9.0
 elasticsearch~=8.17.2
-openai~=1.97.0
+openai~=1.47.1
 tenacity~=9.0.0
-fake-useragent~=2.1.0
+fake-useragent~=2.1.0
+pydantic~=2.10.6

Kaikkia tiedostoja ei voida näyttää, sillä liian monta tiedostoa muuttui tässä diffissä