Ver código fonte

Merge remote-tracking branch 'origin/2025-03-17-account-crawler-pipeline'
merge account crawler task and account quality task

luojunhui 5 meses atrás
pai
commit
2ef5366ed9

+ 41 - 0
account_explore_task.py

@@ -0,0 +1,41 @@
+"""
+@author: luojunhui
+@description: try to get some more accounts
+"""
+
+from tasks.crawler_accounts_by_association import ChannelsAccountCrawler
+from tasks.crawler_accounts_by_association import ToutiaoAccountCrawler
+from tasks.crawler_accounts_by_association import HaoKanAccountCrawler
+from tasks.crawler_accounts_by_association import GzhAccountCrawler
+from tasks.generate_search_keys import get_association_title_list_in_multi_threads
+
+
+def deal_each_platform(platform: str) -> None:
+    """
+    deal each platform
+    :param platform: str, channels or toutiao
+    """
+    match platform:
+        case "toutiao":
+            crawler = ToutiaoAccountCrawler()
+        case "sph":
+            crawler = ChannelsAccountCrawler()
+        case "hksp":
+            crawler = HaoKanAccountCrawler()
+        case "gzh":
+            crawler = GzhAccountCrawler()
+        case _:
+            raise RuntimeError("platform error")
+
+    # start process
+    crawler.deal()
+
+
+if __name__ == "__main__":
+    # get_association_title_list_in_multi_threads()
+    get_association_title_list_in_multi_threads()
+
+    # get each platform
+    platform_list = ["sph", "hksp", "toutiao", "gzh"]
+    for platform_id in platform_list:
+        deal_each_platform(platform=platform_id)

+ 13 - 0
account_quality_analysis.py

@@ -0,0 +1,13 @@
+from tasks.account_recognize_by_llm import AccountRecognizer
+
+
+def main():
+    """
+    main function
+    """
+    account_recognizer = AccountRecognizer()
+    account_recognizer.deal()
+
+
+if __name__ == "__main__":
+    main()

+ 54 - 0
applications/api/feishu_api.py

@@ -0,0 +1,54 @@
+import requests
+
+
+class Feishu:
+    def __init__(self):
+        self.token = None
+        self.headers = {"Content-Type": "application/json"}
+
+    def fetch_token(self):
+        url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
+        post_data = {
+            "app_id": "cli_a51114cf8bf8d00c",
+            "app_secret": "cNoTAqMpsAm7mPBcpCAXFfvOzCNL27fe",
+        }
+        response = requests.request("POST", url=url, data=post_data)
+        tenant_access_token = response.json()["tenant_access_token"]
+        self.token = tenant_access_token
+
+
+class FeishuSheetApi(Feishu):
+
+    def prepend_value(self, sheet_token, sheet_id, ranges, values):
+        insert_value_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{}/values_prepend".format(
+            sheet_token
+        )
+        headers = {
+            "Authorization": "Bearer " + self.token,
+            "contentType": "application/json; charset=utf-8",
+        }
+        body = {
+            "valueRange": {"range": "{}!{}".format(sheet_id, ranges), "values": values}
+        }
+        response = requests.request(
+            "POST", url=insert_value_url, headers=headers, json=body
+        )
+        print(response.json())
+
+    def insert_value(self, sheet_token, sheet_id, ranges, values):
+        insert_value_url = (
+            "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/{}/values".format(
+                sheet_token
+            )
+        )
+        headers = {
+            "Authorization": "Bearer " + self.token,
+            "contentType": "application/json; charset=utf-8",
+        }
+        body = {
+            "valueRange": {"range": "{}!{}".format(sheet_id, ranges), "values": values}
+        }
+        response = requests.request(
+            "PUT", url=insert_value_url, headers=headers, json=body
+        )
+        print(response.json())

+ 1 - 0
applications/pipeline/__init__.py

@@ -1,4 +1,5 @@
 """
 @author: luojunhui
 """
+from .account_pipeline import scrape_account_entities_process
 from .crawler_pipeline import scrape_video_entities_process

+ 36 - 0
applications/pipeline/account_pipeline.py

@@ -0,0 +1,36 @@
+"""
+@author: luojunhui
+@description: account crawler pipeline
+"""
+from applications.db import DatabaseConnector
+
+empty_dict = {}
+
+def whether_duplicate_account_id(account_id: str, platform: str, db_client: DatabaseConnector) -> bool:
+    """
+    whether duplicate account id
+    """
+    sql = f"""
+        select id, status from video_meta_accounts
+        where account_id = %s and platform = %s;
+    """
+    duplicate_id, status = db_client.fetch(query=sql, params=(account_id, platform))[0]
+    if duplicate_id and status:
+        return True
+    return False
+
+def scrape_account_entities_process(account_item: dict, db_client: DatabaseConnector) -> dict:
+    """
+    scrape_account_entities_process,
+    """
+    account_id = account_item['account_id']
+    platform = account_item['platform']
+
+    # whether account exists
+    if whether_duplicate_account_id(account_id, platform, db_client):
+        print("duplicate account id: {}".format(account_id))
+        return empty_dict
+
+    # account analysis
+
+    return account_item

+ 2 - 0
applications/utils/__init__.py

@@ -10,6 +10,8 @@ from .download_video import download_sohu_video
 from .download_video import download_toutiao_video
 from .item import Item
 from .save_to_db import insert_into_single_video_source_table
+from .save_to_db import insert_into_video_meta_accounts_table
+from .save_to_db import insert_into_candidate_account_pool_table
 from .upload import upload_to_oss
 from .fetch_info_from_aigc import fetch_account_fans
 from .fetch_info_from_aigc import fetch_publishing_account_list

+ 50 - 0
applications/utils/item.py

@@ -2,6 +2,7 @@
 @author: luojunhui
 """
 
+
 import time
 
 default_single_video_table_fields = {
@@ -30,6 +31,25 @@ default_single_video_table_fields = {
     "mini_program_title": None
 }
 
+default_account_table_fields = {
+    "platform": 'Not NULL',
+    "account_id": 'Not NULL',
+    "account_name": 'Not NULL',
+    "max_cursor": None,
+    "account_init_date": None,
+    "status": 0,
+    "priority": 0,
+
+}
+
+default_candidate_account_table_fields = {
+    "platform": 'Not NULL',
+    "account_id": 'Not NULL',
+    "account_name": 'Not NULL',
+    "crawler_date": 'Not NULL',
+    "title_list": "[]"
+}
+
 
 class Item(object):
     """
