weixinCategoryCrawler.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. """
  2. @author: luojunhui
  3. 抓取全局品类文章
  4. """
  5. import time
  6. from tqdm import tqdm
  7. from applications import WeixinSpider, Functions, llm_sensitivity
  8. # 常量
  9. ACCOUNT_GOOD_STATUS = 1
  10. DEFAULT_VIEW_COUNT = 0
  11. DEFAULT_LIKE_COUNT = 0
  12. DEFAULT_ARTICLE_STATUS = 1
  13. DEFAULT_TIMESTAMP = 1704038400
  14. class weixinCategory(object):
  15. """
  16. 微信全局品类账号抓取
  17. """
  18. def __init__(self, db_client):
  19. self.db_client_lam = db_client
  20. self.spider = WeixinSpider()
  21. self.function = Functions()
  22. def get_account_list(self, account_category):
  23. """
  24. 获取账号
  25. :param account_category 品类
  26. :return:
  27. """
  28. sql = f"""
  29. select gh_id, account_source, account_name, account_category, latest_update_time
  30. from long_articles_accounts
  31. where account_category = '{account_category}' and is_using = {ACCOUNT_GOOD_STATUS};
  32. """
  33. account_tuple = self.db_client_lam.select(sql)
  34. result = [
  35. {
  36. "gh_id": i[0],
  37. "platform": i[1],
  38. "account_name": i[2],
  39. "category": i[3],
  40. "latest_timestamp": i[4],
  41. }
  42. for i in account_tuple
  43. ]
  44. return result
  45. def insert_data_into_db(self, gh_id, category, article_list):
  46. """
  47. 将数据更新到数据库
  48. :return:
  49. """
  50. success_records = []
  51. for article_obj in article_list:
  52. detail_article_list = article_obj["AppMsg"]["DetailInfo"]
  53. for obj in detail_article_list:
  54. try:
  55. show_stat = self.function.show_desc_to_sta(obj["ShowDesc"])
  56. show_view_count = show_stat.get("show_view_count", DEFAULT_VIEW_COUNT)
  57. show_like_count = show_stat.get("show_like_count", DEFAULT_LIKE_COUNT)
  58. unique_idx = self.function.generateGzhId(obj["ContentUrl"])
  59. insert_sql = f"""
  60. insert into crawler_meta_article
  61. (
  62. platform, mode, category, out_account_id, article_index, title, link, read_cnt, like_cnt,
  63. description, publish_time, crawler_time, status, unique_index, llm_sensitivity
  64. )
  65. VALUES
  66. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  67. """
  68. self.db_client_lam.update(
  69. sql=insert_sql,
  70. params=(
  71. "weixin",
  72. "account",
  73. category,
  74. gh_id,
  75. obj['ItemIndex'],
  76. obj["Title"],
  77. obj["ContentUrl"],
  78. show_view_count,
  79. show_like_count,
  80. obj["Digest"],
  81. obj["send_time"],
  82. int(time.time()),
  83. DEFAULT_ARTICLE_STATUS,
  84. unique_idx,
  85. obj.get("llm_sensitivity", -1)
  86. ),
  87. )
  88. success_records.append({
  89. 'unique_index': unique_idx, 'title': obj['Title']
  90. })
  91. except Exception as e:
  92. print(e)
  93. return success_records
  94. def update_article_sensitive_status(self, category, unique_index, status):
  95. """
  96. 更新文章敏感状态
  97. :return:
  98. """
  99. update_sql = f"""
  100. update crawler_meta_article
  101. set llm_sensitivity = %s
  102. where category = %s and unique_index = %s;
  103. """
  104. self.db_client_lam.update(sql=update_sql, params=(status, category, unique_index))
  105. def update_latest_account_timestamp(self, gh_id):
  106. """
  107. 更新账号的最新时间戳
  108. :return:
  109. """
  110. select_sql = f"""
  111. SELECT publish_time
  112. From crawler_meta_article
  113. WHERE out_account_id = '{gh_id}'
  114. ORDER BY publish_time DESC LIMIT 1;
  115. """
  116. result = self.db_client_lam.select(select_sql)
  117. time_stamp = result[0][0]
  118. dt_str = self.function.timestamp_to_str(time_stamp)
  119. update_sql = f"""
  120. update long_articles_accounts
  121. set latest_update_time = %s
  122. where gh_id = %s;
  123. """
  124. self.db_client_lam.update(sql=update_sql, params=(dt_str, gh_id))
  125. def update_each_account(self, gh_id, category, latest_time_stamp, index=None):
  126. """
  127. 更新账号文章
  128. :return:
  129. """
  130. response = self.spider.update_msg_list(ghId=gh_id, index=index)
  131. msg_list = response.get("data", {}).get("data")
  132. if msg_list:
  133. last_article_in_this_msg = msg_list[-1]
  134. success_records = self.insert_data_into_db(
  135. gh_id=gh_id, category=category, article_list=msg_list
  136. )
  137. last_time_stamp_in_this_msg = last_article_in_this_msg["AppMsg"]["BaseInfo"]["UpdateTime"]
  138. if latest_time_stamp < last_time_stamp_in_this_msg:
  139. next_cursor = response["data"]["next_cursor"]
  140. return success_records + self.update_each_account(
  141. gh_id=gh_id,
  142. latest_time_stamp=latest_time_stamp,
  143. category=category,
  144. index=next_cursor,
  145. )
  146. else:
  147. # 更新最近抓取时间
  148. self.update_latest_account_timestamp(gh_id=gh_id)
  149. print("账号时间更新成功")
  150. return success_records
  151. else:
  152. print("No more data")
  153. return []
  154. def deal(self, category_list):
  155. """
  156. :param category_list:
  157. :return:
  158. """
  159. for category in category_list:
  160. account_list = self.get_account_list(category)
  161. for account in tqdm(account_list):
  162. try:
  163. gh_id = account['gh_id']
  164. category = account['category']
  165. try:
  166. timestamp = int(account['latest_timestamp'].timestamp())
  167. except Exception as e:
  168. timestamp = DEFAULT_TIMESTAMP
  169. success_records = self.update_each_account(
  170. gh_id=gh_id,
  171. category=category,
  172. latest_time_stamp=timestamp
  173. )
  174. success_titles = [x['title'] for x in success_records]
  175. if success_titles:
  176. sensitive_results = llm_sensitivity.check_titles(success_titles)
  177. for record, sensitive_result in zip(success_records, sensitive_results):
  178. self.update_article_sensitive_status(
  179. category=category,
  180. unique_index=record['unique_index'],
  181. status=sensitive_result['hit_rule']
  182. )
  183. print("success")
  184. except Exception as e:
  185. print("fail because of {}".format(e))
  186. def deal_accounts(self, account_list):
  187. """
  188. input account list
  189. :param account_list:
  190. :return:
  191. """
  192. account_tuple = tuple(account_list)
  193. sql = f"""
  194. SELECT gh_id, account_name, account_category, latest_update_time
  195. FROM long_articles_accounts
  196. WHERE account_name in {account_tuple};
  197. """
  198. response = self.db_client_lam.select(sql)
  199. for account in tqdm(response):
  200. try:
  201. gh_id = account[0]
  202. category = account[2]
  203. try:
  204. latest_timestamp = account[3].timestamp()
  205. except Exception as e:
  206. latest_timestamp = DEFAULT_TIMESTAMP
  207. self.update_each_account(
  208. gh_id=gh_id,
  209. category=category,
  210. latest_time_stamp=latest_timestamp
  211. )
  212. except Exception as e:
  213. print(e)