123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364 |
- """
- @author: luojunhui
- """
- import datetime
- import json
- from tqdm import tqdm
- from applications import AIDTApi, DeNetMysql, PQMySQL, Functions, WeixinSpider
- from config import cateMap, coldPoolArticlesNum, accountBaseInfo
- from stratrgy.distribution import ArticleDistribution
- class ColdStartTask(object):
- """
- 冷启分配任务
- """
- AidApi = AIDTApi()
- DeMysql = DeNetMysql()
- PqMysql = PQMySQL()
- Fun = Functions()
- Spider = WeixinSpider()
- D = ArticleDistribution()
- pool3 = "autoArticlePoolLevel1"
- @classmethod
- def generate_account_dict(cls):
- """
- 生成account_list
- :return:
- """
- account_dict = {}
- for key in accountBaseInfo:
- account_name = accountBaseInfo[key]['accountName']
- account_gh_id = accountBaseInfo[key]['ghId']
- account_dict[account_gh_id] = account_name
- return account_dict
- @classmethod
- def usedArticle(cls, content_id):
- """
- 已经使用的文章,把文章状态改为0
- :return:
- """
- sql = f"""
- update cold_start_article_pool
- set status = %s
- where content_channel_id = %s;
- """
- cls.PqMysql.update(sql=sql, params=(0, content_id))
- @classmethod
- def badArticle(cls, content_id):
- """
- 低分的文章,把文章状态改为2
- :return:
- """
- sql = f"""
- update cold_start_article_pool
- set status = %s
- where content_channel_id = %s;
- """
- cls.PqMysql.update(sql=sql, params=(2, content_id))
- @classmethod
- def getTopArticles(cls, category, limit_count):
- """
- 获取高分享的文章list
- :return:
- """
- @classmethod
- def splitCategoryToAccount(cls, cate_list):
- """
- split articles to each account
- :return:
- """
- account_index_info = {
- "gh_058e41145a0c": 30,
- "gh_0e4fd9e88386": 30,
- "gh_744cb16f6e16": 30,
- "gh_ac43eb24376d": 30,
- "gh_970460d9ccec": 30,
- "gh_56ca3dae948c": 30,
- "gh_c91b42649690": 30,
- "gh_6d205db62f04": 30,
- "gh_e24da99dc899": 30,
- "gh_4c058673c07e": 30,
- "gh_03d32e83122f": 30,
- "gh_c69776baf2cd": 30,
- "gh_30816d8adb52": 30,
- "gh_789a40fe7935": 30,
- "gh_95ed5ecf9363": 30,
- "gh_3e91f0624545": 30,
- "gh_57573f01b2ee": 30,
- "gh_9877c8541764": 30,
- "gh_6cfd1132df94": 30,
- "gh_008ef23062ee": 30,
- "gh_5ae65db96cb7": 30,
- "gh_be8c29139989": 30,
- "gh_51e4ad40466d": 30,
- "gh_d4dffc34ac39": 30,
- "gh_89ef4798d3ea": 30,
- "gh_b15de7c99912": 30,
- "gh_9f8dc5b0c74e": 30,
- "gh_7b4a5f86d68c": 30,
- "gh_c5cdf60d9ab4": 5,
- "gh_0c89e11f8bf3": 5,
- "gh_e0eb490115f5": 5,
- "gh_a2901d34f75b": 5,
- "gh_d5f935d0d1f2": 30
- }
- account_dict = cls.generate_account_dict()
- account_list = list(account_index_info.keys())
- title_list = [i['title'] for i in cate_list]
- L_map = {}
- for account_id in account_list:
- account_name = account_dict[account_id]
- score_list = cls.Fun.getTitleScore(title_list=title_list, account_name=account_name)[account_name][
- 'score_list']
- for index, score in enumerate(score_list):
- channel_content_id = cate_list[index]['id']
- item = tuple([account_id, score])
- if L_map.get(channel_content_id):
- L_map[channel_content_id].append(item)
- else:
- L_map[channel_content_id] = [item]
- for key in L_map:
- L_map[key] = sorted(L_map[key], reverse=True, key=lambda x: x[1])
- content_account = []
- for item in cate_list:
- content_id = item['id']
- account_list = L_map[content_id]
- for account_tuple in account_list:
- gh_id, score = account_tuple[0], account_tuple[1]
- if account_index_info[gh_id] > 0:
- sub_item = tuple([content_id, gh_id, score])
- content_account.append(sub_item)
- account_index_info[gh_id] -= 1
- break
- # return content_account
- account_article_dict = {}
- for item in content_account:
- content_id, gh_id, score = item
- sub_i = tuple([content_id, score])
- if account_article_dict.get(gh_id):
- account_article_dict[gh_id].append(sub_i)
- else:
- account_article_dict[gh_id] = [sub_i]
- for account in tqdm(account_article_dict):
- date_str = datetime.datetime.today().strftime("%Y-%m-%d")
- print(account, date_str, json.dumps(account_article_dict[account], ensure_ascii=False))
- insert_sql = f"""
- INSERT INTO article_pre_distribute_account
- (gh_id, date, article_list)
- VALUES
- (%s, %s, %s);
- """
- try:
- PQMySQL.update(sql=insert_sql, params=(
- account, date_str, json.dumps(account_article_dict[account], ensure_ascii=False)))
- except Exception as e:
- print("插入出现问题----{}".format(e))
- print("成功更新完成")
- @classmethod
- def findCategoryArticlesDaily(cls):
- """
- 和每个账号计算相关性分数
- :return:
- """
- category_list = [
- "军事政法",
- # "健康养生",
- "宗教历史",
- # "情感生活",
- # "娱乐八卦",
- # "新闻媒体",
- ]
- L = []
- for category in tqdm(category_list):
- print("{} is processing......".format(category))
- category_total = coldPoolArticlesNum * cateMap.get(category, 0.1)
- category_count = 0
- while category_count < category_total:
- article_list = cls.getTopArticles(category, 10)
- if len(article_list) == 0:
- print("{}: 该品类没有数据了!".format(category))
- break
- title_list = [article[2] for article in article_list]
- score_list = cls.Fun.getTitleScore(title_list, "指尖奇文")['指尖奇文']['score_list']
- for index, score in enumerate(score_list):
- content_id = article_list[index][0]
- if score >= 0.35:
- obj = {
- "id": article_list[index][0],
- "url": article_list[index][1],
- "title": article_list[index][2],
- "cate": category,
- "score": score
- }
- category_count += 1
- cls.usedArticle(content_id=content_id)
- print("used_article")
- L.append(obj)
- else:
- cls.badArticle(content_id=content_id)
- print("bad article")
- print(category_count)
- return L
- @classmethod
- def findAssociationArticlesDaily(cls):
- """
- 获取相关文章
- :return:
- """
- # target_num = int(0.8 * coldPoolArticlesNum)
- sql = f"""
- select id, publish_timestamp, title, link, title_score, url_md5
- from association_articles
- where status = 1 and content_length > 500
- order by publish_timestamp
- DESC limit 10000 offset 10000;
- """
- temp_list = cls.PqMysql.select(sql)
- id_tuple = tuple([i[0] for i in temp_list])
- update_sql = f"""
- update association_articles
- set status = %s
- where id in %s
- """
- cls.PqMysql.update(sql=update_sql, params=(0, id_tuple))
- # url_md5去重
- L = {}
- for line in temp_list:
- key = line[-1]
- if L.get(key):
- L[key].append(list(line))
- else:
- L[key] = [list(line)]
- LL = []
- for key in L:
- value_list = L[key]
- sorted_k = sorted(value_list, reverse=True, key=lambda x: (x[1], x[4]))
- LL.append(sorted_k[0])
- article_list = []
- LL = sorted(LL, reverse=True, key=lambda x: x[1])
- for i in tqdm(LL[:int(680 * 0.8)]):
- try:
- o = {
- "url": i[3],
- "title": i[2],
- "url_md5": i[5],
- # "id": i[3]
- "id": cls.Spider.get_article_text(i[3])['data']['data']['channel_content_id']
- }
- except:
- o = {
- "url": i[3],
- "title": i[2],
- "url_md5": i[5],
- "id": i[3]
- }
- article_list.append(o)
- return article_list
- @classmethod
- def sendToColdPool(cls, plan_id=None):
- """
- 把文章send至第三层
- :return:
- """
- # 获取6个品类的数据
- # association_list = cls.findAssociationArticlesDaily()
- # cls.D.association_split(association_list)
- category_list = cls.findCategoryArticlesDaily()
- # d_list = category_list + association_list
- # # # 预分配账号
- cls.splitCategoryToAccount(category_list)
- # #
- try:
- army = [i for i in category_list if i['cate'] == '军事政法']
- cls.AidApi.updateArticleIntoCrawlerPlan(
- plan_id=plan_id,
- plan_name="军事政法类冷启",
- plan_tag=cls.pool3,
- url_list=[i['url'] for i in army]
- )
- except Exception as e:
- print("error--{}".format(e))
- try:
- history = [i for i in category_list if i['cate'] == '宗教历史']
- cls.AidApi.updateArticleIntoCrawlerPlan(
- plan_id=plan_id,
- plan_name="宗教历史类冷启",
- plan_tag=cls.pool3,
- url_list=[i['url'] for i in history]
- )
- except Exception as e:
- print("error--{}".format(e))
- # #
- # # try:
- # # news = [i for i in category_list if i['cate'] == '新闻媒体']
- # # cls.AidApi.updateArticleIntoCrawlerPlan(
- # # plan_id=plan_id,
- # # plan_name="新闻媒体类冷启",
- # # plan_tag=cls.pool3,
- # # url_list=[i['url'] for i in news]
- # # )
- # # except Exception as e:
- # # print("error--{}".format(e))
- # #
- # # try:
- # # life = [i for i in category_list if i['cate'] == '情感生活']
- # # cls.AidApi.updateArticleIntoCrawlerPlan(
- # # plan_id=plan_id,
- # # plan_name="生活情感类冷启",
- # # plan_tag=cls.pool3,
- # # url_list=[i['url'] for i in life]
- # # )
- # # except Exception as e:
- # # print("error--{}".format(e))
- # #
- # # try:
- # # healthy = [i for i in category_list if i['cate'] == '健康养生']
- # # cls.AidApi.updateArticleIntoCrawlerPlan(
- # # plan_id=plan_id,
- # # plan_name="健康养生类冷启",
- # # plan_tag=cls.pool3,
- # # url_list=[i['url'] for i in healthy]
- # # )
- # # except Exception as e:
- # # print("error--{}".format(e))
- # #
- # # try:
- # # fun = [i for i in category_list if i['cate'] == '娱乐八卦']
- # # cls.AidApi.updateArticleIntoCrawlerPlan(
- # # plan_id=plan_id,
- # # plan_name="娱乐八卦类冷启",
- # # plan_tag=cls.pool3,
- # # url_list=[i['url'] for i in fun]
- # # )
- # # except Exception as e:
- # # print("error--{}".format(e))
- # #
- # cls.AidApi.updateArticleIntoCrawlerPlan(
- # plan_id=plan_id,
- # plan_name="文章账号联想冷启--0805",
- # plan_tag=cls.pool3,
- # url_list=[i['url'] for i in association_list]
- # )
- if __name__ == '__main__':
- CT = ColdStartTask()
- CT.sendToColdPool()
|