@@ -62,6 +82,32 @@ class Item(object):
         """
         return
 
+    def check_account_item(self):
+        """
+        check account item
+        """
+        fields = list(default_account_table_fields.keys())
+        for key in fields:
+            if self.item.get(key, None) is not None:
+                continue
+            elif default_account_table_fields[key] == 'Not NULL':
+                raise ValueError(f"{key} is not None, please check your account item")
+            else:
+                self.item[key] = default_account_table_fields[key]
+
+    def check_candidate_account_item(self):
+        """
+        check association item
+        """
+        fields = list(default_candidate_account_table_fields.keys())
+        for field in fields:
+            if self.item.get(field, None) is not None:
+                continue
+            elif default_candidate_account_table_fields[field] == 'Not NULL':
+                raise ValueError(f"{field} is not None, please check your account item")
+            else:
+                self.item[field] = default_candidate_account_table_fields[field]
+
     def check(self, source):
         """
         check item
@@ -71,3 +117,7 @@ class Item(object):
                 self.check_video_item()
             case "article":
                 self.check_article_item()
+            case "account":
+                self.check_account_item()
+            case "candidate_account":
+                self.check_candidate_account_item()

+ 84 - 0
applications/utils/save_to_db.py

@@ -56,3 +56,87 @@ def insert_into_single_video_source_table(db_client, video_item):
                 "oss_path": video_item["video_oss_path"],
             },
         )
+
+def insert_into_video_meta_accounts_table(db_client, account_item):
+    """
+    insert account into account meta table
+    """
+    insert_sql = f"""
+        insert into video_meta_accounts
+            (platform, account_id, account_name, max_cursor, account_init_date, status, priority)
+        values
+            (%s, %s, %s, %s, %s, %s, %s);
+    """
+    try:
+        db_client.save(
+            query=insert_sql,
+            params=(
+                account_item["platform"],
+                account_item["account_id"],
+                account_item["account_name"],
+                account_item["max_cursor"],
+                account_item["account_init_date"],
+                account_item["status"],
+                account_item["priority"],
+            ),
+        )
+    except Exception as e:
+        log(
+            task="{}_account_crawler".format(account_item["platform"]),
+            function="save_each_account",
+            message="save account failed",
+            data={
+                "error": str(e),
+                "traceback": traceback.format_exc(),
+                "account_id": account_item["account_id"],
+            },
+        )
+
+def insert_into_candidate_account_pool_table(db_client, account_item):
+    """
+    insert recommendation into recommendation table
+    """
+    # check whether duplicate video
+    fetch_query = f"""
+        select id from crawler_candidate_account_pool
+        where account_id = %s and platform = %s;
+    """
+    duplicate_id = db_client.fetch(
+        query=fetch_query, params=(
+            account_item["account_id"],
+            account_item["platform"]
+        )
+    )
+    if duplicate_id:
+        print("duplicate id: {}".format(duplicate_id))
+        return
+
+    # insert into table
+    insert_query = f"""
+        insert into crawler_candidate_account_pool
+            (account_name, account_id, title_list, platform, crawler_date)
+            values
+            (%s, %s, %s, %s, %s)
+    """
+    try:
+        db_client.save(
+            query=insert_query,
+            params=(
+               account_item["account_name"],
+               account_item["account_id"],
+               account_item["title_list"],
+               account_item["platform"],
+               account_item["crawler_date"]
+            )
+        )
+    except Exception as e:
+        log(
+            task="{}_account_crawler".format(account_item["platform"]),
+            function="save_each_account",
+            message="save account failed",
+            data={
+                "error": str(e),
+                "traceback": traceback.format_exc(),
+                "item": account_item
+            }
+        )

+ 3 - 1
coldStartTasks/crawler/baidu/__init__.py

@@ -1 +1,3 @@
-from .video_crawler import BaiduVideoCrawler
+from .video_crawler import BaiduVideoCrawler
+from .spider import haokan_search_videos
+from .spider import haokan_fetch_video_detail

+ 0 - 4
coldStartTasks/crawler/baidu/account_crawler.py

@@ -1,4 +0,0 @@
-"""
-@author: luojunhui
-"""
-

+ 106 - 0
coldStartTasks/crawler/baidu/spider.py

