weixinCategoryCrawler.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. """
  2. @author: luojunhui
  3. 抓取全局品类文章
  4. """
  5. import time
  6. from tqdm import tqdm
  7. from applications import WeixinSpider, Functions, llm_sensitivity
  8. # 常量
  9. ACCOUNT_GOOD_STATUS = 1
  10. DEFAULT_VIEW_COUNT = 0
  11. DEFAULT_LIKE_COUNT = 0
  12. DEFAULT_ARTICLE_STATUS = 1
  13. DEFAULT_TIMESTAMP = 1704038400
  14. class weixinCategory(object):
  15. """
  16. 微信全局品类账号抓取
  17. """
  18. def __init__(self, db_client):
  19. self.db_client_lam = db_client
  20. self.spider = WeixinSpider()
  21. self.function = Functions()
  22. def get_account_list(self, account_category):
  23. """
  24. 获取账号
  25. :param account_category 品类
  26. :return:
  27. """
  28. sql = f"""
  29. select gh_id, account_source, account_name, account_category, latest_update_time
  30. from long_articles_accounts
  31. where account_category = '{account_category}' and is_using = {ACCOUNT_GOOD_STATUS};
  32. """
  33. account_tuple = self.db_client_lam.select(sql)
  34. result = [
  35. {
  36. "gh_id": i[0],
  37. "platform": i[1],
  38. "account_name": i[2],
  39. "category": i[3],
  40. "latest_timestamp": i[4],
  41. }
  42. for i in account_tuple
  43. ]
  44. return result
  45. def insert_data_into_db(self, gh_id, category, article_list):
  46. """
  47. 将数据更新到数据库
  48. :return:
  49. """
  50. for article_obj in article_list:
  51. detail_article_list = article_obj["AppMsg"]["DetailInfo"]
  52. for obj in detail_article_list:
  53. try:
  54. show_stat = self.function.show_desc_to_sta(obj["ShowDesc"])
  55. show_view_count = show_stat.get("show_view_count", DEFAULT_VIEW_COUNT)
  56. show_like_count = show_stat.get("show_like_count", DEFAULT_LIKE_COUNT)
  57. insert_sql = f"""
  58. insert into crawler_meta_article
  59. (
  60. platform, mode, category, out_account_id, article_index, title, link, read_cnt, like_cnt,
  61. description, publish_time, crawler_time, status, unique_index, llm_sensitivity
  62. )
  63. VALUES
  64. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  65. """
  66. self.db_client_lam.update(
  67. sql=insert_sql,
  68. params=(
  69. "weixin",
  70. "account",
  71. category,
  72. gh_id,
  73. obj['ItemIndex'],
  74. obj["Title"],
  75. obj["ContentUrl"],
  76. show_view_count,
  77. show_like_count,
  78. obj["Digest"],
  79. obj["send_time"],
  80. int(time.time()),
  81. DEFAULT_ARTICLE_STATUS,
  82. self.function.generateGzhId(obj["ContentUrl"]),
  83. obj.get("llm_sensitivity", -1)
  84. ),
  85. )
  86. except Exception as e:
  87. print(e)
  88. def update_latest_account_timestamp(self, gh_id):
  89. """
  90. 更新账号的最新时间戳
  91. :return:
  92. """
  93. select_sql = f"""
  94. SELECT publish_time
  95. From crawler_meta_article
  96. WHERE out_account_id = '{gh_id}'
  97. ORDER BY publish_time DESC LIMIT 1;
  98. """
  99. result = self.db_client_lam.select(select_sql)
  100. time_stamp = result[0][0]
  101. dt_str = self.function.timestamp_to_str(time_stamp)
  102. update_sql = f"""
  103. update long_articles_accounts
  104. set latest_update_time = %s
  105. where gh_id = %s;
  106. """
  107. self.db_client_lam.update(sql=update_sql, params=(dt_str, gh_id))
  108. def update_each_account(self, gh_id, category, latest_time_stamp, index=None):
  109. """
  110. 更新账号文章
  111. :return:
  112. """
  113. response = self.spider.update_msg_list(ghId=gh_id, index=index)
  114. msg_list = response.get("data", {}).get("data")
  115. if msg_list:
  116. last_article_in_this_msg = msg_list[-1]
  117. article_titles = []
  118. for msg in msg_list:
  119. for article in msg['AppMsg']['DetailInfo']:
  120. article_titles.append(article['Title'])
  121. sensitive_results = llm_sensitivity.check_titles(article_titles, True)
  122. for msg in msg_list:
  123. for article in msg['AppMsg']['DetailInfo']:
  124. sensitive_hit = sensitive_results.get(article['Title'], None)
  125. if sensitive_hit:
  126. article['llm_sensitivity'] = sensitive_hit['hit_rule']
  127. self.insert_data_into_db(
  128. gh_id=gh_id, category=category, article_list=msg_list
  129. )
  130. last_time_stamp_in_this_msg = last_article_in_this_msg["AppMsg"]["BaseInfo"]["UpdateTime"]
  131. if latest_time_stamp < last_time_stamp_in_this_msg:
  132. next_cursor = response["data"]["next_cursor"]
  133. return self.update_each_account(
  134. gh_id=gh_id,
  135. latest_time_stamp=latest_time_stamp,
  136. category=category,
  137. index=next_cursor,
  138. )
  139. else:
  140. # 更新最近抓取时间
  141. self.update_latest_account_timestamp(gh_id=gh_id)
  142. print("账号时间更新成功")
  143. else:
  144. print("No more data")
  145. def deal(self, category_list):
  146. """
  147. :param category_list:
  148. :return:
  149. """
  150. for category in category_list:
  151. account_list = self.get_account_list(category)
  152. for account in tqdm(account_list):
  153. try:
  154. gh_id = account['gh_id']
  155. category = account['category']
  156. try:
  157. timestamp = int(account['latest_timestamp'].timestamp())
  158. except Exception as e:
  159. timestamp = DEFAULT_TIMESTAMP
  160. self.update_each_account(
  161. gh_id=gh_id,
  162. category=category,
  163. latest_time_stamp=timestamp
  164. )
  165. print("success")
  166. except Exception as e:
  167. print("fail because of {}".format(e))
  168. def deal_accounts(self, account_list):
  169. """
  170. input account list
  171. :param account_list:
  172. :return:
  173. """
  174. account_tuple = tuple(account_list)
  175. sql = f"""
  176. SELECT gh_id, account_name, account_category, latest_update_time
  177. FROM long_articles_accounts
  178. WHERE account_name in {account_tuple};
  179. """
  180. response = self.db_client_lam.select(sql)
  181. for account in tqdm(response):
  182. try:
  183. gh_id = account[0]
  184. category = account[2]
  185. try:
  186. latest_timestamp = account[3].timestamp()
  187. except Exception as e:
  188. latest_timestamp = DEFAULT_TIMESTAMP
  189. self.update_each_account(
  190. gh_id=gh_id,
  191. category=category,
  192. latest_time_stamp=latest_timestamp
  193. )
  194. except Exception as e:
  195. print(e)