Jelajahi Sumber

Merge branch 'feature/luojunhui/2025-08-07-crawl-weixin-account-articles-improve' of Server/LongArticleTaskServer into master

luojunhui 4 minggu lalu
induk
melakukan
34151d8982

+ 55 - 3
README.md

@@ -117,16 +117,20 @@ docker compose up -d
 tree -I "__pycache__|*.pyc"
 ```
 
-## 数据任务
+## 1. 数据任务
 #### daily发文数据回收
 ```aiignore
 curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "daily_publish_articles_recycle"}'
 ```
+#### daily发文更新root_source_id
+```aiignore
+curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "update_root_source_id"}'
+```
 #### 账号质量处理
 ```aiignore
 curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "candidate_account_quality_analysis"}'
 ```
-## 抓取任务
+## 2. 抓取任务
 
 #### 今日头条账号内文章抓取
 ```aiignore
@@ -140,8 +144,20 @@ curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: applicati
 ```aiignore
 curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "crawler_toutiao", "method": "search"}'
 ```
+#### 抓取账号管理(微信)
+```aiignore
+curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "crawler_account_manager", "platform": "weixin"}'
+```
+#### 抓取微信文章(抓账号模式)
+```aiignore
+curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "crawler_gzh_articles", "account_method": "account_association", "crawl_mode": "account"}'
+```
+#### 抓取微信文章(搜索模式)
+```aiignore
+curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "crawler_gzh_articles", "account_method": "search", "crawl_mode": "search"}'
+```
 
-## 冷启动发布任务
+## 3. 冷启动发布任务
 #### 发布头条文章
 ```aiignore
 curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "article_pool_cold_start", "platform": "toutiao", "crawler_methods": ["toutiao_account_association"]}'
@@ -151,4 +167,40 @@ curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: applicati
 curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "article_pool_cold_start"}'
 ```
 
+## 4. 其他
+#### 校验kimi余额
+```aiignore
+curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "check_kimi_balance"}'
+```
+#### 自动下架视频
+```aiignore
+curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "get_off_videos"}'
+```
+#### 校验视频可见状态
+```aiignore
+curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "check_publish_video_audit_status"}'
+```
+#### 外部服务号监测
+```aiignore
+curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "outside_article_monitor"}'
+```
+#### 站内服务号发文监测
+```aiignore
+curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "inner_article_monitor"}'
+```
+#### 标题重写
+```aiignore
+curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "title_rewrite"}'
+```
+#### 为标题增加品类(文章池)
+```aiignore
+curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "article_pool_category_generation", "limit": "1000"}'
+```
+#### 候选账号质量分析
+```aiignore
+curl -X POST http://192.168.142.66:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "candidate_account_quality_analysis"}'
+```
+
+
+
 

+ 1 - 1
app_config.toml

@@ -1,6 +1,6 @@
 reload = true
 bind = "0.0.0.0:6060"
-workers = 6
+workers = 8
 keep_alive_timeout = 120  # 保持连接的最大秒数,根据需要调整
 graceful_timeout = 30    # 重启或停止之前等待当前工作完成的时间
 loglevel = "warning"  # 日志级别

+ 58 - 47
applications/crawler/wechat/gzh_spider.py

@@ -8,6 +8,7 @@ from tenacity import retry
 
 from applications.api import log
 from applications.utils import request_retry
+from applications.utils import AsyncHttpClient
 
 retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30)
 
@@ -17,7 +18,7 @@ headers = {"Content-Type": "application/json"}
 
 
 @retry(**retry_desc)
