weixinCategoryCrawler.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. """
  2. @author: luojunhui
  3. 抓取全局品类文章
  4. """
  5. import time
  6. from tqdm import tqdm
  7. from applications import WeixinSpider, Functions, llm_sensitivity
  8. # 常量
  9. ACCOUNT_GOOD_STATUS = 1
  10. DEFAULT_VIEW_COUNT = 0
  11. DEFAULT_LIKE_COUNT = 0
  12. DEFAULT_ARTICLE_STATUS = 1
  13. DEFAULT_TIMESTAMP = 1704038400
  14. class weixinCategory(object):
  15. """
  16. 微信全局品类账号抓取
  17. """
  18. def __init__(self, db_client):
  19. self.db_client_lam = db_client
  20. self.spider = WeixinSpider()
  21. self.function = Functions()
  22. def get_account_list(self, account_category):
  23. """
  24. 获取账号
  25. :param account_category 品类
  26. :return:
  27. """
  28. sql = f"""
  29. select gh_id, account_source, account_name, account_category, latest_update_time
  30. from long_articles_accounts
  31. where account_category = '{account_category}' and is_using = {ACCOUNT_GOOD_STATUS};
  32. """
  33. account_tuple = self.db_client_lam.select(sql)
  34. result = [
  35. {
  36. "gh_id": i[0],
  37. "platform": i[1],
  38. "account_name": i[2],
  39. "category": i[3],
  40. "latest_timestamp": i[4],
  41. }
  42. for i in account_tuple
  43. ]
  44. return result
  45. def insert_data_into_db(self, gh_id, category, article_list):
  46. """
  47. 将数据更新到数据库
  48. :return:
  49. """
  50. success_records = []
  51. for article_obj in article_list:
  52. detail_article_list = article_obj["AppMsg"]["DetailInfo"]
  53. for obj in detail_article_list:
  54. try:
  55. show_stat = self.function.show_desc_to_sta(obj["ShowDesc"])
  56. show_view_count = show_stat.get("show_view_count", DEFAULT_VIEW_COUNT)
  57. show_like_count = show_stat.get("show_like_count", DEFAULT_LIKE_COUNT)
  58. unique_idx = self.function.generateGzhId(obj["ContentUrl"])
  59. insert_sql = f"""
  60. insert into crawler_meta_article
  61. (
  62. platform, mode, category, out_account_id, article_index, title, link, read_cnt, like_cnt,
  63. description, publish_time, crawler_time, status, unique_index, llm_sensitivity
  64. )
  65. VALUES
  66. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  67. """
  68. self.db_client_lam.update(
  69. sql=insert_sql,
  70. params=(
  71. "weixin",
  72. "account",
  73. category,
  74. gh_id,
  75. obj['ItemIndex'],
  76. obj["Title"],
  77. obj["ContentUrl"],
  78. show_view_count,
  79. show_like_count,
  80. obj["Digest"],
  81. obj["send_time"],
  82. int(time.time()),
  83. DEFAULT_ARTICLE_STATUS,
  84. unique_idx,
  85. obj.get("llm_sensitivity", -1)
  86. ),
  87. )
  88. success_records.append({
  89. 'unique_index': unique_idx, 'title': obj['Title']
  90. })
  91. except Exception as e:
  92. print(e)
  93. return success_records
  94. def update_article_sensitive_status(self, category, unique_index, status):
  95. """
  96. 更新文章敏感状态
  97. :return:
  98. """
  99. update_sql = f"""
  100. update crawler_meta_article
  101. set llm_sensitivity = %s
  102. where category = %s and unique_index = %s;
  103. """
  104. self.db_client_lam.update(sql=update_sql, params=(status, category, unique_index))
  105. def update_latest_account_timestamp(self, gh_id):
  106. """
  107. 更新账号的最新时间戳
  108. :return:
  109. """
  110. select_sql = f"""
  111. SELECT publish_time
  112. From crawler_meta_article
  113. WHERE out_account_id = '{gh_id}'
  114. ORDER BY publish_time DESC LIMIT 1;
  115. """
  116. result = self.db_client_lam.select(select_sql)
  117. time_stamp = result[0][0]
  118. dt_str = self.function.timestamp_to_str(time_stamp)
  119. update_sql = f"""
  120. update long_articles_accounts
  121. set latest_update_time = %s
  122. where gh_id = %s;
  123. """
  124. self.db_client_lam.update(sql=update_sql, params=(dt_str, gh_id))
  125. def update_each_account(self, gh_id, category, latest_time_stamp, index=None):
  126. """
  127. 更新账号文章
  128. :return:
  129. """
  130. response = self.spider.update_msg_list(ghId=gh_id, index=index)
  131. msg_list = response.get("data", {}).get("data")
  132. if msg_list:
  133. last_article_in_this_msg = msg_list[-1]
  134. success_records = self.insert_data_into_db(
  135. gh_id=gh_id, category=category, article_list=msg_list
  136. )
  137. last_time_stamp_in_this_msg = last_article_in_this_msg["AppMsg"]["BaseInfo"]["UpdateTime"]
  138. if latest_time_stamp < last_time_stamp_in_this_msg:
  139. next_cursor = response["data"]["next_cursor"]
  140. return success_records + self.update_each_account(
  141. gh_id=gh_id,
  142. latest_time_stamp=latest_time_stamp,
  143. category=category,
  144. index=next_cursor,
  145. )
  146. else:
  147. # 更新最近抓取时间
  148. self.update_latest_account_timestamp(gh_id=gh_id)
  149. print("账号时间更新成功")
  150. return success_records
  151. else:
  152. print("No more data")
  153. return []
  154. def deal(self, category_list):
  155. """
  156. :param category_list:
  157. :return:
  158. """
  159. for category in category_list:
  160. success_records = []
  161. account_list = self.get_account_list(category)
  162. for account in tqdm(account_list):
  163. try:
  164. gh_id = account['gh_id']
  165. category = account['category']
  166. try:
  167. timestamp = int(account['latest_timestamp'].timestamp())
  168. except Exception as e:
  169. timestamp = DEFAULT_TIMESTAMP
  170. success_records += self.update_each_account(
  171. gh_id=gh_id,
  172. category=category,
  173. latest_time_stamp=timestamp
  174. )
  175. print("success")
  176. except Exception as e:
  177. print("fail because of {}".format(e))
  178. success_titles = [x['title'] for x in success_records]
  179. if success_titles:
  180. try:
  181. sensitive_results = llm_sensitivity.check_titles(success_titles)
  182. for record, sensitive_result in zip(success_records, sensitive_results):
  183. self.update_article_sensitive_status(
  184. category=category,
  185. unique_index=record['unique_index'],
  186. status=sensitive_result['hit_rule']
  187. )
  188. except Exception as e:
  189. print("failed to update sensitive status: {}".format(e))
  190. def deal_accounts(self, account_list):
  191. """
  192. input account list
  193. :param account_list:
  194. :return:
  195. """
  196. account_tuple = tuple(account_list)
  197. sql = f"""
  198. SELECT gh_id, account_name, account_category, latest_update_time
  199. FROM long_articles_accounts
  200. WHERE account_name in {account_tuple};
  201. """
  202. response = self.db_client_lam.select(sql)
  203. for account in tqdm(response):
  204. try:
  205. gh_id = account[0]
  206. category = account[2]
  207. try:
  208. latest_timestamp = account[3].timestamp()
  209. except Exception as e:
  210. latest_timestamp = DEFAULT_TIMESTAMP
  211. self.update_each_account(
  212. gh_id=gh_id,
  213. category=category,
  214. latest_time_stamp=latest_timestamp
  215. )
  216. except Exception as e:
  217. print(e)