@@ -0,0 +1,106 @@
+from __future__ import annotations
+
+import json
+import base64
+import hashlib
+import requests
+import urllib.parse
+from datetime import datetime
+from tenacity import retry
+from uuid import uuid4
+from fake_useragent import FakeUserAgent
+
+from applications import log
+from applications.utils import proxy, request_retry
+
+retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30)
+
+
+@retry(**retry_desc)
+def haokan_search_videos(search_key: str) -> dict | None:
+    """
+    get haokan search videos
+    :param search_key: search key
+    :return: haokan search videos
+    """
+    timestamp_with_ms = datetime.now().timestamp()
+    timestamp_ms = int(timestamp_with_ms * 1000)
+    query_string = urllib.parse.quote(search_key)
+    strings = "{}_{}_{}_{}_{}".format(1, query_string, 10, timestamp_ms, 1)
+    sign = hashlib.md5(strings.encode()).hexdigest()
+    url = f"https://haokan.baidu.com/haokan/ui-search/pc/search/video?pn=1&rn=10&type=video&query={query_string}&sign={sign}&version=1&timestamp={timestamp_ms}"
+    base_64_string = base64.b64encode(str(uuid4()).encode()).decode()
+    headers = {
+        "Accept": "*/*",
+        "Accept-Language": "zh",
+        "Connection": "keep-alive",
+        "Referer": "https://haokan.baidu.com/web/search/page?query={}".format(
+            query_string
+        ),
+        "User-Agent": FakeUserAgent().chrome,
+        "Cookie": "BAIDUID={}".format(base_64_string),
+    }
+    try:
+        response = requests.get(url, headers=headers, proxies=proxy(), timeout=120)
+        response.raise_for_status()
+        return response.json()
+    except requests.exceptions.RequestException as e:
+        log(
+            task="haokan_crawler_videos",
+            function="haokan_search_videos",
+            message=f"API请求失败: {e}",
+            data={"search_key": search_key},
+        )
+    except json.JSONDecodeError as e:
+        log(
+            task="haokan_crawler_videos",
+            function="haokan_search_videos",
+            message=f"响应解析失败: {e}",
+            data={"search_key": search_key},
+        )
+    return None
+
+@retry(**retry_desc)
+def haokan_fetch_video_detail(video_id: str) -> dict | None:
+    """
+    get haokan video detail
+    :param video_id: video id
+    :return: haokan video detail
+    """
+    url = "https://haokan.baidu.com/v"
+    params = {
+        'vid': video_id,
+        '_format': 'json'
+    }
+    base_64_string = base64.b64encode(str(uuid4()).encode()).decode()
+    headers = {
+        'Accept': '*/*',
+        'cookie': "BIDUPSID={}".format(base_64_string),
+        'Accept-Language': 'en,zh;q=0.9,zh-CN;q=0.8',
+        'Cache-Control': 'no-cache',
+        'Connection': 'keep-alive',
+        'Content-Type': 'application/x-www-form-urlencoded',
+        'Referer': 'https://haokan.baidu.com',
+        'User-Agent': FakeUserAgent().chrome,
+    }
+    try:
+        response = requests.get(url, headers=headers, proxies=proxy(), params=params, timeout=120)
+        response.raise_for_status()
+        return response.json()
+    except requests.exceptions.RequestException as e:
+        log(
+            task="haokan_crawler_videos",
+            function="haokan_get_detail",
+            message=f"API请求失败: {e}",
+            data={"video_id": video_id},
+        )
+    except json.JSONDecodeError as e:
+        log(
+            task="haokan_crawler_videos",
+            function="haokan_get_detail",
+            message=f"响应解析失败: {e}",
+            data={"video_id": video_id},
+        )
+    return None
+
+

+ 2 - 1
coldStartTasks/crawler/toutiao/__init__.py

@@ -1,4 +1,5 @@
 """
 @author: luojunhui
 """
-from .blogger import get_toutiao_account_video_list
+from .blogger import get_toutiao_account_video_list
+from .detail_recommend import get_associated_recommendation

+ 59 - 0
coldStartTasks/crawler/toutiao/detail_recommend.py

@@ -0,0 +1,59 @@
+"""
+@author: luojunhui
+"""
+from __future__ import annotations
+
+import json
+import requests
+from tenacity import retry
+
+from applications import log
+from applications.utils import proxy, request_retry
+from .use_js import call_js_function
+
+retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30)
+
+
+@retry(**retry_desc)
+def get_associated_recommendation(article_id: str, cookie: str):
+    """
+    toutiao related recommendation
+    """
+    ms_token = "-aYwLj97uyCi3oghPfhz2nXaekLoFR5YnYUBA5SuyQZae_NLllO4zC30-CeVLth0A6Hmm7MuGr4_IN9MjHUn8wkq-UQKXJxoGmIAokpUsPsOLjdQKffe-cGWCiZ6xqgh7XE%3D"
+    query_params = [
+        0,
+        1,
+        14,
+        "min_behot_time=0&channel_id=91558184576&category=pc_profile_channel&disable_raw_data=true&client_extra_params=%7B%22playparam%22%3A%22codec_type%3A0%2Cenable_dash%3A1%2Cunwatermark%3A1%22%2C%22group_id%22%3A%22{}%22%7D&aid=24&app_name=toutiao_web&msToken={}".format(
+            article_id, ms_token, ms_token),
+        "",
+        "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36"
+    ]
+    a_bogus = call_js_function(query_params)
+    url = f"https://www.toutiao.com/api/pc/list/feed?min_behot_time=0&channel_id=91558184576&category=pc_profile_channel&disable_raw_data=true&client_extra_params=%7B%22playparam%22%3A%22codec_type%3A0%2Cenable_dash%3A1%2Cunwatermark%3A1%22%2C%22group_id%22%3A%22{article_id}%22%7D&aid=24&app_name=toutiao_web&msToken={ms_token}&a_bogus={a_bogus}"
+    headers = {
+        'accept': 'application/json, text/plain, */*',
+        'accept-language': 'zh',
+        'referer': 'https://www.toutiao.com/video/{}/'.format(article_id),
+        'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36',
+        'Cookie': cookie
+    }
+    try:
+        response = requests.get(url, headers=headers, proxies=proxy())
+        response.raise_for_status()
+        return response.json()
+    except requests.exceptions.RequestException as e:
+        log(
+            task="toutiao account crawler",
+            function="get_toutiao_account_video_list",
+            message=f"API请求失败: {e}",
+            data={"account_id": article_id},
+        )
+    except json.JSONDecodeError as e:
+        log(
+            task="toutiao account crawler",
+            function="get_toutiao_account_video_list",
+            message=f"响应解析失败: {e}",
+            data={"account_id": article_id},
+        )
+    return None

+ 4 - 1
coldStartTasks/crawler/wechat/__init__.py

@@ -1,4 +1,7 @@
 """
 @author: luojunhui
 """
-from .article_association import ArticleAssociationCrawler
+from .article_association import ArticleAssociationCrawler
+from .official_accounts_api import get_article_list_from_account
+from .official_accounts_api import get_article_detail
+from.official_accounts_api import get_source_account_from_article

+ 129 - 0
coldStartTasks/crawler/wechat/official_accounts_api.py

