weixinCategoryCrawler.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. """
  2. @author: luojunhui
  3. 抓取全局品类文章
  4. """
  5. import time
  6. from tqdm import tqdm
  7. from applications import WeixinSpider, Functions, DeNetMysql, PQMySQL
  8. class weixinCategory(object):
  9. """
  10. 微信全局品类账号抓取
  11. """
  12. def __init__(self):
  13. self.db_client_pq = PQMySQL()
  14. self.db_client_dt = DeNetMysql()
  15. self.spider = WeixinSpider()
  16. self.function = Functions()
  17. def getAccountList(self, account_category):
  18. """
  19. 获取账号
  20. :param account_category 品类
  21. :return:
  22. """
  23. sql = f"""
  24. select distinct gh_id, account_source, account_name, account_category, latest_update_time
  25. from long_articles_accounts
  26. where account_category = '{account_category}';
  27. """
  28. account_tuple = self.db_client_pq.select(sql)
  29. result = [
  30. {
  31. "gh_id": i[0],
  32. "platform": i[1],
  33. "account_name": i[2],
  34. "category": i[3],
  35. "latest_timestamp": i[4],
  36. }
  37. for i in account_tuple
  38. ]
  39. return result
  40. def updateDataIntoMysql(self, gh_id, category, article_list):
  41. """
  42. 将数据更新到数据库
  43. :return:
  44. """
  45. for article_obj in article_list:
  46. detail_article_list = article_obj["AppMsg"]["DetailInfo"]
  47. for obj in detail_article_list:
  48. try:
  49. show_stat = self.function.show_desc_to_sta(obj["ShowDesc"])
  50. show_view_count = show_stat.get("show_view_count", 0)
  51. show_like_count = show_stat.get("show_like_count", 0)
  52. insert_sql = f"""
  53. insert into crawler_meta_article
  54. (platform, mode, category, out_account_id, article_index, title, link, read_cnt, like_cnt, description, publish_time, crawler_time, status, unique_index)
  55. VALUES
  56. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  57. """
  58. self.db_client_dt.update(
  59. sql=insert_sql,
  60. params=(
  61. "weixin",
  62. "account",
  63. category,
  64. gh_id,
  65. obj['ItemIndex'],
  66. obj["Title"],
  67. obj["ContentUrl"],
  68. show_view_count,
  69. show_like_count,
  70. obj["Digest"],
  71. obj["send_time"],
  72. int(time.time()),
  73. 1,
  74. self.function.generateGzhId(obj["ContentUrl"]),
  75. ),
  76. )
  77. except Exception as e:
  78. print(e)
  79. def updateLatestAccountTimeStamp(self, gh_id):
  80. """
  81. 更新账号的最新时间戳
  82. :return:
  83. """
  84. select_sql = f"""
  85. SELECT publish_time
  86. From crawler_meta_article
  87. WHERE out_account_id = '{gh_id}'
  88. ORDER BY publish_time DESC LIMIT 1;
  89. """
  90. result = self.db_client_dt.select(select_sql)
  91. time_stamp = result[0][0]
  92. dt_str = self.function.time_stamp_to_str(time_stamp)
  93. update_sql = f"""
  94. update long_articles_accounts
  95. set latest_update_time = %s
  96. where gh_id = %s;
  97. """
  98. self.db_client_pq.update(sql=update_sql, params=(dt_str, gh_id))
  99. def updateEachAccountArticles(self, gh_id, category, latest_time_stamp, index=None):
  100. """
  101. 更新账号文章
  102. :return:
  103. """
  104. response = self.spider.update_msg_list(ghId=gh_id, index=index)
  105. msg_list = response.get("data", {}).get("data")
  106. if msg_list:
  107. last_article_in_this_msg = msg_list[-1]
  108. self.updateDataIntoMysql(
  109. gh_id=gh_id, category=category, article_list=msg_list
  110. )
  111. last_time_stamp_in_this_msg = last_article_in_this_msg["AppMsg"]["BaseInfo"]["UpdateTime"]
  112. if latest_time_stamp < last_time_stamp_in_this_msg:
  113. next_cursor = response["data"]["next_cursor"]
  114. return self.updateEachAccountArticles(
  115. gh_id=gh_id,
  116. latest_time_stamp=latest_time_stamp,
  117. category=category,
  118. index=next_cursor,
  119. )
  120. else:
  121. # 更新最近抓取时间
  122. self.updateLatestAccountTimeStamp(gh_id=gh_id)
  123. print("账号时间更新成功")
  124. else:
  125. print("No more data")
  126. if __name__ == "__main__":
  127. wxCategory = weixinCategory()
  128. category_list = [
  129. '军事',
  130. '历史',
  131. # '娱乐八卦',
  132. # '情感生活',
  133. # '健康养生',
  134. # '新闻媒体'
  135. ]
  136. for category in category_list:
  137. account_list = wxCategory.getAccountList(category)
  138. for account in tqdm(account_list):
  139. try:
  140. gh_id = account['gh_id']
  141. category = account['category']
  142. try:
  143. timestamp = int(account['latest_timestamp'].timestamp())
  144. except Exception as e:
  145. timestamp = 1704038400
  146. wxCategory.updateEachAccountArticles(
  147. gh_id=gh_id,
  148. category=category,
  149. latest_time_stamp=timestamp
  150. )
  151. print("success")
  152. except Exception as e:
  153. print("fail because of {}".format(e))