luojunhui 6 mēneši atpakaļ
vecāks
revīzija
43adc7ad16

+ 4 - 1
account_explore_task.py

@@ -6,6 +6,7 @@
 from tasks.crawler_accounts_by_association import ChannelsAccountCrawler
 from tasks.crawler_accounts_by_association import ToutiaoAccountCrawler
 from tasks.crawler_accounts_by_association import HaoKanAccountCrawler
+from tasks.crawler_accounts_by_association import GzhAccountCrawler
 
 
 def deal_each_platform(platform: str) -> None:
@@ -20,6 +21,8 @@ def deal_each_platform(platform: str) -> None:
             crawler = ChannelsAccountCrawler()
         case "hksp":
             crawler = HaoKanAccountCrawler()
+        case "gzh":
+            crawler = GzhAccountCrawler()
         case _:
             raise RuntimeError("platform error")
 
@@ -28,7 +31,7 @@ def deal_each_platform(platform: str) -> None:
 
 
 if __name__ == "__main__":
-    # platform_list = ["sph", "hksp", "toutiao"]
+    # platform_list = ["sph", "hksp", "toutiao", "gzh"]
     platform_list = ["hksp", "sph"]
     for platform_id in platform_list:
         deal_each_platform(platform=platform_id)

+ 4 - 3
applications/pipeline/account_pipeline.py

