publishCategoryArticles.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. """
  2. @author: luojunhui
  3. 品类文章发布到aigc系统的冷启层
  4. """
  5. import datetime
  6. import json
  7. from pandas import DataFrame
  8. from applications import aiditApi, log, bot, DeNetMysql
  9. from config import apolloConfig
  10. apollo = apolloConfig()
  11. class CategoryColdStartTask(object):
  12. """
  13. 品类冷启动发布任务
  14. """
  15. PUBLISHED_STATUS = 2
  16. INIT_STATUS = 1
  17. BAD_STATUS = 0
  18. def __init__(self, db_client):
  19. """
  20. :param db_client:
  21. """
  22. self.db_client = db_client
  23. self.category_map = json.loads(apollo.getConfigValue("category_cold_start_map"))
  24. self.category_cold_start_threshold = json.loads(apollo.getConfigValue("category_cold_start_threshold"))
  25. self.READ_THRESHOLD = self.category_cold_start_threshold.get("READ_THRESHOLD", 5000)
  26. self.READ_TIMES_THRESHOLD = self.category_cold_start_threshold.get("READ_TIMES_THRESHOLD", 1.3)
  27. self.LIMIT_TITLE_LENGTH = self.category_cold_start_threshold.get("LIMIT_TITLE_LENGTH", 15)
  28. log(
  29. task="category_publish_task",
  30. function="__init__",
  31. message="数据库初始化连接完成,apollo配置获取完成",
  32. status="success",
  33. data={
  34. "category": self.category_map,
  35. "threshold": self.category_cold_start_threshold
  36. }
  37. )
  38. def get_articles_from_meta_table(self, category):
  39. """
  40. 从长文 meta 库中获取冷启文章
  41. :return:
  42. """
  43. sql = f"""
  44. SELECT
  45. article_id, out_account_id, article_index, title, link, read_cnt
  46. FROM
  47. crawler_meta_article
  48. WHERE
  49. category = "{category}" and status = '{self.INIT_STATUS}';
  50. """
  51. article_list = self.db_client.select(sql)
  52. log(
  53. task="category_publish_task",
  54. function="get_articles_from_meta_table",
  55. message="获取品类文章总数",
  56. status="success",
  57. data={
  58. "total_articles": len(article_list),
  59. "category": category
  60. }
  61. )
  62. article_df = DataFrame(article_list, columns=['article_id', 'gh_id', 'position', 'title', 'link', 'read_cnt'])
  63. return article_df
  64. def change_article_status(self, category):
  65. """
  66. 已经发布到生成计划中的 id,
  67. :return:
  68. """
  69. plan_id = self.category_map.get(category)
  70. if plan_id:
  71. sql = f"""
  72. SELECT
  73. account.wx_gh,
  74. content.title,
  75. content.content_link,
  76. content.view_count,
  77. content.like_count,
  78. from_unixtime(cprr.create_timestamp / 1000) AS 抓取时间,
  79. from_unixtime(content.publish_timestamp / 1000) AS 发布时间
  80. FROM crawler_plan_result_rel cprr
  81. JOIN crawler_plan plan ON cprr.plan_id = plan.id
  82. JOIN crawler_content content ON cprr.channel_source_id = content.channel_content_id
  83. JOIN crawler_account account ON content.channel_account_id = account.channel_account_id
  84. WHERE plan_id IN (
  85. SELECT
  86. input_source_value
  87. FROM
  88. produce_plan_input_source
  89. WHERE plan_id = '{plan_id}'
  90. );
  91. """
  92. article_list = self.db_client.select(sql)
  93. title_list = [i[1] for i in article_list]
  94. if title_list:
  95. # update
  96. update_sql = f"""
  97. UPDATE
  98. crawler_meta_article
  99. SET
  100. status = %s
  101. WHERE
  102. title in %s and status = %s;
  103. """
  104. self.db_client.update(
  105. sql=update_sql,
  106. params=(self.PUBLISHED_STATUS, tuple(title_list), self.INIT_STATUS)
  107. )
  108. else:
  109. return
  110. def change_article_status_while_publishing(self, article_id_list):
  111. """
  112. :param: article_id_list: 文章的唯一 id
  113. :return:
  114. """
  115. update_sql = f"""
  116. UPDATE
  117. crawler_meta_article
  118. SET
  119. status = %s
  120. WHERE
  121. article_id in %s and status = %s;
  122. """
  123. affect_rows = self.db_client.update(
  124. sql=update_sql,
  125. params=(self.PUBLISHED_STATUS, tuple(article_id_list), self.INIT_STATUS)
  126. )
  127. if affect_rows != len(article_id_list):
  128. bot(
  129. title="品类冷启任务中,出现更新状文章状态失败异常",
  130. detail={
  131. "affected_rows": affect_rows,
  132. "task_rows": len(article_id_list)
  133. }
  134. )
  135. def publish_filter_articles(self, category, articles_df):
  136. """
  137. 过滤文章
  138. :param category:
  139. :param articles_df:
  140. :return:
  141. """
  142. articles_df['average_read'] = articles_df.groupby(['gh_id', 'position'])['read_cnt'].transform('mean')
  143. articles_df['read_times'] = articles_df['read_cnt'] / articles_df['average_read']
  144. filter_df = articles_df[
  145. (articles_df['read_times'] >= self.READ_TIMES_THRESHOLD)
  146. & (articles_df['read_cnt'] >= self.READ_THRESHOLD)
  147. & (articles_df['title'].str.len() > self.LIMIT_TITLE_LENGTH)
  148. & (~articles_df['title'].str.contains('农历'))
  149. & (~articles_df['title'].str.contains('太极'))
  150. & (~articles_df['title'].str.contains('节'))
  151. & (~articles_df['title'].str.contains('早上好'))
  152. & (~articles_df['title'].str.contains('赖清德'))
  153. & (~articles_df['title'].str.contains('普京'))
  154. & (~articles_df['title'].str.contains('俄'))
  155. & (~articles_df['title'].str.contains('南海'))
  156. & (~articles_df['title'].str.contains('台海'))
  157. & (~articles_df['title'].str.contains('解放军'))
  158. & (~articles_df['title'].str.contains('蔡英文'))
  159. & (~articles_df['title'].str.contains('中国'))
  160. ]
  161. url_list = filter_df['link'].values.tolist()
  162. log(
  163. task="category_publish_task",
  164. function="publish_filter_articles",
  165. message="过滤后文章总数",
  166. status="success",
  167. data={
  168. "total_articles": len(url_list),
  169. "category": category
  170. }
  171. )
  172. if url_list:
  173. crawler_plan_response = aiditApi.auto_create_crawler_task(
  174. plan_id=None,
  175. plan_name="自动绑定-{}--{}--{}".format(category, datetime.date.today().__str__(), len(url_list)),
  176. plan_tag="品类冷启动",
  177. url_list=url_list
  178. )
  179. log(
  180. task="category_publish_task",
  181. function="publish_filter_articles",
  182. message="成功创建抓取计划",
  183. status="success",
  184. data=crawler_plan_response
  185. )
  186. # auto bind to generate plan
  187. new_crawler_task_list = [
  188. {
  189. "contentType": 1,
  190. "inputSourceType": 2,
  191. "inputSourceSubType": None,
  192. "fieldName": None,
  193. "inputSourceValue": crawler_plan_response['data']['id'],
  194. "inputSourceLabel": crawler_plan_response['data']['name'],
  195. "inputSourceModal": 3,
  196. "inputSourceChannel": 5
  197. }
  198. ]
  199. generate_plan_response = aiditApi.bind_crawler_task_to_generate_task(
  200. crawler_task_list=new_crawler_task_list,
  201. generate_task_id=self.category_map[category]
  202. )
  203. log(
  204. task="category_publish_task",
  205. function="publish_filter_articles",
  206. message="成功绑定到生成计划",
  207. status="success",
  208. data=generate_plan_response
  209. )
  210. article_id_list = filter_df['article_id'].values.tolist()
  211. self.change_article_status_while_publishing(article_id_list=article_id_list)
  212. def do_job(self):
  213. """
  214. 执行任务
  215. :return:
  216. """
  217. category_list = self.category_map.keys()
  218. log(
  219. task="category_publish_task",
  220. function="do_job",
  221. message="开始自动创建品类文章抓取计划",
  222. status="success",
  223. data={
  224. "category_list": list(category_list)
  225. }
  226. )
  227. for category in category_list:
  228. try:
  229. category_df = self.get_articles_from_meta_table(category=category)
  230. self.publish_filter_articles(
  231. category=category,
  232. articles_df=category_df
  233. )
  234. except Exception as e:
  235. bot(
  236. title="品类冷启任务报错",
  237. detail={
  238. "category": category,
  239. "error": str(e),
  240. "function": "do_job"
  241. }
  242. )