article_pool_cold_start.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. from __future__ import annotations
  2. import time
  3. import datetime
  4. import traceback
  5. from typing import List, Dict
  6. from pandas import DataFrame
  7. from applications.api import task_apollo, feishu_robot
  8. from applications.api import auto_create_crawler_task
  9. from applications.api import auto_bind_crawler_task_to_generate_task
  10. from applications.utils import get_titles_from_produce_plan
  11. class ArticlePoolColdStartConst:
  12. # article
  13. DAILY_ARTICLE_NUM = 1000
  14. SIMILARITY_SCORE_THRESHOLD = 0.5
  15. TITLE_NOT_SENSITIVE = 0
  16. TITLE_SENSITIVE = 1
  17. PUBLISHED_STATUS = 2
  18. INIT_STATUS = 1
  19. BAD_STATUS = 0
  20. READ_TIMES_THRESHOLD = 1.3
  21. # READ_THRESHOLD = 5000
  22. READ_THRESHOLD = 1000
  23. TITLE_LENGTH_LIMIT = 15
  24. TITLE_LENGTH_MAX = 50
  25. DEFAULT_CRAWLER_METHODS = ["1030-手动挑号", "account_association"]
  26. class ArticlePoolColdStartStrategy(ArticlePoolColdStartConst):
  27. def __init__(self, pool, log_client, trace_id):
  28. self.pool = pool
  29. self.log_client = log_client
  30. self.trace_id = trace_id
  31. async def get_weixin_cold_start_articles(
  32. self, crawl_method: str, strategy: str
  33. ) -> List[Dict]:
  34. match strategy:
  35. case "strategy_v1":
  36. query = f"""
  37. select
  38. article_id, title, link, llm_sensitivity, score, category_by_ai
  39. from crawler_meta_article t1
  40. join crawler_meta_article_accounts_read_avg t2 on t1.out_account_id = t2.gh_id and t1.article_index = t2.position
  41. where category = %s
  42. and platform = %s
  43. and title_sensitivity = %s
  44. and t1.status = %s
  45. and t1.read_cnt / t2.read_avg >= %s
  46. and t1.read_cnt >= %s
  47. and t2.status = %s
  48. order by score desc;
  49. """
  50. article_list = await self.pool.async_fetch(
  51. query=query,
  52. params=(
  53. crawl_method,
  54. "weixin",
  55. self.TITLE_NOT_SENSITIVE,
  56. self.INIT_STATUS,
  57. self.READ_TIMES_THRESHOLD,
  58. self.READ_THRESHOLD,
  59. self.INIT_STATUS,
  60. ),
  61. )
  62. return article_list
  63. case _:
  64. raise ValueError("Invalid strategy")
  65. async def get_toutiao_cold_start_articles(
  66. self, crawl_method: str, strategy: str
  67. ) -> List[Dict]:
  68. match strategy:
  69. case "strategy_v1":
  70. query = f"""
  71. select article_id, title, link, llm_sensitivity, score, category_by_ai
  72. from crawler_meta_article
  73. where category = %s
  74. and platform = %s
  75. and status = %s;
  76. """
  77. article_list = await self.pool.async_fetch(
  78. query=query, params=(crawl_method, "toutiao", self.INIT_STATUS)
  79. )
  80. return article_list
  81. case _:
  82. raise ValueError("Invalid strategy")
  83. class ArticlePoolFilterStrategy(ArticlePoolColdStartConst):
  84. async def filter_weixin_articles(self, dataframe, crawl_method):
  85. """微信过滤漏斗"""
  86. total_length: int = dataframe.shape[0]
  87. # 通过标题长度过滤
  88. filter_df = dataframe[
  89. (dataframe["title"].str.len() <= self.TITLE_LENGTH_MAX)
  90. & (dataframe["title"].str.len() >= self.TITLE_LENGTH_LIMIT)
  91. ]
  92. length_level1 = filter_df.shape[0]
  93. # 通过敏感词过滤
  94. sensitive_keywords = [
  95. "农历",
  96. "太极",
  97. "节",
  98. "早上好",
  99. "赖清德",
  100. "普京",
  101. "俄",
  102. "南海",
  103. "台海",
  104. "解放军",
  105. "蔡英文",
  106. "中国",
  107. ]
  108. # 构建正则表达式,使用 | 连接表示“或”的关系
  109. pattern = "|".join(sensitive_keywords)
  110. filter_df = filter_df[~filter_df["title"].str.contains(pattern, na=False)]
  111. # 获取过滤后的行数
  112. length_level2 = filter_df.shape[0]
  113. filter_df = filter_df[~(filter_df["llm_sensitivity"] > 0)]
  114. length_level3 = filter_df.shape[0]
  115. # 第4层通过相关性分数过滤
  116. filter_df = filter_df[filter_df["score"] > self.SIMILARITY_SCORE_THRESHOLD]
  117. length_level4 = filter_df.shape[0]
  118. await feishu_robot.bot(
  119. title="冷启任务发布通知",
  120. detail={
  121. "总文章数量": total_length,
  122. "通过标题长度过滤": "过滤数量: {} 剩余数量: {}".format(
  123. total_length - length_level1, length_level1
  124. ),
  125. "通过敏感词过滤": "过滤数量: {} 剩余数量: {}".format(
  126. length_level1 - length_level2, length_level2
  127. ),
  128. "通过LLM敏感度过滤": "过滤数量: {} 剩余数量: {}".format(
  129. length_level2 - length_level3, length_level3
  130. ),
  131. "通过相关性分数过滤": "过滤数量: {} 剩余数量: {}".format(
  132. length_level3 - length_level4, length_level4
  133. ),
  134. "渠道": crawl_method,
  135. "阅读均值倍数阈值": self.READ_TIMES_THRESHOLD,
  136. "阅读量阈值": self.READ_THRESHOLD,
  137. "标题长度阈值": self.TITLE_LENGTH_LIMIT,
  138. },
  139. mention=False,
  140. )
  141. return filter_df[: self.DAILY_ARTICLE_NUM]
  142. async def filter_toutiao_articles(self, dataframe, crawl_method):
  143. total_length = dataframe.shape[0]
  144. filter_df = dataframe[dataframe["score"] > self.SIMILARITY_SCORE_THRESHOLD]
  145. await feishu_robot.bot(
  146. title="冷启动创建抓取计划",
  147. detail={
  148. "渠道": crawl_method,
  149. "总文章数量": total_length,
  150. "相关性分数过滤剩余": filter_df.shape[0],
  151. },
  152. mention=False,
  153. )
  154. return filter_df[: self.DAILY_ARTICLE_NUM]
  155. class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrategy):
  156. def __init__(self, pool, log_client, trace_id):
  157. super().__init__(pool, log_client, trace_id)
  158. async def get_article_from_meta_table(
  159. self, platform: str, crawl_method: str, strategy: str
  160. ) -> DataFrame:
  161. """
  162. @param platform: 文章抓取平台
  163. @param crawl_method: 文章抓取模式
  164. @param strategy: 供给策略
  165. """
  166. match platform:
  167. case "weixin":
  168. article_list = await self.get_weixin_cold_start_articles(
  169. crawl_method, strategy
  170. )
  171. case "toutiao":
  172. article_list = await self.get_toutiao_cold_start_articles(
  173. crawl_method, strategy
  174. )
  175. case _:
  176. raise ValueError("Invalid platform")
  177. return DataFrame(
  178. article_list,
  179. columns=[
  180. "article_id",
  181. "title",
  182. "link",
  183. "llm_sensitivity",
  184. "score",
  185. "category_by_ai",
  186. ],
  187. )
  188. async def filter_published_titles(self, plan_id):
  189. """
  190. 过滤已添加至aigc中的标题
  191. """
  192. published_title_tuple = await get_titles_from_produce_plan(self.pool, plan_id)
  193. update_query = f"""
  194. update crawler_meta_article set status = %s where title in %s and status = %s;
  195. """
  196. changed_rows = await self.pool.async_save(
  197. query=update_query,
  198. params=(self.PUBLISHED_STATUS, published_title_tuple, self.INIT_STATUS),
  199. )
  200. return changed_rows
  201. async def insert_crawler_plan_into_database(
  202. self, crawler_plan_id, crawler_plan_name, create_timestamp
  203. ):
  204. query = f"""
  205. insert into article_crawler_plan (crawler_plan_id, name, create_timestamp)
  206. values (%s, %s, %s)
  207. """
  208. try:
  209. await self.pool.async_save(
  210. query=query,
  211. params=(crawler_plan_id, crawler_plan_name, create_timestamp),
  212. )
  213. except Exception as e:
  214. await feishu_robot.bot(
  215. title="品类冷启任务,记录抓取计划id失败",
  216. detail={
  217. "error": str(e),
  218. "error_msg": traceback.format_exc(),
  219. "crawler_plan_id": crawler_plan_id,
  220. "crawler_plan_name": crawler_plan_name,
  221. },
  222. )
  223. async def change_article_status_while_publishing(self, article_id_list):
  224. """
  225. :param: article_id_list: 文章的唯一 id
  226. :return:
  227. """
  228. query = f"""
  229. update crawler_meta_article
  230. set status = %s
  231. where article_id in %s and status = %s;
  232. """
  233. affect_rows = await self.pool.async_save(
  234. query=query,
  235. params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS),
  236. )
  237. return affect_rows
  238. async def create_cold_start_plan(
  239. self, platform, crawl_method, plan_id, strategy="strategy_v1"
  240. ):
  241. article_dataframe = await self.get_article_from_meta_table(
  242. platform, crawl_method, strategy
  243. )
  244. await self.log_client.log(
  245. contents={
  246. "task": "article_pool_cold_start",
  247. "platform": platform,
  248. "crawl_method": crawl_method,
  249. "status": "success",
  250. "trace_id": self.trace_id,
  251. "message": "获取文章成功",
  252. "data": {"article_length": article_dataframe.shape[0]},
  253. }
  254. )
  255. match platform:
  256. case "weixin":
  257. input_source_channel = 5
  258. filter_article_df = await self.filter_weixin_articles(
  259. article_dataframe, crawl_method
  260. )
  261. case "toutiao":
  262. input_source_channel = 6
  263. filter_article_df = await self.filter_toutiao_articles(
  264. article_dataframe, crawl_method
  265. )
  266. case _:
  267. raise ValueError("Invalid platform")
  268. # split article into each category
  269. category_list = await task_apollo.get_config_value(key="category_list")
  270. for ai_category in category_list:
  271. filter_category_df = filter_article_df[
  272. filter_article_df["category_by_ai"] == ai_category
  273. ]
  274. url_list = filter_category_df["link"].values.tolist()
  275. if url_list:
  276. # create_crawler_plan
  277. crawler_plan_response = await auto_create_crawler_task(
  278. plan_id=None,
  279. plan_name="自动绑定strategyV2-{}-{}-{}--{}".format(
  280. crawl_method,
  281. ai_category,
  282. datetime.date.today().__str__(),
  283. len(url_list),
  284. ),
  285. plan_tag="品类冷启动",
  286. platform=platform,
  287. url_list=url_list,
  288. )
  289. # save to db
  290. create_timestamp = int(time.time()) * 1000
  291. crawler_plan_id = crawler_plan_response["data"]["id"]
  292. crawler_plan_name = crawler_plan_response["data"]["name"]
  293. await self.insert_crawler_plan_into_database(
  294. crawler_plan_id, crawler_plan_name, create_timestamp
  295. )
  296. # auto bind to generate plan
  297. new_crawler_task_list = [
  298. {
  299. "contentType": 1,
  300. "inputSourceType": 2,
  301. "inputSourceSubType": None,
  302. "fieldName": None,
  303. "inputSourceValue": crawler_plan_id,
  304. "inputSourceLabel": crawler_plan_name,
  305. "inputSourceModal": 3,
  306. "inputSourceChannel": input_source_channel,
  307. }
  308. ]
  309. generate_plan_response = await auto_bind_crawler_task_to_generate_task(
  310. crawler_task_list=new_crawler_task_list, generate_task_id=plan_id
  311. )
  312. await self.log_client.log(
  313. contents={
  314. "task": "article_pool_cold_start",
  315. "platform": platform,
  316. "crawl_method": crawl_method,
  317. "status": "success",
  318. "trace_id": self.trace_id,
  319. "message": "绑定至生成计划成功",
  320. "data": generate_plan_response,
  321. }
  322. )
  323. # change article status
  324. article_id_list = filter_category_df["article_id"].values.tolist()
  325. await self.change_article_status_while_publishing(
  326. article_id_list=article_id_list
  327. )
  328. async def deal(self, platform: str, crawl_methods: List[str]) -> None:
  329. if not crawl_methods:
  330. crawl_methods = self.DEFAULT_CRAWLER_METHODS
  331. await self.log_client.log(
  332. contents={
  333. "task": "article_pool_cold_start",
  334. "platform": platform,
  335. "crawl_methods": crawl_methods,
  336. "status": "success",
  337. "trace_id": self.trace_id,
  338. }
  339. )
  340. crawl_methods_map = await task_apollo.get_config_value(
  341. key="category_cold_start_map"
  342. )
  343. for crawl_method in crawl_methods:
  344. try:
  345. plan_id = crawl_methods_map[crawl_method]
  346. affected_rows = await self.filter_published_titles(plan_id)
  347. await self.log_client.log(
  348. contents={
  349. "task": "article_pool_cold_start",
  350. "platform": platform,
  351. "crawl_method": crawl_method,
  352. "status": "success",
  353. "trace_id": self.trace_id,
  354. "message": "通过已抓取标题修改文章状态",
  355. "data": {"affected_rows": affected_rows},
  356. }
  357. )
  358. await self.create_cold_start_plan(platform, crawl_method, plan_id)
  359. except Exception as e:
  360. await feishu_robot.bot(
  361. title="文章冷启动异常",
  362. detail={
  363. "crawl_method": crawl_method,
  364. "error": str(e),
  365. "function": "deal",
  366. "traceback": traceback.format_exc(),
  367. },
  368. )