gzh_article_crawler.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275
  1. """
  2. @author: luojunhui
  3. 抓取全局品类文章
  4. """
  5. import datetime
  6. import time
  7. import traceback
  8. from typing import Dict, List
  9. from tqdm import tqdm
  10. from pymysql.cursors import DictCursor
  11. from applications.db import DatabaseConnector
  12. from applications.pipeline import (
  13. whether_title_sensitive,
  14. whether_duplicate_article_title,
  15. )
  16. from applications.utils import show_desc_to_sta, generate_gzh_id, timestamp_to_str
  17. from cold_start.crawler.wechat import get_article_list_from_account
  18. from config import long_articles_config
  19. class Const:
  20. ACCOUNT_GOOD_STATUS = 1
  21. # 账号是否每日抓取
  22. ACCOUNT_DAILY_SCRAPE = 1
  23. ACCOUNT_NOT_DAILY_SCRAPE = 0
  24. # 默认值
  25. DEFAULT_VIEW_COUNT = 0
  26. DEFAULT_LIKE_COUNT = 0
  27. DEFAULT_ARTICLE_STATUS = 1
  28. DEFAULT_TIMESTAMP = 1717171200
  29. # 标题sensitivity
  30. TITLE_SENSITIVE = 1
  31. TITLE_NOT_SENSITIVE = 0
  32. class GzhArticleCrawler(Const):
  33. def __init__(self):
  34. self.db_client = DatabaseConnector(long_articles_config)
  35. self.db_client.connect()
  36. def get_latest_timestamp(self, account: dict) -> int:
  37. try:
  38. timestamp = int(account["latest_update_time"].timestamp())
  39. except Exception as e:
  40. timestamp = self.DEFAULT_TIMESTAMP
  41. return timestamp
  42. def insert_article_into_meta(self, gh_id, account_mode, article_list):
  43. """
  44. 将数据更新到数据库
  45. :return:
  46. """
  47. for article_obj in article_list:
  48. detail_article_list = article_obj["AppMsg"]["DetailInfo"]
  49. for obj in detail_article_list:
  50. try:
  51. if whether_duplicate_article_title(obj["Title"], self.db_client):
  52. continue
  53. # 判断标题是否包含敏感词
  54. title_sensitivity = (
  55. self.TITLE_SENSITIVE
  56. if whether_title_sensitive(obj["Title"])
  57. else self.TITLE_NOT_SENSITIVE
  58. )
  59. show_stat = show_desc_to_sta(obj["ShowDesc"])
  60. show_view_count = show_stat.get(
  61. "show_view_count", self.DEFAULT_VIEW_COUNT
  62. )
  63. show_like_count = show_stat.get(
  64. "show_like_count", self.DEFAULT_LIKE_COUNT
  65. )
  66. unique_idx = generate_gzh_id(obj["ContentUrl"])
  67. insert_sql = f"""
  68. insert into crawler_meta_article
  69. (
  70. platform, mode, category, out_account_id, article_index, title, link, read_cnt, like_cnt,
  71. description, publish_time, crawler_time, status, unique_index, llm_sensitivity, title_sensitivity
  72. )
  73. VALUES
  74. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  75. """
  76. self.db_client.save(
  77. query=insert_sql,
  78. params=(
  79. "weixin",
  80. "account",
  81. account_mode,
  82. gh_id,
  83. obj["ItemIndex"],
  84. obj["Title"],
  85. obj["ContentUrl"],
  86. show_view_count,
  87. show_like_count,
  88. obj["Digest"],
  89. obj["send_time"],
  90. int(time.time()),
  91. self.DEFAULT_ARTICLE_STATUS,
  92. unique_idx,
  93. obj.get("llm_sensitivity", -1),
  94. title_sensitivity,
  95. ),
  96. )
  97. except Exception as e:
  98. print(e)
  99. def update_latest_account_timestamp(self, gh_id):
  100. """
  101. 更新账号的最新时间戳
  102. :return:
  103. """
  104. select_sql = f"""
  105. SELECT publish_time
  106. From crawler_meta_article
  107. WHERE out_account_id = '{gh_id}'
  108. ORDER BY publish_time DESC LIMIT 1;
  109. """
  110. result = self.db_client.fetch(select_sql)
  111. time_stamp = result[0][0]
  112. dt_str = timestamp_to_str(time_stamp)
  113. update_sql = f"""
  114. update long_articles_accounts
  115. set latest_update_time = %s
  116. where gh_id = %s;
  117. """
  118. self.db_client.save(query=update_sql, params=(dt_str, gh_id))
  119. def crawl_each_account(self, gh_id, account_mode, latest_time_stamp):
  120. """
  121. 更新账号文章
  122. :return:
  123. """
  124. current_cursor = None
  125. while True:
  126. # fetch response from weixin
  127. response = get_article_list_from_account(
  128. account_id=gh_id, index=current_cursor
  129. )
  130. print(response)
  131. msg_list = response.get("data", {}).get("data")
  132. if not msg_list:
  133. break
  134. # process current page
  135. self.insert_article_into_meta(gh_id, account_mode, msg_list)
  136. # whether crawl next page
  137. last_article_in_this_page = msg_list[-1]
  138. last_time_stamp_in_this_msg = last_article_in_this_page["AppMsg"][
  139. "BaseInfo"
  140. ]["UpdateTime"]
  141. if last_time_stamp_in_this_msg > latest_time_stamp:
  142. self.update_latest_account_timestamp(gh_id)
  143. break
  144. # update cursor for next page
  145. current_cursor = response.get("data", {}).get("next_cursor")
  146. if not current_cursor:
  147. break
  148. def crawl_account_list(self, account_list, account_method):
  149. for account in tqdm(account_list):
  150. try:
  151. gh_id = account["gh_id"]
  152. account_name = account["account_name"]
  153. latest_timestamp = self.get_latest_timestamp(account)
  154. self.crawl_each_account(gh_id, account_method, latest_timestamp)
  155. self.update_account_read_avg_info(gh_id, account_name)
  156. except Exception as e:
  157. print(f"fail because of {e}")
  158. print(traceback.format_exc() )
  159. def update_account_read_avg_info(self, gh_id, account_name):
  160. """
  161. calculate read avg info and read_avg_ci_high
  162. """
  163. position_list = [i for i in range(1, 9)]
  164. today_dt = datetime.date.today().isoformat()
  165. for position in position_list:
  166. fetch_query = f"""
  167. select read_cnt, from_unixtime(publish_time, "%Y-%m_%d") as publish_dt from crawler_meta_article
  168. where out_account_id = '{gh_id}' and article_index = {position}
  169. order by publish_time desc limit 30;
  170. """
  171. fetch_response = self.db_client.fetch(fetch_query, cursor_type=DictCursor)
  172. if fetch_response:
  173. read_cnt_list = [i["read_cnt"] for i in fetch_response]
  174. n = len(read_cnt_list)
  175. read_avg = sum(read_cnt_list) / n
  176. max_publish_dt = fetch_response[0]["publish_dt"]
  177. remark = f"从{max_publish_dt}开始计算,往前算{len(fetch_response)}天"
  178. insert_query = f"""
  179. insert ignore into
  180. crawler_meta_article_accounts_read_avg
  181. (gh_id, account_name, position, read_avg, dt, status, remark)
  182. values
  183. (%s, %s, %s, %s, %s, %s, %s);
  184. """
  185. insert_rows = self.db_client.save(
  186. insert_query,
  187. params=(
  188. gh_id,
  189. account_name,
  190. position,
  191. read_avg,
  192. today_dt,
  193. 1,
  194. remark,
  195. ),
  196. )
  197. if insert_rows:
  198. update_query = f"""
  199. update crawler_meta_article_accounts_read_avg
  200. set status = %s
  201. where gh_id = %s and position = %s and dt < %s;
  202. """
  203. self.db_client.save(update_query, (0, gh_id, position, today_dt))
  204. class CrawlerDailyScrapeAccountArticles(GzhArticleCrawler):
  205. def get_account_list(self, account_method: str) -> List[Dict]:
  206. """
  207. 获取账号
  208. :param account_method:
  209. :return:
  210. """
  211. query = f"""
  212. select gh_id, account_source, account_name, account_category, latest_update_time
  213. from long_articles_accounts
  214. where account_category = '{account_method}' and is_using = {self.ACCOUNT_GOOD_STATUS} and daily_scrape = {self.ACCOUNT_DAILY_SCRAPE};
  215. """
  216. account_list = self.db_client.fetch(query, cursor_type=DictCursor)
  217. return account_list
  218. def deal(self, method_list):
  219. """
  220. :param method_list:
  221. :return:
  222. """
  223. # daily 品类账号抓取
  224. for account_method in method_list:
  225. account_list = self.get_account_list(account_method)
  226. self.crawl_account_list(account_list, account_method)
  227. class CrawlerAssociationAccountArticles(GzhArticleCrawler):
  228. def get_association_account_list(self, date_str):
  229. """
  230. 获取账号联想的轮询账号
  231. """
  232. group_id = date_str[-1]
  233. query = f"""
  234. select account_id, gh_id, account_name, latest_update_time
  235. from long_articles_accounts
  236. where account_category = 'account_association' and is_using = {self.ACCOUNT_DAILY_SCRAPE} and daily_scrape = {self.ACCOUNT_NOT_DAILY_SCRAPE};
  237. """
  238. account_list = self.db_client.fetch(query, cursor_type=DictCursor)
  239. today_crawler_account_list = [
  240. i for i in account_list if str(i["account_id"])[-1] == group_id
  241. ]
  242. return today_crawler_account_list
  243. def deal(self, date_str):
  244. account_list = self.get_association_account_list(date_str)
  245. self.crawl_account_list(account_list, "account_association")