gzh_article_crawler.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. """
  2. @author: luojunhui
  3. @file: gzh_article_crawler.py
  4. @time: 2025/04/25 14:20
  5. @desc: 抓取公众号文章
  6. """
  7. from __future__ import annotations
  8. import time
  9. import traceback
  10. from pymysql.cursors import DictCursor
  11. from tqdm import tqdm
  12. from applications import log
  13. from applications.api.gzh_api import get_gzh_account_article_list
  14. from applications.db import DatabaseConnector
  15. from applications.utils import Item
  16. from applications.utils import insert_into_article_meta_table
  17. from config import long_articles_config
  18. class GZHArticleCrawler:
  19. def __init__(self):
  20. self.db_client = DatabaseConnector(db_config=long_articles_config)
  21. self.db_client.connect()
  22. def crawler_each_article(self, gh_id, category, article):
  23. article_item = Item()
  24. article_item.add("title", article["Title"])
  25. article_item.add("platform", "weixin")
  26. article_item.add("mode", "account")
  27. article_item.add("category", category)
  28. article_item.add("out_account_id", gh_id)
  29. article_item.add("article_index", article["ItemIndex"])
  30. article_item.add("link", article["ContentUrl"])
  31. # article_item.add("read_cnt", article["ShowViewCount"])
  32. # article_item.add("like_cnt", article["ShowLikeCount"])
  33. article_item.add("description", article["Digest"])
  34. article_item.add("publish_time", article["send_time"])
  35. article_item.add("crawler_time", int(time.time()))
  36. article_item.add("status", 1)
  37. # article_item.add("unique_index", str_to_md5(article["ContentUrl"]))
  38. article_item.add("llm_sensitivity", -1)
  39. article_item.add("title_sensitivity", -1)
  40. # check item
  41. meta_item = article_item.check(source="article")
  42. insert_into_article_meta_table(db_client=self.db_client, article_item=meta_item)
  43. def insert_msg_into_article_meta_table(self, gh_id, category, msg_list):
  44. for article_msg in tqdm(msg_list, desc=f"crawler : {gh_id}"):
  45. article_list = article_msg["AppMsg"]["DetailInfo"]
  46. for article in article_list:
  47. try:
  48. self.crawler_each_article(gh_id, category, article)
  49. except Exception as e:
  50. print(e)
  51. print(traceback.format_exc())
  52. def crawl_each_gzh_account_article_list(
  53. self, account_id: str, category: str, latest_update_timestamp: int
  54. ):
  55. next_cursor = None
  56. while True:
  57. fetch_response = get_gzh_account_article_list(account_id, next_cursor)
  58. account_msg_list = fetch_response.get("data", {}).get("data")
  59. if not account_msg_list:
  60. break
  61. self.insert_msg_into_article_meta_table(
  62. account_id, category, account_msg_list
  63. )
  64. last_article = account_msg_list[-1]
  65. last_timestamp_in_page = last_article["AppMsg"]["BaseInfo"]["UpdateTime"]
  66. if latest_update_timestamp < last_timestamp_in_page:
  67. # update account timestamp
  68. break
  69. else:
  70. next_cursor = fetch_response["data"].get("next_cursor")
  71. if not next_cursor:
  72. break