Browse Source

Merge branch '2025-01-02-toutiao-recommend-v1' of luojunhui/LongArticlesJob into master

luojunhui 3 months ago
parent
commit
439d12eb3a

+ 5 - 0
.gitignore

@@ -25,6 +25,9 @@ test/
 *.egg-info/
 .installed.cfg
 *.egg
+test/
+.vscode
+.DS_Store
 
 # PyInstaller
 #  Usually these files are written by a python script from a template
@@ -59,3 +62,5 @@ docs/_build/
 # PyBuilder
 target/
 
+dev.py
+.gitignore

+ 66 - 0
coldStartTasks/crawler/toutiao_get_bogus.py

@@ -0,0 +1,66 @@
+"""
+@author: 罗俊辉
+@file: toutiao_get_bogus.py
+@time: 2024/1/4
+@desc: 用浏览器工具获取头条参数,并且存储到数据库中
+"""
+
+import json
+from typing import Dict
+from playwright.sync_api import sync_playwright
+
+from applications.db import DatabaseConnector
+from config import long_articles_config
+
+
+class ToutiaoBogus:
+    def __init__(self):
+        """
+        初始化ToutiaoBogus类的实例。
+        该方法创建了一个DatabaseConnector的实例,并调用其connect方法来建立与数据库的连接。
+        Attributes:
+            db (DatabaseConnector): 用于与数据库进行交互的DatabaseConnector实例。
+        """
+        # 创建一个DatabaseConnector实例,用于与数据库进行交互
+        self.db = DatabaseConnector(db_config=long_articles_config)
+        # 调用DatabaseConnector实例的connect方法,建立与数据库的连接
+        self.db.connect()
+
+    def on_request(self, request, category):
+        if "https://www.toutiao.com/api/pc/list/feed?" in request.url:
+            request_info = {
+                'method': request.method,
+                'url': request.url,
+                'headers': request.headers if request.headers else {},
+                'postData': request.post_data if request.post_data else {}
+            }
+            insert_sql = f"""
+                INSERT INTO toutiao_request_params
+                (request_method, request_url, request_headers, post_data, category) 
+                VALUES
+                (%s, %s, %s, %s, %s);
+            """
+            self.db.save(
+                query=insert_sql,
+                params=(
+                    request.method, 
+                    request.url, 
+                    json.dumps(request.headers, ensure_ascii=False), 
+                    json.dumps(request.post_data, ensure_ascii=False),
+                    category
+                )
+            )
+        
+    def crawler_recommend_article_list(self, category_info: Dict):
+        with sync_playwright() as p:
+            browser = p.chromium.launch(headless=False)
+            context = browser.new_context()
+            page = context.new_page()
+            page.goto(category_info['url'])
+            page.wait_for_load_state("networkidle")
+            # 监听请求事件
+            page.on("request", lambda request: self.on_request(request, category_info['category']))
+            page.get_by_role("button", name=category_info['name']).click()
+            page.wait_for_load_state("networkidle")
+            page.wait_for_timeout(5000)
+            browser.close()

+ 163 - 0
coldStartTasks/crawler/toutiao_recommend_crawler.py

@@ -0,0 +1,163 @@
+"""
+@author: luojunhui
+"""
+import json
+import time
+import traceback
+from typing import Dict
+
+import requests
+from tqdm import tqdm
+from pymysql.cursors import DictCursor
+
+from applications import bot
+from applications import log
+from applications import Functions
+from applications.db import DatabaseConnector
+from config import long_articles_config
+
+functions = Functions()
+
+class ToutiaoRecommendCrawler(object):
+    """
+    今日头条推荐流
+    """
+    def __init__(self) -> None:
+        self.db_client = None
+
+    def init_database(self) -> None:
+        """
+        初始化数据库
+        :return:
+        """
+        try:
+            self.db_client = DatabaseConnector(db_config=long_articles_config)
+            self.db_client.connect()
+        except Exception as e:
+            bot(
+                title="今日头条推荐流文章抓取任务数据库连接失败",
+                detail={
+                    "error": str(e),
+                    "error_stack": traceback.format_exc()
+                }
+            )
+
+    def get_history_recommendation(self) -> Dict:
+        """
+        获取历史推荐流文章
+        :return:
+        """
+        select_sql = f"""
+            SELECT request_method, request_url, request_headers, post_data
+            FROM toutiao_request_params
+            WHERE category = 'history' and expire_flag = 0
+            ORDER BY id
+            LIMIT 1;
+        """
+        result = self.db_client.fetch(
+            query=select_sql,
+            cursor_type=DictCursor
+        )
+        if not result:
+            print("cookie没了报警")
+            return {}
+        cookie_obj = result[0]
+        response = requests.request(
+            method=cookie_obj['request_method'],
+            url=cookie_obj['request_url'],
+            headers=json.loads(cookie_obj['request_headers']),
+            proxies=functions.proxy()
+        )
+        return response.json()
+
+    def get_tech_recommendation(self) -> Dict:
+        """
+        获取科技推荐流文章
+        :return:
+        """
+        return
+
+    def insert_each_article(self, item: Dict) -> Dict:
+        """
+        提取文章信息
+        :param article_info:
+        :return:
+        """
+        item_id = item.get('item_id')
+        article_url = item['article_url']
+        like_count = item['like_count']
+        read_count = item['read_count']
+        title = item['title']
+        user_info = item['user_info']
+        user_id = user_info.get('user_id')
+        abstract = item['Abstract']
+        publish_time = item['publish_time']
+        insert_sql = f"""
+            INSERT IGNORE INTO crawler_meta_article
+            (platform, mode, category, out_account_id, title, link, read_cnt, like_cnt, status, description, publish_time, crawler_time, unique_index)
+            VALUES
+            (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+        """
+        self.db_client.save(
+            query=insert_sql,
+            params=(
+                "toutiao",
+                "recommend",
+                "history",
+                user_id,
+                title,
+                article_url,
+                read_count,
+                like_count,
+                1,
+                abstract,
+                publish_time,
+                int(time.time()),
+                item_id
+            )
+        )
+
+    def process_recommendation(self, recommendation) -> Dict:
+        """
+        处理推荐流文章
+        :param recommendation:
+        :return:
+        """
+        for item in tqdm(recommendation['data']):
+            if item.get('article_url'):
+                video_flag = item.get('has_video')
+                if not video_flag:
+                    try:
+                        self.insert_each_article(item)
+                    except Exception as e:
+                        error_data = {
+                            "error": str(e),
+                            "error_stack": traceback.format_exc()
+                        }
+                        log(
+                            task='toutiao_recommend',
+                            message='头条推荐流文章插入失败',
+                            data=error_data,
+                            status='fail'
+                        )
+                else:
+                    print("视频文章跳过")
+            else:
+                print("无链接文章跳过")
+    
+    def run(self) -> None:
+        """
+        主函数
+        :return:
+        """
+        for i in range(10):
+            try:
+                article_list = self.get_history_recommendation()
+                self.process_recommendation(article_list)
+                time.sleep(3)
+            except Exception as e:
+                error_data = {
+                    "error": str(e),
+                    "error_stack": traceback.format_exc()
+                }
+                print(error_data)

+ 7 - 0
run_toutiao_recommend.py

@@ -0,0 +1,7 @@
+from coldStartTasks.crawler.toutiao_recommend_crawler import ToutiaoRecommendCrawler
+
+
+if __name__ == "__main__":
+    toutiao_recommend_crawler = ToutiaoRecommendCrawler()
+    toutiao_recommend_crawler.init_database()
+    toutiao_recommend_crawler.run()