weixinCategoryCrawler.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. """
  2. @author: luojunhui
  3. 抓取全局品类文章
  4. """
  5. import time
  6. from tqdm import tqdm
  7. from applications import WeixinSpider, Functions, longArticlesMySQL
  8. # 常量
  9. ACCOUNT_GOOD_STATUS = 1
  10. DEFAULT_VIEW_COUNT = 0
  11. DEFAULT_LIKE_COUNT = 0
  12. DEFAULT_ARTICLE_STATUS = 1
  13. DEFAULT_TIMESTAMP = 1704038400
  14. class weixinCategory(object):
  15. """
  16. 微信全局品类账号抓取
  17. """
  18. def __init__(self):
  19. self.db_client_lam = longArticlesMySQL()
  20. self.spider = WeixinSpider()
  21. self.function = Functions()
  22. def get_account_list(self, account_category):
  23. """
  24. 获取账号
  25. :param account_category 品类
  26. :return:
  27. """
  28. sql = f"""
  29. select gh_id, account_source, account_name, account_category, latest_update_time
  30. from long_articles_accounts
  31. where account_category = '{account_category}' and is_using = {ACCOUNT_GOOD_STATUS};
  32. """
  33. account_tuple = self.db_client_lam.select(sql)
  34. result = [
  35. {
  36. "gh_id": i[0],
  37. "platform": i[1],
  38. "account_name": i[2],
  39. "category": i[3],
  40. "latest_timestamp": i[4],
  41. }
  42. for i in account_tuple
  43. ]
  44. return result
  45. def insert_data_into_db(self, gh_id, category, article_list):
  46. """
  47. 将数据更新到数据库
  48. :return:
  49. """
  50. for article_obj in article_list:
  51. detail_article_list = article_obj["AppMsg"]["DetailInfo"]
  52. for obj in detail_article_list:
  53. try:
  54. show_stat = self.function.show_desc_to_sta(obj["ShowDesc"])
  55. show_view_count = show_stat.get("show_view_count", DEFAULT_VIEW_COUNT)
  56. show_like_count = show_stat.get("show_like_count", DEFAULT_LIKE_COUNT)
  57. insert_sql = f"""
  58. insert into crawler_meta_article
  59. (platform, mode, category, out_account_id, article_index, title, link, read_cnt, like_cnt, description, publish_time, crawler_time, status, unique_index)
  60. VALUES
  61. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  62. """
  63. self.db_client_lam.update(
  64. sql=insert_sql,
  65. params=(
  66. "weixin",
  67. "account",
  68. category,
  69. gh_id,
  70. obj['ItemIndex'],
  71. obj["Title"],
  72. obj["ContentUrl"],
  73. show_view_count,
  74. show_like_count,
  75. obj["Digest"],
  76. obj["send_time"],
  77. int(time.time()),
  78. DEFAULT_ARTICLE_STATUS,
  79. self.function.generateGzhId(obj["ContentUrl"]),
  80. ),
  81. )
  82. except Exception as e:
  83. print(e)
  84. def update_latest_account_timestamp(self, gh_id):
  85. """
  86. 更新账号的最新时间戳
  87. :return:
  88. """
  89. select_sql = f"""
  90. SELECT publish_time
  91. From crawler_meta_article
  92. WHERE out_account_id = '{gh_id}'
  93. ORDER BY publish_time DESC LIMIT 1;
  94. """
  95. result = self.db_client_lam.select(select_sql)
  96. time_stamp = result[0][0]
  97. dt_str = self.function.time_stamp_to_str(time_stamp)
  98. update_sql = f"""
  99. update long_articles_accounts
  100. set latest_update_time = %s
  101. where gh_id = %s;
  102. """
  103. self.db_client_lam.update(sql=update_sql, params=(dt_str, gh_id))
  104. def update_each_account(self, gh_id, category, latest_time_stamp, index=None):
  105. """
  106. 更新账号文章
  107. :return:
  108. """
  109. response = self.spider.update_msg_list(ghId=gh_id, index=index)
  110. msg_list = response.get("data", {}).get("data")
  111. if msg_list:
  112. last_article_in_this_msg = msg_list[-1]
  113. self.insert_data_into_db(
  114. gh_id=gh_id, category=category, article_list=msg_list
  115. )
  116. last_time_stamp_in_this_msg = last_article_in_this_msg["AppMsg"]["BaseInfo"]["UpdateTime"]
  117. if latest_time_stamp < last_time_stamp_in_this_msg:
  118. next_cursor = response["data"]["next_cursor"]
  119. return self.update_each_account(
  120. gh_id=gh_id,
  121. latest_time_stamp=latest_time_stamp,
  122. category=category,
  123. index=next_cursor,
  124. )
  125. else:
  126. # 更新最近抓取时间
  127. self.update_latest_account_timestamp(gh_id=gh_id)
  128. print("账号时间更新成功")
  129. else:
  130. print("No more data")
  131. def deal(self, category_list):
  132. """
  133. :param category_list:
  134. :return:
  135. """
  136. for category in category_list:
  137. account_list = self.get_account_list(category)
  138. for account in tqdm(account_list):
  139. try:
  140. gh_id = account['gh_id']
  141. category = account['category']
  142. try:
  143. timestamp = int(account['latest_timestamp'].timestamp())
  144. except Exception as e:
  145. timestamp = DEFAULT_TIMESTAMP
  146. self.update_each_account(
  147. gh_id=gh_id,
  148. category=category,
  149. latest_time_stamp=timestamp
  150. )
  151. print("success")
  152. except Exception as e:
  153. print("fail because of {}".format(e))
  154. def deal_accounts(self, account_list):
  155. """
  156. input account list
  157. :param account_list:
  158. :return:
  159. """
  160. account_tuple = tuple(account_list)
  161. sql = f"""
  162. SELECT gh_id, account_name, account_category, latest_update_time
  163. FROM long_articles_accounts
  164. WHERE account_name in {account_tuple};
  165. """
  166. response = self.db_client_lam.select(sql)
  167. for account in tqdm(response):
  168. try:
  169. gh_id = account[0]
  170. category = account[2]
  171. try:
  172. latest_timestamp = account[3].timestamp()
  173. except Exception as e:
  174. latest_timestamp = DEFAULT_TIMESTAMP
  175. self.update_each_account(
  176. gh_id=gh_id,
  177. category=category,
  178. latest_time_stamp=latest_timestamp
  179. )
  180. except Exception as e:
  181. print(e)