| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277 | 
							- """
 
- @author: luojunhui
 
- 抓取全局品类文章
 
- """
 
- import json
 
- import time
 
- from tqdm import tqdm
 
- from pymysql.cursors import DictCursor
 
- from applications import WeixinSpider, Functions, log
 
- from coldStartTasks.filter import article_crawler_duplicate_filter
 
- from config import apolloConfig
 
- # 常量
 
- ACCOUNT_GOOD_STATUS = 1
 
- # 账号是否每日抓取
 
- ACCOUNT_DAILY_SCRAPE = 1
 
- ACCOUNT_NOT_DAILY_SCRAPE = 0
 
- # 默认值
 
- DEFAULT_VIEW_COUNT = 0
 
- DEFAULT_LIKE_COUNT = 0
 
- DEFAULT_ARTICLE_STATUS = 1
 
- DEFAULT_TIMESTAMP = 1717171200
 
- # 标题sensitivity
 
- TITLE_SENSITIVE = 1
 
- TITLE_NOT_SENSITIVE = 0
 
- config = apolloConfig()
 
- sensitive_word_list = json.loads(config.getConfigValue("sensitive_word_list"))
 
- def whether_title_sensitive(title: str) -> bool:
 
-     """
 
-     : param title:
 
-     判断视频是否的标题是否包含敏感词
 
-     """
 
-     for word in sensitive_word_list:
 
-         if word in title:
 
-             return True
 
-     return False
 
- class weixinCategory(object):
 
-     """
 
-     微信全局品类账号抓取
 
-     """
 
-     def __init__(self, db_client):
 
-         self.db_client_lam = db_client
 
-         self.spider = WeixinSpider()
 
-         self.function = Functions()
 
-     def get_account_list(self, account_category):
 
-         """
 
-         获取账号
 
-         :param account_category 品类
 
-         :return:
 
-         """
 
-         sql = f"""
 
-             select gh_id, account_source, account_name, account_category, latest_update_time
 
-             from long_articles_accounts 
 
-             where account_category = '{account_category}' and is_using = {ACCOUNT_GOOD_STATUS} and daily_scrape = {ACCOUNT_DAILY_SCRAPE};
 
-             """
 
-         account_tuple = self.db_client_lam.select(sql)
 
-         result = [
 
-             {
 
-                 "gh_id": i[0],
 
-                 "platform": i[1],
 
-                 "account_name": i[2],
 
-                 "category": i[3],
 
-                 "latest_timestamp": i[4],
 
-             }
 
-             for i in account_tuple
 
-         ]
 
-         return result
 
-     def get_association_account_list(self, date_str):
 
-         """
 
-         获取账号联想的轮询账号
 
-         """
 
-         group_id = date_str[-1]
 
-         sql = f"""
 
-             select account_id, gh_id, account_name, latest_update_time
 
-             from long_articles_accounts
 
-             where account_category = 'account_association' and is_using = {ACCOUNT_DAILY_SCRAPE} and daily_scrape = {ACCOUNT_NOT_DAILY_SCRAPE};
 
-         """
 
-         account_list = self.db_client_lam.select(sql, cursor_type=DictCursor)
 
-         today_crawler_account_list = [i for i in account_list if str(i['account_id'])[-1] == group_id]
 
-         return today_crawler_account_list
 
-     def insert_data_into_db(self, gh_id, category, article_list):
 
-         """
 
-         将数据更新到数据库
 
-         :return:
 
-         """
 
-         success_records = []
 
-         for article_obj in article_list:
 
-             detail_article_list = article_obj["AppMsg"]["DetailInfo"]
 
-             for obj in detail_article_list:
 
-                 try:
 
-                     # 判断文章是否存在相同的标题
 
-                     if article_crawler_duplicate_filter(
 
-                             new_article_title=obj["Title"], db_client=self.db_client_lam
 
-                     ):
 
-                         log(
 
-                             function="weixinCategory",
 
-                             task="weixinCategory",
 
-                             message="文章去重",
 
-                             data={"title": obj["Title"]}
 
-                         )
 
-                         continue
 
-                     # 判断标题是否包含敏感词
 
-                     title_sensitivity = TITLE_SENSITIVE if whether_title_sensitive(obj["Title"]) else TITLE_NOT_SENSITIVE
 
-                     show_stat = self.function.show_desc_to_sta(obj["ShowDesc"])
 
-                     show_view_count = show_stat.get("show_view_count", DEFAULT_VIEW_COUNT)
 
-                     show_like_count = show_stat.get("show_like_count", DEFAULT_LIKE_COUNT)
 
-                     unique_idx = self.function.generateGzhId(obj["ContentUrl"])
 
-                     insert_sql = f"""
 
-                         insert into crawler_meta_article
 
-                         (
 
-                          platform, mode, category, out_account_id, article_index, title, link, read_cnt, like_cnt,
 
-                          description, publish_time, crawler_time, status, unique_index, llm_sensitivity, title_sensitivity
 
-                         )
 
-                         VALUES 
 
-                         (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
 
-                     """
 
