| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 | """@author: luojunhui"""import jsonimport timeimport tracebackfrom typing import Dictimport requestsfrom tqdm import tqdmfrom pymysql.cursors import DictCursorfrom applications import botfrom applications import logfrom applications import Functionsfrom applications.db import DatabaseConnectorfrom config import long_articles_configfrom cold_start.filter import article_crawler_duplicate_filterfunctions = Functions()class ToutiaoRecommendCrawler(object):    """    今日头条推荐流    """    def __init__(self) -> None:        self.db_client = None    def init_database(self) -> None:        """        初始化数据库        :return:        """        try:            self.db_client = DatabaseConnector(db_config=long_articles_config)            self.db_client.connect()        except Exception as e:            bot(                title="今日头条推荐流文章抓取任务数据库连接失败",                detail={                    "error": str(e),                    "error_stack": traceback.format_exc()                }            )    def get_request_params(self, category) -> Dict:        """        获取请求参数        :return:        """        select_sql = f"""                    SELECT request_method, request_url, request_headers, post_data                    FROM toutiao_request_params                    WHERE category = '{category}' and expire_flag = 0                    ORDER BY id                    LIMIT 1;                """        result = self.db_client.fetch(            query=select_sql,            cursor_type=DictCursor        )        if not result:            print("cookie没了报警")            return {}        else:            return result[0]    def get_recommendation_article_list(self, category) -> Dict:        """        获取历史推荐流文章        :return:        """        cookie_obj = self.get_request_params(category)        if not cookie_obj:            return {}        response = requests.request(            method=cookie_obj['request_method'],            url=cookie_obj['request_url'],            headers=json.loads(cookie_obj['request_headers']),            proxies=functions.proxy()        )        if response.text is None:            print("{}: cookie  失效".format(category))        return response.json()    def insert_each_article(self, category, item: Dict) -> None:        """        提取文章信息        :param item        :param category        :return:        """        title = item['title']        if article_crawler_duplicate_filter(new_article_title=title, db_client=self.db_client):            log(                function='toutiao_recommend_crawler',                task='toutiao_recommend',                message='标题去重'            )            return        item_id = item.get('item_id')        article_url = item['article_url']        like_count = item['like_count']        read_count = item['read_count']        user_info = item['user_info']        user_id = user_info.get('user_id')        abstract = item['Abstract']        publish_time = item['publish_time']        insert_sql = f"""            INSERT IGNORE INTO crawler_meta_article            (platform, mode, category, out_account_id, title, link, read_cnt, like_cnt, status, description, publish_time, crawler_time, unique_index)            VALUES            (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);        """        self.db_client.save(            query=insert_sql,            params=(                "toutiao",                "recommend",                category,                user_id,                title,                article_url,                read_count,                like_count,                1,                abstract,                publish_time,                int(time.time()),                item_id            )        )    def process_recommendation(self, category, recommendation):        """        处理推荐流文章        :param recommendation        :param category        :return:        """        for item in tqdm(recommendation['data']):            if item.get('article_url'):                video_flag = item.get('has_video')                if not video_flag:                    try:                        self.insert_each_article(category=category, item=item)                    except Exception as e:                        error_data = {                            "error": str(e),                            "error_stack": traceback.format_exc()                        }                        log(                            function='toutiao_recommend_crawler',                            task='toutiao_recommend',                            message='头条推荐流文章插入失败',                            data=error_data,                            status='fail'                        )                else:                    print("视频文章跳过")            else:                print("无链接文章跳过")    def run(self, category) -> None:        """        主函数        :return:        """        for i in range(10):            try:                article_list = self.get_recommendation_article_list(category=category)                self.process_recommendation(category=category, recommendation=article_list)                time.sleep(3)            except Exception as e:                error_data = {                    "error": str(e),                    "error_stack": traceback.format_exc()                }                print(error_data)
 |