""" @author: luojunhui """ import json import time import traceback from typing import Dict import requests from tqdm import tqdm from pymysql.cursors import DictCursor from applications import bot from applications import log from applications import Functions from applications.db import DatabaseConnector from config import long_articles_config functions = Functions() class ToutiaoRecommendCrawler(object): """ 今日头条推荐流 """ def __init__(self) -> None: self.db_client = None def init_database(self) -> None: """ 初始化数据库 :return: """ try: self.db_client = DatabaseConnector(db_config=long_articles_config) self.db_client.connect() except Exception as e: bot( title="今日头条推荐流文章抓取任务数据库连接失败", detail={ "error": str(e), "error_stack": traceback.format_exc() } ) def get_history_recommendation(self) -> Dict: """ 获取历史推荐流文章 :return: """ select_sql = f""" SELECT request_method, request_url, request_headers, post_data FROM toutiao_request_params WHERE category = 'history' and expire_flag = 0 ORDER BY id LIMIT 1; """ result = self.db_client.fetch( query=select_sql, cursor_type=DictCursor ) if not result: print("cookie没了报警") return {} cookie_obj = result[0] response = requests.request( method=cookie_obj['request_method'], url=cookie_obj['request_url'], headers=json.loads(cookie_obj['request_headers']), proxies=functions.proxy() ) return response.json() def get_tech_recommendation(self) -> Dict: """ 获取科技推荐流文章 :return: """ return def insert_each_article(self, item: Dict) -> Dict: """ 提取文章信息 :param article_info: :return: """ item_id = item.get('item_id') article_url = item['article_url'] like_count = item['like_count'] read_count = item['read_count'] title = item['title'] user_info = item['user_info'] user_id = user_info.get('user_id') abstract = item['Abstract'] publish_time = item['publish_time'] insert_sql = f""" INSERT IGNORE INTO crawler_meta_article (platform, mode, category, out_account_id, title, link, read_cnt, like_cnt, status, description, publish_time, crawler_time, unique_index) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ self.db_client.save( query=insert_sql, params=( "toutiao", "recommend", "history", user_id, title, article_url, read_count, like_count, 1, abstract, publish_time, int(time.time()), item_id ) ) def process_recommendation(self, recommendation) -> Dict: """ 处理推荐流文章 :param recommendation: :return: """ for item in tqdm(recommendation['data']): if item.get('article_url'): video_flag = item.get('has_video') if not video_flag: try: self.insert_each_article(item) except Exception as e: error_data = { "error": str(e), "error_stack": traceback.format_exc() } log( task='toutiao_recommend', message='头条推荐流文章插入失败', data=error_data, status='fail' ) else: print("视频文章跳过") else: print("无链接文章跳过") def run(self) -> None: """ 主函数 :return: """ for i in range(10): try: article_list = self.get_history_recommendation() self.process_recommendation(article_list) time.sleep(3) except Exception as e: error_data = { "error": str(e), "error_stack": traceback.format_exc() } print(error_data)