""" @author: luojunhui """ import datetime import json from tqdm import tqdm from applications import AIDTApi, DeNetMysql, PQMySQL, Functions, WeixinSpider from config import cateMap, coldPoolArticlesNum, accountBaseInfo from stratrgy.distribution import ArticleDistribution class ColdStartTask(object): """ 冷启分配任务 """ AidApi = AIDTApi() DeMysql = DeNetMysql() PqMysql = PQMySQL() Fun = Functions() Spider = WeixinSpider() D = ArticleDistribution() pool3 = "autoArticlePoolLevel1" @classmethod def generate_account_dict(cls): """ 生成account_list :return: """ account_dict = {} for key in accountBaseInfo: account_name = accountBaseInfo[key]['accountName'] account_gh_id = accountBaseInfo[key]['ghId'] account_dict[account_gh_id] = account_name return account_dict @classmethod def usedArticle(cls, content_id): """ 已经使用的文章,把文章状态改为0 :return: """ sql = f""" update cold_start_article_pool set status = %s where content_channel_id = %s; """ cls.PqMysql.update(sql=sql, params=(0, content_id)) @classmethod def badArticle(cls, content_id): """ 低分的文章,把文章状态改为2 :return: """ sql = f""" update cold_start_article_pool set status = %s where content_channel_id = %s; """ cls.PqMysql.update(sql=sql, params=(2, content_id)) @classmethod def getTopArticles(cls, category, limit_count): """ 获取高分享的文章list :return: """ @classmethod def splitCategoryToAccount(cls, cate_list): """ split articles to each account :return: """ account_index_info = { "gh_058e41145a0c": 30, "gh_0e4fd9e88386": 30, "gh_744cb16f6e16": 30, "gh_ac43eb24376d": 30, "gh_970460d9ccec": 30, "gh_56ca3dae948c": 30, "gh_c91b42649690": 30, "gh_6d205db62f04": 30, "gh_e24da99dc899": 30, "gh_4c058673c07e": 30, "gh_03d32e83122f": 30, "gh_c69776baf2cd": 30, "gh_30816d8adb52": 30, "gh_789a40fe7935": 30, "gh_95ed5ecf9363": 30, "gh_3e91f0624545": 30, "gh_57573f01b2ee": 30, "gh_9877c8541764": 30, "gh_6cfd1132df94": 30, "gh_008ef23062ee": 30, "gh_5ae65db96cb7": 30, "gh_be8c29139989": 30, "gh_51e4ad40466d": 30, "gh_d4dffc34ac39": 30, "gh_89ef4798d3ea": 30, "gh_b15de7c99912": 30, "gh_9f8dc5b0c74e": 30, "gh_7b4a5f86d68c": 30, "gh_c5cdf60d9ab4": 5, "gh_0c89e11f8bf3": 5, "gh_e0eb490115f5": 5, "gh_a2901d34f75b": 5, "gh_d5f935d0d1f2": 30 } account_dict = cls.generate_account_dict() account_list = list(account_index_info.keys()) title_list = [i['title'] for i in cate_list] L_map = {} for account_id in account_list: account_name = account_dict[account_id] score_list = cls.Fun.getTitleScore(title_list=title_list, account_name=account_name)[account_name][ 'score_list'] for index, score in enumerate(score_list): channel_content_id = cate_list[index]['id'] item = tuple([account_id, score]) if L_map.get(channel_content_id): L_map[channel_content_id].append(item) else: L_map[channel_content_id] = [item] for key in L_map: L_map[key] = sorted(L_map[key], reverse=True, key=lambda x: x[1]) content_account = [] for item in cate_list: content_id = item['id'] account_list = L_map[content_id] for account_tuple in account_list: gh_id, score = account_tuple[0], account_tuple[1] if account_index_info[gh_id] > 0: sub_item = tuple([content_id, gh_id, score]) content_account.append(sub_item) account_index_info[gh_id] -= 1 break # return content_account account_article_dict = {} for item in content_account: content_id, gh_id, score = item sub_i = tuple([content_id, score]) if account_article_dict.get(gh_id): account_article_dict[gh_id].append(sub_i) else: account_article_dict[gh_id] = [sub_i] for account in tqdm(account_article_dict): date_str = datetime.datetime.today().strftime("%Y-%m-%d") print(account, date_str, json.dumps(account_article_dict[account], ensure_ascii=False)) insert_sql = f""" INSERT INTO article_pre_distribute_account (gh_id, date, article_list) VALUES (%s, %s, %s); """ try: PQMySQL.update(sql=insert_sql, params=( account, date_str, json.dumps(account_article_dict[account], ensure_ascii=False))) except Exception as e: print("插入出现问题----{}".format(e)) print("成功更新完成") @classmethod def findCategoryArticlesDaily(cls): """ 和每个账号计算相关性分数 :return: """ category_list = [ "军事政法", # "健康养生", "宗教历史", # "情感生活", # "娱乐八卦", # "新闻媒体", ] L = [] for category in tqdm(category_list): print("{} is processing......".format(category)) category_total = coldPoolArticlesNum * cateMap.get(category, 0.1) category_count = 0 while category_count < category_total: article_list = cls.getTopArticles(category, 10) if len(article_list) == 0: print("{}: 该品类没有数据了!".format(category)) break title_list = [article[2] for article in article_list] score_list = cls.Fun.getTitleScore(title_list, "指尖奇文")['指尖奇文']['score_list'] for index, score in enumerate(score_list): content_id = article_list[index][0] if score >= 0.35: obj = { "id": article_list[index][0], "url": article_list[index][1], "title": article_list[index][2], "cate": category, "score": score } category_count += 1 cls.usedArticle(content_id=content_id) print("used_article") L.append(obj) else: cls.badArticle(content_id=content_id) print("bad article") print(category_count) return L @classmethod def findAssociationArticlesDaily(cls): """ 获取相关文章 :return: """ # target_num = int(0.8 * coldPoolArticlesNum) sql = f""" select id, publish_timestamp, title, link, title_score, url_md5 from association_articles where status = 1 and content_length > 500 order by publish_timestamp DESC limit 10000 offset 10000; """ temp_list = cls.PqMysql.select(sql) id_tuple = tuple([i[0] for i in temp_list]) update_sql = f""" update association_articles set status = %s where id in %s """ cls.PqMysql.update(sql=update_sql, params=(0, id_tuple)) # url_md5去重 L = {} for line in temp_list: key = line[-1] if L.get(key): L[key].append(list(line)) else: L[key] = [list(line)] LL = [] for key in L: value_list = L[key] sorted_k = sorted(value_list, reverse=True, key=lambda x: (x[1], x[4])) LL.append(sorted_k[0]) article_list = [] LL = sorted(LL, reverse=True, key=lambda x: x[1]) for i in tqdm(LL[:int(680 * 0.8)]): try: o = { "url": i[3], "title": i[2], "url_md5": i[5], # "id": i[3] "id": cls.Spider.get_article_text(i[3])['data']['data']['channel_content_id'] } except: o = { "url": i[3], "title": i[2], "url_md5": i[5], "id": i[3] } article_list.append(o) return article_list @classmethod def sendToColdPool(cls, plan_id=None): """ 把文章send至第三层 :return: """ # 获取6个品类的数据 # association_list = cls.findAssociationArticlesDaily() # cls.D.association_split(association_list) category_list = cls.findCategoryArticlesDaily() # d_list = category_list + association_list # # # 预分配账号 cls.splitCategoryToAccount(category_list) # # try: army = [i for i in category_list if i['cate'] == '军事政法'] cls.AidApi.updateArticleIntoCrawlerPlan( plan_id=plan_id, plan_name="军事政法类冷启", plan_tag=cls.pool3, url_list=[i['url'] for i in army] ) except Exception as e: print("error--{}".format(e)) try: history = [i for i in category_list if i['cate'] == '宗教历史'] cls.AidApi.updateArticleIntoCrawlerPlan( plan_id=plan_id, plan_name="宗教历史类冷启", plan_tag=cls.pool3, url_list=[i['url'] for i in history] ) except Exception as e: print("error--{}".format(e)) # # # # try: # # news = [i for i in category_list if i['cate'] == '新闻媒体'] # # cls.AidApi.updateArticleIntoCrawlerPlan( # # plan_id=plan_id, # # plan_name="新闻媒体类冷启", # # plan_tag=cls.pool3, # # url_list=[i['url'] for i in news] # # ) # # except Exception as e: # # print("error--{}".format(e)) # # # # try: # # life = [i for i in category_list if i['cate'] == '情感生活'] # # cls.AidApi.updateArticleIntoCrawlerPlan( # # plan_id=plan_id, # # plan_name="生活情感类冷启", # # plan_tag=cls.pool3, # # url_list=[i['url'] for i in life] # # ) # # except Exception as e: # # print("error--{}".format(e)) # # # # try: # # healthy = [i for i in category_list if i['cate'] == '健康养生'] # # cls.AidApi.updateArticleIntoCrawlerPlan( # # plan_id=plan_id, # # plan_name="健康养生类冷启", # # plan_tag=cls.pool3, # # url_list=[i['url'] for i in healthy] # # ) # # except Exception as e: # # print("error--{}".format(e)) # # # # try: # # fun = [i for i in category_list if i['cate'] == '娱乐八卦'] # # cls.AidApi.updateArticleIntoCrawlerPlan( # # plan_id=plan_id, # # plan_name="娱乐八卦类冷启", # # plan_tag=cls.pool3, # # url_list=[i['url'] for i in fun] # # ) # # except Exception as e: # # print("error--{}".format(e)) # # # cls.AidApi.updateArticleIntoCrawlerPlan( # plan_id=plan_id, # plan_name="文章账号联想冷启--0805", # plan_tag=cls.pool3, # url_list=[i['url'] for i in association_list] # ) if __name__ == '__main__': CT = ColdStartTask() CT.sendToColdPool()