weixinCategoryCrawler.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. """
  2. @author: luojunhui
  3. 抓取全局品类文章
  4. """
  5. import time
  6. from tqdm import tqdm
  7. from applications import WeixinSpider, Functions, llm_sensitivity, log
  8. from coldStartTasks.filter import article_crawler_duplicate_filter
  9. # 常量
  10. ACCOUNT_GOOD_STATUS = 1
  11. ACCOUNT_DAILY_SCRAPE = 1
  12. DEFAULT_VIEW_COUNT = 0
  13. DEFAULT_LIKE_COUNT = 0
  14. DEFAULT_ARTICLE_STATUS = 1
  15. DEFAULT_TIMESTAMP = 1704038400
  16. class weixinCategory(object):
  17. """
  18. 微信全局品类账号抓取
  19. """
  20. def __init__(self, db_client):
  21. self.db_client_lam = db_client
  22. self.spider = WeixinSpider()
  23. self.function = Functions()
  24. def get_account_list(self, account_category):
  25. """
  26. 获取账号
  27. :param account_category 品类
  28. :return:
  29. """
  30. sql = f"""
  31. select gh_id, account_source, account_name, account_category, latest_update_time
  32. from long_articles_accounts
  33. where account_category = '{account_category}' and is_using = {ACCOUNT_GOOD_STATUS} and daily_scrape = {ACCOUNT_DAILY_SCRAPE};
  34. """
  35. account_tuple = self.db_client_lam.select(sql)
  36. result = [
  37. {
  38. "gh_id": i[0],
  39. "platform": i[1],
  40. "account_name": i[2],
  41. "category": i[3],
  42. "latest_timestamp": i[4],
  43. }
  44. for i in account_tuple
  45. ]
  46. return result
  47. def insert_data_into_db(self, gh_id, category, article_list):
  48. """
  49. 将数据更新到数据库
  50. :return:
  51. """
  52. success_records = []
  53. for article_obj in article_list:
  54. detail_article_list = article_obj["AppMsg"]["DetailInfo"]
  55. for obj in detail_article_list:
  56. try:
  57. # 判断文章是否存在相同的标题
  58. if article_crawler_duplicate_filter(
  59. new_article_title=obj["Title"], db_client=self.db_client_lam
  60. ):
  61. log(
  62. function="weixinCategory",
  63. task="weixinCategory",
  64. message="文章去重",
  65. data={"title": obj["Title"]}
  66. )
  67. continue
  68. show_stat = self.function.show_desc_to_sta(obj["ShowDesc"])
  69. show_view_count = show_stat.get("show_view_count", DEFAULT_VIEW_COUNT)
  70. show_like_count = show_stat.get("show_like_count", DEFAULT_LIKE_COUNT)
  71. unique_idx = self.function.generateGzhId(obj["ContentUrl"])
  72. insert_sql = f"""
  73. insert into crawler_meta_article
  74. (
  75. platform, mode, category, out_account_id, article_index, title, link, read_cnt, like_cnt,
  76. description, publish_time, crawler_time, status, unique_index, llm_sensitivity
  77. )
  78. VALUES
  79. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  80. """
  81. self.db_client_lam.update(
  82. sql=insert_sql,
  83. params=(
  84. "weixin",
  85. "account",
  86. category,
  87. gh_id,
  88. obj['ItemIndex'],
  89. obj["Title"],
  90. obj["ContentUrl"],
  91. show_view_count,
  92. show_like_count,
  93. obj["Digest"],
  94. obj["send_time"],
  95. int(time.time()),
  96. DEFAULT_ARTICLE_STATUS,
  97. unique_idx,
  98. obj.get("llm_sensitivity", -1)
  99. ),
  100. )
  101. success_records.append({
  102. 'unique_index': unique_idx, 'title': obj['Title']
  103. })
  104. except Exception as e:
  105. print(e)
  106. return success_records
  107. def update_article_sensitive_status(self, category, unique_index, status):
  108. """
  109. 更新文章敏感状态
  110. :return:
  111. """
  112. update_sql = f"""
  113. update crawler_meta_article
  114. set llm_sensitivity = %s
  115. where category = %s and unique_index = %s;
  116. """
  117. self.db_client_lam.update(sql=update_sql, params=(status, category, unique_index))
  118. def update_latest_account_timestamp(self, gh_id):
  119. """
  120. 更新账号的最新时间戳
  121. :return:
  122. """
  123. select_sql = f"""
  124. SELECT publish_time
  125. From crawler_meta_article
  126. WHERE out_account_id = '{gh_id}'
  127. ORDER BY publish_time DESC LIMIT 1;
  128. """
  129. result = self.db_client_lam.select(select_sql)
  130. time_stamp = result[0][0]
  131. dt_str = self.function.timestamp_to_str(time_stamp)
  132. update_sql = f"""
  133. update long_articles_accounts
  134. set latest_update_time = %s
  135. where gh_id = %s;
  136. """
  137. self.db_client_lam.update(sql=update_sql, params=(dt_str, gh_id))
  138. def update_each_account(self, gh_id, category, latest_time_stamp, index=None):
  139. """
  140. 更新账号文章
  141. :return:
  142. """
  143. response = self.spider.update_msg_list(ghId=gh_id, index=index)
  144. msg_list = response.get("data", {}).get("data")
  145. if msg_list:
  146. last_article_in_this_msg = msg_list[-1]
  147. success_records = self.insert_data_into_db(
  148. gh_id=gh_id, category=category, article_list=msg_list
  149. )
  150. last_time_stamp_in_this_msg = last_article_in_this_msg["AppMsg"]["BaseInfo"]["UpdateTime"]
  151. if latest_time_stamp < last_time_stamp_in_this_msg:
  152. next_cursor = response["data"]["next_cursor"]
  153. return success_records + self.update_each_account(
  154. gh_id=gh_id,
  155. latest_time_stamp=latest_time_stamp,
  156. category=category,
  157. index=next_cursor,
  158. )
  159. else:
  160. # 更新最近抓取时间
  161. self.update_latest_account_timestamp(gh_id=gh_id)
  162. print("账号时间更新成功")
  163. return success_records
  164. else:
  165. print("No more data")
  166. return []
  167. def deal(self, category_list):
  168. """
  169. :param category_list:
  170. :return:
  171. """
  172. for category in category_list:
  173. success_records = []
  174. account_list = self.get_account_list(category)
  175. for account in tqdm(account_list):
  176. try:
  177. gh_id = account['gh_id']
  178. category = account['category']
  179. try:
  180. timestamp = int(account['latest_timestamp'].timestamp())
  181. except Exception as e:
  182. timestamp = DEFAULT_TIMESTAMP
  183. success_records += self.update_each_account(
  184. gh_id=gh_id,
  185. category=category,
  186. latest_time_stamp=timestamp
  187. )
  188. print("success")
  189. except Exception as e:
  190. print("fail because of {}".format(e))
  191. success_titles = [x['title'] for x in success_records]
  192. if success_titles:
  193. try:
  194. sensitive_results = llm_sensitivity.check_titles(success_titles)
  195. for record, sensitive_result in zip(success_records, sensitive_results):
  196. self.update_article_sensitive_status(
  197. category=category,
  198. unique_index=record['unique_index'],
  199. status=sensitive_result['hit_rule']
  200. )
  201. except Exception as e:
  202. print("failed to update sensitive status: {}".format(e))
  203. def deal_accounts(self, account_list):
  204. """
  205. input account list
  206. :param account_list:
  207. :return:
  208. """
  209. account_tuple = tuple(account_list)
  210. sql = f"""
  211. SELECT gh_id, account_name, account_category, latest_update_time
  212. FROM long_articles_accounts
  213. WHERE account_name in {account_tuple};
  214. """
  215. response = self.db_client_lam.select(sql)
  216. for account in tqdm(response):
  217. try:
  218. gh_id = account[0]
  219. category = account[2]
  220. try:
  221. latest_timestamp = account[3].timestamp()
  222. except Exception as e:
  223. latest_timestamp = DEFAULT_TIMESTAMP
  224. self.update_each_account(
  225. gh_id=gh_id,
  226. category=category,
  227. latest_time_stamp=latest_timestamp
  228. )
  229. except Exception as e:
  230. print(e)