publishCategoryArticles.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. """
  2. @author: luojunhui
  3. 品类文章发布到aigc系统的冷启层
  4. """
  5. import datetime
  6. import json
  7. import time
  8. import traceback
  9. from pandas import DataFrame
  10. from applications import aiditApi, log, bot
  11. from config import apolloConfig
  12. apollo = apolloConfig()
  13. DAILY_CRAWLER_MAX_NUM = 1000
  14. SIMILARITY_MIN_SCORE = 0.4
  15. TITLE_NOT_SENSITIVE = 0
  16. class CategoryColdStartTask(object):
  17. """
  18. 品类冷启动发布任务
  19. """
  20. PUBLISHED_STATUS = 2
  21. INIT_STATUS = 1
  22. BAD_STATUS = 0
  23. def __init__(self, db_client):
  24. """
  25. :param db_client:
  26. """
  27. self.db_client = db_client
  28. self.category_map = json.loads(apollo.getConfigValue("category_cold_start_map"))
  29. self.category_cold_start_threshold = json.loads(apollo.getConfigValue("category_cold_start_threshold"))
  30. self.READ_THRESHOLD = self.category_cold_start_threshold.get("READ_THRESHOLD", 5000)
  31. self.READ_TIMES_THRESHOLD = self.category_cold_start_threshold.get("READ_TIMES_THRESHOLD", 1.3)
  32. self.LIMIT_TITLE_LENGTH = self.category_cold_start_threshold.get("LIMIT_TITLE_LENGTH", 15)
  33. self.TITLE_LENGTH_MAX = self.category_cold_start_threshold.get("TITLE_LENGTH_MAX", 50)
  34. log(
  35. task="category_publish_task",
  36. function="__init__",
  37. message="数据库初始化连接完成,apollo配置获取完成",
  38. data={
  39. "category": self.category_map,
  40. "threshold": self.category_cold_start_threshold
  41. }
  42. )
  43. def insert_into_db(self, crawler_plan_id, crawler_plan_name, create_timestamp):
  44. """
  45. 插入抓取计划到数据库中
  46. :param create_timestamp:
  47. :param crawler_plan_id:
  48. :param crawler_plan_name:
  49. :return:
  50. """
  51. insert_sql = f"""
  52. INSERT INTO article_crawler_plan
  53. (crawler_plan_id, name, create_timestamp)
  54. values
  55. (%s, %s, %s)
  56. """
  57. try:
  58. self.db_client.update(
  59. sql=insert_sql,
  60. params=(crawler_plan_id, crawler_plan_name, create_timestamp)
  61. )
  62. except Exception as e:
  63. bot(
  64. title="品类冷启任务,记录抓取计划id失败",
  65. detail={
  66. "error": str(e),
  67. "error_msg": traceback.format_exc(),
  68. "crawler_plan_id": crawler_plan_id,
  69. "crawler_plan_name": crawler_plan_name
  70. }
  71. )
  72. def get_articles_from_meta_table(self, category, article_source):
  73. """
  74. 从长文 meta 库中获取冷启文章
  75. :return:
  76. """
  77. sql = f"""
  78. SELECT
  79. article_id, out_account_id, article_index, title, link, read_cnt, status, llm_sensitivity, score
  80. FROM
  81. crawler_meta_article
  82. WHERE
  83. category = "{category}" and platform = "{article_source}" and title_sensitivity = {TITLE_NOT_SENSITIVE}
  84. ORDER BY score DESC;
  85. """
  86. article_list = self.db_client.select(sql)
  87. log(
  88. task="category_publish_task",
  89. function="get_articles_from_meta_table",
  90. message="获取品类文章总数",
  91. data={
  92. "total_articles": len(article_list),
  93. "category": category
  94. }
  95. )
  96. article_df = DataFrame(article_list,
  97. columns=['article_id', 'gh_id', 'position', 'title', 'link', 'read_cnt', 'status',
  98. 'llm_sensitivity', 'score'])
  99. return article_df
  100. def filter_each_category(self, category):
  101. """
  102. 过滤单个生成计划类别的文章
  103. """
  104. plan_id = self.category_map.get(category)
  105. if plan_id:
  106. article_list = aiditApi.get_generated_article_list(plan_id)
  107. title_list = [i[1] for i in article_list]
  108. if title_list:
  109. # update
  110. update_sql = f"""
  111. UPDATE
  112. crawler_meta_article
  113. SET
  114. status = %s
  115. WHERE
  116. title in %s and status = %s;
  117. """
  118. affected_rows = self.db_client.update(
  119. sql=update_sql,
  120. params=(self.PUBLISHED_STATUS, tuple(title_list), self.INIT_STATUS)
  121. )
  122. print(affected_rows)
  123. else:
  124. print("未获取到计划id")
  125. return
  126. def published_articles_title_filter(self):
  127. """
  128. 已经发布到生成计划中的 id,
  129. :return:
  130. """
  131. category_list = list(self.category_map.keys())
  132. for category in category_list:
  133. try:
  134. self.filter_each_category(category)
  135. except Exception as e:
  136. log(
  137. task="category_publish_task",
  138. function="published_articles_title_filter",
  139. message="过滤已发布文章失败",
  140. data={
  141. "error": str(e),
  142. "error_msg": traceback.format_exc(),
  143. "category": category
  144. }
  145. )
  146. def change_article_status_while_publishing(self, article_id_list):
  147. """
  148. :param: article_id_list: 文章的唯一 id
  149. :return:
  150. """
  151. update_sql = f"""
  152. UPDATE
  153. crawler_meta_article
  154. SET
  155. status = %s
  156. WHERE
  157. article_id in %s and status = %s;
  158. """
  159. affect_rows = self.db_client.update(
  160. sql=update_sql,
  161. params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS)
  162. )
  163. if affect_rows != len(article_id_list):
  164. bot(
  165. title="品类冷启任务中,出现更新状文章状态失败异常",
  166. detail={
  167. "affected_rows": affect_rows,
  168. "task_rows": len(article_id_list)
  169. }
  170. )
  171. def filter_weixin_articles(self, articles_df, category):
  172. """
  173. 微信抓取文章过滤漏斗
  174. """
  175. articles_df['average_read'] = articles_df.groupby(['gh_id', 'position'])['read_cnt'].transform('mean')
  176. articles_df['read_times'] = articles_df['read_cnt'] / articles_df['average_read']
  177. total_length = articles_df.shape[0]
  178. # 第0层过滤已经发布的文章
  179. filter_df = articles_df[articles_df['status'] == self.INIT_STATUS]
  180. length_level0 = filter_df.shape[0]
  181. # 第一层漏斗通过阅读均值倍数过滤
  182. filter_df = filter_df[filter_df['read_times'] >= self.READ_TIMES_THRESHOLD]
  183. length_level1 = filter_df.shape[0]
  184. # 第二层漏斗通过阅读量过滤
  185. filter_df = filter_df[
  186. filter_df['read_cnt'] >= self.READ_THRESHOLD
  187. ]
  188. length_level2 = filter_df.shape[0]
  189. # 第三层漏斗通过标题长度过滤
  190. filter_df = filter_df[
  191. (filter_df['title'].str.len() >= self.LIMIT_TITLE_LENGTH)
  192. & (filter_df['title'].str.len() <= self.TITLE_LENGTH_MAX)
  193. ]
  194. length_level3 = filter_df.shape[0]
  195. # 第四层通过敏感词过滤
  196. filter_df = filter_df[
  197. (~filter_df['title'].str.contains('农历'))
  198. & (~filter_df['title'].str.contains('太极'))
  199. & (~filter_df['title'].str.contains('节'))
  200. & (~filter_df['title'].str.contains('早上好'))
  201. & (~filter_df['title'].str.contains('赖清德'))
  202. & (~filter_df['title'].str.contains('普京'))
  203. & (~filter_df['title'].str.contains('俄'))
  204. & (~filter_df['title'].str.contains('南海'))
  205. & (~filter_df['title'].str.contains('台海'))
  206. & (~filter_df['title'].str.contains('解放军'))
  207. & (~filter_df['title'].str.contains('蔡英文'))
  208. & (~filter_df['title'].str.contains('中国'))
  209. ]
  210. length_level4 = filter_df.shape[0]
  211. # 第五层通过LLM敏感度过滤
  212. filter_df = filter_df[
  213. ~(filter_df['llm_sensitivity'] > 0)
  214. ]
  215. length_level5 = filter_df.shape[0]
  216. # 第六层通过相关性分数过滤
  217. filter_df = filter_df[filter_df['score'] > SIMILARITY_MIN_SCORE]
  218. length_level6 = filter_df.shape[0]
  219. log(
  220. task="category_publish_task",
  221. function="publish_filter_articles",
  222. message="过滤后文章总数",
  223. data={
  224. "total_articles": length_level5,
  225. "category": category
  226. }
  227. )
  228. bot(
  229. title="冷启任务发布通知",
  230. detail={
  231. "总文章数量": total_length,
  232. "通过已经发布状态过滤": "过滤数量: {} 剩余数量: {}".format(
  233. total_length - length_level0, length_level0),
  234. "通过阅读均值倍数过滤": "过滤数量: {} 剩余数量: {}".format(
  235. length_level0 - length_level1, length_level1),
  236. "通过阅读量过滤": "过滤数量: {} 剩余数量: {}".format(
  237. length_level1 - length_level2, length_level2),
  238. "通过标题长度过滤": "过滤数量: {} 剩余数量: {}".format(
  239. length_level2 - length_level3, length_level3),
  240. "通过敏感词过滤": "过滤数量: {} 剩余数量: {}".format(
  241. length_level3 - length_level4, length_level4),
  242. "通过LLM敏感度过滤": "过滤数量: {} 剩余数量: {}".format(
  243. length_level4 - length_level5, length_level5
  244. ),
  245. "通过相关性分数过滤": "过滤数量: {} 剩余数量: {}".format(
  246. length_level5 - length_level6, length_level6
  247. ),
  248. "品类": category,
  249. "阅读均值倍数阈值": self.READ_TIMES_THRESHOLD,
  250. "阅读量阈值": self.READ_THRESHOLD,
  251. "标题长度阈值": self.LIMIT_TITLE_LENGTH
  252. },
  253. mention=False
  254. )
  255. return filter_df[:DAILY_CRAWLER_MAX_NUM]
  256. def filter_toutiao_articles(self, articles_df, category):
  257. """
  258. 头条文章过滤漏斗
  259. """
  260. total_length = articles_df.shape[0]
  261. # 第一层漏斗通过状态过滤
  262. zero_level_funnel_df = articles_df[articles_df['status'] == self.INIT_STATUS]
  263. zero_level_funnel_length = zero_level_funnel_df.shape[0]
  264. bot(
  265. title="账号冷启动---头条推荐流发布",
  266. detail={
  267. "category": category,
  268. "总文章数量": total_length,
  269. "通过已经发布状态过滤": "过滤数量: {} 剩余数量: {}".format(total_length - zero_level_funnel_length,
  270. zero_level_funnel_length),
  271. },
  272. mention=False
  273. )
  274. return zero_level_funnel_df
  275. def publish_filter_articles(self, category, articles_df, article_source):
  276. """
  277. 过滤文章
  278. :param category: 文章品类
  279. :param articles_df: 该品类下的文章data_frame
  280. :param article_source: 文章来源
  281. :return:
  282. """
  283. match article_source:
  284. case "weixin":
  285. filtered_articles_df = self.filter_weixin_articles(articles_df, category)
  286. input_source_channel = 5
  287. case "toutiao":
  288. filtered_articles_df = self.filter_toutiao_articles(articles_df, category)
  289. input_source_channel = 6
  290. case _:
  291. return
  292. url_list = filtered_articles_df['link'].values.tolist()
  293. if url_list:
  294. # create_crawler_plan
  295. crawler_plan_response = aiditApi.auto_create_crawler_task(
  296. plan_id=None,
  297. plan_name="自动绑定-{}--{}--{}".format(category, datetime.date.today().__str__(), len(url_list)),
  298. plan_tag="品类冷启动",
  299. article_source=article_source,
  300. url_list=url_list
  301. )
  302. log(
  303. task="category_publish_task",
  304. function="publish_filter_articles",
  305. message="成功创建抓取计划",
  306. data=crawler_plan_response
  307. )
  308. # save to db
  309. create_timestamp = int(time.time()) * 1000
  310. crawler_plan_id = crawler_plan_response['data']['id']
  311. crawler_plan_name = crawler_plan_response['data']['name']
  312. self.insert_into_db(crawler_plan_id, crawler_plan_name, create_timestamp)
  313. # auto bind to generate plan
  314. new_crawler_task_list = [
  315. {
  316. "contentType": 1,
  317. "inputSourceType": 2,
  318. "inputSourceSubType": None,
  319. "fieldName": None,
  320. "inputSourceValue": crawler_plan_id,
  321. "inputSourceLabel": crawler_plan_name,
  322. "inputSourceModal": 3,
  323. "inputSourceChannel": input_source_channel
  324. }
  325. ]
  326. generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
  327. crawler_task_list=new_crawler_task_list,
  328. generate_task_id=self.category_map[category]
  329. )
  330. log(
  331. task="category_publish_task",
  332. function="publish_filter_articles",
  333. message="成功绑定到生成计划",
  334. data=generate_plan_response
  335. )
  336. # change article status
  337. article_id_list = filtered_articles_df['article_id'].values.tolist()
  338. self.change_article_status_while_publishing(article_id_list=article_id_list)
  339. def do_job(self, article_source, category_list=None):
  340. """
  341. 执行任务
  342. :return:
  343. """
  344. if not category_list:
  345. category_list = self.category_map.keys()
  346. log(
  347. task="category_publish_task",
  348. function="do_job",
  349. message="开始自动创建品类文章抓取计划",
  350. data={
  351. "category_list": list(category_list)
  352. }
  353. )
  354. for category in category_list:
  355. try:
  356. # 已发布标题去重
  357. self.published_articles_title_filter()
  358. category_df = self.get_articles_from_meta_table(category=category, article_source=article_source)
  359. self.publish_filter_articles(
  360. category=category,
  361. articles_df=category_df,
  362. article_source=article_source
  363. )
  364. except Exception as e:
  365. bot(
  366. title="品类冷启任务报错",
  367. detail={
  368. "category": category,
  369. "error": str(e),
  370. "function": "do_job",
  371. "traceback": traceback.format_exc()
  372. }
  373. )