luojunhui 2 ماه پیش
والد
کامیت
208ca335a4

+ 2 - 2
account_cold_start_daily.py

@@ -137,8 +137,8 @@ def main(date_str, category_list=None, article_source=None):
         article_source = 'weixin'
     task = AccountColdStartDailyTask()
     if task.init_db():
-        # if article_source == 'weixin':
-        #     task.crawler_task(category_list=category_list, date_str=date_str)
+        if article_source == 'weixin':
+            task.crawler_task(category_list=category_list, date_str=date_str)
 
         task.publish_article_task(category_list=category_list, article_source=article_source)
 

+ 100 - 0
applications/api/gzh_api.py

@@ -0,0 +1,100 @@
+import json
+import requests
+from tenacity import retry
+from requests.exceptions import RequestException
+from typing import Optional, Dict
+
+from applications import log
+from applications.utils import request_retry
+
+retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=60)
+base_url = "http://crawler-cn.aiddit.com/crawler/wei_xin"
+headers = {"Content-Type": "application/json"}
+
+def send_post_request(url, data):
+    try:
+        response = requests.post(url, headers=headers, data=data, timeout=60)
+        response.raise_for_status()
+        return response.json()
+    except RequestException as e:
+        print(f"API请求失败: {e}")
+    except json.JSONDecodeError as e:
+        print(f"响应解析失败: {e}")
+    return None
+
+
+@retry(**retry_desc)
+def search_articles_in_gzh(title: str, page: str = "1") -> Optional[Dict]:
+    url = f"{base_url}/keyword"
+    payload = json.dumps(
+        {
+            "keyword": title,
+            "cursor": page
+        }
+    )
+    return send_post_request(url, data=payload)
+
+
+@retry(**retry_desc)
+def get_gzh_article_detail(content_link: str, is_count: bool = False, is_cache: bool =True) -> Optional[Dict]:
+    url = f"{base_url}/detail"
+    payload = json.dumps(
+        {
+            "content_link": content_link,
+            "is_count": is_count,
+            "is_cache": is_cache,
+            "is_ad": False
+        }
+    )
+    return send_post_request(url, data=payload)
+
+
+@retry(**retry_desc)
+def get_gzh_account_article_list(gh_id: str, index: str = None) -> Optional[Dict]:
+    url = f"{base_url}/blogger"
+    payload = json.dumps(
+        {
+            "account_id": gh_id,
+            "cursor": index
+        }
+    )
+    return send_post_request(url, data=payload)
+
+
+@retry(**retry_desc)
+def get_gzh_account_detail(content_link: str) -> Optional[Dict]:
+    url = f"{base_url}/account_info"
+    payload = json.dumps(
+        {
+            "content_link": content_link
+        }
+    )
+    return send_post_request(url, data=payload)
+
+
+@retry(**retry_desc)
+def get_gzh_recommend_articles(content_link: str) -> Optional[Dict]:
+    url = f"{base_url}/recommend"
+    payload = json.dumps(
+        {
+            "content_link": content_link
+        }
+    )
+    return send_post_request(url, data=payload)
+
+
+@retry(**retry_desc)
+def get_gzh_recommend_articles_v2(content_link: str) -> Optional[Dict]:
+    url = "http://datapi.top/wxapi/relatedarticle"
+    payload = {
+        'url': content_link,
+        'token': '401e4d3c85068bb5'
+    }
+    response =  send_post_request(url, data=payload)
+    log(
+        task="article_association_crawler",
+        function="get_recommend_articles_v2",
+        message="获取推荐链接,付费接口",
+        data={"content_link": content_link, "response": response},
+    )
+    return response

+ 2 - 1
applications/pipeline/__init__.py

@@ -1,4 +1,5 @@
 """
 @author: luojunhui
 """
-from .crawler_pipeline import scrape_video_entities_process
+from .crawler_pipeline import scrape_video_entities_process
+from .crawler_pipeline import scrape_article_entities_process

+ 31 - 0
applications/pipeline/crawler_pipeline.py

@@ -46,6 +46,18 @@ def whether_duplicate_video_title(video_title: str, db_client) -> bool:
     return False
 
 
