publishCategoryArticles.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. """
  2. @author: luojunhui
  3. 品类文章发布到aigc系统的冷启层
  4. """
  5. import datetime
  6. from pandas import DataFrame
  7. from applications import DeNetMysql, aiditApi
  8. class CategoryColdStartTask(object):
  9. """
  10. 品类冷启动
  11. """
  12. CATEGORY_MAP = {
  13. "军事": "20240805154433785506170",
  14. "历史": "20240805154359027876170",
  15. "娱乐八卦": "20241016121719447880753",
  16. "情感生活": "20240731052727708334827",
  17. "健康养生": "20240731052921849744134",
  18. # "新闻媒体": "20240731052554397387407"
  19. }
  20. PUBLISHED_STATUS = 2
  21. INIT_STATUS = 1
  22. BAD_STATUS = 0
  23. READ_THRESHOLD = 5000
  24. READ_TIMES_THRESHOLD = 1.3
  25. LIMIT_TITLE_LENGTH = 15
  26. def __init__(self, db_client):
  27. """
  28. :param db_client:
  29. """
  30. self.db_client = db_client
  31. def get_articles_from_meta_table(self, category):
  32. """
  33. 从长文 meta 库中获取冷启文章
  34. :return:
  35. """
  36. sql = f"""
  37. SELECT
  38. out_account_id, article_index, title, link, read_cnt
  39. FROM
  40. crawler_meta_article
  41. WHERE
  42. category = "{category}" and status = '{self.INIT_STATUS}';
  43. """
  44. article_list = self.db_client.select(sql)
  45. article_df = DataFrame(article_list, columns=['gh_id', 'position', 'title', 'link', 'read_cnt'])
  46. return article_df
  47. def change_article_status(self, category):
  48. """
  49. 已经发布到生成计划中的 id,
  50. :return:
  51. """
  52. plan_id = self.CATEGORY_MAP.get(category)
  53. if plan_id:
  54. sql = f"""
  55. SELECT
  56. account.wx_gh,
  57. content.title,
  58. content.content_link,
  59. content.view_count,
  60. content.like_count,
  61. from_unixtime(cprr.create_timestamp / 1000) AS 抓取时间,
  62. from_unixtime(content.publish_timestamp / 1000) AS 发布时间
  63. FROM crawler_plan_result_rel cprr
  64. JOIN crawler_plan plan ON cprr.plan_id = plan.id
  65. JOIN crawler_content content ON cprr.channel_source_id = content.channel_content_id
  66. JOIN crawler_account account ON content.channel_account_id = account.channel_account_id
  67. WHERE plan_id IN (
  68. SELECT
  69. input_source_value
  70. FROM
  71. produce_plan_input_source
  72. WHERE plan_id = '{plan_id}'
  73. );
  74. """
  75. article_list = self.db_client.select(sql)
  76. title_list = [i[1] for i in article_list]
  77. # update
  78. update_sql = f"""
  79. UPDATE
  80. crawler_meta_article
  81. SET
  82. status = %s
  83. WHERE
  84. title in %s and status = %s;
  85. """
  86. self.db_client.update(
  87. sql=update_sql,
  88. params=(self.PUBLISHED_STATUS, tuple(title_list), self.INIT_STATUS)
  89. )
  90. else:
  91. return
  92. def filter_articles(self, category, articles_df):
  93. """
  94. 过滤文章
  95. :param articles_df:
  96. :return:
  97. """
  98. print(articles_df.size)
  99. articles_df['average_read'] = articles_df.groupby(['gh_id', 'position'])['read_cnt'].transform('mean')
  100. articles_df['read_times'] = articles_df['read_cnt'] / articles_df['average_read']
  101. filter_df = articles_df[
  102. (articles_df['read_times'] >= self.READ_TIMES_THRESHOLD)
  103. & (articles_df['read_cnt'] >= self.READ_THRESHOLD)
  104. & (articles_df['title'].str.len() > 15)
  105. & (~articles_df['title'].str.contains('农历'))
  106. & (~articles_df['title'].str.contains('太极'))
  107. & (~articles_df['title'].str.contains('节'))
  108. & (~articles_df['title'].str.contains('早上好'))
  109. & (~articles_df['title'].str.contains('赖清德'))
  110. & (~articles_df['title'].str.contains('普京'))
  111. & (~articles_df['title'].str.contains('俄'))
  112. & (~articles_df['title'].str.contains('南海'))
  113. & (~articles_df['title'].str.contains('台海'))
  114. & (~articles_df['title'].str.contains('解放军'))
  115. & (~articles_df['title'].str.contains('蔡英文'))
  116. & (~articles_df['title'].str.contains('中国'))
  117. ]
  118. url_list = filter_df['link'].values.tolist()
  119. # title_list = filter_df['title'].values.tolist()
  120. # for line in title_list:
  121. # print(line + "\n")
  122. aiditApi.auto_create_crawler_task(
  123. plan_id=None,
  124. plan_name="{}--{}".format(category, datetime.date.today().__str__()),
  125. plan_tag="品类冷启动",
  126. url_list=url_list
  127. )
  128. d = DeNetMysql()
  129. c = CategoryColdStartTask(d)
  130. for ca in c.CATEGORY_MAP.keys():
  131. all_articles = c.get_articles_from_meta_table(category=ca)
  132. c.filter_articles(ca, all_articles)