article_pool_cold_start.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. from __future__ import annotations
  2. import time
  3. import datetime
  4. import traceback
  5. from typing import List, Dict
  6. from pandas import DataFrame
  7. from applications.api import task_apollo, feishu_robot
  8. from applications.api import auto_create_crawler_task
  9. from applications.api import auto_bind_crawler_task_to_generate_task
  10. from applications.utils import get_titles_from_produce_plan
  11. class ArticlePoolColdStartConst:
  12. # article
  13. DAILY_ARTICLE_NUM = 1000
  14. SIMILARITY_SCORE_THRESHOLD = 0.5
  15. TITLE_NOT_SENSITIVE = 0
  16. TITLE_SENSITIVE = 1
  17. PUBLISHED_STATUS = 2
  18. INIT_STATUS = 1
  19. BAD_STATUS = 0
  20. READ_TIMES_THRESHOLD = 1.3
  21. READ_THRESHOLD = 5000
  22. TITLE_LENGTH_LIMIT = 15
  23. TITLE_LENGTH_MAX = 50
  24. DEFAULT_CRAWLER_METHODS = ["1030-手动挑号", "account_association"]
  25. class ArticlePoolColdStart(ArticlePoolColdStartConst):
  26. def __init__(self, pool, log_client, trace_id):
  27. self.pool = pool
  28. self.log_client = log_client
  29. self.trace_id = trace_id
  30. async def get_article_from_meta_table(
  31. self, platform: str, crawl_method: str, strategy: str
  32. ) -> DataFrame:
  33. """
  34. @param platform: 文章抓取平台
  35. @param crawl_method: 文章抓取模式
  36. @param strategy: 供给策略
  37. """
  38. match platform:
  39. case "weixin":
  40. article_list = await self.get_weixin_cold_start_articles(
  41. crawl_method, strategy
  42. )
  43. case "toutiao":
  44. article_list = await self.get_toutiao_cold_start_articles(
  45. crawl_method, strategy
  46. )
  47. case _:
  48. raise ValueError("Invalid platform")
  49. return DataFrame(
  50. article_list,
  51. columns=[
  52. "article_id",
  53. "title",
  54. "link",
  55. "llm_sensitivity",
  56. "score",
  57. "category_by_ai",
  58. ],
  59. )
  60. async def get_weixin_cold_start_articles(
  61. self, crawl_method: str, strategy: str
  62. ) -> List[Dict]:
  63. match strategy:
  64. case "strategy_v1":
  65. query = f"""
  66. select
  67. article_id, title, link, llm_sensitivity, score, category_by_ai
  68. from crawler_meta_article t1
  69. join crawler_meta_article_accounts_read_avg t2 on t1.out_account_id = t2.gh_id and t1.article_index = t2.position
  70. where category = %s
  71. and platform = %s
  72. and title_sensitivity = %s
  73. and t1.status = %s
  74. and t1.read_cnt / t2.read_avg >= %s
  75. and t1.read_cnt >= %s
  76. and t2.status = %s
  77. order by score desc;
  78. """
  79. article_list = await self.pool.async_fetch(
  80. query=query,
  81. params=(
  82. crawl_method,
  83. "weixin",
  84. self.TITLE_NOT_SENSITIVE,
  85. self.INIT_STATUS,
  86. self.READ_TIMES_THRESHOLD,
  87. self.READ_THRESHOLD,
  88. self.INIT_STATUS,
  89. ),
  90. )
  91. return article_list
  92. case _:
  93. raise ValueError("Invalid strategy")
  94. async def get_toutiao_cold_start_articles(
  95. self, crawl_method: str, strategy: str
  96. ) -> List[Dict]:
  97. match strategy:
  98. case "strategy_v1":
  99. query = f"""
  100. select article_id, title, link, llm_sensitivity, score, category_by_ai
  101. from crawler_meta_article
  102. where category = %s
  103. and platform = %s
  104. and status = %s;
  105. """
  106. article_list = await self.pool.async_fetch(
  107. query=query, params=(crawl_method, "toutiao", self.INIT_STATUS)
  108. )
  109. return article_list
  110. case _:
  111. raise ValueError("Invalid strategy")
  112. async def filter_published_titles(self, plan_id):
  113. """
  114. 过滤已添加至aigc中的标题
  115. """
  116. published_title_tuple = await get_titles_from_produce_plan(self.pool, plan_id)
  117. update_query = f"""
  118. update crawler_meta_article set status = %s where title in %s and status = %s;
  119. """
  120. changed_rows = await self.pool.async_save(
  121. query=update_query,
  122. params=(self.PUBLISHED_STATUS, published_title_tuple, self.INIT_STATUS),
  123. )
  124. return changed_rows
  125. async def filter_weixin_articles(self, dataframe, crawl_method):
  126. """微信过滤漏斗"""
  127. total_length: int = dataframe.shape[0]
  128. # 通过标题长度过滤
  129. filter_df = dataframe[
  130. (dataframe["title"].str.len() <= self.TITLE_LENGTH_MAX)
  131. & (dataframe["title"].str.len() >= self.TITLE_LENGTH_LIMIT)
  132. ]
  133. length_level1 = filter_df.shape[0]
  134. # 通过敏感词过滤
  135. sensitive_keywords = [
  136. "农历",
  137. "太极",
  138. "节",
  139. "早上好",
  140. "赖清德",
  141. "普京",
  142. "俄",
  143. "南海",
  144. "台海",
  145. "解放军",
  146. "蔡英文",
  147. "中国",
  148. ]
  149. # 构建正则表达式,使用 | 连接表示“或”的关系
  150. pattern = "|".join(sensitive_keywords)
  151. filter_df = filter_df[~filter_df["title"].str.contains(pattern, na=False)]
  152. # 获取过滤后的行数
  153. length_level2 = filter_df.shape[0]
  154. filter_df = filter_df[~(filter_df["llm_sensitivity"] > 0)]
  155. length_level3 = filter_df.shape[0]
  156. # 第4层通过相关性分数过滤
  157. filter_df = filter_df[filter_df["score"] > self.SIMILARITY_SCORE_THRESHOLD]
  158. length_level4 = filter_df.shape[0]
  159. await feishu_robot.bot(
  160. title="冷启任务发布通知",
  161. detail={
  162. "总文章数量": total_length,
  163. "通过标题长度过滤": "过滤数量: {} 剩余数量: {}".format(
  164. total_length - length_level1, length_level1
  165. ),
  166. "通过敏感词过滤": "过滤数量: {} 剩余数量: {}".format(
  167. length_level1 - length_level2, length_level2
  168. ),
  169. "通过LLM敏感度过滤": "过滤数量: {} 剩余数量: {}".format(
  170. length_level2 - length_level3, length_level3
  171. ),
  172. "通过相关性分数过滤": "过滤数量: {} 剩余数量: {}".format(
  173. length_level3 - length_level4, length_level4
  174. ),
  175. "渠道": crawl_method,
  176. "阅读均值倍数阈值": self.READ_TIMES_THRESHOLD,
  177. "阅读量阈值": self.READ_THRESHOLD,
  178. "标题长度阈值": self.TITLE_LENGTH_LIMIT,
  179. },
  180. mention=False,
  181. )
  182. return filter_df[: self.DAILY_ARTICLE_NUM]
  183. async def filter_toutiao_articles(self, dataframe, crawl_method):
  184. total_length = dataframe.shape[0]
  185. filter_df = dataframe[dataframe["score"] > self.SIMILARITY_SCORE_THRESHOLD]
  186. await feishu_robot.bot(
  187. title="冷启动创建抓取计划",
  188. detail={
  189. "渠道": crawl_method,
  190. "总文章数量": total_length,
  191. "相关性分数过滤": filter_df.shape[0],
  192. },
  193. mention=False,
  194. )
  195. return filter_df[: self.DAILY_ARTICLE_NUM]
  196. async def insert_crawler_plan_into_database(
  197. self, crawler_plan_id, crawler_plan_name, create_timestamp
  198. ):
  199. query = f"""
  200. insert into article_crawler_plan (crawler_plan_id, name, create_timestamp)
  201. values (%s, %s, %s)
  202. """
  203. try:
  204. await self.pool.async_save(
  205. query=query,
  206. params=(crawler_plan_id, crawler_plan_name, create_timestamp),
  207. )
  208. except Exception as e:
  209. await feishu_robot.bot(
  210. title="品类冷启任务,记录抓取计划id失败",
  211. detail={
  212. "error": str(e),
  213. "error_msg": traceback.format_exc(),
  214. "crawler_plan_id": crawler_plan_id,
  215. "crawler_plan_name": crawler_plan_name,
  216. },
  217. )
  218. async def change_article_status_while_publishing(self, article_id_list):
  219. """
  220. :param: article_id_list: 文章的唯一 id
  221. :return:
  222. """
  223. query = f"""
  224. update crawler_meta_article
  225. set status = %s
  226. where article_id in %s and status = %s;
  227. """
  228. affect_rows = await self.pool.async_save(
  229. query=query,
  230. params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS),
  231. )
  232. return affect_rows
  233. async def create_cold_start_plan(
  234. self, platform, crawl_method, plan_id, strategy="strategy_v1"
  235. ):
  236. article_dataframe = await self.get_article_from_meta_table(
  237. platform, crawl_method, strategy
  238. )
  239. await self.log_client.log(
  240. contents={
  241. "task": "article_pool_cold_start",
  242. "platform": platform,
  243. "crawl_method": crawl_method,
  244. "status": "success",
  245. "trace_id": self.trace_id,
  246. "message": "获取文章成功",
  247. "data": {"article_length": article_dataframe.shape[0]},
  248. }
  249. )
  250. match platform:
  251. case "weixin":
  252. input_source_channel = 5
  253. filter_article_df = await self.filter_weixin_articles(
  254. article_dataframe, crawl_method
  255. )
  256. case "toutiao":
  257. input_source_channel = 6
  258. filter_article_df = await self.filter_toutiao_articles(
  259. article_dataframe, crawl_method
  260. )
  261. case _:
  262. raise ValueError("Invalid platform")
  263. # split article into each category
  264. category_list = await task_apollo.get_config_value(key="category_list")
  265. for ai_category in category_list:
  266. filter_category_df = filter_article_df[
  267. filter_article_df["category_by_ai"] == ai_category
  268. ]
  269. url_list = filter_category_df["link"].values.tolist()
  270. if url_list:
  271. # create_crawler_plan
  272. crawler_plan_response = await auto_create_crawler_task(
  273. plan_id=None,
  274. plan_name="自动绑定-{}-{}-{}--{}".format(
  275. crawl_method,
  276. ai_category,
  277. datetime.date.today().__str__(),
  278. len(url_list),
  279. ),
  280. plan_tag="品类冷启动",
  281. platform=platform,
  282. url_list=url_list,
  283. )
  284. # save to db
  285. create_timestamp = int(time.time()) * 1000
  286. crawler_plan_id = crawler_plan_response["data"]["id"]
  287. crawler_plan_name = crawler_plan_response["data"]["name"]
  288. await self.insert_crawler_plan_into_database(
  289. crawler_plan_id, crawler_plan_name, create_timestamp
  290. )
  291. # auto bind to generate plan
  292. new_crawler_task_list = [
  293. {
  294. "contentType": 1,
  295. "inputSourceType": 2,
  296. "inputSourceSubType": None,
  297. "fieldName": None,
  298. "inputSourceValue": crawler_plan_id,
  299. "inputSourceLabel": crawler_plan_name,
  300. "inputSourceModal": 3,
  301. "inputSourceChannel": input_source_channel,
  302. }
  303. ]
  304. generate_plan_response = await auto_bind_crawler_task_to_generate_task(
  305. crawler_task_list=new_crawler_task_list, generate_task_id=plan_id
  306. )
  307. await self.log_client.log(
  308. contents={
  309. "task": "article_pool_cold_start",
  310. "platform": platform,
  311. "crawl_method": crawl_method,
  312. "status": "success",
  313. "trace_id": self.trace_id,
  314. "message": "绑定至生成计划成功",
  315. "data": generate_plan_response,
  316. }
  317. )
  318. # change article status
  319. article_id_list = filter_category_df["article_id"].values.tolist()
  320. await self.change_article_status_while_publishing(
  321. article_id_list=article_id_list
  322. )
  323. async def deal(self, platform: str, crawl_methods: List[str]) -> None:
  324. if not crawl_methods:
  325. crawl_methods = self.DEFAULT_CRAWLER_METHODS
  326. await self.log_client.log(
  327. contents={
  328. "task": "article_pool_cold_start",
  329. "platform": platform,
  330. "crawl_methods": crawl_methods,
  331. "status": "success",
  332. "trace_id": self.trace_id,
  333. }
  334. )
  335. crawl_methods_map = await task_apollo.get_config_value(
  336. key="category_cold_start_map"
  337. )
  338. for crawl_method in crawl_methods:
  339. try:
  340. plan_id = crawl_methods_map[crawl_method]
  341. affected_rows = await self.filter_published_titles(plan_id)
  342. await self.log_client.log(
  343. contents={
  344. "task": "article_pool_cold_start",
  345. "platform": platform,
  346. "crawl_method": crawl_method,
  347. "status": "success",
  348. "trace_id": self.trace_id,
  349. "message": "通过已抓取标题修改文章状态",
  350. "data": {"affected_rows": affected_rows},
  351. }
  352. )
  353. await self.create_cold_start_plan(platform, crawl_method, plan_id)
  354. except Exception as e:
  355. await feishu_robot.bot(
  356. title="文章冷启动异常",
  357. detail={
  358. "crawl_method": crawl_method,
  359. "error": str(e),
  360. "function": "deal",
  361. "traceback": traceback.format_exc(),
  362. },
  363. )