publishCategoryArticles.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. """
  2. @author: luojunhui
  3. 品类文章发布到aigc系统的冷启层
  4. """
  5. import datetime
  6. import json
  7. import time
  8. import traceback
  9. from pandas import DataFrame
  10. from applications import log, bot
  11. from applications.api import aidit_api
  12. from config import apolloConfig
  13. apollo = apolloConfig()
  14. class CategoryColdStartTask(object):
  15. """
  16. 品类冷启动发布任务
  17. """
  18. PUBLISHED_STATUS = 2
  19. INIT_STATUS = 1
  20. BAD_STATUS = 0
  21. def __init__(self, db_client):
  22. """
  23. :param db_client:
  24. """
  25. self.db_client = db_client
  26. self.category_map = json.loads(apollo.getConfigValue("category_cold_start_map"))
  27. self.category_cold_start_threshold = json.loads(apollo.getConfigValue("category_cold_start_threshold"))
  28. self.READ_THRESHOLD = self.category_cold_start_threshold.get("READ_THRESHOLD", 5000)
  29. self.READ_TIMES_THRESHOLD = self.category_cold_start_threshold.get("READ_TIMES_THRESHOLD", 1.3)
  30. self.LIMIT_TITLE_LENGTH = self.category_cold_start_threshold.get("LIMIT_TITLE_LENGTH", 15)
  31. log(
  32. task="category_publish_task",
  33. function="__init__",
  34. message="数据库初始化连接完成,apollo配置获取完成",
  35. data={
  36. "category": self.category_map,
  37. "threshold": self.category_cold_start_threshold
  38. }
  39. )
  40. def insert_into_db(self, crawler_plan_id, crawler_plan_name, create_timestamp):
  41. """
  42. 插入抓取计划到数据库中
  43. :param create_timestamp:
  44. :param crawler_plan_id:
  45. :param crawler_plan_name:
  46. :return:
  47. """
  48. insert_sql = f"""
  49. INSERT INTO article_crawler_plan
  50. (crawler_plan_id, name, create_timestamp)
  51. values
  52. (%s, %s, %s)
  53. """
  54. try:
  55. self.db_client.update(
  56. sql=insert_sql,
  57. params=(crawler_plan_id, crawler_plan_name, create_timestamp)
  58. )
  59. except Exception as e:
  60. bot(
  61. title="品类冷启任务,记录抓取计划id失败",
  62. detail={
  63. "error": str(e),
  64. "error_msg": traceback.format_exc(),
  65. "crawler_plan_id": crawler_plan_id,
  66. "crawler_plan_name": crawler_plan_name
  67. }
  68. )
  69. def get_articles_from_meta_table(self, category):
  70. """
  71. 从长文 meta 库中获取冷启文章
  72. :return:
  73. """
  74. sql = f"""
  75. SELECT
  76. article_id, out_account_id, article_index, title, link, read_cnt, status
  77. FROM
  78. crawler_meta_article
  79. WHERE
  80. category = "{category}";
  81. """
  82. article_list = self.db_client.select(sql)
  83. log(
  84. task="category_publish_task",
  85. function="get_articles_from_meta_table",
  86. message="获取品类文章总数",
  87. data={
  88. "total_articles": len(article_list),
  89. "category": category
  90. }
  91. )
  92. article_df = DataFrame(article_list, columns=['article_id', 'gh_id', 'position', 'title', 'link', 'read_cnt', 'status'])
  93. return article_df
  94. def change_article_status(self, category):
  95. """
  96. 已经发布到生成计划中的 id,
  97. :return:
  98. """
  99. plan_id = self.category_map.get(category)
  100. if plan_id:
  101. article_list = aidit_api.get_generated_article_list(plan_id)
  102. title_list = [i[1] for i in article_list]
  103. if title_list:
  104. # update
  105. update_sql = f"""
  106. UPDATE
  107. crawler_meta_article
  108. SET
  109. status = %s
  110. WHERE
  111. title in %s and status = %s;
  112. """
  113. self.db_client.update(
  114. sql=update_sql,
  115. params=(self.PUBLISHED_STATUS, tuple(title_list), self.INIT_STATUS)
  116. )
  117. else:
  118. return
  119. def change_article_status_while_publishing(self, article_id_list):
  120. """
  121. :param: article_id_list: 文章的唯一 id
  122. :return:
  123. """
  124. update_sql = f"""
  125. UPDATE
  126. crawler_meta_article
  127. SET
  128. status = %s
  129. WHERE
  130. article_id in %s and status = %s;
  131. """
  132. affect_rows = self.db_client.update(
  133. sql=update_sql,
  134. params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS)
  135. )
  136. if affect_rows != len(article_id_list):
  137. bot(
  138. title="品类冷启任务中,出现更新状文章状态失败异常",
  139. detail={
  140. "affected_rows": affect_rows,
  141. "task_rows": len(article_id_list)
  142. }
  143. )
  144. def publish_filter_articles(self, category, articles_df):
  145. """
  146. 过滤文章
  147. :param category:
  148. :param articles_df:
  149. :return:
  150. """
  151. articles_df['average_read'] = articles_df.groupby(['gh_id', 'position'])['read_cnt'].transform('mean')
  152. articles_df['read_times'] = articles_df['read_cnt'] / articles_df['average_read']
  153. total_length = articles_df.shape[0]
  154. # 第0层过滤已经发布的文章
  155. zero_level_funnel_df = articles_df[articles_df['status'] == self.INIT_STATUS]
  156. zero_level_funnel_length = zero_level_funnel_df.shape[0]
  157. # 第一层漏斗通过阅读均值倍数过滤
  158. first_level_funnel_df = zero_level_funnel_df[zero_level_funnel_df['read_times'] >= self.READ_TIMES_THRESHOLD]
  159. first_level_funnel_length = first_level_funnel_df.shape[0]
  160. # 第二层漏斗通过阅读量过滤
  161. second_level_funnel_df = first_level_funnel_df[
  162. first_level_funnel_df['read_cnt'] >= self.READ_THRESHOLD
  163. ]
  164. second_level_funnel_length = second_level_funnel_df.shape[0]
  165. # 第三层漏斗通过标题长度过滤
  166. third_level_funnel_df = second_level_funnel_df[
  167. second_level_funnel_df['title'].str.len() >= self.LIMIT_TITLE_LENGTH
  168. ]
  169. third_level_funnel_length = third_level_funnel_df.shape[0]
  170. # 最后一层通过敏感词过滤
  171. filter_df = third_level_funnel_df[
  172. (~third_level_funnel_df['title'].str.contains('农历'))
  173. & (~third_level_funnel_df['title'].str.contains('太极'))
  174. & (~third_level_funnel_df['title'].str.contains('节'))
  175. & (~third_level_funnel_df['title'].str.contains('早上好'))
  176. & (~third_level_funnel_df['title'].str.contains('赖清德'))
  177. & (~third_level_funnel_df['title'].str.contains('普京'))
  178. & (~third_level_funnel_df['title'].str.contains('俄'))
  179. & (~third_level_funnel_df['title'].str.contains('南海'))
  180. & (~third_level_funnel_df['title'].str.contains('台海'))
  181. & (~third_level_funnel_df['title'].str.contains('解放军'))
  182. & (~third_level_funnel_df['title'].str.contains('蔡英文'))
  183. & (~third_level_funnel_df['title'].str.contains('中国'))
  184. ]
  185. final_length = filter_df.shape[0]
  186. url_list = filter_df['link'].values.tolist()
  187. log(
  188. task="category_publish_task",
  189. function="publish_filter_articles",
  190. message="过滤后文章总数",
  191. data={
  192. "total_articles": final_length,
  193. "category": category
  194. }
  195. )
  196. bot(
  197. title="冷启任务发布通知",
  198. detail={
  199. "总文章数量": total_length,
  200. "通过已经发布状态过滤": "过滤数量: {} 剩余数量: {}".format(total_length - zero_level_funnel_length, zero_level_funnel_length),
  201. "通过阅读均值倍数过滤": "过滤数量: {} 剩余数量: {}".format(zero_level_funnel_length - first_level_funnel_length, first_level_funnel_length),
  202. "通过阅读量过滤": "过滤数量: {} 剩余数量: {}".format(first_level_funnel_length - second_level_funnel_length, second_level_funnel_length),
  203. "通过标题长度过滤": "过滤数量: {} 剩余数量: {}".format(second_level_funnel_length - third_level_funnel_length, third_level_funnel_length),
  204. "通过敏感词过滤": "过滤数量: {} 剩余数量: {}".format(third_level_funnel_length - final_length, final_length),
  205. "品类": category,
  206. "阅读均值倍数阈值": self.READ_TIMES_THRESHOLD,
  207. "阅读量阈值": self.READ_THRESHOLD,
  208. "标题长度阈值": self.LIMIT_TITLE_LENGTH
  209. },
  210. mention=False
  211. )
  212. if url_list:
  213. # create_crawler_plan
  214. crawler_plan_response = aidit_api.auto_create_crawler_task(
  215. plan_id=None,
  216. plan_name="自动绑定-{}--{}--{}".format(category, datetime.date.today().__str__(), len(url_list)),
  217. plan_tag="品类冷启动",
  218. url_list=url_list
  219. )
  220. log(
  221. task="category_publish_task",
  222. function="publish_filter_articles",
  223. message="成功创建抓取计划",
  224. data=crawler_plan_response
  225. )
  226. # save to db
  227. create_timestamp = int(time.time()) * 1000
  228. crawler_plan_id = crawler_plan_response['data']['id']
  229. crawler_plan_name = crawler_plan_response['data']['name']
  230. self.insert_into_db(crawler_plan_id, crawler_plan_name, create_timestamp)
  231. # auto bind to generate plan
  232. new_crawler_task_list = [
  233. {
  234. "contentType": 1,
  235. "inputSourceType": 2,
  236. "inputSourceSubType": None,
  237. "fieldName": None,
  238. "inputSourceValue": crawler_plan_response['data']['id'],
  239. "inputSourceLabel": crawler_plan_response['data']['name'],
  240. "inputSourceModal": 3,
  241. "inputSourceChannel": 5
  242. }
  243. ]
  244. generate_plan_response = aidit_api.bind_crawler_task_to_generate_task(
  245. crawler_task_list=new_crawler_task_list,
  246. generate_task_id=self.category_map[category]
  247. )
  248. log(
  249. task="category_publish_task",
  250. function="publish_filter_articles",
  251. message="成功绑定到生成计划",
  252. data=generate_plan_response
  253. )
  254. # change article status
  255. article_id_list = filter_df['article_id'].values.tolist()
  256. self.change_article_status_while_publishing(article_id_list=article_id_list)
  257. def do_job(self, category_list=None):
  258. """
  259. 执行任务
  260. :return:
  261. """
  262. if not category_list:
  263. category_list = self.category_map.keys()
  264. log(
  265. task="category_publish_task",
  266. function="do_job",
  267. message="开始自动创建品类文章抓取计划",
  268. data={
  269. "category_list": list(category_list)
  270. }
  271. )
  272. for category in category_list:
  273. try:
  274. category_df = self.get_articles_from_meta_table(category=category)
  275. self.publish_filter_articles(
  276. category=category,
  277. articles_df=category_df
  278. )
  279. except Exception as e:
  280. bot(
  281. title="品类冷启任务报错",
  282. detail={
  283. "category": category,
  284. "error": str(e),
  285. "function": "do_job"
  286. }
  287. )