weixinCategoryCrawler.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. """
  2. @author: luojunhui
  3. 抓取全局品类文章
  4. """
  5. import time
  6. from tqdm import tqdm
  7. from applications import WeixinSpider, Functions
  8. # 常量
  9. ACCOUNT_GOOD_STATUS = 1
  10. DEFAULT_VIEW_COUNT = 0
  11. DEFAULT_LIKE_COUNT = 0
  12. DEFAULT_ARTICLE_STATUS = 1
  13. DEFAULT_TIMESTAMP = 1704038400
  14. class weixinCategory(object):
  15. """
  16. 微信全局品类账号抓取
  17. """
  18. def __init__(self, db_client):
  19. self.db_client_lam = db_client
  20. self.spider = WeixinSpider()
  21. self.function = Functions()
  22. def get_account_list(self, account_category):
  23. """
  24. 获取账号
  25. :param account_category 品类
  26. :return:
  27. """
  28. sql = f"""
  29. select gh_id, account_source, account_name, account_category, latest_update_time
  30. from long_articles_accounts
  31. where account_category = '{account_category}' and is_using = {ACCOUNT_GOOD_STATUS}
  32. and init_date = '2024-12-31';
  33. """
  34. account_tuple = self.db_client_lam.select(sql)
  35. result = [
  36. {
  37. "gh_id": i[0],
  38. "platform": i[1],
  39. "account_name": i[2],
  40. "category": i[3],
  41. "latest_timestamp": i[4],
  42. }
  43. for i in account_tuple
  44. ]
  45. return result
  46. def insert_data_into_db(self, gh_id, category, article_list):
  47. """
  48. 将数据更新到数据库
  49. :return:
  50. """
  51. for article_obj in article_list:
  52. detail_article_list = article_obj["AppMsg"]["DetailInfo"]
  53. for obj in detail_article_list:
  54. try:
  55. show_stat = self.function.show_desc_to_sta(obj["ShowDesc"])
  56. show_view_count = show_stat.get("show_view_count", DEFAULT_VIEW_COUNT)
  57. show_like_count = show_stat.get("show_like_count", DEFAULT_LIKE_COUNT)
  58. insert_sql = f"""
  59. insert into crawler_meta_article
  60. (platform, mode, category, out_account_id, article_index, title, link, read_cnt, like_cnt, description, publish_time, crawler_time, status, unique_index)
  61. VALUES
  62. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  63. """
  64. self.db_client_lam.update(
  65. sql=insert_sql,
  66. params=(
  67. "weixin",
  68. "account",
  69. category,
  70. gh_id,
  71. obj['ItemIndex'],
  72. obj["Title"],
  73. obj["ContentUrl"],
  74. show_view_count,
  75. show_like_count,
  76. obj["Digest"],
  77. obj["send_time"],
  78. int(time.time()),
  79. DEFAULT_ARTICLE_STATUS,
  80. self.function.generateGzhId(obj["ContentUrl"]),
  81. ),
  82. )
  83. except Exception as e:
  84. print(e)
  85. def update_latest_account_timestamp(self, gh_id):
  86. """
  87. 更新账号的最新时间戳
  88. :return:
  89. """
  90. select_sql = f"""
  91. SELECT publish_time
  92. From crawler_meta_article
  93. WHERE out_account_id = '{gh_id}'
  94. ORDER BY publish_time DESC LIMIT 1;
  95. """
  96. result = self.db_client_lam.select(select_sql)
  97. time_stamp = result[0][0]
  98. dt_str = self.function.timestamp_to_str(time_stamp)
  99. update_sql = f"""
  100. update long_articles_accounts
  101. set latest_update_time = %s
  102. where gh_id = %s;
  103. """
  104. self.db_client_lam.update(sql=update_sql, params=(dt_str, gh_id))
  105. def update_each_account(self, gh_id, category, latest_time_stamp, index=None):
  106. """
  107. 更新账号文章
  108. :return:
  109. """
  110. response = self.spider.update_msg_list(ghId=gh_id, index=index)
  111. msg_list = response.get("data", {}).get("data")
  112. if msg_list:
  113. # last_article_in_this_msg = msg_list[-1]
  114. self.insert_data_into_db(
  115. gh_id=gh_id, category=category, article_list=msg_list
  116. )
  117. # last_time_stamp_in_this_msg = last_article_in_this_msg["AppMsg"]["BaseInfo"]["UpdateTime"]
  118. # if latest_time_stamp < last_time_stamp_in_this_msg:
  119. # next_cursor = response["data"]["next_cursor"]
  120. # return self.update_each_account(
  121. # gh_id=gh_id,
  122. # latest_time_stamp=latest_time_stamp,
  123. # category=category,
  124. # index=next_cursor,
  125. # )
  126. # else:
  127. # 更新最近抓取时间
  128. self.update_latest_account_timestamp(gh_id=gh_id)
  129. print("账号时间更新成功")
  130. else:
  131. print("No more data")
  132. def deal(self, category_list):
  133. """
  134. :param category_list:
  135. :return:
  136. """
  137. for category in category_list:
  138. account_list = self.get_account_list(category)
  139. for account in tqdm(account_list):
  140. try:
  141. gh_id = account['gh_id']
  142. category = account['category']
  143. try:
  144. timestamp = int(account['latest_timestamp'].timestamp())
  145. except Exception as e:
  146. timestamp = DEFAULT_TIMESTAMP
  147. self.update_each_account(
  148. gh_id=gh_id,
  149. category=category,
  150. latest_time_stamp=timestamp
  151. )
  152. print("success")
  153. except Exception as e:
  154. print("fail because of {}".format(e))
  155. def deal_accounts(self, account_list):
  156. """
  157. input account list
  158. :param account_list:
  159. :return:
  160. """
  161. account_tuple = tuple(account_list)
  162. sql = f"""
  163. SELECT gh_id, account_name, account_category, latest_update_time
  164. FROM long_articles_accounts
  165. WHERE account_name in {account_tuple};
  166. """
  167. response = self.db_client_lam.select(sql)
  168. for account in tqdm(response):
  169. try:
  170. gh_id = account[0]
  171. category = account[2]
  172. try:
  173. latest_timestamp = account[3].timestamp()
  174. except Exception as e:
  175. latest_timestamp = DEFAULT_TIMESTAMP
  176. self.update_each_account(
  177. gh_id=gh_id,
  178. category=category,
  179. latest_time_stamp=latest_timestamp
  180. )
  181. except Exception as e:
  182. print(e)