""" @author: luojunhui 品类文章发布到aigc系统的冷启层 """ import datetime from pandas import DataFrame from applications import DeNetMysql, aiditApi class CategoryColdStartTask(object): """ 品类冷启动 """ CATEGORY_MAP = { "军事": "20240805154433785506170", "历史": "20240805154359027876170", "娱乐八卦": "20241016121719447880753", "情感生活": "20240731052727708334827", "健康养生": "20240731052921849744134", # "新闻媒体": "20240731052554397387407" } PUBLISHED_STATUS = 2 INIT_STATUS = 1 BAD_STATUS = 0 READ_THRESHOLD = 5000 READ_TIMES_THRESHOLD = 1.3 LIMIT_TITLE_LENGTH = 15 def __init__(self, db_client): """ :param db_client: """ self.db_client = db_client def get_articles_from_meta_table(self, category): """ 从长文 meta 库中获取冷启文章 :return: """ sql = f""" SELECT out_account_id, article_index, title, link, read_cnt FROM crawler_meta_article WHERE category = "{category}" and status = '{self.INIT_STATUS}'; """ article_list = self.db_client.select(sql) article_df = DataFrame(article_list, columns=['gh_id', 'position', 'title', 'link', 'read_cnt']) return article_df def change_article_status(self, category): """ 已经发布到生成计划中的 id, :return: """ plan_id = self.CATEGORY_MAP.get(category) if plan_id: sql = f""" SELECT account.wx_gh, content.title, content.content_link, content.view_count, content.like_count, from_unixtime(cprr.create_timestamp / 1000) AS 抓取时间, from_unixtime(content.publish_timestamp / 1000) AS 发布时间 FROM crawler_plan_result_rel cprr JOIN crawler_plan plan ON cprr.plan_id = plan.id JOIN crawler_content content ON cprr.channel_source_id = content.channel_content_id JOIN crawler_account account ON content.channel_account_id = account.channel_account_id WHERE plan_id IN ( SELECT input_source_value FROM produce_plan_input_source WHERE plan_id = '{plan_id}' ); """ article_list = self.db_client.select(sql) title_list = [i[1] for i in article_list] # update update_sql = f""" UPDATE crawler_meta_article SET status = %s WHERE title in %s and status = %s; """ self.db_client.update( sql=update_sql, params=(self.PUBLISHED_STATUS, tuple(title_list), self.INIT_STATUS) ) else: return def filter_articles(self, category, articles_df): """ 过滤文章 :param articles_df: :return: """ print(articles_df.size) articles_df['average_read'] = articles_df.groupby(['gh_id', 'position'])['read_cnt'].transform('mean') articles_df['read_times'] = articles_df['read_cnt'] / articles_df['average_read'] filter_df = articles_df[ (articles_df['read_times'] >= self.READ_TIMES_THRESHOLD) & (articles_df['read_cnt'] >= self.READ_THRESHOLD) & (articles_df['title'].str.len() > 15) & (~articles_df['title'].str.contains('农历')) & (~articles_df['title'].str.contains('太极')) & (~articles_df['title'].str.contains('节')) & (~articles_df['title'].str.contains('早上好')) & (~articles_df['title'].str.contains('赖清德')) & (~articles_df['title'].str.contains('普京')) & (~articles_df['title'].str.contains('俄')) & (~articles_df['title'].str.contains('南海')) & (~articles_df['title'].str.contains('台海')) & (~articles_df['title'].str.contains('解放军')) & (~articles_df['title'].str.contains('蔡英文')) & (~articles_df['title'].str.contains('中国')) ] url_list = filter_df['link'].values.tolist() # title_list = filter_df['title'].values.tolist() # for line in title_list: # print(line + "\n") aiditApi.auto_create_crawler_task( plan_id=None, plan_name="{}--{}".format(category, datetime.date.today().__str__()), plan_tag="品类冷启动", url_list=url_list ) d = DeNetMysql() c = CategoryColdStartTask(d) for ca in c.CATEGORY_MAP.keys(): all_articles = c.get_articles_from_meta_table(category=ca) c.filter_articles(ca, all_articles)