@@ -0,0 +1,129 @@
+from __future__ import annotations
+
+import re
+import json
+import requests
+from fake_useragent import FakeUserAgent
+from tenacity import retry
+
+from applications import log
+from applications.utils import request_retry
+
+retry_desc = request_retry(retry_times=3, min_retry_delay=2, max_retry_delay=30)
+
+# url from aigc
+base_url = "http://crawler-cn.aiddit.com/crawler/wei_xin"
+headers = {"Content-Type": "application/json"}
+
+
+@retry(**retry_desc)
+def get_article_detail(
+    article_link: str, is_count: bool=False, is_cache: bool=True
+) -> dict | None:
+    """
+    get official article detail
+    """
+    target_url = f"{base_url}/detail"
+    payload = json.dumps(
+        {
+            "content_link": article_link,
+            "is_count": is_count,
+            "is_ad": False,
+            "is_cache": is_cache
+        }
+    )
+    try:
+        response = requests.post(
+            url=target_url, headers=headers, data=payload, timeout=120
+        )
+        response.raise_for_status()
+        return response.json()
+    except requests.exceptions.RequestException as e:
+        log(
+            task="get_official_article_detail",
+            function="get_official_article_detail",
+            message=f"API请求失败: {e}",
+            data={"link": article_link}
+        )
+    except json.JSONDecodeError as e:
+        log(
+            task="get_official_article_detail",
+            function="get_official_article_detail",
+            message=f"响应解析失败: {e}",
+            data={"link": article_link}
+        )
+    return None
+
+
+@retry(**retry_desc)
+def get_article_list_from_account(
+        account_id: str, index
+) -> dict | None:
+    target_url = f"{base_url}/blogger"
+    payload = json.dumps(
+        {
+            "account_id": account_id,
+            "cursor": index
+        }
+    )
+    try:
+        response = requests.post(
+            url=target_url, headers=headers, data=payload, timeout=120
+        )
+        response.raise_for_status()
+        return response.json()
+    except requests.exceptions.RequestException as e:
+        log(
+            task="get_official_account_article_list",
+            function="get_official_account_article_list",
+            message=f"API请求失败: {e}",
+            data={"gh_id": account_id}
+        )
+    except json.JSONDecodeError as e:
+        log(
+            task="get_official_account_article_list",
+            function="get_official_account_article_list",
+            message=f"响应解析失败: {e}",
+            data={"gh_id": account_id}
+        )
+    return None
+
+
+@retry(**retry_desc)
+def get_source_account_from_article(article_link) -> dict | None:
+    """
+    get account info from official article
+    :param article_link:
+    :return:
+    """
+    try:
+        response = requests.get(url=article_link, headers={'User-Agent': FakeUserAgent().random}, timeout=120)
+        response.raise_for_status()
+        html_text = response.text
+        regex_nickname = r"hit_nickname:\s*'([^']+)'"
+        regex_username = r"hit_username:\s*'([^']+)'"
+        nickname = re.search(regex_nickname, html_text)
+        username = re.search(regex_username, html_text)
+        # 输出提取的结果
+        if nickname and username:
+            return {
+                'name': nickname.group(1),
+                'gh_id': username.group(1)
+            }
+        else:
+            return {}
+    except requests.exceptions.RequestException as e:
+        log(
+            task="get_source_account_from_article",
+            function="get_source_account_from_article",
+            message=f"API请求失败: {e}",
+            data={"link": article_link}
+        )
+    except json.JSONDecodeError as e:
+        log(
+            task="get_source_account_from_article",
+            function="get_source_account_from_article",
+            message=f"响应解析失败: {e}",
+            data={"link": article_link}
+        )
+    return None

+ 26 - 0
sh/run_account_explore.sh

@@ -0,0 +1,26 @@
+#!/bin/bash
+
+# 获取当前日期,格式为 YYYY-MM-DD
+CURRENT_DATE=$(date +%F)
+
+# 日志文件路径,包含日期
+LOG_FILE="/root/luojunhui/logs/account_explore_log_$CURRENT_DATE.txt"
+
+# 重定向整个脚本的输出到带日期的日志文件
+exec >> "$LOG_FILE" 2>&1
+if pgrep -f "python3 account_explore_task.py" > /dev/null
+then
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - account_explore_task.py is running"
+else
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - trying to restart account_explore_task.py"
+    # 切换到指定目录
+    cd /root/luojunhui/dev/LongArticlesJob
+
+    # 激活 Conda 环境
+    source /root/miniconda3/etc/profile.d/conda.sh
+    conda activate tasks
+
+    # 在后台运行 Python 脚本并重定向日志输出
+    nohup python3 account_explore_task.py >> "${LOG_FILE}" 2>&1 &
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restarted account_explore_task.py"
+fi

+ 26 - 0
sh/run_account_quality_analysis.sh

@@ -0,0 +1,26 @@
+#!/bin/bash
+
+# 获取当前日期,格式为 YYYY-MM-DD
+CURRENT_DATE=$(date +%F)
+
+# 日志文件路径,包含日期
+LOG_FILE="/root/luojunhui/logs/account_quality_analysis_log_$CURRENT_DATE.txt"
+
+# 重定向整个脚本的输出到带日期的日志文件
+exec >> "$LOG_FILE" 2>&1
+if pgrep -f "python3 account_quality_analysis.py" > /dev/null
+then
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - account_quality_analysis.py is running"
+else
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - trying to restart account_quality_analysis.py"
+    # 切换到指定目录
+    cd /root/luojunhui/dev/LongArticlesJob
+
+    # 激活 Conda 环境
+    source /root/miniconda3/etc/profile.d/conda.sh
+    conda activate tasks
+
+    # 在后台运行 Python 脚本并重定向日志输出
+    nohup python3 account_quality_analysis.py >> "${LOG_FILE}" 2>&1 &
+    echo "$(date '+%Y-%m-%d %H:%M:%S') - successfully restarted account_quality_analysis.py"
+fi

+ 177 - 0
tasks/account_recognize_by_llm.py