-def get_article_detail(
+async def get_article_detail(
     article_link: str, is_count: bool = False, is_cache: bool = True
 ) -> dict | None:
     """
@@ -32,54 +33,61 @@ def get_article_detail(
             "is_cache": is_cache,
         }
     )
-    try:
-        response = requests.post(
-            url=target_url, headers=headers, data=payload, timeout=120
-        )
-        response.raise_for_status()
-        return response.json()
-    except requests.exceptions.RequestException as e:
-        log(
-            task="get_official_article_detail",
-            function="get_official_article_detail",
-            message=f"API请求失败: {e}",
-            data={"link": article_link},
-        )
-    except json.JSONDecodeError as e:
-        log(
-            task="get_official_article_detail",
-            function="get_official_article_detail",
-            message=f"响应解析失败: {e}",
-            data={"link": article_link},
-        )
-    return None
+    async with AsyncHttpClient(timeout=10) as http_client:
+        response = await http_client.post(target_url, headers=headers, data=payload)
+
+    return response
+    # try:
+    #     response = requests.post(
+    #         url=target_url, headers=headers, data=payload, timeout=120
+    #     )
+    #     response.raise_for_status()
+    #     return response.json()
+    # except requests.exceptions.RequestException as e:
+    #     log(
+    #         task="get_official_article_detail",
+    #         function="get_official_article_detail",
+    #         message=f"API请求失败: {e}",
+    #         data={"link": article_link},
+    #     )
+    # except json.JSONDecodeError as e:
+    #     log(
+    #         task="get_official_article_detail",
+    #         function="get_official_article_detail",
+    #         message=f"响应解析失败: {e}",
+    #         data={"link": article_link},
+    #     )
+    # return None
 
 
 @retry(**retry_desc)
-def get_article_list_from_account(account_id: str, index=None) -> dict | None:
+async def get_article_list_from_account(account_id: str, index=None) -> dict | None:
     target_url = f"{base_url}/blogger"
     payload = json.dumps({"account_id": account_id, "cursor": index})
-    try:
-        response = requests.post(
-            url=target_url, headers=headers, data=payload, timeout=120
-        )
-        response.raise_for_status()
-        return response.json()
-    except requests.exceptions.RequestException as e:
-        log(
-            task="get_official_account_article_list",
-            function="get_official_account_article_list",
-            message=f"API请求失败: {e}",
-            data={"gh_id": account_id},
-        )
-    except json.JSONDecodeError as e:
-        log(
-            task="get_official_account_article_list",
-            function="get_official_account_article_list",
-            message=f"响应解析失败: {e}",
-            data={"gh_id": account_id},
-        )
-    return None
+    async with AsyncHttpClient(timeout=120) as http_client:
+        response = await http_client.post(target_url, headers=headers, data=payload)
+    return response
+    # try:
+    #     response = requests.post(
+    #         url=target_url, headers=headers, data=payload, timeout=120
+    #     )
+    #     response.raise_for_status()
+    #     return response.json()
+    # except requests.exceptions.RequestException as e:
+    #     log(
+    #         task="get_official_account_article_list",
+    #         function="get_official_account_article_list",
+    #         message=f"API请求失败: {e}",
+    #         data={"gh_id": account_id},
+    #     )
+    # except json.JSONDecodeError as e:
+    #     log(
+    #         task="get_official_account_article_list",
+    #         function="get_official_account_article_list",
+    #         message=f"响应解析失败: {e}",
+    #         data={"gh_id": account_id},
+    #     )
+    # return None
 
 
 @retry(**retry_desc)
@@ -124,8 +132,11 @@ def get_source_account_from_article(article_link) -> dict | None:
 
 
 @retry(**retry_desc)
-def search(keyword: str, page="1") -> dict | None:
+async def weixin_search(keyword: str, page="1") -> dict | None:
     url = "{}/keyword".format(base_url)
     payload = json.dumps({"keyword": keyword, "cursor": page})
-    response = requests.request("POST", url, headers=headers, data=payload, timeout=120)
-    return response.json()
+    # response = requests.request("POST", url, headers=headers, data=payload, timeout=120)
+    async with AsyncHttpClient(timeout=120) as http_client:
+        response = await http_client.post(url=url, headers=headers, data=payload)
+
+    return response

+ 1 - 2
applications/pipeline/crawler_pipeline.py

@@ -60,6 +60,7 @@ class CrawlerPipeline(AsyncApolloApi):
 
     async def save_item_to_database(self, media_type: str, item: dict, trace_id: str):
         """deal function"""
+        item['trace_id'] = trace_id
         match media_type:
             case "video":
                 await self.save_single_record(media_type, item)
@@ -109,7 +110,6 @@ class CrawlerPipeline(AsyncApolloApi):
                         "message": "save article successfully",
                     }
                 )
-
             case "account":
                 if await self.whether_account_exist(
                     item["account_id"], item["media_type"]
@@ -125,6 +125,5 @@ class CrawlerPipeline(AsyncApolloApi):
                         "message": "save account successfully",
                     }
                 )
-
             case _:
                 raise Exception("Unknown media type")

+ 8 - 1
applications/tasks/crawler_tasks/__init__.py

@@ -1,4 +1,11 @@
 from .crawler_toutiao import CrawlerToutiao
 from .crawler_account_manager import WeixinAccountManager
+from .crawler_gzh import CrawlerGzhAccountArticles
+from .crawler_gzh import CrawlerGzhSearchArticles
 
-__all__ = ["CrawlerToutiao", "WeixinAccountManager"]
+__all__ = [
+    "CrawlerToutiao",
+    "WeixinAccountManager",
+    "CrawlerGzhAccountArticles",
+    "CrawlerGzhSearchArticles"
+]

+ 0 - 1
applications/tasks/crawler_tasks/crawler_account_manager.py

@@ -146,4 +146,3 @@ class WeixinAccountManager(CrawlerAccountManager):
 
         for account_id in tqdm(account_id_list):
             await self.analysis_single_account(account_id)
-

+ 349 - 0
applications/tasks/crawler_tasks/crawler_gzh.py

@@ -0,0 +1,349 @@
+from __future__ import annotations
+
+import asyncio
+import time
+import traceback
+from datetime import datetime, date, timedelta
+from typing import List, Dict
+
+from applications.api import feishu_robot
+from applications.crawler.wechat import weixin_search
+from applications.crawler.wechat import get_article_detail
+from applications.crawler.wechat import get_article_list_from_account
+from applications.pipeline import CrawlerPipeline
+from applications.utils import timestamp_to_str, show_desc_to_sta
+from applications.utils import get_hot_titles, generate_gzh_id
+
+
+class CrawlerGzhConst:
+    PLATFORM = "weixin"
+    DEFAULT_VIEW_COUNT = 0
+    DEFAULT_LIKE_COUNT = 0
+    DEFAULT_ARTICLE_STATUS = 1
+    MAX_DEPTH = 3
+    #
+    SLEEP_SECONDS = 1
+
+    STAT_DURATION = 30  # days
+    DEFAULT_TIMESTAMP = 1735660800
+
+
+class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
+    def __init__(self, pool, log_client, trace_id):
+        super().__init__(pool, log_client)
+        self.trace_id = trace_id
+
+    async def get_crawler_accounts(self, method: str, strategy: str) -> List[Dict]:
+        """get crawler accounts"""
+        match strategy:
+            case "V1":
+                query = """
+                    select gh_id, account_name, latest_update_time
+                    from long_articles_accounts
+                    where account_category = %s and is_using = %s and daily_scrape = %s;
+                """
+                return await self.pool.async_fetch(query=query, params=(method, 1, 1))
+            case "V2":
+                query = """
+                    select gh_id, account_name, latest_update_time
+                    from long_articles_accounts
+                    where account_category = %s and is_using = %s order by recent_score_ci_lower desc limit %s; 
+                """
+                return await self.pool.async_fetch(query=query, params=(method, 1, 500))
+            case _:
+                raise Exception("strategy not supported")
+
+    async def get_account_latest_update_timestamp(self, account_id: str) -> int:
+        """get latest update time"""
+        query = """ select max(publish_time) as publish_time from crawler_meta_article where out_account_id = %s;"""
+        latest_timestamp_obj = await self.pool.async_fetch(
+            query=query, params=(account_id,)
+        )
+        return latest_timestamp_obj[0]["publish_time"] if latest_timestamp_obj else None
+
+    async def crawl_each_article(
+        self, article_raw_data, mode, account_method, account_id, source_title=None
+    ):
+        """crawl each article"""
+        base_item = {
+            "platform": self.PLATFORM,
+            "mode": mode,
+            "crawler_time": int(time.time()),
+            "category": account_method,
+        }
+        match mode:
+            case "account":
+                show_stat = show_desc_to_sta(article_raw_data["ShowDesc"])
+                show_view_count = show_stat.get(
+                    "show_view_count", self.DEFAULT_VIEW_COUNT
+                )
+                show_like_count = show_stat.get(
+                    "show_like_count", self.DEFAULT_LIKE_COUNT
+                )
+                unique_idx = generate_gzh_id(article_raw_data["ContentUrl"])
+
+                new_item = {
+                    **base_item,
+                    "read_cnt": show_view_count,
+                    "like_cnt": show_like_count,
+                    "title": article_raw_data["Title"],
+                    "out_account_id": account_id,
+                    "article_index": article_raw_data["ItemIndex"],
+                    "link": article_raw_data["ContentUrl"],
+                    "description": article_raw_data["Digest"],
+                    "unique_index": unique_idx,
+                    "publish_time": article_raw_data["send_time"],
+                }
+            case "search":
+                new_item = {
+                    **base_item,
+                    "out_account_id": account_id,
+                    "article_index": article_raw_data["item_index"],
+                    "title": article_raw_data["title"],
+                    "link": article_raw_data["content_link"],
+                    "like_cnt": article_raw_data.get(
+                        "like_count", self.DEFAULT_LIKE_COUNT
+                    ),
+                    "read_cnt": article_raw_data.get(
+                        "view_count", self.DEFAULT_VIEW_COUNT
+                    ),
+                    "publish_time": int(article_raw_data["publish_timestamp"] / 1000),
+                    "unique_index": generate_gzh_id(article_raw_data["content_link"]),
+                    "source_article_title": source_title,
+                }
+            case _:
+                raise Exception(f"unknown mode: {mode}")
+
+        await self.save_item_to_database(
+            media_type="article", item=new_item, trace_id=self.trace_id
+        )
+        await asyncio.sleep(self.STAT_DURATION)
+
+    async def update_account_read_avg_info(self, gh_id, account_name):
+        """update account read avg info"""
+        position_list = [i for i in range(1, 9)]
+        today_dt = date.today().isoformat()
+        for position in position_list:
+            query = f"""
+                select read_cnt, from_unixtime(publish_time, "%Y-%m_%d") as publish_dt from crawler_meta_article
+                where out_account_id = '{gh_id}' and article_index = {position}
+                order by publish_time desc limit {self.STAT_DURATION};
+            """
+            fetch_response = await self.pool.async_fetch(query=query)
+            if fetch_response:
+                read_cnt_list = [i["read_cnt"] for i in fetch_response]
+                n = len(read_cnt_list)
+                read_avg = sum(read_cnt_list) / n
+                max_publish_dt = fetch_response[0]["publish_dt"]
+                remark = f"从{max_publish_dt}开始计算,往前算{len(fetch_response)}天"
+                insert_query = f"""
+                    insert ignore into crawler_meta_article_accounts_read_avg
+                    (gh_id, account_name, position, read_avg, dt, status, remark)
+                    values
+                    (%s, %s, %s, %s, %s, %s, %s);
+                """
+                insert_rows = await self.pool.async_save(
+                    query=insert_query,
+                    params=(
+                        gh_id,
+                        account_name,
+                        position,
+                        read_avg,
+                        today_dt,
+                        1,
+                        remark,
+                    ),
+                )
+                if insert_rows:
+                    update_query = f"""
+                        update crawler_meta_article_accounts_read_avg
+                        set status = %s 
+                        where gh_id = %s and position = %s and dt < %s;
+                    """
+                    await self.pool.async_save(
+                        update_query, (0, gh_id, position, today_dt)
+                    )
+
+    async def get_hot_titles_with_strategy(self, strategy):
+        """get hot titles with strategy"""
+        match strategy:
+            case "V1":
+                position = 3
+                read_times_threshold = 1.21
+                timedelta_days = 3
+            case "V2":
+                position = 2
+                read_times_threshold = 1.1
+                timedelta_days = 5
+            case _:
+                raise Exception(f"unknown strategy: {strategy}")
+        date_string = (datetime.today() - timedelta(days=timedelta_days)).strftime(
+            "%Y%m%d"
+        )
+        return await get_hot_titles(
+            self.pool,
+            date_string=date_string,
+            position=position,
+            read_times_threshold=read_times_threshold,
+        )
+
+
+class CrawlerGzhAccountArticles(CrawlerGzhBaseStrategy):
+    def __init__(self, pool, log_client, trace_id):
+        super().__init__(pool, log_client, trace_id)
+
+    async def insert_article_into_meta(self, gh_id, account_method, msg_list):
+        """
+        将数据更新到数据库
+        :return:
+        """
+        for msg in msg_list:
+            article_list = msg["AppMsg"]["DetailInfo"]
+            for obj in article_list:
+                await self.crawl_each_article(
+                    article_raw_data=obj,
+                    mode="account",
+                    account_method=account_method,
+                    account_id=gh_id,
+                )
+
+    async def update_account_latest_timestamp(self, gh_id):
+        """update the latest timestamp after crawler"""
+        latest_timestamp = await self.get_account_latest_update_timestamp(gh_id)
+        dt_str = timestamp_to_str(latest_timestamp)
+        query = """update long_articles_accounts set latest_update_time = %s where gh_id = %s;"""
+        await self.pool.async_save(query=query, params=(dt_str, gh_id))
+
+    async def crawler_single_account(self, account_method: str, account: Dict) -> None:
+        """crawler single account"""
+        current_cursor = None
+        gh_id = account["gh_id"]
+        latest_timestamp = account["latest_update_time"].timestamp()
+        while True:
+            # fetch response from weixin
+            response = await get_article_list_from_account(
+                account_id=gh_id, index=current_cursor
+            )
+            msg_list = response.get("data", {}).get("data")
+            if not msg_list:
+                break
+
+            # process current page
+            await self.insert_article_into_meta(gh_id, account_method, msg_list)
+
+            # whether crawl next page
+            last_article_in_this_page = msg_list[-1]
+            last_time_stamp_in_this_msg = last_article_in_this_page["AppMsg"][
+                "BaseInfo"
+            ]["UpdateTime"]
+            if last_time_stamp_in_this_msg > latest_timestamp:
+                await self.update_account_latest_timestamp(gh_id)
+                break
+
+            # update cursor for next page
+            current_cursor = response.get("data", {}).get("next_cursor")
+            if not current_cursor:
+                break
+
+    async def deal(self, method: str, strategy: str = "V1"):
+        account_list = await self.get_crawler_accounts(method, strategy)
+        for account in account_list:
+            print(account)
+            try:
+                await self.crawler_single_account(method, account)
+                await self.update_account_read_avg_info(
+                    gh_id=account["gh_id"], account_name=account["account_name"]
+                )
+            except Exception as e:
+                await self.log_client.log(
+                    contents={
+                        "task": "crawler_gzh_articles",
+                        "trace_id": self.trace_id,
+                        "data": {
+                            "account_id": account["gh_id"],
+                            "account_method": method,
+                            "error": str(e),
+                            "traceback": traceback.format_exc(),
+                        },
+                    }
+                )
+
+
+class CrawlerGzhSearchArticles(CrawlerGzhBaseStrategy):
+    def __init__(self, pool, log_client, trace_id):
+        super().__init__(pool, log_client, trace_id)
+
+    async def crawl_search_articles_detail(
+        self, article_list: List[Dict], source_title: str
+    ):
+        for article in article_list:
+            url = article["url"]
+            detail_response = await get_article_detail(url, is_count=True, is_cache=False)
+            if not detail_response:
+                continue
+
+            article_data = detail_response.get("data")
+            if not article_data:
+                continue
+
+            if type(article_data) is not dict:
+                continue
+
+            article_detail = article_data.get("data")
+            if not article_detail:
+                continue
+
+            await self.crawl_each_article(
+                article_raw_data=article_detail,
+                mode="search",
+                account_method="search",
+                account_id="search",
+                source_title=source_title,
+            )
+            await asyncio.sleep(self.SLEEP_SECONDS)
+
+    async def search_each_title(self, title: str, page: str = "1") -> None:
+        """search in weixin"""
+        current_page = page
+        while True:
+            # 翻页不超过3页
+            if int(current_page) > self.MAX_DEPTH:
+                break
+            # 调用搜索接口
+            search_response = await weixin_search(keyword=title, page=page)
+            if not search_response:
+                break
+
+            article_list = search_response.get("data", {}).get("data")
+            if not article_list:
+                break
+            # 存储搜索结果
+            await self.crawl_search_articles_detail(article_list, title)
+            # 判断是否还有下一页
+            has_more = search_response.get("data", {}).get("has_more")
+            if not has_more:
+                break
+            # 更新page
+            current_page = search_response.get("data", {}).get("next_cursor")
+
+    async def get_task_execute_result(self):
+        """get task execute result"""
+        query = """select count(*) as total_search_articles from crawler_meta_article where trace_id = %s;"""
+        return await self.pool.async_fetch(query=query, params=(self.trace_id,))
+
+    async def deal(self, strategy: str = "V1"):
+        hot_titles = await self.get_hot_titles_with_strategy(strategy)
+        for hot_title in hot_titles:
+            print("hot title:", hot_title)
+            try:
+                await self.search_each_title(hot_title)
+            except Exception as e:
+                print(f"crawler_gzh_articles error:{e}")
+
+        await feishu_robot.bot(
+            title="公众号搜索任务执行完成",
+            detail={
+                "strategy": strategy,
+                "execute_detail": await self.get_task_execute_result()
+            }
+        )

+ 2 - 2
applications/tasks/data_recycle_tasks/recycle_daily_publish_articles.py

@@ -102,7 +102,7 @@ class RecycleDailyPublishArticlesTask(Const):
 
         cursor = None
         while True:
-            response = get_article_list_from_account(
+            response = await get_article_list_from_account(
                 account_id=account["gh_id"], index=cursor
             )
             response_code = response["code"]
@@ -281,7 +281,7 @@ class UpdateRootSourceIdAndUpdateTimeTask(Const):
         url = article["ContentUrl"]
         wx_sn = article["wx_sn"].decode("utf-8")
         try:
-            response = get_article_detail(url)
+            response = await get_article_detail(url)
             response_code = response["code"]
 
             if response_code == self.ARTICLE_DELETE_CODE:

+ 4 - 4
applications/tasks/monitor_tasks/gzh_article_monitor.py

@@ -93,7 +93,7 @@ class OutsideGzhArticlesCollector(OutsideGzhArticlesManager):
         if await self.whether_published_in_a_week(gh_id):
             return
 
-        fetch_response = get_article_list_from_account(gh_id)
+        fetch_response = await get_article_list_from_account(gh_id)
         try:
             msg_list = fetch_response.get("data", {}).get("data", [])
             if msg_list:
@@ -120,7 +120,7 @@ class OutsideGzhArticlesCollector(OutsideGzhArticlesManager):
         # insert each article
         for article in detail_info:
             link = article["ContentUrl"]
-            article_detail = get_article_detail(link)
+            article_detail = await get_article_detail(link)
             response_code = article_detail["code"]
             if response_code == self.ARTICLE_ILLEGAL_CODE:
                 illegal_reason = article_detail.get("msg")
@@ -195,7 +195,7 @@ class OutsideGzhArticlesMonitor(OutsideGzhArticlesManager):
         check each article
         """
         link = article["link"]
-        article_detail = get_article_detail(link)
+        article_detail = await get_article_detail(link)
         response_code = article_detail["code"]
         if response_code == self.ARTICLE_ILLEGAL_CODE:
             illegal_reason = article_detail.get("msg")
@@ -279,7 +279,7 @@ class InnerGzhArticlesMonitor(MonitorConst):
     async def check_each_article(self, article: dict):
         gh_id, account_name, title, url, wx_sn, publish_date = article
         try:
-            response = get_article_detail(url, is_cache=False)
+            response = await get_article_detail(url, is_cache=False)
             response_code = response["code"]
             if response_code == self.ARTICLE_ILLEGAL_CODE:
                 error_detail = article.get("msg")

+ 17 - 0
applications/tasks/task_handler.py

@@ -3,6 +3,8 @@ from datetime import datetime
 from applications.tasks.cold_start_tasks import ArticlePoolColdStart
 from applications.tasks.crawler_tasks import CrawlerToutiao
 from applications.tasks.crawler_tasks import WeixinAccountManager
+from applications.tasks.crawler_tasks import CrawlerGzhAccountArticles
+from applications.tasks.crawler_tasks import CrawlerGzhSearchArticles
 from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
 from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
 from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
@@ -135,3 +137,18 @@ class TaskHandler(TaskMapper):
 
         await task.deal(platform=platform, account_id_list=account_id_list)
         return self.TASK_SUCCESS_STATUS
+
+    async def _crawler_gzh_article_handler(self) -> int:
+        account_method = self.data.get("account_method")
+        crawl_mode = self.data.get("crawl_mode")
+        strategy = self.data.get("strategy")
+        match crawl_mode:
+            case "account":
+                task = CrawlerGzhAccountArticles(self.db_client, self.log_client, self.trace_id)
+                await task.deal(account_method, strategy)
+            case "search":
+                task = CrawlerGzhSearchArticles(self.db_client, self.log_client, self.trace_id)
+                await task.deal(strategy)
+            case _:
+                raise ValueError(f"Unsupported crawl mode {crawl_mode}")
+        return self.TASK_SUCCESS_STATUS

+ 3 - 1
applications/tasks/task_scheduler.py

@@ -182,7 +182,9 @@ class TaskScheduler(TaskHandler):
             # 文章内容池--标题品类处理
             "article_pool_category_generation": self._article_pool_category_generation_handler,
             # 抓取账号管理
-            "crawler_account_manager": self._crawler_account_manager_handler
+            "crawler_account_manager": self._crawler_account_manager_handler,
+            # 微信公众号文章抓取
+            "crawler_gzh_articles": self._crawler_gzh_article_handler
         }
 
         if task_name not in handlers:

