weixinCategoryCrawler.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. """
  2. @author: luojunhui
  3. 抓取全局品类文章
  4. """
  5. import json
  6. import time
  7. from tqdm import tqdm
  8. from pymysql.cursors import DictCursor
  9. from applications import WeixinSpider, Functions, llm_sensitivity, log
  10. from coldStartTasks.filter import article_crawler_duplicate_filter
  11. from config import apolloConfig
  12. # 常量
  13. ACCOUNT_GOOD_STATUS = 1
  14. # 账号是否每日抓取
  15. ACCOUNT_DAILY_SCRAPE = 1
  16. ACCOUNT_NOT_DAILY_SCRAPE = 0
  17. # 默认值
  18. DEFAULT_VIEW_COUNT = 0
  19. DEFAULT_LIKE_COUNT = 0
  20. DEFAULT_ARTICLE_STATUS = 1
  21. DEFAULT_TIMESTAMP = 1717171200
  22. # 标题sensitivity
  23. TITLE_SENSITIVE = 1
  24. TITLE_NOT_SENSITIVE = 0
  25. config = apolloConfig()
  26. sensitive_word_list = json.loads(config.getConfigValue("sensitive_word_list"))
  27. def whether_title_sensitive(title: str) -> bool:
  28. """
  29. : param title:
  30. 判断视频是否的标题是否包含敏感词
  31. """
  32. for word in sensitive_word_list:
  33. if word in title:
  34. return True
  35. return False
  36. class weixinCategory(object):
  37. """
  38. 微信全局品类账号抓取
  39. """
  40. def __init__(self, db_client):
  41. self.db_client_lam = db_client
  42. self.spider = WeixinSpider()
  43. self.function = Functions()
  44. def get_account_list(self, account_category):
  45. """
  46. 获取账号
  47. :param account_category 品类
  48. :return:
  49. """
  50. sql = f"""
  51. select gh_id, account_source, account_name, account_category, latest_update_time
  52. from long_articles_accounts
  53. where account_category = '{account_category}' and is_using = {ACCOUNT_GOOD_STATUS} and daily_scrape = {ACCOUNT_DAILY_SCRAPE};
  54. """
  55. account_tuple = self.db_client_lam.select(sql)
  56. result = [
  57. {
  58. "gh_id": i[0],
  59. "platform": i[1],
  60. "account_name": i[2],
  61. "category": i[3],
  62. "latest_timestamp": i[4],
  63. }
  64. for i in account_tuple
  65. ]
  66. return result
  67. def get_association_account_list(self, date_str):
  68. """
  69. 获取账号联想的轮询账号
  70. """
  71. group_id = date_str[-1]
  72. sql = f"""
  73. select account_id, gh_id, account_name, latest_update_time
  74. from long_articles_accounts
  75. where account_category = 'account_association' and is_using = {ACCOUNT_DAILY_SCRAPE} and daily_scrape = {ACCOUNT_NOT_DAILY_SCRAPE};
  76. """
  77. account_list = self.db_client_lam.select(sql, cursor_type=DictCursor)
  78. today_crawler_account_list = [i for i in account_list if str(i['account_id'])[-1] == group_id]
  79. return today_crawler_account_list
  80. def insert_data_into_db(self, gh_id, category, article_list):
  81. """
  82. 将数据更新到数据库
  83. :return:
  84. """
  85. success_records = []
  86. for article_obj in article_list:
  87. detail_article_list = article_obj["AppMsg"]["DetailInfo"]
  88. for obj in detail_article_list:
  89. try:
  90. # 判断文章是否存在相同的标题
  91. if article_crawler_duplicate_filter(
  92. new_article_title=obj["Title"], db_client=self.db_client_lam
  93. ):
  94. log(
  95. function="weixinCategory",
  96. task="weixinCategory",
  97. message="文章去重",
  98. data={"title": obj["Title"]}
  99. )
  100. continue
  101. # 判断标题是否包含敏感词
  102. title_sensitivity = TITLE_SENSITIVE if whether_title_sensitive(obj["Title"]) else TITLE_NOT_SENSITIVE
  103. show_stat = self.function.show_desc_to_sta(obj["ShowDesc"])
  104. show_view_count = show_stat.get("show_view_count", DEFAULT_VIEW_COUNT)
  105. show_like_count = show_stat.get("show_like_count", DEFAULT_LIKE_COUNT)
  106. unique_idx = self.function.generateGzhId(obj["ContentUrl"])
  107. insert_sql = f"""
  108. insert into crawler_meta_article
  109. (
  110. platform, mode, category, out_account_id, article_index, title, link, read_cnt, like_cnt,
  111. description, publish_time, crawler_time, status, unique_index, llm_sensitivity, title_sensitivity
  112. )
  113. VALUES
  114. (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
  115. """
  116. self.db_client_lam.update(
  117. sql=insert_sql,
  118. params=(
  119. "weixin",
  120. "account",
  121. category,
  122. gh_id,
  123. obj['ItemIndex'],
  124. obj["Title"],
  125. obj["ContentUrl"],
  126. show_view_count,
  127. show_like_count,
  128. obj["Digest"],
  129. obj["send_time"],
  130. int(time.time()),
  131. DEFAULT_ARTICLE_STATUS,
  132. unique_idx,
  133. obj.get("llm_sensitivity", -1),
  134. title_sensitivity
  135. ),
  136. )
  137. success_records.append({
  138. 'unique_index': unique_idx, 'title': obj['Title']
  139. })
  140. except Exception as e:
  141. print(e)
  142. return success_records
  143. def update_article_sensitive_status(self, category, unique_index, status):
  144. """
  145. 更新文章敏感状态
  146. :return:
  147. """
  148. update_sql = f"""
  149. update crawler_meta_article
  150. set llm_sensitivity = %s
  151. where category = %s and unique_index = %s;
  152. """
  153. self.db_client_lam.update(sql=update_sql, params=(status, category, unique_index))
  154. def update_latest_account_timestamp(self, gh_id):
  155. """
  156. 更新账号的最新时间戳
  157. :return:
  158. """
  159. select_sql = f"""
  160. SELECT publish_time
  161. From crawler_meta_article
  162. WHERE out_account_id = '{gh_id}'
  163. ORDER BY publish_time DESC LIMIT 1;
  164. """
  165. result = self.db_client_lam.select(select_sql)
  166. time_stamp = result[0][0]
  167. dt_str = self.function.timestamp_to_str(time_stamp)
  168. update_sql = f"""
  169. update long_articles_accounts
  170. set latest_update_time = %s
  171. where gh_id = %s;
  172. """
  173. self.db_client_lam.update(sql=update_sql, params=(dt_str, gh_id))
  174. def update_each_account(self, gh_id, category, latest_time_stamp, index=None):
  175. """
  176. 更新账号文章
  177. :return:
  178. """
  179. response = self.spider.update_msg_list(ghId=gh_id, index=index)
  180. msg_list = response.get("data", {}).get("data")
  181. if msg_list:
  182. last_article_in_this_msg = msg_list[-1]
  183. success_records = self.insert_data_into_db(
  184. gh_id=gh_id, category=category, article_list=msg_list
  185. )
  186. last_time_stamp_in_this_msg = last_article_in_this_msg["AppMsg"]["BaseInfo"]["UpdateTime"]
  187. if latest_time_stamp < last_time_stamp_in_this_msg:
  188. next_cursor = response["data"]["next_cursor"]
  189. return success_records + self.update_each_account(
  190. gh_id=gh_id,
  191. latest_time_stamp=latest_time_stamp,
  192. category=category,
  193. index=next_cursor,
  194. )
  195. else:
  196. # 更新最近抓取时间
  197. self.update_latest_account_timestamp(gh_id=gh_id)
  198. print("账号时间更新成功")
  199. return success_records
  200. else:
  201. print("No more data")
  202. return []
  203. def crawler_each_category(self, account_list, category):
  204. """
  205. 抓取每个品类
  206. :return:
  207. """
  208. success_records = []
  209. for account in tqdm(account_list, desc="crawler_each_category"):
  210. try:
  211. gh_id = account['gh_id']
  212. try:
  213. timestamp = int(account['latest_timestamp'].timestamp())
  214. except Exception as e:
  215. timestamp = DEFAULT_TIMESTAMP
  216. success_records += self.update_each_account(
  217. gh_id=gh_id,
  218. category=category,
  219. latest_time_stamp=timestamp
  220. )
  221. print("success")
  222. except Exception as e:
  223. print("fail because of {}".format(e))
  224. success_titles = [x['title'] for x in success_records]
  225. if success_titles:
  226. try:
  227. sensitive_results = llm_sensitivity.check_titles(success_titles)
  228. for record, sensitive_result in zip(success_records, sensitive_results):
  229. self.update_article_sensitive_status(
  230. category=category,
  231. unique_index=record['unique_index'],
  232. status=sensitive_result['hit_rule']
  233. )
  234. except Exception as e:
  235. print("failed to update sensitive status: {}".format(e))
  236. def deal(self, category_list, date_str):
  237. """
  238. :param category_list:
  239. :param date_str: YYYY-MM-DD
  240. :return:
  241. """
  242. # daily 品类账号抓取
  243. for category in category_list:
  244. account_list = self.get_account_list(category)
  245. self.crawler_each_category(account_list=account_list, category=category)
  246. # 账号联想账号轮询抓取
  247. association_account_list = self.get_association_account_list(date_str)
  248. self.crawler_each_category(account_list=association_account_list, category="association")
  249. def deal_accounts(self, account_list):
  250. """
  251. input account list
  252. :param account_list: 具体账号抓取,只抓一页
  253. :return:
  254. """
  255. account_tuple = tuple(account_list)
  256. sql = f"""
  257. SELECT gh_id, account_name, account_category, latest_update_time
  258. FROM long_articles_accounts
  259. WHERE account_name in {account_tuple};
  260. """
  261. response = self.db_client_lam.select(sql)
  262. for account in tqdm(response):
  263. try:
  264. gh_id = account[0]
  265. category = account[2]
  266. try:
  267. latest_timestamp = account[3].timestamp()
  268. except Exception as e:
  269. print(e)
  270. latest_timestamp = DEFAULT_TIMESTAMP
  271. self.update_each_account(
  272. gh_id=gh_id,
  273. category=category,
  274. latest_time_stamp=latest_timestamp
  275. )
  276. except Exception as e:
  277. print(e)