@@ -0,0 +1,177 @@
+"""
+use llm function to recognize the account information
+"""
+
+import json
+from pymysql.cursors import DictCursor
+from tqdm import tqdm
+from threading import local
+import concurrent
+from concurrent.futures import ThreadPoolExecutor
+
+from applications.api import fetch_deepseek_response
+from applications.db import DatabaseConnector
+from config import long_articles_config
+
+thread_local = local()
+
+
+def generate_prompt(account_title_list):
+    """
+    生成prompt
+    :param account_title_list:
+    """
+    title_list = "\n".join(account_title_list)
+    g_prompt = f"""
+    ** 任务指令 **
+        你是一名资深中文新闻编辑,需根据以下标准对一批标题进行主题匹配度评分(0-100分)
+        
+    ** 评估维度及权重 **
+        1. 受众精准度(50%)
+            正向匹配:存款/养老/健康/饮食/疾病警示/家庭伦理/近现代战争历史/老知青/奇闻异事
+            负向排除:影视解说/文学解读/个人收藏(钱币/邮票)/机械科普/数码测评/电子游戏/时尚潮流/明星八卦/极限运动/学术研究/网络热梗/宠物饲养/音乐/棋牌
+            
+        2. 标题技法(40%)
+            悬念设计:疑问句/省略号/反转结构(例:"打开后瞬间愣住...")
+            情感强度:使用"痛心!""寒心!"等情绪词
+            数据冲击:具体数字增强可信度(例:"存款180万消失")
+            口语化表达:使用"涨知识了""别不当回事"等日常用语
+            
+        3. 内容调性(10%)
+            煽情猎奇:家庭悲剧/离奇事件(例:"棺材板挖出金条")
+            警示价值:健康建议/法律案例(例:"三种食物禁止二次加热")
+            历史揭秘:人物秘闻/老照片故事
+            爱国情怀:军事突破/资源发现(例:"南极发现巨型粮仓")
+
+    ** 评分规则 **
+        90-100分:同时满足3个维度且要素齐全,无负向内容
+        70-89分:满足2个核心维度,无负向内容
+        50-69分:仅满足受众群体正向匹配,无负向内容
+        30-49分:存在轻微关联但要素缺失
+        0-29分:完全无关或包含任意负向品类内容
+        
+    ** 待评估标题 **
+        {title_list}
+        
+    ** 输出要求 **
+        仅输出这一批标题的评分,用数组 List 返回 [score1, score2, score3,...] 不要包含任何解释或说明。
+    """
+    return g_prompt
+
+
+def get_db_client():
+    """
+    each thread get it's own db client
+    """
+    if not hasattr(thread_local, "db_client"):
+        thread_local.db_client = DatabaseConnector(long_articles_config)
+        thread_local.db_client.connect()
+    return thread_local.db_client
+
+
+def update_task_status(thread_db_client, task_id, ori_status, new_status):
+    """
+    update task status
+    """
+    update_query = f"""
+        update crawler_candidate_account_pool
+        set status = %s
+        where id = %s and status = %s;  
+    """
+    thread_db_client.save(update_query, (new_status, task_id, ori_status))
+
+
+def recognize_each_account(thread_db_client, account):
+    """
+    recognize each account
+    """
+    task_id = account["id"]
+    # lock task
+    update_task_status(thread_db_client, task_id, 0, 1)
+
+    # process
+    title_list = json.loads(account["title_list"])
+    if len(title_list) < 15 and account['platform'] == 'toutiao':
+        # 账号数量不足,直接跳过
+        print("bad account, skip")
+        update_task_status(thread_db_client, task_id, 1, 11)
+        return
+
+    # 标题长度过长,需要过滤
+    title_total_length = sum(len(title) for title in title_list)
+    avg_title_length = title_total_length / len(title_list)
+    if avg_title_length > 45:
+        print("title too long, skip")
+        update_task_status(thread_db_client, task_id, 1, 14)
+        return
+
+    prompt = generate_prompt(title_list)
+    response = fetch_deepseek_response(model="DeepSeek-V3", prompt=prompt)
+    response_score_str = response.strip()
+    try:
+        score_list = json.loads(response_score_str)
+        avg_score = sum(score_list) / len(score_list)
+
+    except Exception as e:
+        score_list = []
+        avg_score = 0
+
+    if score_list and avg_score:
+        update_query = f"""
+            update crawler_candidate_account_pool
+            set score_list = %s, avg_score = %s, status = %s
+            where id = %s and status = %s;
+        """
+        thread_db_client.save(
+            update_query, (json.dumps(score_list), avg_score, 2, task_id, 1)
+        )
+    else:
+        update_task_status(thread_db_client, task_id, 1, 12)
+
+
+def recognize_task_thread(task):
+    """
+    recognize thread
+    """
+    thread_db_client = get_db_client()
+    try:
+        recognize_each_account(thread_db_client, task)
+    except Exception as e:
+        print(e)
+        update_task_status(
+            thread_db_client=thread_db_client,
+            task_id=["id"],
+            ori_status=1,
+            new_status=13,
+        )
+
+
+class AccountRecognizer:
+    def __init__(self):
+        self.db_client = DatabaseConnector(long_articles_config)
+        self.db_client.connect()
+
+    def get_task_list(self):
+        """
+        get account task from database
+        """
+        fetch_query = f"""
+            select id, title_list, platform from crawler_candidate_account_pool
+            where avg_score is null and status = 0 and title_list is not null;
+        """
+        fetch_response = self.db_client.fetch(fetch_query, cursor_type=DictCursor)
+        return fetch_response
+
+    def deal(self):
+        task_list = self.get_task_list()
+
+        with ThreadPoolExecutor(max_workers=8) as executor:
+            futures = [
+                executor.submit(recognize_task_thread, task) for task in task_list
+            ]
+            for future in tqdm(
+                concurrent.futures.as_completed(futures),
+                total=len(task_list),
+                desc="处理进度",
+            ):
+                future.result()

+ 451 - 0
tasks/crawler_accounts_by_association.py

