article_pool_cold_start.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. from __future__ import annotations
  2. import time
  3. import datetime
  4. import traceback
  5. from typing import List
  6. from pandas import DataFrame
  7. from tqdm.asyncio import tqdm
  8. from applications.api import task_apollo, feishu_robot
  9. from applications.api import auto_create_crawler_task
  10. from applications.api import auto_bind_crawler_task_to_generate_task
  11. from applications.config import category_config, input_source_map
  12. from applications.utils import get_titles_from_produce_plan
  13. from applications.tasks.cold_start_tasks.article_pool import (
  14. ArticlePoolColdStartStrategy,
  15. ArticlePoolFilterStrategy
  16. )
  17. class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrategy):
  18. def __init__(self, pool, log_client, trace_id):
  19. super().__init__(pool, log_client, trace_id)
  20. async def get_article_from_meta_table(
  21. self, platform: str, crawl_method: str, strategy: str, category: str | None
  22. ) -> DataFrame:
  23. """
  24. @param platform: 文章抓取平台
  25. @param crawl_method: 文章抓取模式
  26. @param strategy: 供给策略
  27. """
  28. match platform:
  29. case "weixin":
  30. article_list = await self.get_weixin_cold_start_articles(
  31. crawl_method, strategy, category
  32. )
  33. case "toutiao":
  34. article_list = await self.get_toutiao_cold_start_articles(
  35. crawl_method, strategy, category
  36. )
  37. case _:
  38. raise ValueError("Invalid platform")
  39. return DataFrame(
  40. article_list,
  41. columns=[
  42. "article_id",
  43. "title",
  44. "link",
  45. "llm_sensitivity",
  46. "score",
  47. "category_by_ai",
  48. ],
  49. )
  50. async def filter_published_titles(self, plan_id):
  51. """
  52. 过滤已添加至aigc中的标题
  53. """
  54. published_title_tuple = await get_titles_from_produce_plan(self.pool, plan_id)
  55. if not published_title_tuple:
  56. return 0
  57. update_query = """
  58. update crawler_meta_article set status = %s where title in %s and status = %s;
  59. """
  60. changed_rows = await self.pool.async_save(
  61. query=update_query,
  62. params=(self.PUBLISHED_STATUS, published_title_tuple, self.INIT_STATUS),
  63. )
  64. return changed_rows
  65. async def insert_crawler_plan_into_database(
  66. self, crawler_plan_id, crawler_plan_name, create_timestamp
  67. ):
  68. query = """
  69. insert into article_crawler_plan (crawler_plan_id, name, create_timestamp)
  70. values (%s, %s, %s)
  71. """
  72. try:
  73. await self.pool.async_save(
  74. query=query,
  75. params=(crawler_plan_id, crawler_plan_name, create_timestamp),
  76. )
  77. except Exception as e:
  78. await feishu_robot.bot(
  79. title="品类冷启任务,记录抓取计划id失败",
  80. detail={
  81. "error": str(e),
  82. "error_msg": traceback.format_exc(),
  83. "crawler_plan_id": crawler_plan_id,
  84. "crawler_plan_name": crawler_plan_name,
  85. },
  86. )
  87. async def change_article_status_while_publishing(self, article_id_list):
  88. """
  89. :param: article_id_list: 文章的唯一 id
  90. :return:
  91. """
  92. query = """
  93. update crawler_meta_article
  94. set status = %s
  95. where article_id in %s and status = %s;
  96. """
  97. affect_rows = await self.pool.async_save(
  98. query=query,
  99. params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS),
  100. )
  101. return affect_rows
  102. async def create_crawler_plan_and_bind_to_produce_plan(
  103. self,
  104. strategy: str,
  105. crawl_method: str,
  106. category: str,
  107. platform: str,
  108. url_list: List[str],
  109. plan_id: str,
  110. ):
  111. # create_crawler_plan
  112. crawler_plan_response = await auto_create_crawler_task(
  113. plan_id=None,
  114. plan_name=f"冷启动-{strategy}-{category}-{datetime.date.today().__str__()}-{len(url_list)}",
  115. plan_tag="品类冷启动",
  116. platform=platform,
  117. url_list=url_list,
  118. )
  119. # save to db
  120. create_timestamp = int(time.time()) * 1000
  121. crawler_plan_id = crawler_plan_response["data"]["id"]
  122. crawler_plan_name = crawler_plan_response["data"]["name"]
  123. await self.insert_crawler_plan_into_database(
  124. crawler_plan_id, crawler_plan_name, create_timestamp
  125. )
  126. # auto bind to generate plan
  127. new_crawler_task_list = [
  128. {
  129. "contentType": 1,
  130. "inputSourceType": 2,
  131. "inputSourceSubType": None,
  132. "fieldName": None,
  133. "inputSourceValue": crawler_plan_id,
  134. "inputSourceLabel": crawler_plan_name,
  135. "inputSourceModal": 3,
  136. "inputSourceChannel": input_source_map[platform],
  137. }
  138. ]
  139. generate_plan_response = await auto_bind_crawler_task_to_generate_task(
  140. crawler_task_list=new_crawler_task_list, generate_task_id=plan_id
  141. )
  142. await self.log_client.log(
  143. contents={
  144. "task": "article_pool_cold_start",
  145. "platform": platform,
  146. "crawl_method": crawl_method,
  147. "status": "success",
  148. "trace_id": self.trace_id,
  149. "message": "绑定至生成计划成功",
  150. "data": generate_plan_response,
  151. }
  152. )
  153. async def create_cold_start_plan(
  154. self,
  155. platform,
  156. plan_id,
  157. strategy="strategy_v1",
  158. category=None,
  159. crawl_method=None,
  160. ):
  161. # get article data_frame from meta article
  162. article_dataframe = await self.get_article_from_meta_table(
  163. platform, crawl_method, strategy, category
  164. )
  165. await self.log_client.log(
  166. contents={
  167. "task": "article_pool_cold_start",
  168. "platform": platform,
  169. "crawl_method": crawl_method,
  170. "status": "success",
  171. "trace_id": self.trace_id,
  172. "message": "获取文章成功",
  173. "data": {"article_length": article_dataframe.shape[0]},
  174. }
  175. )
  176. filter_article_df = await self.article_pool_filter(
  177. strategy, platform, article_dataframe, crawl_method, category
  178. )
  179. match strategy:
  180. case "strategy_v1":
  181. # split article into each category
  182. category_list = await task_apollo.get_config_value(key="category_list")
  183. for ai_category in category_list:
  184. filter_category_df = filter_article_df[
  185. filter_article_df["category_by_ai"] == ai_category
  186. ]
  187. url_list = filter_category_df["link"].values.tolist()
  188. if url_list:
  189. await self.create_crawler_plan_and_bind_to_produce_plan(
  190. strategy, crawl_method, ai_category, platform, url_list, plan_id
  191. )
  192. # change article status
  193. article_id_list = filter_category_df["article_id"].values.tolist()
  194. await self.change_article_status_while_publishing(
  195. article_id_list=article_id_list
  196. )
  197. case "strategy_v2":
  198. url_list = filter_article_df["link"].values.tolist()
  199. await self.create_crawler_plan_and_bind_to_produce_plan(
  200. strategy, crawl_method, category, platform, url_list, plan_id
  201. )
  202. # change article status
  203. article_id_list = filter_article_df["article_id"].values.tolist()
  204. await self.change_article_status_while_publishing(
  205. article_id_list=article_id_list
  206. )
  207. async def deal(
  208. self,
  209. platform: str,
  210. strategy="strategy_v1",
  211. crawl_methods=None,
  212. category_list=None,
  213. ) -> None:
  214. """execute cold start task in different strategy"""
  215. match strategy:
  216. case "strategy_v1":
  217. if not crawl_methods:
  218. crawl_methods = self.DEFAULT_CRAWLER_METHODS
  219. await self.log_client.log(
  220. contents={
  221. "task": "article_pool_cold_start",
  222. "platform": platform,
  223. "crawl_methods": crawl_methods,
  224. "status": "success",
  225. "trace_id": self.trace_id,
  226. }
  227. )
  228. crawl_methods_map = await task_apollo.get_config_value(
  229. key="category_cold_start_map"
  230. )
  231. for crawl_method in crawl_methods:
  232. try:
  233. plan_id = crawl_methods_map[crawl_method]
  234. affected_rows = await self.filter_published_titles(plan_id)
  235. await self.log_client.log(
  236. contents={
  237. "task": "article_pool_cold_start",
  238. "platform": platform,
  239. "crawl_method": crawl_method,
  240. "status": "success",
  241. "trace_id": self.trace_id,
  242. "message": "通过已抓取标题修改文章状态",
  243. "data": {"affected_rows": affected_rows},
  244. }
  245. )
  246. await self.create_cold_start_plan(
  247. platform=platform,
  248. plan_id=plan_id,
  249. crawl_method=crawl_method,
  250. )
  251. except Exception as e:
  252. await feishu_robot.bot(
  253. title="文章冷启动异常",
  254. detail={
  255. "crawl_method": crawl_method,
  256. "error": str(e),
  257. "function": "deal",
  258. "traceback": traceback.format_exc(),
  259. },
  260. )
  261. case "strategy_v2":
  262. if not category_list:
  263. category_list = list(category_config.keys())
  264. for category in tqdm(category_list):
  265. try:
  266. plan_id = category_config[category]
  267. affected_rows = await self.filter_published_titles(plan_id)
  268. await self.log_client.log(
  269. contents={
  270. "task": "article_pool_cold_start",
  271. "platform": platform,
  272. "category": category,
  273. "status": "success",
  274. "trace_id": self.trace_id,
  275. "message": "通过已抓取标题修改文章状态",
  276. "data": {"affected_rows": affected_rows},
  277. }
  278. )
  279. await self.create_cold_start_plan(
  280. platform=platform,
  281. strategy=strategy,
  282. plan_id=plan_id,
  283. category=category,
  284. )
  285. except Exception as e:
  286. await feishu_robot.bot(
  287. title="文章冷启动异常",
  288. detail={
  289. "category": category,
  290. "strategy": strategy,
  291. "error": str(e),
  292. "function": "deal",
  293. "traceback": traceback.format_exc(),
  294. },
  295. )
  296. if self.cold_start_records:
  297. columns = [
  298. feishu_robot.create_feishu_columns_sheet(
  299. sheet_type="plain_text",
  300. sheet_name="category",
  301. display_name="文章品类",
  302. ),
  303. feishu_robot.create_feishu_columns_sheet(
  304. sheet_type="number",
  305. sheet_name="cold_start_num",
  306. display_name="本次冷启数量",
  307. ),
  308. feishu_robot.create_feishu_columns_sheet(
  309. sheet_type="number",
  310. sheet_name="total_length",
  311. display_name="总文章剩余数量",
  312. ),
  313. feishu_robot.create_feishu_columns_sheet(
  314. sheet_type="number",
  315. sheet_name="filter_by_title_length",
  316. display_name="标题长度过滤",
  317. ),
  318. feishu_robot.create_feishu_columns_sheet(
  319. sheet_type="number",
  320. sheet_name="filter_by_sensitivity",
  321. display_name="敏感词过滤",
  322. ),
  323. feishu_robot.create_feishu_columns_sheet(
  324. sheet_type="number",
  325. sheet_name="filter_by_llm_sensitity",
  326. display_name="经过大模型判断敏感过滤",
  327. ),
  328. feishu_robot.create_feishu_columns_sheet(
  329. sheet_type="number",
  330. sheet_name="filter_by_score",
  331. display_name="经过相关性分过滤",
  332. ),
  333. feishu_robot.create_feishu_columns_sheet(
  334. sheet_type="number",
  335. sheet_name="read_avg_threshold",
  336. display_name="阅读均值倍数阈值",
  337. ),
  338. feishu_robot.create_feishu_columns_sheet(
  339. sheet_type="number",
  340. sheet_name="read_threshold",
  341. display_name="阅读量阈值",
  342. ),
  343. feishu_robot.create_feishu_columns_sheet(
  344. sheet_type="number",
  345. sheet_name="title_length_threshold",
  346. display_name="标题长度阈值",
  347. ),
  348. ]
  349. await feishu_robot.bot(
  350. title="长文文章路冷启动发布",
  351. detail={
  352. "columns": columns,
  353. "rows": self.cold_start_records,
  354. },
  355. table=True,
  356. mention=False,
  357. )
  358. case _:
  359. raise Exception(f"error strategy {strategy}")