+def whether_duplicate_article_title(article_title: str, db_client) -> bool:
+    fetch_query = f"""
+        select article_id from crawler_meta_article 
+        where title = %s;
+    """
+    duplicate_id = db_client.fetch(query=fetch_query, params=(article_title,))
+    if duplicate_id:
+        return True
+
+    return False
+
+
 def scrape_video_entities_process(video_item, db_client) -> dict:
     """
     video crawler pipeline
@@ -86,3 +98,22 @@ def scrape_video_entities_process(video_item, db_client) -> dict:
         return video_item
     else:
         return empty_dict
+
+
+def scrape_article_entities_process(article_item, db_client) -> dict:
+    """
+    article crawler pipeline
+    """
+    article_title = article_item['title']
+    if whether_duplicate_article_title(article_title, db_client):
+        return empty_dict
+
+    # whether sensitive title
+    if whether_title_sensitive(article_title):
+        article_item['title_sensitivity'] = 1
+        return article_item
+
+    return article_item
+
+
+

+ 1 - 0
applications/utils/__init__.py

@@ -9,6 +9,7 @@ from .download_video import download_sph_video
 from .download_video import download_sohu_video
 from .download_video import download_toutiao_video
 from .item import Item
+from .save_to_db import insert_into_article_meta_table
 from .save_to_db import insert_into_single_video_source_table
 from .upload import upload_to_oss
 from .fetch_info_from_aigc import fetch_account_fans

+ 26 - 2
applications/utils/item.py

@@ -3,6 +3,7 @@
 """
 
 import time
