| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275 | """@author: luojunhui抓取全局品类文章"""import datetimeimport timeimport tracebackfrom typing import Dict, Listfrom tqdm import tqdmfrom pymysql.cursors import DictCursorfrom applications.db import DatabaseConnectorfrom applications.pipeline import (    whether_title_sensitive,    whether_duplicate_article_title,)from applications.utils import show_desc_to_sta, generate_gzh_id, timestamp_to_strfrom cold_start.crawler.wechat import get_article_list_from_accountfrom config import long_articles_configclass Const:    ACCOUNT_GOOD_STATUS = 1    # 账号是否每日抓取    ACCOUNT_DAILY_SCRAPE = 1    ACCOUNT_NOT_DAILY_SCRAPE = 0    # 默认值    DEFAULT_VIEW_COUNT = 0    DEFAULT_LIKE_COUNT = 0    DEFAULT_ARTICLE_STATUS = 1    DEFAULT_TIMESTAMP = 1717171200    # 标题sensitivity    TITLE_SENSITIVE = 1    TITLE_NOT_SENSITIVE = 0class GzhArticleCrawler(Const):    def __init__(self):        self.db_client = DatabaseConnector(long_articles_config)        self.db_client.connect()    def get_latest_timestamp(self, account: dict) -> int:        try:            timestamp = int(account["latest_update_time"].timestamp())        except Exception as e:            timestamp = self.DEFAULT_TIMESTAMP        return timestamp    def insert_article_into_meta(self, gh_id, account_mode, article_list):        """        将数据更新到数据库        :return:        """        for article_obj in article_list:            detail_article_list = article_obj["AppMsg"]["DetailInfo"]            for obj in detail_article_list:                try:                    if whether_duplicate_article_title(obj["Title"], self.db_client):                        continue                    # 判断标题是否包含敏感词                    title_sensitivity = (                        self.TITLE_SENSITIVE                        if whether_title_sensitive(obj["Title"])                        else self.TITLE_NOT_SENSITIVE                    )                    show_stat = show_desc_to_sta(obj["ShowDesc"])                    show_view_count = show_stat.get(                        "show_view_count", self.DEFAULT_VIEW_COUNT                    )                    show_like_count = show_stat.get(                        "show_like_count", self.DEFAULT_LIKE_COUNT                    )                    unique_idx = generate_gzh_id(obj["ContentUrl"])                    insert_sql = f"""                        insert into crawler_meta_article                        (                         platform, mode, category, out_account_id, article_index, title, link, read_cnt, like_cnt,                         description, publish_time, crawler_time, status, unique_index, llm_sensitivity, title_sensitivity                        )                        VALUES                         (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);                    """                    self.db_client.save(                        query=insert_sql,                        params=(                            "weixin",                            "account",                            account_mode,                            gh_id,                            obj["ItemIndex"],                            obj["Title"],                            obj["ContentUrl"],                            show_view_count,                            show_like_count,                            obj["Digest"],                            obj["send_time"],                            int(time.time()),                            self.DEFAULT_ARTICLE_STATUS,                            unique_idx,                            obj.get("llm_sensitivity", -1),                            title_sensitivity,                        ),                    )                except Exception as e:                    print(e)    def update_latest_account_timestamp(self, gh_id):        """        更新账号的最新时间戳        :return:        """        select_sql = f"""            SELECT publish_time             From crawler_meta_article             WHERE out_account_id = '{gh_id}'            ORDER BY publish_time DESC LIMIT 1;        """        result = self.db_client.fetch(select_sql)        time_stamp = result[0][0]        dt_str = timestamp_to_str(time_stamp)        update_sql = f"""            update long_articles_accounts            set latest_update_time = %s            where gh_id = %s;        """        self.db_client.save(query=update_sql, params=(dt_str, gh_id))    def crawl_each_account(self, gh_id, account_mode, latest_time_stamp):        """        更新账号文章        :return:        """        current_cursor = None        while True:            # fetch response from weixin            response = get_article_list_from_account(                account_id=gh_id, index=current_cursor            )            print(response)            msg_list = response.get("data", {}).get("data")            if not msg_list:                break            # process current page            self.insert_article_into_meta(gh_id, account_mode, msg_list)            # whether crawl next page            last_article_in_this_page = msg_list[-1]            last_time_stamp_in_this_msg = last_article_in_this_page["AppMsg"][                "BaseInfo"            ]["UpdateTime"]            if last_time_stamp_in_this_msg > latest_time_stamp:                self.update_latest_account_timestamp(gh_id)                break            # update cursor for next page            current_cursor = response.get("data", {}).get("next_cursor")            if not current_cursor:                break    def crawl_account_list(self, account_list, account_method):        for account in tqdm(account_list):            try:                gh_id = account["gh_id"]                account_name = account["account_name"]                latest_timestamp = self.get_latest_timestamp(account)                self.crawl_each_account(gh_id, account_method, latest_timestamp)                self.update_account_read_avg_info(gh_id, account_name)            except Exception as e:                print(f"fail because of {e}")                print(traceback.format_exc() )    def update_account_read_avg_info(self, gh_id, account_name):        """        calculate read avg info and read_avg_ci_high        """        position_list = [i for i in range(1, 9)]        today_dt = datetime.date.today().isoformat()        for position in position_list:            fetch_query = f"""                select read_cnt, from_unixtime(publish_time, "%Y-%m_%d") as publish_dt from crawler_meta_article                where out_account_id = '{gh_id}' and article_index = {position}                order by publish_time desc limit 30;            """            fetch_response = self.db_client.fetch(fetch_query, cursor_type=DictCursor)            if fetch_response:                read_cnt_list = [i["read_cnt"] for i in fetch_response]                n = len(read_cnt_list)                read_avg = sum(read_cnt_list) / n                max_publish_dt = fetch_response[0]["publish_dt"]                remark = f"从{max_publish_dt}开始计算,往前算{len(fetch_response)}天"                insert_query = f"""                    insert ignore into                         crawler_meta_article_accounts_read_avg                        (gh_id, account_name, position, read_avg, dt, status, remark)                        values                        (%s, %s, %s, %s, %s, %s, %s);                """                insert_rows = self.db_client.save(                    insert_query,                    params=(                        gh_id,                        account_name,                        position,                        read_avg,                        today_dt,                        1,                        remark,                    ),                )                if insert_rows:                    update_query = f"""                        update crawler_meta_article_accounts_read_avg                        set status = %s                         where gh_id = %s and position = %s and dt < %s;                    """                    self.db_client.save(update_query, (0, gh_id, position, today_dt))class CrawlerDailyScrapeAccountArticles(GzhArticleCrawler):    def get_account_list(self, account_method: str) -> List[Dict]:        """        获取账号        :param account_method:        :return:        """        query = f"""            select gh_id, account_source, account_name, account_category, latest_update_time            from long_articles_accounts             where account_category = '{account_method}' and is_using = {self.ACCOUNT_GOOD_STATUS} and daily_scrape = {self.ACCOUNT_DAILY_SCRAPE};        """        account_list = self.db_client.fetch(query, cursor_type=DictCursor)        return account_list    def deal(self, method_list):        """        :param method_list:        :return:        """        # daily 品类账号抓取        for account_method in method_list:            account_list = self.get_account_list(account_method)            self.crawl_account_list(account_list, account_method)class CrawlerAssociationAccountArticles(GzhArticleCrawler):    def get_association_account_list(self, date_str):        """        获取账号联想的轮询账号        """        group_id = date_str[-1]        query = f"""            select account_id, gh_id, account_name, latest_update_time            from long_articles_accounts            where account_category = 'account_association' and is_using = {self.ACCOUNT_DAILY_SCRAPE} and daily_scrape = {self.ACCOUNT_NOT_DAILY_SCRAPE};        """        account_list = self.db_client.fetch(query, cursor_type=DictCursor)        today_crawler_account_list = [            i for i in account_list if str(i["account_id"])[-1] == group_id        ]        return today_crawler_account_list    def deal(self, date_str):        account_list = self.get_association_account_list(date_str)        self.crawl_account_list(account_list, "account_association")
 |