123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275 |
- """
- @author: luojunhui
- 抓取全局品类文章
- """
- import datetime
- import time
- import traceback
- from typing import Dict, List
- from tqdm import tqdm
- from pymysql.cursors import DictCursor
- from applications.db import DatabaseConnector
- from applications.pipeline import (
- whether_title_sensitive,
- whether_duplicate_article_title,
- )
- from applications.utils import show_desc_to_sta, generate_gzh_id, timestamp_to_str
- from cold_start.crawler.wechat import get_article_list_from_account
- from config import long_articles_config
- class Const:
- ACCOUNT_GOOD_STATUS = 1
- # 账号是否每日抓取
- ACCOUNT_DAILY_SCRAPE = 1
- ACCOUNT_NOT_DAILY_SCRAPE = 0
- # 默认值
- DEFAULT_VIEW_COUNT = 0
- DEFAULT_LIKE_COUNT = 0
- DEFAULT_ARTICLE_STATUS = 1
- DEFAULT_TIMESTAMP = 1717171200
- # 标题sensitivity
- TITLE_SENSITIVE = 1
- TITLE_NOT_SENSITIVE = 0
- class GzhArticleCrawler(Const):
- def __init__(self):
- self.db_client = DatabaseConnector(long_articles_config)
- self.db_client.connect()
- def get_latest_timestamp(self, account: dict) -> int:
- try:
- timestamp = int(account["latest_update_time"].timestamp())
- except Exception as e:
- timestamp = self.DEFAULT_TIMESTAMP
- return timestamp
- def insert_article_into_meta(self, gh_id, account_mode, article_list):
- """
- 将数据更新到数据库
- :return:
- """
- for article_obj in article_list:
- detail_article_list = article_obj["AppMsg"]["DetailInfo"]
- for obj in detail_article_list:
- try:
- if whether_duplicate_article_title(obj["Title"], self.db_client):
- continue
- # 判断标题是否包含敏感词
- title_sensitivity = (
- self.TITLE_SENSITIVE
- if whether_title_sensitive(obj["Title"])
- else self.TITLE_NOT_SENSITIVE
- )
- show_stat = show_desc_to_sta(obj["ShowDesc"])
- show_view_count = show_stat.get(
- "show_view_count", self.DEFAULT_VIEW_COUNT
- )
- show_like_count = show_stat.get(
- "show_like_count", self.DEFAULT_LIKE_COUNT
- )
- unique_idx = generate_gzh_id(obj["ContentUrl"])
- insert_sql = f"""
- insert into crawler_meta_article
- (
- platform, mode, category, out_account_id, article_index, title, link, read_cnt, like_cnt,
- description, publish_time, crawler_time, status, unique_index, llm_sensitivity, title_sensitivity
- )
- VALUES
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- self.db_client.save(
- query=insert_sql,
- params=(
- "weixin",
- "account",
- account_mode,
- gh_id,
- obj["ItemIndex"],
- obj["Title"],
- obj["ContentUrl"],
- show_view_count,
- show_like_count,
- obj["Digest"],
- obj["send_time"],
- int(time.time()),
- self.DEFAULT_ARTICLE_STATUS,
- unique_idx,
- obj.get("llm_sensitivity", -1),
- title_sensitivity,
- ),
- )
- except Exception as e:
- print(e)
- def update_latest_account_timestamp(self, gh_id):
- """
- 更新账号的最新时间戳
- :return:
- """
- select_sql = f"""
- SELECT publish_time
- From crawler_meta_article
- WHERE out_account_id = '{gh_id}'
- ORDER BY publish_time DESC LIMIT 1;
- """
- result = self.db_client.fetch(select_sql)
- time_stamp = result[0][0]
- dt_str = timestamp_to_str(time_stamp)
- update_sql = f"""
- update long_articles_accounts
- set latest_update_time = %s
- where gh_id = %s;
- """
- self.db_client.save(query=update_sql, params=(dt_str, gh_id))
- def crawl_each_account(self, gh_id, account_mode, latest_time_stamp):
- """
- 更新账号文章
- :return:
- """
- current_cursor = None
- while True:
- # fetch response from weixin
- response = get_article_list_from_account(
- account_id=gh_id, index=current_cursor
- )
- print(response)
- msg_list = response.get("data", {}).get("data")
- if not msg_list:
- break
- # process current page
- self.insert_article_into_meta(gh_id, account_mode, msg_list)
- # whether crawl next page
- last_article_in_this_page = msg_list[-1]
- last_time_stamp_in_this_msg = last_article_in_this_page["AppMsg"][
- "BaseInfo"
- ]["UpdateTime"]
- if last_time_stamp_in_this_msg > latest_time_stamp:
- self.update_latest_account_timestamp(gh_id)
- break
- # update cursor for next page
- current_cursor = response.get("data", {}).get("next_cursor")
- if not current_cursor:
- break
- def crawl_account_list(self, account_list, account_method):
- for account in tqdm(account_list):
- try:
- gh_id = account["gh_id"]
- account_name = account["account_name"]
- latest_timestamp = self.get_latest_timestamp(account)
- self.crawl_each_account(gh_id, account_method, latest_timestamp)
- self.update_account_read_avg_info(gh_id, account_name)
- except Exception as e:
- print(f"fail because of {e}")
- print(traceback.format_exc() )
- def update_account_read_avg_info(self, gh_id, account_name):
- """
- calculate read avg info and read_avg_ci_high
- """
- position_list = [i for i in range(1, 9)]
- today_dt = datetime.date.today().isoformat()
- for position in position_list:
- fetch_query = f"""
- select read_cnt, from_unixtime(publish_time, "%Y-%m_%d") as publish_dt from crawler_meta_article
- where out_account_id = '{gh_id}' and article_index = {position}
- order by publish_time desc limit 30;
- """
- fetch_response = self.db_client.fetch(fetch_query, cursor_type=DictCursor)
- if fetch_response:
- read_cnt_list = [i["read_cnt"] for i in fetch_response]
- n = len(read_cnt_list)
- read_avg = sum(read_cnt_list) / n
- max_publish_dt = fetch_response[0]["publish_dt"]
- remark = f"从{max_publish_dt}开始计算,往前算{len(fetch_response)}天"
- insert_query = f"""
- insert ignore into
- crawler_meta_article_accounts_read_avg
- (gh_id, account_name, position, read_avg, dt, status, remark)
- values
- (%s, %s, %s, %s, %s, %s, %s);
- """
- insert_rows = self.db_client.save(
- insert_query,
- params=(
- gh_id,
- account_name,
- position,
- read_avg,
- today_dt,
- 1,
- remark,
- ),
- )
- if insert_rows:
- update_query = f"""
- update crawler_meta_article_accounts_read_avg
- set status = %s
- where gh_id = %s and position = %s and dt < %s;
- """
- self.db_client.save(update_query, (0, gh_id, position, today_dt))
- class CrawlerDailyScrapeAccountArticles(GzhArticleCrawler):
- def get_account_list(self, account_method: str) -> List[Dict]:
- """
- 获取账号
- :param account_method:
- :return:
- """
- query = f"""
- select gh_id, account_source, account_name, account_category, latest_update_time
- from long_articles_accounts
- where account_category = '{account_method}' and is_using = {self.ACCOUNT_GOOD_STATUS} and daily_scrape = {self.ACCOUNT_DAILY_SCRAPE};
- """
- account_list = self.db_client.fetch(query, cursor_type=DictCursor)
- return account_list
- def deal(self, method_list):
- """
- :param method_list:
- :return:
- """
- # daily 品类账号抓取
- for account_method in method_list:
- account_list = self.get_account_list(account_method)
- self.crawl_account_list(account_list, account_method)
- class CrawlerAssociationAccountArticles(GzhArticleCrawler):
- def get_association_account_list(self, date_str):
- """
- 获取账号联想的轮询账号
- """
- group_id = date_str[-1]
- query = f"""
- select account_id, gh_id, account_name, latest_update_time
- from long_articles_accounts
- where account_category = 'account_association' and is_using = {self.ACCOUNT_DAILY_SCRAPE} and daily_scrape = {self.ACCOUNT_NOT_DAILY_SCRAPE};
- """
- account_list = self.db_client.fetch(query, cursor_type=DictCursor)
- today_crawler_account_list = [
- i for i in account_list if str(i["account_id"])[-1] == group_id
- ]
- return today_crawler_account_list
- def deal(self, date_str):
- account_list = self.get_association_account_list(date_str)
- self.crawl_account_list(account_list, "account_association")
|