@@ -0,0 +1,451 @@
+"""
+@author: luojunhui
+"""
+
+from __future__ import annotations
+
+import json
+import datetime
+import traceback
+
+from pymysql.cursors import DictCursor
+from tqdm import tqdm
+
+from applications import log
+from applications.db import DatabaseConnector
+from applications.pipeline import scrape_account_entities_process
+from applications.utils import Item
+from applications.utils import insert_into_candidate_account_pool_table
+from coldStartTasks.crawler.baidu import haokan_search_videos
+from coldStartTasks.crawler.baidu.baidu_spider import baidu_account_video_crawler
+from coldStartTasks.crawler.channels import search_in_wechat_channel
+from coldStartTasks.crawler.channels import get_channel_account_videos
+from coldStartTasks.crawler.toutiao import get_associated_recommendation
+from coldStartTasks.crawler.toutiao import get_toutiao_account_video_list
+from coldStartTasks.crawler.wechat import get_article_detail
+from coldStartTasks.crawler.wechat import get_article_list_from_account
+from coldStartTasks.crawler.wechat import get_source_account_from_article
+from config import apolloConfig, long_articles_config
+
+config = apolloConfig()
+recommend_cookie = config.getConfigValue("toutiao_detail_recommend_cookie")
+blogger_cookie = config.getConfigValue("toutiao_blogger_cookie")
+
+
+class CrawlerAccounts:
+
+    def __init__(self):
+        self.db_client = DatabaseConnector(db_config=long_articles_config)
+        self.db_client.connect()
+
+    def get_seed_keys(self) -> list[str]:
+        """
+        get search keys from database
+        """
+        fetch_query = f"""
+                select association_title
+                from `article_pool_promotion_source`
+                where association_status = 2 order by association_update_timestamp desc limit 100;
+        """
+        fetch_response = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
+        title_list = []
+        for item in fetch_response:
+            try:
+                title_list += json.loads(item['association_title'])
+            except Exception as e:
+                print(e)
+                continue
+        return list(set(title_list))
+
+    def insert_video_into_recommend_table(self, item: dict) -> None:
+        # whether account exists
+        final_item = scrape_account_entities_process(item, self.db_client)
+        if not final_item:
+            return
+        else:
+            # save to db
+            insert_into_candidate_account_pool_table(
+                db_client=self.db_client, account_item=final_item
+            )
+
+    def update_account_status(
+        self, account_id_tuple: tuple, ori_status: int, new_status: int
+    ) -> int:
+        update_query = f"""
+            update video_association
+            set status = %s
+            where id in %s and status = %s;
+        """
+        affected_rows = self.db_client.save(
+            query=update_query, params=(new_status, account_id_tuple, ori_status)
+        )
+        return affected_rows
+
+
+class ChannelsAccountCrawler(CrawlerAccounts):
+    """
+    crawler channel accounts
+    strategy:
+        1. try to get seed titles from database
+        2. try to get hot_points from web
+        2. use search api to get accounts
+    """
+
+    def process_channels_account(
+        self, account_name: str, account_id: str, title_list_str: str
+    ):
+        """
+        process video item and save to database
+        """
+
+        account_item = Item()
+        account_item.add("account_name", account_name)
+        account_item.add("account_id", account_id)
+        account_item.add("title_list", title_list_str)
+        account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d"))
+        account_item.add("platform", "sph")
+        # check item
+        account_item.check(source="candidate_account")
+
+        # save to db
+        self.insert_video_into_recommend_table(account_item.item)
+
+    def process_search_response(self, video: dict):
+        """
+        通过搜索视频的账号名称去搜索账号,并且抓取该账号下的第一页视频
+        """
+        account_name = video["items"][0]["source"]["title"]
+        # search account detail
+        search_account_response = search_in_wechat_channel(
+            search_key=account_name, search_type=2
+        )
+        account_detail = search_account_response["data"]["data"][0]["items"][0]
+        account_id = account_detail["jumpInfo"]["userName"]
+
+        # fetch account video list for the first page
+        search_video_response = get_channel_account_videos(account_id)
+        search_response_data = search_video_response["data"]
+        search_response_data_type = type(search_response_data)
+        if search_response_data_type == dict:
+            video_list = search_response_data["object"]
+        elif search_response_data_type == list:
+            video_list = search_response_data
+        else:
+            raise RuntimeError("search video response type error")
+
+        title_list = [i["objectDesc"]["description"] for i in video_list]
+        title_list_str = json.dumps(title_list, ensure_ascii=False)
+        self.process_channels_account(account_name, account_id, title_list_str)
+
+    def search_video_in_channels(self, title: str) -> None:
+        """
+        search
+        """
+        search_response = search_in_wechat_channel(search_key=title, search_type=1)
+        video_list = search_response["data"]["data"][0]["subBoxes"]
+        for video in tqdm(video_list, desc="crawler each video"):
+            try:
+                self.process_search_response(video)
+            except Exception as e:
+                log(
+                    task="crawler_channels_account_videos",
+                    function="process_search_response",
+                    message="search by title failed",
+                    data={
+                        "video": video,
+                        "error": str(e),
+                        "traceback": traceback.format_exc(),
+                    },
+                )
+
+    def deal(self):
+        seed_title_list = self.get_seed_keys()
+        for seed_title in tqdm(seed_title_list, desc="crawler each title"):
+            try:
+                self.search_video_in_channels(title=seed_title)
+            except Exception as e:
+                log(
+                    task="crawler_channels_account_videos",
+                    function="search_video_in_channels",
+                    message="search video in channels failed",
+                    data={
+                        "title": seed_title,
+                        "error": str(e),
+                        "traceback": traceback.format_exc(),
+                    },
+                )
+
+
+class ToutiaoAccountCrawler(CrawlerAccounts):
+
+    def get_seed_videos(self):
+        fetch_query = f"""
+            select out_account_name, article_title, url_unique_md5 
+            from publish_single_video_source
+            where platform = 'toutiao' and video_pool_audit_status = 1 and bad_status = 0
+            order by score desc limit 100;
+        """
+        seed_video_list = self.db_client.fetch(
+            query=fetch_query, cursor_type=DictCursor
+        )
+        return seed_video_list
+
+    def get_level_up_videos(self):
+        fetch_query = f"""
+            select out_account_name, article_title, url_unique_md5
+            from publish_single_video_source
+            where platform = 'toutiao' and (
+                article_title in (
+                    select distinct(title) from article_pool_promotion_source where status = 1 and deleted = 0
+                )
+                    or flow_pool_level < 4
+                );
+        """
+        uplevel_video_list = self.db_client.fetch(
+            query=fetch_query, cursor_type=DictCursor
+        )
+        return uplevel_video_list
+
+    def process_toutiao_account(self, video):
+
+        # process video item and save to database
+        account_item = Item()
+        user_info = video["user_info"]
+        account_item.add("account_name", user_info["name"])
+        account_item.add("account_id", user_info["user_id"])
+        account_item.add("platform", "toutiao")
+        account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d"))
+
+        # fetch account video first page video list
+        fetch_response = get_toutiao_account_video_list(
+            account_id=user_info["user_id"], cookie=blogger_cookie
+        )
+        video_list = fetch_response["data"]
+        title_list = [i["title"] for i in video_list]
+        title_list_str = json.dumps(title_list, ensure_ascii=False)
+        account_item.add("title_list", title_list_str)
+
+        # check item
+        account_item.check(source="candidate_account")
+
+        # insert into database
+        self.insert_video_into_recommend_table(account_item.item)
+
+    def get_recommend_video_list(self, seed_video: dict):
+
+        # get recommend videos for each video
+        seed_video_id = seed_video["url_unique_md5"]
+        recommend_response = get_associated_recommendation(
+            seed_video_id, recommend_cookie
+        )
+        recommend_video_list = recommend_response["data"]
+        for video in tqdm(recommend_video_list):
+            try:
+                self.process_toutiao_account(video)
+
+            except Exception as e:
+                log(
+                    task="toutiao account crawler",
+                    function="process_toutiao_video",
+                    message="get recommend video failed",
+                    data={
+                        "video": video,
+                        "error": str(e),
+                        "traceback": traceback.format_exc(),
+                    },
+                )
+
+    def get_category_recommend_list(self):
+        """
+        品类推荐流几乎无视频,暂时不做
+        """
+        return NotImplementedError()
+
+    def deal(self):
+
+        # start
+        # seed_video_list = self.get_seed_videos()
+        seed_video_list = self.get_level_up_videos()
+        for seed_video in tqdm(seed_video_list, desc="get each video recommendation"):
+            try:
+                self.get_recommend_video_list(seed_video)
+            except Exception as e:
+                log(
+                    task="toutiao_recommendation_crawler",
+                    function="save_each_recommendation",
+                    message="save recommendation failed",
+                    data={
+                        "error": str(e),
+                        "traceback": traceback.format_exc(),
+                        "seed_video": seed_video,
+                    },
+                )
+
+
+class HaoKanAccountCrawler(CrawlerAccounts):
+
+    def process_haokan_video(self, video: dict) -> None:
+        """
+        process_haokan_video
+        """
+
+        account_item = Item()
+        account_item.add("account_name", video["author"])
+        account_item.add("account_id", video["author_id"])
+        account_item.add("platform", "hksp")
+        account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d"))
+
+        # fetch account video first page video list
+        fetch_response = baidu_account_video_crawler(account_id=video["author_id"])
+        video_list = fetch_response["results"]
+        title_list = [i["content"]["title"] for i in video_list]
+        title_list_str = json.dumps(title_list, ensure_ascii=False)
+        account_item.add("title_list", title_list_str)
+
+        # check item
+        account_item.check(source="candidate_account")
+
+        # insert into database
+        self.insert_video_into_recommend_table(account_item.item)
+
+    def search_videos_in_haokan_video(self, title: str) -> None:
+        """
+        search_
+        """
+        search_response = haokan_search_videos(title)
+        video_list = search_response["data"]["list"]
+        for video in tqdm(video_list, desc="search videos"):
+            try:
+                self.process_haokan_video(video)
+            except Exception as e:
+                log(
+                    task="haokan_search_crawler",
+                    function="process_haokan_video",
+                    message="process haokan video failed",
+                    data={
+                        "video": video,
+                        "error": str(e),
+                        "traceback": traceback.format_exc(),
+                    },
+                )
+
+    def deal(self):
+        seed_title_list = self.get_seed_keys()
+        for seed_title in tqdm(seed_title_list, desc="crawler each title"):
+            try:
+                self.search_videos_in_haokan_video(seed_title)
+            except Exception as e:
+                log(
+                    task="haokan_search_crawler",
+                    function="search_videos_in_haokan_video",
+                    message="search videos in haokan video failed",
+                    data={
+                        "title": seed_title,
+                        "error": str(e),
+                        "traceback": traceback.format_exc(),
+                    },
+                )
+
+
+class GzhAccountCrawler(CrawlerAccounts):
+
+    def get_task_list(self):
+        fetch_query = f"""
+            select id, article_url
+            from publish_single_video_source
+            where source_account = 1 and platform = 'gzh' limit 10;
+        """
+        task_list = self.db_client.fetch(query=fetch_query, cursor_type=DictCursor)
+        return task_list
+
+    def process_official_account(self, account_name, account_id):
+        """
+        process_official_account
+        """
+        account_item = Item()
+        account_item.add("account_name", account_name)
+        account_item.add("account_id", account_id)
+        account_item.add("platform", "gzh")
+        account_item.add("crawler_date", datetime.datetime.today().strftime("%Y-%m-%d"))
+
+        # fetch account video first page video list
+        fetch_response = get_article_list_from_account(account_id=account_id, index=None)
+        msg_list = fetch_response["data"]["data"]
+        title_list = []
+        for msg in msg_list:
+            sub_title_list = [i['Title'] for i in msg['AppMsg']['DetailInfo']]
+            if len(title_list) > 10:
+                continue
+            else:
+                title_list += sub_title_list
+
+        title_list_str = json.dumps(title_list, ensure_ascii=False)
+        account_item.add("title_list", title_list_str)
+
+        # check item
+        account_item.check(source="candidate_account")
+
+        # insert into database
+        self.insert_video_into_recommend_table(account_item.item)
+
+    def extract_account_from_article_link(self, article_link):
+        """
+        try to get account info from article link
+        """
+        # is article link original
+        article_detail = get_article_detail(article_link)
+        is_original = article_detail["data"]["data"]["is_original"]
+
+        if is_original:
+            return
+        # extract source account
+        source_account = get_source_account_from_article(article_link)
+        if not source_account:
+            return
+        else:
+            account_name = source_account['name']
+            gh_id = source_account['gh_id']
+            self.process_official_account(account_name, gh_id)
+
+    def update_crawler_article_status(self, article_id_tuple: tuple):
+        """
+        update crawler article status
+        """
+        update_query = f"""
+            update publish_single_video_source
+            set source_account = %s
+            where id in %s;
+        """
+        affected_rows = self.db_client.save(
+            query=update_query, params=(0, article_id_tuple)
+        )
+        return affected_rows
+
+    def deal(self):
+        task_list = self.get_task_list()
+        task_id_list = []
+        for crawler_article_obj in tqdm(task_list, desc="crawler article list"):
+            article_url = crawler_article_obj['article_url']
+            article_id = crawler_article_obj['id']
+            task_id_list.append(int(article_id))
+            try:
+                self.extract_account_from_article_link(article_url)
+
+            except Exception as e:
+                log(
+                    task="gzh_account_crawler",
+                    function="extract_account_from_article_link",
+                    message="extract account from article link failed",
+                    data={
+                        "article_url": article_url,
+                        "error": str(e),
+                        "traceback": traceback.format_exc(),
+                    },
+                )
+
+        if task_id_list:
+            article_id_tuple = tuple(task_id_list)
+            affected_rows = self.update_crawler_article_status(article_id_tuple)
+            print(affected_rows)
+
+

