""" @author: luojunhui 抓取全局品类文章 """ import json import time from tqdm import tqdm from pymysql.cursors import DictCursor from applications import WeixinSpider, Functions, log from coldStartTasks.filter import article_crawler_duplicate_filter from config import apolloConfig # 常量 ACCOUNT_GOOD_STATUS = 1 # 账号是否每日抓取 ACCOUNT_DAILY_SCRAPE = 1 ACCOUNT_NOT_DAILY_SCRAPE = 0 # 默认值 DEFAULT_VIEW_COUNT = 0 DEFAULT_LIKE_COUNT = 0 DEFAULT_ARTICLE_STATUS = 1 DEFAULT_TIMESTAMP = 1717171200 # 标题sensitivity TITLE_SENSITIVE = 1 TITLE_NOT_SENSITIVE = 0 config = apolloConfig() sensitive_word_list = json.loads(config.getConfigValue("sensitive_word_list")) def whether_title_sensitive(title: str) -> bool: """ : param title: 判断视频是否的标题是否包含敏感词 """ for word in sensitive_word_list: if word in title: return True return False class weixinCategory(object): """ 微信全局品类账号抓取 """ def __init__(self, db_client): self.db_client_lam = db_client self.spider = WeixinSpider() self.function = Functions() def get_account_list(self, account_category): """ 获取账号 :param account_category 品类 :return: """ sql = f""" select gh_id, account_source, account_name, account_category, latest_update_time from long_articles_accounts where account_category = '{account_category}' and is_using = {ACCOUNT_GOOD_STATUS} and daily_scrape = {ACCOUNT_DAILY_SCRAPE}; """ account_tuple = self.db_client_lam.select(sql) result = [ { "gh_id": i[0], "platform": i[1], "account_name": i[2], "category": i[3], "latest_timestamp": i[4], } for i in account_tuple ] return result def get_association_account_list(self, date_str): """ 获取账号联想的轮询账号 """ group_id = date_str[-1] sql = f""" select account_id, gh_id, account_name, latest_update_time from long_articles_accounts where account_category = 'account_association' and is_using = {ACCOUNT_DAILY_SCRAPE} and daily_scrape = {ACCOUNT_NOT_DAILY_SCRAPE}; """ account_list = self.db_client_lam.select(sql, cursor_type=DictCursor) today_crawler_account_list = [i for i in account_list if str(i['account_id'])[-1] == group_id] return today_crawler_account_list def insert_data_into_db(self, gh_id, category, article_list): """ 将数据更新到数据库 :return: """ success_records = [] for article_obj in article_list: detail_article_list = article_obj["AppMsg"]["DetailInfo"] for obj in detail_article_list: try: # 判断文章是否存在相同的标题 if article_crawler_duplicate_filter( new_article_title=obj["Title"], db_client=self.db_client_lam ): log( function="weixinCategory", task="weixinCategory", message="文章去重", data={"title": obj["Title"]} ) continue # 判断标题是否包含敏感词 title_sensitivity = TITLE_SENSITIVE if whether_title_sensitive(obj["Title"]) else TITLE_NOT_SENSITIVE show_stat = self.function.show_desc_to_sta(obj["ShowDesc"]) show_view_count = show_stat.get("show_view_count", DEFAULT_VIEW_COUNT) show_like_count = show_stat.get("show_like_count", DEFAULT_LIKE_COUNT) unique_idx = self.function.generateGzhId(obj["ContentUrl"]) insert_sql = f""" insert into crawler_meta_article ( platform, mode, category, out_account_id, article_index, title, link, read_cnt, like_cnt, description, publish_time, crawler_time, status, unique_index, llm_sensitivity, title_sensitivity ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s); """ self.db_client_lam.update( sql=insert_sql, params=( "weixin", "account", category, gh_id, obj['ItemIndex'], obj["Title"], obj["ContentUrl"], show_view_count, show_like_count, obj["Digest"], obj["send_time"], int(time.time()), DEFAULT_ARTICLE_STATUS, unique_idx, obj.get("llm_sensitivity", -1), title_sensitivity ), ) success_records.append({ 'unique_index': unique_idx, 'title': obj['Title'] }) except Exception as e: print(e) return success_records def update_latest_account_timestamp(self, gh_id): """ 更新账号的最新时间戳 :return: """ select_sql = f""" SELECT publish_time From crawler_meta_article WHERE out_account_id = '{gh_id}' ORDER BY publish_time DESC LIMIT 1; """ result = self.db_client_lam.select(select_sql) time_stamp = result[0][0] dt_str = self.function.timestamp_to_str(time_stamp) update_sql = f""" update long_articles_accounts set latest_update_time = %s where gh_id = %s; """ self.db_client_lam.update(sql=update_sql, params=(dt_str, gh_id)) def update_each_account(self, gh_id, category, latest_time_stamp, index=None): """ 更新账号文章 :return: """ response = self.spider.update_msg_list(ghId=gh_id, index=index) msg_list = response.get("data", {}).get("data") if msg_list: last_article_in_this_msg = msg_list[-1] success_records = self.insert_data_into_db( gh_id=gh_id, category=category, article_list=msg_list ) last_time_stamp_in_this_msg = last_article_in_this_msg["AppMsg"]["BaseInfo"]["UpdateTime"] if latest_time_stamp < last_time_stamp_in_this_msg: next_cursor = response["data"]["next_cursor"] return success_records + self.update_each_account( gh_id=gh_id, latest_time_stamp=latest_time_stamp, category=category, index=next_cursor, ) else: # 更新最近抓取时间 self.update_latest_account_timestamp(gh_id=gh_id) print("账号时间更新成功") return success_records else: print("No more data") return [] def crawler_each_category(self, account_list, category): """ 抓取每个品类 :return: """ success_records = [] for account in tqdm(account_list, desc="crawler_each_category"): try: gh_id = account['gh_id'] try: timestamp = int(account['latest_timestamp'].timestamp()) except Exception as e: timestamp = DEFAULT_TIMESTAMP success_records += self.update_each_account( gh_id=gh_id, category=category, latest_time_stamp=timestamp ) print("success") except Exception as e: print("fail because of {}".format(e)) def deal(self, category_list, date_str): """ :param category_list: :param date_str: YYYY-MM-DD :return: """ # daily 品类账号抓取 for category in category_list: account_list = self.get_account_list(category) self.crawler_each_category(account_list=account_list, category=category) # 账号联想账号轮询抓取 association_account_list = self.get_association_account_list(date_str) self.crawler_each_category(account_list=association_account_list, category="account_association") def deal_accounts(self, account_list): """ input account list :param account_list: 具体账号抓取,只抓一页 :return: """ account_tuple = tuple(account_list) sql = f""" SELECT gh_id, account_name, account_category, latest_update_time FROM long_articles_accounts WHERE account_name in {account_tuple}; """ response = self.db_client_lam.select(sql) for account in tqdm(response): try: gh_id = account[0] category = account[2] try: latest_timestamp = account[3].timestamp() except Exception as e: print(e) latest_timestamp = DEFAULT_TIMESTAMP self.update_each_account( gh_id=gh_id, category=category, latest_time_stamp=latest_timestamp ) except Exception as e: print(e)