+ 11 - 3
applications/utils/async_mysql_utils.py

@@ -2,7 +2,7 @@ from typing import List, Dict
 
 
 async def get_top_article_title_list(pool) -> List[Dict]:
-    query = f"""
+    query = """
         select distinct title, source_id
         from datastat_sort_strategy
         where produce_plan_name = %s and source_id is not null;
@@ -10,5 +10,13 @@ async def get_top_article_title_list(pool) -> List[Dict]:
     return await pool.async_fetch(query=query, params=("TOP100",))
 
 
-async def get():
-    pass
+async def get_hot_titles(pool, date_string, position, read_times_threshold) -> List[str]:
+    """get titles of hot articles"""
+    query = """
+        select distinct title
+        from datastat_sort_strategy
+        where position < %s and read_rate >= %s and date_str >= %s;
+    """
+    response = await pool.async_fetch(query=query, params=(position, read_times_threshold, date_string))
+    return [i['title'] for i in response]
+

+ 3 - 4
applications/utils/item.py

@@ -15,16 +15,14 @@ class CrawlerMetaArticle(BaseModel):
         default=..., description="抓取类型:最初设计不合理,积重难返,实际与品类无关"
     )
     out_account_id: str = Field(default=..., description="抓取账号账号id")
-    article_index: str = Field(
+    article_index: int = Field(
         default=None, description="群发发文位置,常见于微信公众号"
     )
     title: str = Field(default=..., description="文章标题")
     link: str = Field(default=..., description="文章链接")
     read_cnt: int = Field(default=0, description="阅读量")
     like_cnt: int = Field(default=0, description="点赞量")
-    description: Optional[str] = Field(
-        default=None, max_length=255, description="文章简介"
-    )
+    description: Optional[str] = Field(default=None, description="文章简介")
     publish_time: int = Field(default=None, description="文章发布时间")
     crawler_time: int = Field(default=None, description="抓取时间")
     score: float = Field(default=None, description="相似度分")
@@ -41,6 +39,7 @@ class CrawlerMetaArticle(BaseModel):
         default=0,
         description="文章内嵌套视频状态 0: init; 1: processing; 2: successfully; 3:article link bad ;99: fail",
     )
+    trace_id: str = Field(default=None, description="创建该条记录的任务ID")
 
 
 class CrawlerMetaAccount(BaseModel):

+ 4 - 0
dev.py

@@ -8,3 +8,7 @@ prompt = "你好"
 res = fetch_deepseek_completion(model="defa", prompt=prompt)
 
 print(res)
+"""
+curl -X POST http://127.0.0.1:6060/api/run_task -H "Content-Type: application/json" -d '{"task_name": "crawler_gzh_articles", "account_method": "search", "crawl_mode": "search", "strategy": "V1"}'
+
+"""