123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279 |
- """
- @author: luojunhui
- 抓取全局品类文章
- """
- import time
- from tqdm import tqdm
- from pymysql.cursors import DictCursor
- from applications import WeixinSpider, Functions, llm_sensitivity, log
- from coldStartTasks.filter import article_crawler_duplicate_filter
- # 常量
- ACCOUNT_GOOD_STATUS = 1
- # 账号是否每日抓取
- ACCOUNT_DAILY_SCRAPE = 1
- ACCOUNT_NOT_DAILY_SCRAPE = 0
- # 默认值
- DEFAULT_VIEW_COUNT = 0
- DEFAULT_LIKE_COUNT = 0
- DEFAULT_ARTICLE_STATUS = 1
- DEFAULT_TIMESTAMP = 1717171200
- class weixinCategory(object):
- """
- 微信全局品类账号抓取
- """
- def __init__(self, db_client):
- self.db_client_lam = db_client
- self.spider = WeixinSpider()
- self.function = Functions()
- def get_account_list(self, account_category):
- """
- 获取账号
- :param account_category 品类
- :return:
- """
- sql = f"""
- select gh_id, account_source, account_name, account_category, latest_update_time
- from long_articles_accounts
- where account_category = '{account_category}' and is_using = {ACCOUNT_GOOD_STATUS} and daily_scrape = {ACCOUNT_DAILY_SCRAPE};
- """
- account_tuple = self.db_client_lam.select(sql)
- result = [
- {
- "gh_id": i[0],
- "platform": i[1],
- "account_name": i[2],
- "category": i[3],
- "latest_timestamp": i[4],
- }
- for i in account_tuple
- ]
- return result
- def get_association_account_list(self, date_str):
- """
- 获取账号联想的轮询账号
- """
- group_id = date_str[-1]
- sql = f"""
- select account_id, gh_id, account_name, latest_update_time
- from long_articles_accounts
- where account_category = 'account_association' and is_using = {ACCOUNT_DAILY_SCRAPE} and daily_scrape = {ACCOUNT_NOT_DAILY_SCRAPE};
- """
- account_list = self.db_client_lam.select(sql, cursor_type=DictCursor)
- today_crawler_account_list = [i for i in account_list if str(i['account_id'])[-1] == group_id]
- return today_crawler_account_list
- def insert_data_into_db(self, gh_id, category, article_list):
- """
- 将数据更新到数据库
- :return:
- """
- success_records = []
- for article_obj in article_list:
- detail_article_list = article_obj["AppMsg"]["DetailInfo"]
- for obj in detail_article_list:
- try:
- # 判断文章是否存在相同的标题
- if article_crawler_duplicate_filter(
- new_article_title=obj["Title"], db_client=self.db_client_lam
- ):
- log(
- function="weixinCategory",
- task="weixinCategory",
- message="文章去重",
- data={"title": obj["Title"]}
- )
- continue
- show_stat = self.function.show_desc_to_sta(obj["ShowDesc"])
- show_view_count = show_stat.get("show_view_count", DEFAULT_VIEW_COUNT)
- show_like_count = show_stat.get("show_like_count", DEFAULT_LIKE_COUNT)
- unique_idx = self.function.generateGzhId(obj["ContentUrl"])
- insert_sql = f"""
- insert into crawler_meta_article
- (
- platform, mode, category, out_account_id, article_index, title, link, read_cnt, like_cnt,
- description, publish_time, crawler_time, status, unique_index, llm_sensitivity
- )
- VALUES
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- self.db_client_lam.update(
- sql=insert_sql,
- params=(
- "weixin",
- "account",
- category,
- gh_id,
- obj['ItemIndex'],
- obj["Title"],
- obj["ContentUrl"],
- show_view_count,
- show_like_count,
- obj["Digest"],
- obj["send_time"],
- int(time.time()),
- DEFAULT_ARTICLE_STATUS,
- unique_idx,
- obj.get("llm_sensitivity", -1)
- ),
- )
- success_records.append({
- 'unique_index': unique_idx, 'title': obj['Title']
- })
- except Exception as e:
- print(e)
- return success_records
- def update_article_sensitive_status(self, category, unique_index, status):
- """
- 更新文章敏感状态
- :return:
- """
- update_sql = f"""
- update crawler_meta_article
- set llm_sensitivity = %s
- where category = %s and unique_index = %s;
- """
- self.db_client_lam.update(sql=update_sql, params=(status, category, unique_index))
- def update_latest_account_timestamp(self, gh_id):
- """
- 更新账号的最新时间戳
- :return:
- """
- select_sql = f"""
- SELECT publish_time
- From crawler_meta_article
- WHERE out_account_id = '{gh_id}'
- ORDER BY publish_time DESC LIMIT 1;
- """
- result = self.db_client_lam.select(select_sql)
- time_stamp = result[0][0]
- dt_str = self.function.timestamp_to_str(time_stamp)
- update_sql = f"""
- update long_articles_accounts
- set latest_update_time = %s
- where gh_id = %s;
- """
- self.db_client_lam.update(sql=update_sql, params=(dt_str, gh_id))
- def update_each_account(self, gh_id, category, latest_time_stamp, index=None):
- """
- 更新账号文章
- :return:
- """
- response = self.spider.update_msg_list(ghId=gh_id, index=index)
- msg_list = response.get("data", {}).get("data")
- if msg_list:
- last_article_in_this_msg = msg_list[-1]
- success_records = self.insert_data_into_db(
- gh_id=gh_id, category=category, article_list=msg_list
- )
- last_time_stamp_in_this_msg = last_article_in_this_msg["AppMsg"]["BaseInfo"]["UpdateTime"]
- if latest_time_stamp < last_time_stamp_in_this_msg:
- next_cursor = response["data"]["next_cursor"]
- return success_records + self.update_each_account(
- gh_id=gh_id,
- latest_time_stamp=latest_time_stamp,
- category=category,
- index=next_cursor,
- )
- else:
- # 更新最近抓取时间
- self.update_latest_account_timestamp(gh_id=gh_id)
- print("账号时间更新成功")
- return success_records
- else:
- print("No more data")
- return []
- def crawler_each_category(self, account_list, category):
- """
- 抓取每个品类
- :return:
- """
- success_records = []
- for account in tqdm(account_list, desc="crawler_each_category"):
- try:
- gh_id = account['gh_id']
- try:
- timestamp = int(account['latest_timestamp'].timestamp())
- except Exception as e:
- timestamp = DEFAULT_TIMESTAMP
- success_records += self.update_each_account(
- gh_id=gh_id,
- category=category,
- latest_time_stamp=timestamp
- )
- print("success")
- except Exception as e:
- print("fail because of {}".format(e))
- success_titles = [x['title'] for x in success_records]
- if success_titles:
- try:
- sensitive_results = llm_sensitivity.check_titles(success_titles)
- for record, sensitive_result in zip(success_records, sensitive_results):
- self.update_article_sensitive_status(
- category=category,
- unique_index=record['unique_index'],
- status=sensitive_result['hit_rule']
- )
- except Exception as e:
- print("failed to update sensitive status: {}".format(e))
- def deal(self, category_list, date_str):
- """
- :param category_list:
- :param date_str: YYYY-MM-DD
- :return:
- """
- # daily 品类账号抓取
- for category in category_list:
- account_list = self.get_account_list(category)
- self.crawler_each_category(account_list=account_list, category=category)
- # 账号联想账号轮询抓取
- association_account_list = self.get_association_account_list(date_str)
- self.crawler_each_category(account_list=association_account_list, category="association")
- def deal_accounts(self, account_list):
- """
- input account list
- :param account_list: 具体账号抓取,只抓一页
- :return:
- """
- account_tuple = tuple(account_list)
- sql = f"""
- SELECT gh_id, account_name, account_category, latest_update_time
- FROM long_articles_accounts
- WHERE account_name in {account_tuple};
- """
- response = self.db_client_lam.select(sql)
- for account in tqdm(response):
- try:
- gh_id = account[0]
- category = account[2]
- try:
- latest_timestamp = account[3].timestamp()
- except Exception as e:
- print(e)
- latest_timestamp = DEFAULT_TIMESTAMP
- self.update_each_account(
- gh_id=gh_id,
- category=category,
- latest_time_stamp=latest_timestamp
- )
- except Exception as e:
- print(e)
|