@@ -11,11 +11,11 @@ def whether_duplicate_account_id(account_id: str, platform: str, db_client: Data
     whether duplicate account id
     """
     sql = f"""
-        select id from video_meta_accounts
+        select id, status from video_meta_accounts
         where account_id = %s and platform = %s;
     """
-    duplicate_id = db_client.fetch(query=sql, params=(account_id, platform))
-    if duplicate_id:
+    duplicate_id, status = db_client.fetch(query=sql, params=(account_id, platform))[0]
+    if duplicate_id and status:
         return True
     return False
 
@@ -28,6 +28,7 @@ def scrape_account_entities_process(account_item: dict, db_client: DatabaseConne
 
     # whether account exists
     if whether_duplicate_account_id(account_id, platform, db_client):
+        print("duplicate account id: {}".format(account_id))
         return empty_dict
 
     # account analysis

+ 1 - 0
applications/utils/save_to_db.py

@@ -102,6 +102,7 @@ def insert_into_candidate_account_pool_table(db_client, account_item):
         )
     )
     if duplicate_id:
+        print("duplicate id: {}".format(duplicate_id))
         return
 
     # insert into table

+ 4 - 1
coldStartTasks/crawler/wechat/__init__.py

@@ -1,4 +1,7 @@
 """
 @author: luojunhui
 """
-from .article_association import ArticleAssociationCrawler
+from .article_association import ArticleAssociationCrawler
+from .official_accounts_api import get_article_list_from_account
+from .official_accounts_api import get_article_detail
+from.official_accounts_api import get_source_account_from_article

+ 129 - 0
coldStartTasks/crawler/wechat/official_accounts_api.py

@@ -0,0 +1,129 @@
+from __future__ import annotations
+
+import re
+import json
+import requests
+from fake_useragent import FakeUserAgent
+from tenacity import retry
+
+from applications import log
+from applications.utils import request_retry
+
+retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30)
+
+# url from aigc
+base_url = "http://crawler-cn.aiddit.com/crawler/wei_xin"
+headers = {"Content-Type": "application/json"}
+
+
+@retry(**retry_desc)
+def get_article_detail(
+    article_link: str, is_count: bool=False, is_cache: bool=True
+) -> dict | None:
+    """
+    get official article detail
+    """
+    target_url = f"{base_url}/detail"
+    payload = json.dumps(
+        {
+            "content_link": article_link,
+            "is_count": is_count,
+            "is_ad": False,
+            "is_cache": is_cache
+        }
+    )
+    try:
+        response = requests.post(
+            url=target_url, headers=headers, data=payload, timeout=120
+        )
+        response.raise_for_status()
+        return response.json()
+    except requests.exceptions.RequestException as e:
+        log(
+            task="get_official_article_detail",
+            function="get_official_article_detail",
+            message=f"API请求失败: {e}",
+            data={"link": article_link}
+        )
+    except json.JSONDecodeError as e:
+        log(
+            task="get_official_article_detail",
+            function="get_official_article_detail",
+            message=f"响应解析失败: {e}",
+            data={"link": article_link}
+        )
+    return None
+
+
+@retry(**retry_desc)
+def get_article_list_from_account(
+        account_id: str, index
+) -> dict | None:
+    target_url = f"{base_url}/blogger"
+    payload = json.dumps(
+        {
+            "account_id": account_id,
+            "cursor": index
+        }
+    )
+    try:
+        response = requests.post(
+            url=target_url, headers=headers, data=payload, timeout=120
+        )
+        response.raise_for_status()
+        return response.json()
+    except requests.exceptions.RequestException as e:
+        log(
+            task="get_official_account_article_list",
+            function="get_official_account_article_list",
+            message=f"API请求失败: {e}",
+            data={"gh_id": account_id}
+        )
+    except json.JSONDecodeError as e:
+        log(
+            task="get_official_account_article_list",
+            function="get_official_account_article_list",
+            message=f"响应解析失败: {e}",
+            data={"gh_id": account_id}
+        )
+    return None
+
+
+@retry(**retry_desc)
+def get_source_account_from_article(article_link) -> dict | None:
+    """
+    get account info from official article
+    :param article_link:
+    :return:
+    """
+    try:
+        response = requests.get(url=article_link, headers={'User-Agent': FakeUserAgent().random}, timeout=120)
+        response.raise_for_status()
+        html_text = response.text
+        regex_nickname = r"hit_nickname:\s*'([^']+)'"
+        regex_username = r"hit_username:\s*'([^']+)'"
+        nickname = re.search(regex_nickname, html_text)
+        username = re.search(regex_username, html_text)
+        # 输出提取的结果
+        if nickname and username:
+            return {
+                'name': nickname.group(1),
+                'gh_id': username.group(1)
+            }
+        else:
+            return {}
+    except requests.exceptions.RequestException as e:
+        log(
+            task="get_source_account_from_article",
+            function="get_source_account_from_article",
+            message=f"API请求失败: {e}",
+            data={"link": article_link}
+        )
+    except json.JSONDecodeError as e:
+        log(
+            task="get_source_account_from_article",
+            function="get_source_account_from_article",
+            message=f"响应解析失败: {e}",
+            data={"link": article_link}
+        )
+    return None

+ 109 - 2
tasks/crawler_accounts_by_association.py

@@ -18,10 +18,13 @@ from applications.utils import Item
 from applications.utils import insert_into_candidate_account_pool_table
 from coldStartTasks.crawler.baidu import haokan_search_videos
 from coldStartTasks.crawler.baidu.baidu_spider import baidu_account_video_crawler
-from coldStartTasks.crawler.toutiao import get_associated_recommendation
-from coldStartTasks.crawler.toutiao import get_toutiao_account_video_list
 from coldStartTasks.crawler.channels import search_in_wechat_channel
 from coldStartTasks.crawler.channels import get_channel_account_videos
+from coldStartTasks.crawler.toutiao import get_associated_recommendation
+from coldStartTasks.crawler.toutiao import get_toutiao_account_video_list
+from coldStartTasks.crawler.wechat import get_article_detail
+from coldStartTasks.crawler.wechat import get_article_list_from_account
+from coldStartTasks.crawler.wechat import get_source_account_from_article
 from config import apolloConfig, long_articles_config
 
 config = apolloConfig()
@@ -344,3 +347,107 @@ class HaoKanAccountCrawler(CrawlerAccounts):
                         "traceback": traceback.format_exc(),
                     },
                 )
+
+
+class GzhAccountCrawler(CrawlerAccounts):
+
+    def get_task_list(self):
+        fetch_query = f"""
+            select id, article_url
+            from publish_single_video_source
+            where source_account = 1 and platform = 'gzh' limit 10;
+        """
+        task_list = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
+        return task_list
+
+    def process_official_account(self, account_name, account_id):
+        """
+        process_official_account
+        """
+        account_item = Item()
+        account_item.add("account_name", account_name)
+        account_item.add("account_id", account_id)
+        account_item.add("platform", "gzh")
+        account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d"))
+
+        # fetch account video first page video list
+        fetch_response = get_article_list_from_account(account_id=account_id, index=None)
+        msg_list = fetch_response["data"]["data"]
+        title_list = []
+        for msg in msg_list:
+            sub_title_list = [i['Title'] for i in msg['AppMsg']['DetailInfo']]
+            if len(title_list) > 10:
+                continue
+            else:
+                title_list += sub_title_list
+
+        title_list_str = json.dumps(title_list, ensure_ascii=False)
+        account_item.add("title_list", title_list_str)
+
+        # check item
+        account_item.check(source="candidate_account")
+
+        # insert into database
+        self.insert_video_into_recommend_table(account_item.item)
+
+    def extract_account_from_article_link(self, article_link):
+        """
+        try to get account info from article link
+        """
+        # is article link original
+        article_detail = get_article_detail(article_link)
+        is_original = article_detail["data"]["data"]["is_original"]
+
+        if is_original:
+            return
+        # extract source account
+        source_account = get_source_account_from_article(article_link)
+        if not source_account:
+            return
+        else:
+            account_name = source_account['name']
+            gh_id = source_account['gh_id']
+            self.process_official_account(account_name, gh_id)
+
+    def update_crawler_article_status(self, article_id_tuple: tuple):
+        """
+        update crawler article status
+        """
+        update_query = f"""
+            update publish_single_video_source
+            set source_account = %s
+            where id in %s;
+        """
+        affected_rows = self.db_client.save(
+            query=update_query, params=(0, article_id_tuple)
+        )
+        return affected_rows
+
+    def deal(self):
+        task_list = self.get_task_list()
+        task_id_list = []
+        for crawler_article_obj in tqdm(task_list, desc="crawler article list"):
+            article_url = crawler_article_obj['article_url']
+            article_id = crawler_article_obj['id']
+            task_id_list.append(int(article_id))
+            try:
+                self.extract_account_from_article_link(article_url)
+
+            except Exception as e:
+                log(
+                    task="gzh_account_crawler",
+                    function="extract_account_from_article_link",
+                    message="extract account from article link failed",
+                    data={
+                        "article_url": article_url,
+                        "error": str(e),
+                        "traceback": traceback.format_exc(),
+                    },
+                )
+
+        if task_id_list:
+            article_id_tuple = tuple(task_id_list)
+            affected_rows = self.update_crawler_article_status(article_id_tuple)
+            print(affected_rows)
+
+