""" @author: luojunhui """ import json import time import traceback from typing import Dict import requests from tqdm import tqdm from pymysql.cursors import DictCursor from applications import bot from applications import log from applications import Functions from applications.db import DatabaseConnector from config import long_articles_config from coldStartTasks.filter import article_crawler_duplicate_filter functions = Functions() class ToutiaoRecommendCrawler(object): """ 今日头条推荐流 """ def __init__(self) -> None: self.db_client = None def init_database(self) -> None: """ 初始化数据库 :return: """ try: self.db_client = DatabaseConnector(db_config=long_articles_config) self.db_client.connect() except Exception as e: bot( title="今日头条推荐流文章抓取任务数据库连接失败", detail={ "error": str(e), "error_stack": traceback.format_exc() } ) def get_request_params(self, category) -> Dict: """ 获取请求参数 :return: """ select_sql = f""" SELECT request_method, request_url, request_headers, post_data FROM toutiao_request_params WHERE category = '{category}' and expire_flag = 0 ORDER BY id LIMIT 1; """ result = self.db_client.fetch( query=select_sql, cursor_type=DictCursor ) if not result: print("cookie没了报警") return {} else: return result[0] def get_recommendation_article_list(self, category) -> Dict: """ 获取历史推荐流文章 :return: """ cookie_obj = self.get_request_params(category) if not cookie_obj: return {} response = requests.request( method=cookie_obj['request_method'], url=cookie_obj['request_url'], headers=json.loads(cookie_obj['request_headers']), proxies=functions.proxy() ) if response.text is None: print("{}: cookie 失效".format(category)) return response.json() def insert_each_article(self, category, item: Dict) -> None: """ 提取文章信息 :param item :param category :return: """ title = item['title'] if article_crawler_duplicate_filter(new_article_title=title, db_client=self.db_client): log( function='toutiao_recommend_crawler', task='toutiao_recommend', message='标题去重' ) return item_id = item.get('item_id') article_url = item['article_url'] like_count = item['like_count'] read_count = item['read_count'] user_info = item['user_info'] user_id = user_info.get('user_id') abstract = item['Abstract'] publish_time = item['publish_time'] insert_sql = f""" INSERT IGNORE INTO crawler_meta_article (platform, mode, category, out_account_id, title, link, read_cnt, like_cnt, status, description, publish_time, crawler_time, unique_index) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ self.db_client.save( query=insert_sql, params=( "toutiao", "recommend", category, user_id, title, article_url, read_count, like_count, 1, abstract, publish_time, int(time.time()), item_id ) ) def process_recommendation(self, category, recommendation): """ 处理推荐流文章 :param recommendation :param category :return: """ for item in tqdm(recommendation['data']): if item.get('article_url'): video_flag = item.get('has_video') if not video_flag: try: self.insert_each_article(category=category, item=item) except Exception as e: error_data = { "error": str(e), "error_stack": traceback.format_exc() } log( function='toutiao_recommend_crawler', task='toutiao_recommend', message='头条推荐流文章插入失败', data=error_data, status='fail' ) else: print("视频文章跳过") else: print("无链接文章跳过") def run(self, category) -> None: """ 主函数 :return: """ for i in range(10): try: article_list = self.get_recommendation_article_list(category=category) self.process_recommendation(category=category, recommendation=article_list) time.sleep(3) except Exception as e: error_data = { "error": str(e), "error_stack": traceback.format_exc() } print(error_data)