weixinCategoryCrawler.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. """
  2. @author: luojunhui
  3. 抓取全局品类文章
  4. """
  5. import time
  6. from tqdm import tqdm
  7. from applications import WeixinSpider, Functions, DeNetMysql, longArticlesMySQL
  8. class weixinCategory(object):
  9. """
  10. 微信全局品类账号抓取
  11. """
  12. def __init__(self):
  13. self.db_client_lam = longArticlesMySQL
  14. self.db_client_dt = DeNetMysql()
  15. self.spider = WeixinSpider()
  16. self.function = Functions()
  17. def getAccountList(self, account_category):
  18. """
  19. 获取账号
  20. :param account_category 品类
  21. :return:
  22. """
  23. sql = f"""
  24. select gh_id, account_source, account_name, account_category, latest_update_time
  25. from long_articles_accounts
  26. where account_category = '{account_category}' and is_using = 1;
  27. """
  28. account_tuple = self.db_client_lam.select(sql)
  29. result = [
  30. {
  31. "gh_id": i[0],
  32. "platform": i[1],
  33. "account_name": i[2],
  34. "category": i[3],
  35. "latest_timestamp": i[4],
  36. }
  37. for i in account_tuple
  38. ]
  39. return result
  40. def updateDataIntoMysql(self, gh_id, category, article_list):
  41. """
  42. 将数据更新到数据库
  43. :return:
  44. """
  45. for article_obj in article_list:
  46. detail_article_list = article_obj["AppMsg"]["DetailInfo"]
  47. for obj in detail_article_list:
  48. try:
  49. show_stat = self.function.show_desc_to_sta(obj["ShowDesc"])
  50. show_view_count = show_stat.get("show_view_count", 0)
  51. show_like_count = show_stat.get("show_like_count", 0)
  52. insert_sql = f"""
  53. insert into crawler_meta_article
  54. (platform, mode, category, out_account_id, article_index, title, link, read_cnt, like_cnt, description, publish_time, crawler_time, status, unique_index)
  55. VALUES
  56. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  57. """
  58. self.db_client_dt.update(
  59. sql=insert_sql,
  60. params=(
  61. "weixin",
  62. "account",
  63. category,
  64. gh_id,
  65. obj['ItemIndex'],
  66. obj["Title"],
  67. obj["ContentUrl"],
  68. show_view_count,
  69. show_like_count,
  70. obj["Digest"],
  71. obj["send_time"],
  72. int(time.time()),
  73. 1,
  74. self.function.generateGzhId(obj["ContentUrl"]),
  75. ),
  76. )
  77. except Exception as e:
  78. print(e)
  79. def updateLatestAccountTimeStamp(self, gh_id):
  80. """
  81. 更新账号的最新时间戳
  82. :return:
  83. """
  84. select_sql = f"""
  85. SELECT publish_time
  86. From crawler_meta_article
  87. WHERE out_account_id = '{gh_id}'
  88. ORDER BY publish_time DESC LIMIT 1;
  89. """
  90. result = self.db_client_dt.select(select_sql)
  91. time_stamp = result[0][0]
  92. dt_str = self.function.time_stamp_to_str(time_stamp)
  93. update_sql = f"""
  94. update long_articles_accounts
  95. set latest_update_time = %s
  96. where gh_id = %s;
  97. """
  98. self.db_client_lam.update(sql=update_sql, params=(dt_str, gh_id))
  99. def updateEachAccountArticles(self, gh_id, category, latest_time_stamp, index=None):
  100. """
  101. 更新账号文章
  102. :return:
  103. """
  104. response = self.spider.update_msg_list(ghId=gh_id, index=index)
  105. msg_list = response.get("data", {}).get("data")
  106. if msg_list:
  107. last_article_in_this_msg = msg_list[-1]
  108. self.updateDataIntoMysql(
  109. gh_id=gh_id, category=category, article_list=msg_list
  110. )
  111. last_time_stamp_in_this_msg = last_article_in_this_msg["AppMsg"]["BaseInfo"]["UpdateTime"]
  112. if latest_time_stamp < last_time_stamp_in_this_msg:
  113. next_cursor = response["data"]["next_cursor"]
  114. return self.updateEachAccountArticles(
  115. gh_id=gh_id,
  116. latest_time_stamp=latest_time_stamp,
  117. category=category,
  118. index=next_cursor,
  119. )
  120. else:
  121. # 更新最近抓取时间
  122. self.updateLatestAccountTimeStamp(gh_id=gh_id)
  123. print("账号时间更新成功")
  124. else:
  125. print("No more data")
  126. if __name__ == "__main__":
  127. wxCategory = weixinCategory()
  128. category_list = [
  129. 'daily-account-mining'
  130. ]
  131. for category in category_list:
  132. account_list = wxCategory.getAccountList(category)
  133. for account in tqdm(account_list):
  134. try:
  135. gh_id = account['gh_id']
  136. category = account['category']
  137. try:
  138. timestamp = int(account['latest_timestamp'].timestamp())
  139. except Exception as e:
  140. timestamp = 1704038400
  141. wxCategory.updateEachAccountArticles(
  142. gh_id=gh_id,
  143. category=category,
  144. latest_time_stamp=timestamp
  145. )
  146. print("success")
  147. except Exception as e:
  148. print("fail because of {}".format(e))