+ 145 - 0
tasks/generate_search_keys.py

@@ -0,0 +1,145 @@
+import json
+import time
+import concurrent
+import datetime
+from tqdm import tqdm
+from pymysql.cursors import DictCursor
+from concurrent.futures import ThreadPoolExecutor
+from applications.api import fetch_deepseek_response
+from applications.db import DatabaseConnector
+from config import long_articles_config
+
+INIT_STATUS = 0
+PROCESSING_STATUS = 1
+SUCCESS_STATUS = 2
+FAIL_STATUS = 99
+
+MAX_PROCESSING_TIME = 60 * 60
+
+
+def generate_prompt(title):
+    prompt = f"""
+    **输入标题**:  
+    {title}
+    task1:`请按以下维度解构标题核心元素:`  
+    1. **核心人物**(带身份标签):
+    2. **冲突双方**:
+    3. **核心事件**:
+    4. **反常细节**:
+    5. **时空坐标**:
+    6. **认知钩子**:
+    7. **隐性议题**:
+    task2: 请基于 task1 结构元素,套用以下公式来生成关键词
+    | 公式类型        | 生成逻辑                                
+    |--------------- |----------------------------
+    | **悬念挖掘式**  | [人物]+[反常行为]+[学科解释需求]       
+    | **时空穿越式**  | [历史事件]在[年代]的[群体]解读          
+    | **技术拆解式**  | [人物]的[专业领域]+[技术术语]           
+    | **文化对抗式**  | [国家A]如何重新定义[国家B]的[历史符号]
+    | **暗线追踪式**  | [微小物证]揭示的[大历史真相]        
+
+    task3: 基于 task2 生成的关键词矩阵,生成一批长尾词,通过这些词可以在媒体平台搜索到相关内容,
+           要求这些内容对 50 岁以上的中老年人有一定的吸引性
+
+    输出: 只需要输出task3 生成的长尾词列表
+    输出示例: ["长尾词1", "长尾词2", "长尾词3", "长尾词4", ...]
+    """
+    return prompt
+
+def lock_task(db_client, task_md5):
+    lock_query = f"""
+        update `article_pool_promotion_source`
+        set association_status = %s, association_update_timestamp = %s
+        where title_md5 = %s and association_status = %s;
+    """
+    affected_rows = db_client.save(lock_query, params=(PROCESSING_STATUS, datetime.datetime.now(), task_md5, INIT_STATUS))
+    return affected_rows
+
+def rollback_task(db_client):
+    now_timestamp = int(time.time())
+    timestamp_threshold = datetime.datetime.fromtimestamp(
+        now_timestamp - MAX_PROCESSING_TIME
+    )
+    update_query = f"""
+        update `article_pool_promotion_source`
+        set association_status = %s
+        where association_status = %s and association_update_timestamp < %s;
+    """
+    rollback_rows = db_client.save(
+        query=update_query, params=(INIT_STATUS, PROCESSING_STATUS, timestamp_threshold)
+    )
+    return rollback_rows
+
+def generate_single_title(task):
+    title = task['title']
+    thread_client = DatabaseConnector(long_articles_config)
+    thread_client.connect()
+
+    rollback_task(thread_client)
+
+    lock_result = lock_task(thread_client, task['title_md5'])
+    if not lock_result:
+        return
+
+    prompt = generate_prompt(title)
+    response = fetch_deepseek_response(model="DeepSeek-R1", prompt=prompt)
+    title_list_str = response.strip()
+    try:
+        title_list = json.loads(title_list_str)
+    except json.decoder.JSONDecodeError:
+        title_list = title_list_str
+    except Exception as e:
+        # set as fail
+        update_query = f"""
+                update `article_pool_promotion_source`
+                set association_status = %s, association_update_timestamp = %s
+                where title_md5 = %s and association_status = %s;
+                """
+        thread_client.save(
+            update_query,
+            params=(
+                FAIL_STATUS, datetime.datetime.now(), task['title_md5'],
+                PROCESSING_STATUS
+            )
+        )
+        return
+
+    # set as success
+    update_query = f"""
+        update `article_pool_promotion_source`
+        set association_status = %s, association_update_timestamp = %s, association_title = %s
+        where title_md5 = %s and association_status = %s;
+        """
+    thread_client.save(
+        update_query,
+        params=(
+            SUCCESS_STATUS, datetime.datetime.now(), json.dumps(title_list, ensure_ascii=False), task['title_md5'], PROCESSING_STATUS
+        )
+    )
+
+def get_task_list():
+    db = DatabaseConnector(long_articles_config)
+    db.connect()
+    fetch_query = f"""
+        select distinct title, title_md5
+        from `article_pool_promotion_source`
+        where `level` = 'autoArticlePoolLevel1' 
+            and status = 1 and `deleted` = 0 and association_status = 0 
+            order by `create_timestamp` desc 
+            limit 160;
+    """
+    title_list = db.fetch(fetch_query, cursor_type=DictCursor)
+    return title_list
+
+def get_association_title_list_in_multi_threads():
+    task_list = get_task_list()
+    with ThreadPoolExecutor(max_workers=8) as executor:
+        futures = [
+            executor.submit(generate_single_title, task) for task in task_list
+        ]
+        for future in tqdm(
+                concurrent.futures.as_completed(futures),
+                total=len(task_list),
+                desc="处理进度",
+        ):
+            future.result()