+from pydantic import BaseModel
 
 default_single_video_table_fields = {
     "platform": "gzh",
@@ -30,6 +31,27 @@ default_single_video_table_fields = {
     "mini_program_title": None
 }
 
+class MetaArticleItem(BaseModel):
+    platform: str | None = None
+    mode: str | None = None
+    category: str | None = None
+    out_account_id: str | None = None
+    article_index: int | None = None
+    title: str | None = None
+    link: str | None = None
+    read_cnt: int = 0
+    like_cnt: int = 0
+    description: str | None = None
+    publish_time: int | None = None
+    crawler_time: int | None = None
+    status: str | None = None
+    channel_content_id: str | None = None
+    unique_index: str | None = None
+    source_article_title: str | None = None
+    source_account: str | None = None
+    llm_sensitivity: float | None = None
+    title_sensitivity: float | None = None
+
 
 class Item(object):
     """
@@ -60,7 +82,8 @@ class Item(object):
         """
         check article item
         """
-        return
+        article_item = MetaArticleItem(**self.item)
+        return article_item
 
     def check(self, source):
         """
@@ -70,4 +93,5 @@ class Item(object):
             case "video":
                 self.check_video_item()
             case "article":
-                self.check_article_item()
+                return self.check_article_item()
+        return None

+ 51 - 1
applications/utils/save_to_db.py

@@ -14,7 +14,8 @@ def insert_into_single_video_source_table(db_client, video_item):
         INSERT INTO publish_single_video_source
         (content_trace_id, article_title, out_account_id, out_account_name, read_cnt, like_cnt, article_url, cover_url, 
         video_oss_path, publish_timestamp, crawler_timestamp, url_unique_md5, category, tags, platform, source_account, 
-        category_status, audit_status, audit_video_id, mini_program_title)
+        category_status, audit_status, audit_video_id, mini_program_title
+        )
         values
         (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
     """
@@ -56,3 +57,52 @@ def insert_into_single_video_source_table(db_client, video_item):
                 "oss_path": video_item["video_oss_path"],
             },
         )
+
+
+def insert_into_article_meta_table(db_client, article_item):
+    """
+    insert article meta table
+    """
+    insert_query = f"""
+        insert into crawler_meta_article
+        (
+         platform, mode, category, out_account_id, article_index, title, link, read_cnt, like_cnt,
+         description, publish_time, crawler_time, status, unique_index, llm_sensitivity, title_sensitivity
+        )
+        VALUES 
+        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+    """
+    try:
+        db_client.save(
+            query=insert_query,
+            params=(
+                article_item.platform,
+                article_item.mode,
+                article_item.category,
+                article_item.out_account_id,
+                article_item.article_index,
+                article_item.title,
+                article_item.link,
+                article_item.read_cnt,
+                article_item.like_cnt,
+                article_item.description,
+                article_item.publish_time,
+                article_item.crawler_time,
+                article_item.status,
+                article_item.unique_index,
+                article_item.llm_sensitivity,
+                article_item.title_sensitivity
+            ),
+        )
+    except Exception as e:
+        log(
+            task="{}_article_crawler".format(article_item.platform),
+            function="save_each_article",
+            message="save article failed",
+            data={
+                "error": str(e),
+                "traceback": traceback.format_exc(),
+                "link": article_item.link
+            }
+        )
+

+ 3 - 1
requirements.txt

@@ -27,4 +27,6 @@ scikit-learn~=1.6.1
 google~=3.0.0
 cffi~=1.17.1
 lxml~=5.3.2
-scipy~=1.15.2
+scipy~=1.15.2
+matplotlib~=3.10.1
+pydantic~=2.10.6

+ 88 - 0
tasks/crawler_tasks/crawler_article/gzh_article_crawler.py

@@ -0,0 +1,88 @@
+"""
+@author: luojunhui
+@file: gzh_article_crawler.py
+@time: 2025/04/25 14:20
+@desc: 抓取公众号文章
+"""
+
+from __future__ import annotations
+
+import time
+import traceback
+
+from pymysql.cursors import DictCursor
+from tqdm import tqdm
+
+from applications import log
+from applications.api.gzh_api import get_gzh_account_article_list
+from applications.db import DatabaseConnector
+from applications.utils import Item
+from applications.utils import insert_into_article_meta_table
+
+from config import long_articles_config
+
+
+class GZHArticleCrawler:
+    def __init__(self):
+        self.db_client = DatabaseConnector(db_config=long_articles_config)
+        self.db_client.connect()
+
+    def crawler_each_article(self, gh_id, category, article):
+        article_item = Item()
+
+        article_item.add("title", article["Title"])
+        article_item.add("platform", "weixin")
+        article_item.add("mode", "account")
+        article_item.add("category", category)
+        article_item.add("out_account_id", gh_id)
+        article_item.add("article_index", article["ItemIndex"])
+        article_item.add("link", article["ContentUrl"])
+        # article_item.add("read_cnt", article["ShowViewCount"])
+        # article_item.add("like_cnt", article["ShowLikeCount"])
+        article_item.add("description", article["Digest"])
+        article_item.add("publish_time", article["send_time"])
+        article_item.add("crawler_time", int(time.time()))
+        article_item.add("status", 1)
+        # article_item.add("unique_index", str_to_md5(article["ContentUrl"]))
+        article_item.add("llm_sensitivity", -1)
+        article_item.add("title_sensitivity", -1)
+
+        # check item
+        meta_item = article_item.check(source="article")
+        insert_into_article_meta_table(db_client=self.db_client, article_item=meta_item)
+
+    def insert_msg_into_article_meta_table(self, gh_id, category, msg_list):
+        for article_msg in tqdm(msg_list, desc=f"crawler : {gh_id}"):
+            article_list = article_msg["AppMsg"]["DetailInfo"]
+            for article in article_list:
+                try:
+                    self.crawler_each_article(gh_id, category, article)
+
+                except Exception as e:
+                    print(e)
+                    print(traceback.format_exc())
+
+    def crawl_each_gzh_account_article_list(
+        self, account_id: str, category: str, latest_update_timestamp: int
+    ):
+        next_cursor = None
+        while True:
+            fetch_response = get_gzh_account_article_list(account_id, next_cursor)
+            account_msg_list = fetch_response.get("data", {}).get("data")
+
+            if not account_msg_list:
+                break
+
+            self.insert_msg_into_article_meta_table(
+                account_id, category, account_msg_list
+            )
+
+            last_article = account_msg_list[-1]
+            last_timestamp_in_page = last_article["AppMsg"]["BaseInfo"]["UpdateTime"]
+            if latest_update_timestamp < last_timestamp_in_page:
+                # update account timestamp
+                break
+            else:
+                next_cursor = fetch_response["data"].get("next_cursor")
+                if not next_cursor:
+                    break