-                     self.db_client_lam.update(
 
-                         sql=insert_sql,
 
-                         params=(
 
-                             "weixin",
 
-                             "account",
 
-                             category,
 
-                             gh_id,
 
-                             obj['ItemIndex'],
 
-                             obj["Title"],
 
-                             obj["ContentUrl"],
 
-                             show_view_count,
 
-                             show_like_count,
 
-                             obj["Digest"],
 
-                             obj["send_time"],
 
-                             int(time.time()),
 
-                             DEFAULT_ARTICLE_STATUS,
 
-                             unique_idx,
 
-                             obj.get("llm_sensitivity", -1),
 
-                             title_sensitivity
 
-                         ),
 
-                     )
 
-                     success_records.append({
 
-                         'unique_index': unique_idx, 'title': obj['Title']
 
-                     })
 
-                 except Exception as e:
 
-                     print(e)
 
-         return success_records
 
-     def update_latest_account_timestamp(self, gh_id):
 
-         """
 
-         更新账号的最新时间戳
 
-         :return:
 
-         """
 
-         select_sql = f"""
 
-             SELECT publish_time 
 
-             From crawler_meta_article 
 
-             WHERE out_account_id = '{gh_id}'
 
-             ORDER BY publish_time DESC LIMIT 1;
 
-         """
 
-         result = self.db_client_lam.select(select_sql)
 
-         time_stamp = result[0][0]
 
-         dt_str = self.function.timestamp_to_str(time_stamp)
 
-         update_sql = f"""
 
-             update long_articles_accounts
 
-             set latest_update_time = %s
 
-             where gh_id = %s;
 
-         """
 
-         self.db_client_lam.update(sql=update_sql, params=(dt_str, gh_id))
 
-     def update_each_account(self, gh_id, category, latest_time_stamp, index=None):
 
-         """
 
-         更新账号文章
 
-         :return:
 
-         """
 
-         response = self.spider.update_msg_list(ghId=gh_id, index=index)
 
-         msg_list = response.get("data", {}).get("data")
 
-         if msg_list:
 
-             last_article_in_this_msg = msg_list[-1]
 
-             success_records = self.insert_data_into_db(
 
-                 gh_id=gh_id, category=category, article_list=msg_list
 
-             )
 
-             last_time_stamp_in_this_msg = last_article_in_this_msg["AppMsg"]["BaseInfo"]["UpdateTime"]
 
-             if latest_time_stamp < last_time_stamp_in_this_msg:
 
-                 next_cursor = response["data"]["next_cursor"]
 
-                 return success_records + self.update_each_account(
 
-                     gh_id=gh_id,
 
-                     latest_time_stamp=latest_time_stamp,
 
-                     category=category,
 
-                     index=next_cursor,
 
-                 )
 
-             else:
 
-                 # 更新最近抓取时间
 
-                 self.update_latest_account_timestamp(gh_id=gh_id)
 
-                 print("账号时间更新成功")
 
-                 return success_records
 
-         else:
 
-             print("No more data")
 
-             return []
 
-     def crawler_each_category(self, account_list, category):
 
-         """
 
-         抓取每个品类
 
-         :return:
 
-         """
 
-         success_records = []
 
-         for account in tqdm(account_list, desc="crawler_each_category"):
 
-             try:
 
-                 gh_id = account['gh_id']
 
-                 try:
 
-                     timestamp = int(account['latest_timestamp'].timestamp())
 
-                 except Exception as e:
 
-                     timestamp = DEFAULT_TIMESTAMP
 
-                 success_records += self.update_each_account(
 
-                     gh_id=gh_id,
 
-                     category=category,
 
-                     latest_time_stamp=timestamp
 
-                 )
 
-                 print("success")
 
-             except Exception as e:
 
-                 print("fail because of {}".format(e))
 
-     def deal(self, category_list, date_str):
 
-         """
 
-         :param category_list:
 
-         :param date_str: YYYY-MM-DD
 
-         :return:
 
-         """
 
-         # daily 品类账号抓取
 
-         for category in category_list:
 
-             account_list = self.get_account_list(category)
 
-             self.crawler_each_category(account_list=account_list, category=category)
 
-         # 账号联想账号轮询抓取
 
-         association_account_list = self.get_association_account_list(date_str)
 
-         self.crawler_each_category(account_list=association_account_list, category="account_association")
 
-     def deal_accounts(self, account_list):
 
-         """
 
-         input account list
 
-         :param account_list: 具体账号抓取,只抓一页
 
-         :return:
 
-         """
 
-         account_tuple = tuple(account_list)
 
-         sql = f"""
 
-             SELECT gh_id, account_name, account_category, latest_update_time 
 
-             FROM long_articles_accounts
 
-             WHERE account_name in {account_tuple};
 
-         """
 
-         response = self.db_client_lam.select(sql)
 
-         for account in tqdm(response):
 
-             try:
 
-                 gh_id = account[0]
 
-                 category = account[2]
 
-                 try:
 
-                     latest_timestamp = account[3].timestamp()
 
-                 except Exception as e:
 
-                     print(e)
 
-                     latest_timestamp = DEFAULT_TIMESTAMP
 
-                 self.update_each_account(
 
-                     gh_id=gh_id,
 
-                     category=category,
 
-                     latest_time_stamp=latest_timestamp
 
-                 )
 
-             except Exception as e:
 
-                 print(e)
 
 
  |