""" @author: luojunhui 抓取全局品类文章 """ import datetime import time import traceback from typing import Dict, List from tqdm import tqdm from pymysql.cursors import DictCursor from applications.db import DatabaseConnector from applications.pipeline import ( whether_title_sensitive, whether_duplicate_article_title, ) from applications.utils import show_desc_to_sta, generate_gzh_id, timestamp_to_str from cold_start.crawler.wechat import get_article_list_from_account from config import long_articles_config class Const: ACCOUNT_GOOD_STATUS = 1 # 账号是否每日抓取 ACCOUNT_DAILY_SCRAPE = 1 ACCOUNT_NOT_DAILY_SCRAPE = 0 # 默认值 DEFAULT_VIEW_COUNT = 0 DEFAULT_LIKE_COUNT = 0 DEFAULT_ARTICLE_STATUS = 1 DEFAULT_TIMESTAMP = 1717171200 # 标题sensitivity TITLE_SENSITIVE = 1 TITLE_NOT_SENSITIVE = 0 class GzhArticleCrawler(Const): def __init__(self): self.db_client = DatabaseConnector(long_articles_config) self.db_client.connect() def get_latest_timestamp(self, account: dict) -> int: try: timestamp = int(account["latest_update_time"].timestamp()) except Exception as e: timestamp = self.DEFAULT_TIMESTAMP return timestamp def insert_article_into_meta(self, gh_id, account_mode, article_list): """ 将数据更新到数据库 :return: """ for article_obj in article_list: detail_article_list = article_obj["AppMsg"]["DetailInfo"] for obj in detail_article_list: try: if whether_duplicate_article_title(obj["Title"], self.db_client): continue # 判断标题是否包含敏感词 title_sensitivity = ( self.TITLE_SENSITIVE if whether_title_sensitive(obj["Title"]) else self.TITLE_NOT_SENSITIVE ) show_stat = show_desc_to_sta(obj["ShowDesc"]) show_view_count = show_stat.get( "show_view_count", self.DEFAULT_VIEW_COUNT ) show_like_count = show_stat.get( "show_like_count", self.DEFAULT_LIKE_COUNT ) unique_idx = generate_gzh_id(obj["ContentUrl"]) insert_sql = f""" insert into crawler_meta_article ( platform, mode, category, out_account_id, article_index, title, link, read_cnt, like_cnt, description, publish_time, crawler_time, status, unique_index, llm_sensitivity, title_sensitivity ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ self.db_client.save( query=insert_sql, params=( "weixin", "account", account_mode, gh_id, obj["ItemIndex"], obj["Title"], obj["ContentUrl"], show_view_count, show_like_count, obj["Digest"], obj["send_time"], int(time.time()), self.DEFAULT_ARTICLE_STATUS, unique_idx, obj.get("llm_sensitivity", -1), title_sensitivity, ), ) except Exception as e: print(e) def update_latest_account_timestamp(self, gh_id): """ 更新账号的最新时间戳 :return: """ select_sql = f""" SELECT publish_time From crawler_meta_article WHERE out_account_id = '{gh_id}' ORDER BY publish_time DESC LIMIT 1; """ result = self.db_client.fetch(select_sql) time_stamp = result[0][0] dt_str = timestamp_to_str(time_stamp) update_sql = f""" update long_articles_accounts set latest_update_time = %s where gh_id = %s; """ self.db_client.save(query=update_sql, params=(dt_str, gh_id)) def crawl_each_account(self, gh_id, account_mode, latest_time_stamp): """ 更新账号文章 :return: """ current_cursor = None while True: # fetch response from weixin response = get_article_list_from_account( account_id=gh_id, index=current_cursor ) print(response) msg_list = response.get("data", {}).get("data") if not msg_list: break # process current page self.insert_article_into_meta(gh_id, account_mode, msg_list) # whether crawl next page last_article_in_this_page = msg_list[-1] last_time_stamp_in_this_msg = last_article_in_this_page["AppMsg"][ "BaseInfo" ]["UpdateTime"] if last_time_stamp_in_this_msg > latest_time_stamp: self.update_latest_account_timestamp(gh_id) break # update cursor for next page current_cursor = response.get("data", {}).get("next_cursor") if not current_cursor: break def crawl_account_list(self, account_list, account_method): for account in tqdm(account_list): try: gh_id = account["gh_id"] account_name = account["account_name"] latest_timestamp = self.get_latest_timestamp(account) self.crawl_each_account(gh_id, account_method, latest_timestamp) self.update_account_read_avg_info(gh_id, account_name) except Exception as e: print(f"fail because of {e}") print(traceback.format_exc() ) def update_account_read_avg_info(self, gh_id, account_name): """ calculate read avg info and read_avg_ci_high """ position_list = [i for i in range(1, 9)] today_dt = datetime.date.today().isoformat() for position in position_list: fetch_query = f""" select read_cnt, from_unixtime(publish_time, "%Y-%m_%d") as publish_dt from crawler_meta_article where out_account_id = '{gh_id}' and article_index = {position} order by publish_time desc limit 30; """ fetch_response = self.db_client.fetch(fetch_query, cursor_type=DictCursor) if fetch_response: read_cnt_list = [i["read_cnt"] for i in fetch_response] n = len(read_cnt_list) read_avg = sum(read_cnt_list) / n max_publish_dt = fetch_response[0]["publish_dt"] remark = f"从{max_publish_dt}开始计算,往前算{len(fetch_response)}天" insert_query = f""" insert ignore into crawler_meta_article_accounts_read_avg (gh_id, account_name, position, read_avg, dt, status, remark) values (%s, %s, %s, %s, %s, %s, %s); """ insert_rows = self.db_client.save( insert_query, params=( gh_id, account_name, position, read_avg, today_dt, 1, remark, ), ) if insert_rows: update_query = f""" update crawler_meta_article_accounts_read_avg set status = %s where gh_id = %s and position = %s and dt < %s; """ self.db_client.save(update_query, (0, gh_id, position, today_dt)) class CrawlerDailyScrapeAccountArticles(GzhArticleCrawler): def get_account_list(self, account_method: str) -> List[Dict]: """ 获取账号 :param account_method: :return: """ query = f""" select gh_id, account_source, account_name, account_category, latest_update_time from long_articles_accounts where account_category = '{account_method}' and is_using = {self.ACCOUNT_GOOD_STATUS} and daily_scrape = {self.ACCOUNT_DAILY_SCRAPE}; """ account_list = self.db_client.fetch(query, cursor_type=DictCursor) return account_list def deal(self, method_list): """ :param method_list: :return: """ # daily 品类账号抓取 for account_method in method_list: account_list = self.get_account_list(account_method) self.crawl_account_list(account_list, account_method) class CrawlerAssociationAccountArticles(GzhArticleCrawler): def get_association_account_list(self, date_str): """ 获取账号联想的轮询账号 """ group_id = date_str[-1] query = f""" select account_id, gh_id, account_name, latest_update_time from long_articles_accounts where account_category = 'account_association' and is_using = {self.ACCOUNT_DAILY_SCRAPE} and daily_scrape = {self.ACCOUNT_NOT_DAILY_SCRAPE}; """ account_list = self.db_client.fetch(query, cursor_type=DictCursor) today_crawler_account_list = [ i for i in account_list if str(i["account_id"])[-1] == group_id ] return today_crawler_account_list def deal(self, date_str): account_list = self.get_association_account_list(date_str) self.crawl_account_list(account_list, "account_association")