weixinCategoryCrawler.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. """
  2. @author: luojunhui
  3. 抓取全局品类文章
  4. """
  5. import time
  6. from tqdm import tqdm
  7. from applications import WeixinSpider, Functions, llm_sensitivity, log
  8. from coldStartTasks.filter import article_crawler_duplicate_filter
  9. # 常量
  10. ACCOUNT_GOOD_STATUS = 1
  11. DEFAULT_VIEW_COUNT = 0
  12. DEFAULT_LIKE_COUNT = 0
  13. DEFAULT_ARTICLE_STATUS = 1
  14. DEFAULT_TIMESTAMP = 1704038400
  15. class weixinCategory(object):
  16. """
  17. 微信全局品类账号抓取
  18. """
  19. def __init__(self, db_client):
  20. self.db_client_lam = db_client
  21. self.spider = WeixinSpider()
  22. self.function = Functions()
  23. def get_account_list(self, account_category):
  24. """
  25. 获取账号
  26. :param account_category 品类
  27. :return:
  28. """
  29. sql = f"""
  30. select gh_id, account_source, account_name, account_category, latest_update_time
  31. from long_articles_accounts
  32. where account_category = '{account_category}' and is_using = {ACCOUNT_GOOD_STATUS};
  33. """
  34. account_tuple = self.db_client_lam.select(sql)
  35. result = [
  36. {
  37. "gh_id": i[0],
  38. "platform": i[1],
  39. "account_name": i[2],
  40. "category": i[3],
  41. "latest_timestamp": i[4],
  42. }
  43. for i in account_tuple
  44. ]
  45. return result
  46. def insert_data_into_db(self, gh_id, category, article_list):
  47. """
  48. 将数据更新到数据库
  49. :return:
  50. """
  51. success_records = []
  52. for article_obj in article_list:
  53. detail_article_list = article_obj["AppMsg"]["DetailInfo"]
  54. for obj in detail_article_list:
  55. try:
  56. # 判断文章是否存在相同的标题
  57. if article_crawler_duplicate_filter(
  58. new_article_title=obj["Title"], db_client=self.db_client_lam
  59. ):
  60. log(
  61. function="weixinCategory",
  62. task="weixinCategory",
  63. message="文章去重",
  64. data={"title": obj["Title"]}
  65. )
  66. continue
  67. show_stat = self.function.show_desc_to_sta(obj["ShowDesc"])
  68. show_view_count = show_stat.get("show_view_count", DEFAULT_VIEW_COUNT)
  69. show_like_count = show_stat.get("show_like_count", DEFAULT_LIKE_COUNT)
  70. unique_idx = self.function.generateGzhId(obj["ContentUrl"])
  71. insert_sql = f"""
  72. insert into crawler_meta_article
  73. (
  74. platform, mode, category, out_account_id, article_index, title, link, read_cnt, like_cnt,
  75. description, publish_time, crawler_time, status, unique_index, llm_sensitivity
  76. )
  77. VALUES
  78. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  79. """
  80. self.db_client_lam.update(
  81. sql=insert_sql,
  82. params=(
  83. "weixin",
  84. "account",
  85. category,
  86. gh_id,
  87. obj['ItemIndex'],
  88. obj["Title"],
  89. obj["ContentUrl"],
  90. show_view_count,
  91. show_like_count,
  92. obj["Digest"],
  93. obj["send_time"],
  94. int(time.time()),
  95. DEFAULT_ARTICLE_STATUS,
  96. unique_idx,
  97. obj.get("llm_sensitivity", -1)
  98. ),
  99. )
  100. success_records.append({
  101. 'unique_index': unique_idx, 'title': obj['Title']
  102. })
  103. except Exception as e:
  104. print(e)
  105. return success_records
  106. def update_article_sensitive_status(self, category, unique_index, status):
  107. """
  108. 更新文章敏感状态
  109. :return:
  110. """
  111. update_sql = f"""
  112. update crawler_meta_article
  113. set llm_sensitivity = %s
  114. where category = %s and unique_index = %s;
  115. """
  116. self.db_client_lam.update(sql=update_sql, params=(status, category, unique_index))
  117. def update_latest_account_timestamp(self, gh_id):
  118. """
  119. 更新账号的最新时间戳
  120. :return:
  121. """
  122. select_sql = f"""
  123. SELECT publish_time
  124. From crawler_meta_article
  125. WHERE out_account_id = '{gh_id}'
  126. ORDER BY publish_time DESC LIMIT 1;
  127. """
  128. result = self.db_client_lam.select(select_sql)
  129. time_stamp = result[0][0]
  130. dt_str = self.function.timestamp_to_str(time_stamp)
  131. update_sql = f"""
  132. update long_articles_accounts
  133. set latest_update_time = %s
  134. where gh_id = %s;
  135. """
  136. self.db_client_lam.update(sql=update_sql, params=(dt_str, gh_id))
  137. def update_each_account(self, gh_id, category, latest_time_stamp, index=None):
  138. """
  139. 更新账号文章
  140. :return:
  141. """
  142. response = self.spider.update_msg_list(ghId=gh_id, index=index)
  143. msg_list = response.get("data", {}).get("data")
  144. if msg_list:
  145. last_article_in_this_msg = msg_list[-1]
  146. success_records = self.insert_data_into_db(
  147. gh_id=gh_id, category=category, article_list=msg_list
  148. )
  149. last_time_stamp_in_this_msg = last_article_in_this_msg["AppMsg"]["BaseInfo"]["UpdateTime"]
  150. if latest_time_stamp < last_time_stamp_in_this_msg:
  151. next_cursor = response["data"]["next_cursor"]
  152. return success_records + self.update_each_account(
  153. gh_id=gh_id,
  154. latest_time_stamp=latest_time_stamp,
  155. category=category,
  156. index=next_cursor,
  157. )
  158. else:
  159. # 更新最近抓取时间
  160. self.update_latest_account_timestamp(gh_id=gh_id)
  161. print("账号时间更新成功")
  162. return success_records
  163. else:
  164. print("No more data")
  165. return []
  166. def deal(self, category_list):
  167. """
  168. :param category_list:
  169. :return:
  170. """
  171. for category in category_list:
  172. success_records = []
  173. account_list = self.get_account_list(category)
  174. for account in tqdm(account_list):
  175. try:
  176. gh_id = account['gh_id']
  177. category = account['category']
  178. try:
  179. timestamp = int(account['latest_timestamp'].timestamp())
  180. except Exception as e:
  181. timestamp = DEFAULT_TIMESTAMP
  182. success_records += self.update_each_account(
  183. gh_id=gh_id,
  184. category=category,
  185. latest_time_stamp=timestamp
  186. )
  187. print("success")
  188. except Exception as e:
  189. print("fail because of {}".format(e))
  190. success_titles = [x['title'] for x in success_records]
  191. if success_titles:
  192. try:
  193. sensitive_results = llm_sensitivity.check_titles(success_titles)
  194. for record, sensitive_result in zip(success_records, sensitive_results):
  195. self.update_article_sensitive_status(
  196. category=category,
  197. unique_index=record['unique_index'],
  198. status=sensitive_result['hit_rule']
  199. )
  200. except Exception as e:
  201. print("failed to update sensitive status: {}".format(e))
  202. def deal_accounts(self, account_list):
  203. """
  204. input account list
  205. :param account_list:
  206. :return:
  207. """
  208. account_tuple = tuple(account_list)
  209. sql = f"""
  210. SELECT gh_id, account_name, account_category, latest_update_time
  211. FROM long_articles_accounts
  212. WHERE account_name in {account_tuple};
  213. """
  214. response = self.db_client_lam.select(sql)
  215. for account in tqdm(response):
  216. try:
  217. gh_id = account[0]
  218. category = account[2]
  219. try:
  220. latest_timestamp = account[3].timestamp()
  221. except Exception as e:
  222. latest_timestamp = DEFAULT_TIMESTAMP
  223. self.update_each_account(
  224. gh_id=gh_id,
  225. category=category,
  226. latest_time_stamp=latest_timestamp
  227. )
  228. except Exception as e:
  229. print(e)