publishCategoryArticles.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. """
  2. @author: luojunhui
  3. 品类文章发布到aigc系统的冷启层
  4. """
  5. import datetime
  6. import json
  7. import time
  8. import traceback
  9. from pandas import DataFrame
  10. from applications import aiditApi, log, bot, llm_sensitivity
  11. from config import apolloConfig
  12. apollo = apolloConfig()
  13. DAILY_CRAWLER_MAX_NUM = 1000
  14. SIMILARITY_MIN_SCORE = 0.4
  15. TITLE_NOT_SENSITIVE = 0
  16. class CategoryColdStartTask(object):
  17. """
  18. 品类冷启动发布任务
  19. """
  20. PUBLISHED_STATUS = 2
  21. INIT_STATUS = 1
  22. BAD_STATUS = 0
  23. def __init__(self, db_client):
  24. """
  25. :param db_client:
  26. """
  27. self.db_client = db_client
  28. self.category_map = json.loads(apollo.getConfigValue("category_cold_start_map"))
  29. self.category_cold_start_threshold = json.loads(apollo.getConfigValue("category_cold_start_threshold"))
  30. self.READ_THRESHOLD = self.category_cold_start_threshold.get("READ_THRESHOLD", 5000)
  31. self.READ_TIMES_THRESHOLD = self.category_cold_start_threshold.get("READ_TIMES_THRESHOLD", 1.3)
  32. self.LIMIT_TITLE_LENGTH = self.category_cold_start_threshold.get("LIMIT_TITLE_LENGTH", 15)
  33. self.TITLE_LENGTH_MAX = self.category_cold_start_threshold.get("TITLE_LENGTH_MAX", 50)
  34. log(
  35. task="category_publish_task",
  36. function="__init__",
  37. message="数据库初始化连接完成,apollo配置获取完成",
  38. data={
  39. "category": self.category_map,
  40. "threshold": self.category_cold_start_threshold
  41. }
  42. )
  43. def insert_into_db(self, crawler_plan_id, crawler_plan_name, create_timestamp):
  44. """
  45. 插入抓取计划到数据库中
  46. :param create_timestamp:
  47. :param crawler_plan_id:
  48. :param crawler_plan_name:
  49. :return:
  50. """
  51. insert_sql = f"""
  52. INSERT INTO article_crawler_plan
  53. (crawler_plan_id, name, create_timestamp)
  54. values
  55. (%s, %s, %s)
  56. """
  57. try:
  58. self.db_client.update(
  59. sql=insert_sql,
  60. params=(crawler_plan_id, crawler_plan_name, create_timestamp)
  61. )
  62. except Exception as e:
  63. bot(
  64. title="品类冷启任务,记录抓取计划id失败",
  65. detail={
  66. "error": str(e),
  67. "error_msg": traceback.format_exc(),
  68. "crawler_plan_id": crawler_plan_id,
  69. "crawler_plan_name": crawler_plan_name
  70. }
  71. )
  72. def get_articles_from_meta_table(self, category, article_source):
  73. """
  74. 从长文 meta 库中获取冷启文章
  75. :return:
  76. """
  77. sql = f"""
  78. SELECT
  79. article_id, out_account_id, article_index, title, link, read_cnt, status, llm_sensitivity, score
  80. FROM
  81. crawler_meta_article
  82. WHERE
  83. category = "{category}" and platform = "{article_source}" and title_sensitivity = {TITLE_NOT_SENSITIVE}
  84. ORDER BY score DESC;
  85. """
  86. article_list = self.db_client.select(sql)
  87. log(
  88. task="category_publish_task",
  89. function="get_articles_from_meta_table",
  90. message="获取品类文章总数",
  91. data={
  92. "total_articles": len(article_list),
  93. "category": category
  94. }
  95. )
  96. article_df = DataFrame(article_list,
  97. columns=['article_id', 'gh_id', 'position', 'title', 'link', 'read_cnt', 'status',
  98. 'llm_sensitivity', 'score'])
  99. return article_df
  100. def filter_each_category(self, category):
  101. """
  102. 过滤单个生成计划类别的文章
  103. """
  104. plan_id = self.category_map.get(category)
  105. if plan_id:
  106. print(category, "getting title list")
  107. article_list = aiditApi.get_generated_article_list(plan_id)
  108. print("success")
  109. title_list = [i[1] for i in article_list]
  110. if title_list:
  111. # update
  112. update_sql = f"""
  113. UPDATE
  114. crawler_meta_article
  115. SET
  116. status = %s
  117. WHERE
  118. title in %s and status = %s;
  119. """
  120. affected_rows = self.db_client.update(
  121. sql=update_sql,
  122. params=(self.PUBLISHED_STATUS, tuple(title_list), self.INIT_STATUS)
  123. )
  124. print(affected_rows)
  125. else:
  126. print("未获取到计划id")
  127. return
  128. def published_articles_title_filter(self):
  129. """
  130. 已经发布到生成计划中的 id,
  131. :return:
  132. """
  133. category_list = list(self.category_map.keys())
  134. for category in category_list:
  135. try:
  136. self.filter_each_category(category)
  137. except Exception as e:
  138. log(
  139. task="category_publish_task",
  140. function="published_articles_title_filter",
  141. message="过滤已发布文章失败",
  142. data={
  143. "error": str(e),
  144. "error_msg": traceback.format_exc(),
  145. "category": category
  146. }
  147. )
  148. def change_article_status_while_publishing(self, article_id_list):
  149. """
  150. :param: article_id_list: 文章的唯一 id
  151. :return:
  152. """
  153. update_sql = f"""
  154. UPDATE
  155. crawler_meta_article
  156. SET
  157. status = %s
  158. WHERE
  159. article_id in %s and status = %s;
  160. """
  161. affect_rows = self.db_client.update(
  162. sql=update_sql,
  163. params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS)
  164. )
  165. if affect_rows != len(article_id_list):
  166. bot(
  167. title="品类冷启任务中,出现更新状文章状态失败异常",
  168. detail={
  169. "affected_rows": affect_rows,
  170. "task_rows": len(article_id_list)
  171. }
  172. )
  173. def filter_weixin_articles(self, articles_df, category):
  174. """
  175. 微信抓取文章过滤漏斗
  176. """
  177. articles_df['average_read'] = articles_df.groupby(['gh_id', 'position'])['read_cnt'].transform('mean')
  178. articles_df['read_times'] = articles_df['read_cnt'] / articles_df['average_read']
  179. total_length = articles_df.shape[0]
  180. # 第0层过滤已经发布的文章
  181. filter_df = articles_df[articles_df['status'] == self.INIT_STATUS]
  182. length_level0 = filter_df.shape[0]
  183. # 第一层漏斗通过阅读均值倍数过滤
  184. filter_df = filter_df[filter_df['read_times'] >= self.READ_TIMES_THRESHOLD]
  185. length_level1 = filter_df.shape[0]
  186. # 第二层漏斗通过阅读量过滤
  187. filter_df = filter_df[
  188. filter_df['read_cnt'] >= self.READ_THRESHOLD
  189. ]
  190. length_level2 = filter_df.shape[0]
  191. # 第三层漏斗通过标题长度过滤
  192. filter_df = filter_df[
  193. (filter_df['title'].str.len() >= self.LIMIT_TITLE_LENGTH)
  194. & (filter_df['title'].str.len() <= self.TITLE_LENGTH_MAX)
  195. ]
  196. length_level3 = filter_df.shape[0]
  197. # 第四层通过敏感词过滤
  198. filter_df = filter_df[
  199. (~filter_df['title'].str.contains('农历'))
  200. & (~filter_df['title'].str.contains('太极'))
  201. & (~filter_df['title'].str.contains('节'))
  202. & (~filter_df['title'].str.contains('早上好'))
  203. & (~filter_df['title'].str.contains('赖清德'))
  204. & (~filter_df['title'].str.contains('普京'))
  205. & (~filter_df['title'].str.contains('俄'))
  206. & (~filter_df['title'].str.contains('南海'))
  207. & (~filter_df['title'].str.contains('台海'))
  208. & (~filter_df['title'].str.contains('解放军'))
  209. & (~filter_df['title'].str.contains('蔡英文'))
  210. & (~filter_df['title'].str.contains('中国'))
  211. ]
  212. length_level4 = filter_df.shape[0]
  213. # 第五层通过LLM敏感度过滤
  214. filter_df = filter_df[
  215. ~(filter_df['llm_sensitivity'] > 0)
  216. ]
  217. length_level5 = filter_df.shape[0]
  218. # 第六层通过相关性分数过滤
  219. filter_df = filter_df[filter_df['score'] > SIMILARITY_MIN_SCORE]
  220. length_level6 = filter_df.shape[0]
  221. log(
  222. task="category_publish_task",
  223. function="publish_filter_articles",
  224. message="过滤后文章总数",
  225. data={
  226. "total_articles": length_level5,
  227. "category": category
  228. }
  229. )
  230. bot(
  231. title="冷启任务发布通知",
  232. detail={
  233. "总文章数量": total_length,
  234. "通过已经发布状态过滤": "过滤数量: {} 剩余数量: {}".format(
  235. total_length - length_level0, length_level0),
  236. "通过阅读均值倍数过滤": "过滤数量: {} 剩余数量: {}".format(
  237. length_level0 - length_level1, length_level1),
  238. "通过阅读量过滤": "过滤数量: {} 剩余数量: {}".format(
  239. length_level1 - length_level2, length_level2),
  240. "通过标题长度过滤": "过滤数量: {} 剩余数量: {}".format(
  241. length_level2 - length_level3, length_level3),
  242. "通过敏感词过滤": "过滤数量: {} 剩余数量: {}".format(
  243. length_level3 - length_level4, length_level4),
  244. "通过LLM敏感度过滤": "过滤数量: {} 剩余数量: {}".format(
  245. length_level4 - length_level5, length_level5
  246. ),
  247. "通过相关性分数过滤": "过滤数量: {} 剩余数量: {}".format(
  248. length_level5 - length_level6, length_level6
  249. ),
  250. "品类": category,
  251. "阅读均值倍数阈值": self.READ_TIMES_THRESHOLD,
  252. "阅读量阈值": self.READ_THRESHOLD,
  253. "标题长度阈值": self.LIMIT_TITLE_LENGTH
  254. },
  255. mention=False
  256. )
  257. return filter_df[:DAILY_CRAWLER_MAX_NUM]
  258. def filter_toutiao_articles(self, articles_df, category):
  259. """
  260. 头条文章过滤漏斗
  261. """
  262. total_length = articles_df.shape[0]
  263. # 第一层漏斗通过状态过滤
  264. zero_level_funnel_df = articles_df[articles_df['status'] == self.INIT_STATUS]
  265. zero_level_funnel_length = zero_level_funnel_df.shape[0]
  266. bot(
  267. title="账号冷启动---头条推荐流发布",
  268. detail={
  269. "category": category,
  270. "总文章数量": total_length,
  271. "通过已经发布状态过滤": "过滤数量: {} 剩余数量: {}".format(total_length - zero_level_funnel_length,
  272. zero_level_funnel_length),
  273. },
  274. mention=False
  275. )
  276. return zero_level_funnel_df
  277. def update_article_sensitive_status(self, article_id, status):
  278. """
  279. 更新文章敏感状态
  280. :return:
  281. """
  282. update_sql = f"""
  283. update crawler_meta_article
  284. set llm_sensitivity = %s
  285. where article_id = %s;
  286. """
  287. self.db_client.update(sql=update_sql, params=(status, article_id))
  288. def publish_filter_articles(self, category, articles_df, article_source):
  289. """
  290. 过滤文章
  291. :param category: 文章品类
  292. :param articles_df: 该品类下的文章data_frame
  293. :param article_source: 文章来源
  294. :return:
  295. """
  296. match article_source:
  297. case "weixin":
  298. filtered_articles_df = self.filter_weixin_articles(articles_df, category)
  299. input_source_channel = 5
  300. case "toutiao":
  301. filtered_articles_df = self.filter_toutiao_articles(articles_df, category)
  302. input_source_channel = 6
  303. case _:
  304. return
  305. success_titles = filtered_articles_df['title'].values.tolist()
  306. article_id_list = filtered_articles_df['article_id'].values.tolist()
  307. if success_titles:
  308. try:
  309. sensitive_results = llm_sensitivity.check_titles(success_titles)
  310. for article_id, sensitive_result in zip(article_id_list, sensitive_results):
  311. self.update_article_sensitive_status(
  312. article_id=article_id,
  313. status=sensitive_result['hit_rule']
  314. )
  315. if sensitive_result['hit_rule'] > TITLE_NOT_SENSITIVE:
  316. filtered_articles_df = filtered_articles_df[filtered_articles_df['article_id'] != article_id]
  317. except Exception as e:
  318. print("failed to update sensitive status: {}".format(e))
  319. url_list = filtered_articles_df['link'].values.tolist()
  320. if url_list:
  321. # create_crawler_plan
  322. crawler_plan_response = aiditApi.auto_create_crawler_task(
  323. plan_id=None,
  324. plan_name="自动绑定-{}--{}--{}".format(category, datetime.date.today().__str__(), len(url_list)),
  325. plan_tag="品类冷启动",
  326. article_source=article_source,
  327. url_list=url_list
  328. )
  329. log(
  330. task="category_publish_task",
  331. function="publish_filter_articles",
  332. message="成功创建抓取计划",
  333. data=crawler_plan_response
  334. )
  335. # save to db
  336. create_timestamp = int(time.time()) * 1000
  337. crawler_plan_id = crawler_plan_response['data']['id']
  338. crawler_plan_name = crawler_plan_response['data']['name']
  339. self.insert_into_db(crawler_plan_id, crawler_plan_name, create_timestamp)
  340. # auto bind to generate plan
  341. new_crawler_task_list = [
  342. {
  343. "contentType": 1,
  344. "inputSourceType": 2,
  345. "inputSourceSubType": None,
  346. "fieldName": None,
  347. "inputSourceValue": crawler_plan_id,
  348. "inputSourceLabel": crawler_plan_name,
  349. "inputSourceModal": 3,
  350. "inputSourceChannel": input_source_channel
  351. }
  352. ]
  353. generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
  354. crawler_task_list=new_crawler_task_list,
  355. generate_task_id=self.category_map[category]
  356. )
  357. log(
  358. task="category_publish_task",
  359. function="publish_filter_articles",
  360. message="成功绑定到生成计划",
  361. data=generate_plan_response
  362. )
  363. # change article status
  364. article_id_list = filtered_articles_df['article_id'].values.tolist()
  365. self.change_article_status_while_publishing(article_id_list=article_id_list)
  366. def do_job(self, article_source, category_list=None):
  367. """
  368. 执行任务
  369. :return:
  370. """
  371. if not category_list:
  372. category_list = self.category_map.keys()
  373. log(
  374. task="category_publish_task",
  375. function="do_job",
  376. message="开始自动创建品类文章抓取计划",
  377. data={
  378. "category_list": list(category_list)
  379. }
  380. )
  381. for category in category_list:
  382. try:
  383. # 已发布标题去重
  384. self.published_articles_title_filter()
  385. category_df = self.get_articles_from_meta_table(category=category, article_source=article_source)
  386. self.publish_filter_articles(
  387. category=category,
  388. articles_df=category_df,
  389. article_source=article_source
  390. )
  391. except Exception as e:
  392. bot(
  393. title="品类冷启任务报错",
  394. detail={
  395. "category": category,
  396. "error": str(e),
  397. "function": "do_job",
  398. "traceback": traceback.format_exc()
  399. }
  400. )