task2.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. """
  2. @author: luojunhui
  3. """
  4. import datetime
  5. import json
  6. from tqdm import tqdm
  7. from applications import AIDTApi, DeNetMysql, PQMySQL, Functions, WeixinSpider
  8. from config import cateMap, coldPoolArticlesNum, accountBaseInfo
  9. from stratrgy.distribution import ArticleDistribution
  10. class ColdStartTask(object):
  11. """
  12. 冷启分配任务
  13. """
  14. AidApi = AIDTApi()
  15. DeMysql = DeNetMysql()
  16. PqMysql = PQMySQL()
  17. Fun = Functions()
  18. Spider = WeixinSpider()
  19. D = ArticleDistribution()
  20. pool3 = "autoArticlePoolLevel1"
  21. @classmethod
  22. def generate_account_dict(cls):
  23. """
  24. 生成account_list
  25. :return:
  26. """
  27. account_dict = {}
  28. for key in accountBaseInfo:
  29. account_name = accountBaseInfo[key]['accountName']
  30. account_gh_id = accountBaseInfo[key]['ghId']
  31. account_dict[account_gh_id] = account_name
  32. return account_dict
  33. @classmethod
  34. def usedArticle(cls, content_id):
  35. """
  36. 已经使用的文章,把文章状态改为0
  37. :return:
  38. """
  39. sql = f"""
  40. update cold_start_article_pool
  41. set status = %s
  42. where content_channel_id = %s;
  43. """
  44. cls.PqMysql.update(sql=sql, params=(0, content_id))
  45. @classmethod
  46. def badArticle(cls, content_id):
  47. """
  48. 低分的文章,把文章状态改为2
  49. :return:
  50. """
  51. sql = f"""
  52. update cold_start_article_pool
  53. set status = %s
  54. where content_channel_id = %s;
  55. """
  56. cls.PqMysql.update(sql=sql, params=(2, content_id))
  57. @classmethod
  58. def getTopArticles(cls, category, limit_count):
  59. """
  60. 获取高分享的文章list
  61. :return:
  62. """
  63. @classmethod
  64. def splitCategoryToAccount(cls, cate_list):
  65. """
  66. split articles to each account
  67. :return:
  68. """
  69. account_index_info = {
  70. "gh_058e41145a0c": 30,
  71. "gh_0e4fd9e88386": 30,
  72. "gh_744cb16f6e16": 30,
  73. "gh_ac43eb24376d": 30,
  74. "gh_970460d9ccec": 30,
  75. "gh_56ca3dae948c": 30,
  76. "gh_c91b42649690": 30,
  77. "gh_6d205db62f04": 30,
  78. "gh_e24da99dc899": 30,
  79. "gh_4c058673c07e": 30,
  80. "gh_03d32e83122f": 30,
  81. "gh_c69776baf2cd": 30,
  82. "gh_30816d8adb52": 30,
  83. "gh_789a40fe7935": 30,
  84. "gh_95ed5ecf9363": 30,
  85. "gh_3e91f0624545": 30,
  86. "gh_57573f01b2ee": 30,
  87. "gh_9877c8541764": 30,
  88. "gh_6cfd1132df94": 30,
  89. "gh_008ef23062ee": 30,
  90. "gh_5ae65db96cb7": 30,
  91. "gh_be8c29139989": 30,
  92. "gh_51e4ad40466d": 30,
  93. "gh_d4dffc34ac39": 30,
  94. "gh_89ef4798d3ea": 30,
  95. "gh_b15de7c99912": 30,
  96. "gh_9f8dc5b0c74e": 30,
  97. "gh_7b4a5f86d68c": 30,
  98. "gh_c5cdf60d9ab4": 5,
  99. "gh_0c89e11f8bf3": 5,
  100. "gh_e0eb490115f5": 5,
  101. "gh_a2901d34f75b": 5,
  102. "gh_d5f935d0d1f2": 30
  103. }
  104. account_dict = cls.generate_account_dict()
  105. account_list = list(account_index_info.keys())
  106. title_list = [i['title'] for i in cate_list]
  107. L_map = {}
  108. for account_id in account_list:
  109. account_name = account_dict[account_id]
  110. score_list = cls.Fun.getTitleScore(title_list=title_list, account_name=account_name)[account_name][
  111. 'score_list']
  112. for index, score in enumerate(score_list):
  113. channel_content_id = cate_list[index]['id']
  114. item = tuple([account_id, score])
  115. if L_map.get(channel_content_id):
  116. L_map[channel_content_id].append(item)
  117. else:
  118. L_map[channel_content_id] = [item]
  119. for key in L_map:
  120. L_map[key] = sorted(L_map[key], reverse=True, key=lambda x: x[1])
  121. content_account = []
  122. for item in cate_list:
  123. content_id = item['id']
  124. account_list = L_map[content_id]
  125. for account_tuple in account_list:
  126. gh_id, score = account_tuple[0], account_tuple[1]
  127. if account_index_info[gh_id] > 0:
  128. sub_item = tuple([content_id, gh_id, score])
  129. content_account.append(sub_item)
  130. account_index_info[gh_id] -= 1
  131. break
  132. # return content_account
  133. account_article_dict = {}
  134. for item in content_account:
  135. content_id, gh_id, score = item
  136. sub_i = tuple([content_id, score])
  137. if account_article_dict.get(gh_id):
  138. account_article_dict[gh_id].append(sub_i)
  139. else:
  140. account_article_dict[gh_id] = [sub_i]
  141. for account in tqdm(account_article_dict):
  142. date_str = datetime.datetime.today().strftime("%Y-%m-%d")
  143. print(account, date_str, json.dumps(account_article_dict[account], ensure_ascii=False))
  144. insert_sql = f"""
  145. INSERT INTO article_pre_distribute_account
  146. (gh_id, date, article_list)
  147. VALUES
  148. (%s, %s, %s);
  149. """
  150. try:
  151. PQMySQL.update(sql=insert_sql, params=(
  152. account, date_str, json.dumps(account_article_dict[account], ensure_ascii=False)))
  153. except Exception as e:
  154. print("插入出现问题----{}".format(e))
  155. print("成功更新完成")
  156. @classmethod
  157. def findCategoryArticlesDaily(cls):
  158. """
  159. 和每个账号计算相关性分数
  160. :return:
  161. """
  162. category_list = [
  163. "军事政法",
  164. # "健康养生",
  165. "宗教历史",
  166. # "情感生活",
  167. # "娱乐八卦",
  168. # "新闻媒体",
  169. ]
  170. L = []
  171. for category in tqdm(category_list):
  172. print("{} is processing......".format(category))
  173. category_total = coldPoolArticlesNum * cateMap.get(category, 0.1)
  174. category_count = 0
  175. while category_count < category_total:
  176. article_list = cls.getTopArticles(category, 10)
  177. if len(article_list) == 0:
  178. print("{}: 该品类没有数据了!".format(category))
  179. break
  180. title_list = [article[2] for article in article_list]
  181. score_list = cls.Fun.getTitleScore(title_list, "指尖奇文")['指尖奇文']['score_list']
  182. for index, score in enumerate(score_list):
  183. content_id = article_list[index][0]
  184. if score >= 0.35:
  185. obj = {
  186. "id": article_list[index][0],
  187. "url": article_list[index][1],
  188. "title": article_list[index][2],
  189. "cate": category,
  190. "score": score
  191. }
  192. category_count += 1
  193. cls.usedArticle(content_id=content_id)
  194. print("used_article")
  195. L.append(obj)
  196. else:
  197. cls.badArticle(content_id=content_id)
  198. print("bad article")
  199. print(category_count)
  200. return L
  201. @classmethod
  202. def findAssociationArticlesDaily(cls):
  203. """
  204. 获取相关文章
  205. :return:
  206. """
  207. # target_num = int(0.8 * coldPoolArticlesNum)
  208. sql = f"""
  209. select id, publish_timestamp, title, link, title_score, url_md5
  210. from association_articles
  211. where status = 1 and content_length > 500
  212. order by publish_timestamp
  213. DESC limit 10000 offset 10000;
  214. """
  215. temp_list = cls.PqMysql.select(sql)
  216. id_tuple = tuple([i[0] for i in temp_list])
  217. update_sql = f"""
  218. update association_articles
  219. set status = %s
  220. where id in %s
  221. """
  222. cls.PqMysql.update(sql=update_sql, params=(0, id_tuple))
  223. # url_md5去重
  224. L = {}
  225. for line in temp_list:
  226. key = line[-1]
  227. if L.get(key):
  228. L[key].append(list(line))
  229. else:
  230. L[key] = [list(line)]
  231. LL = []
  232. for key in L:
  233. value_list = L[key]
  234. sorted_k = sorted(value_list, reverse=True, key=lambda x: (x[1], x[4]))
  235. LL.append(sorted_k[0])
  236. article_list = []
  237. LL = sorted(LL, reverse=True, key=lambda x: x[1])
  238. for i in tqdm(LL[:int(680 * 0.8)]):
  239. try:
  240. o = {
  241. "url": i[3],
  242. "title": i[2],
  243. "url_md5": i[5],
  244. # "id": i[3]
  245. "id": cls.Spider.get_article_text(i[3])['data']['data']['channel_content_id']
  246. }
  247. except:
  248. o = {
  249. "url": i[3],
  250. "title": i[2],
  251. "url_md5": i[5],
  252. "id": i[3]
  253. }
  254. article_list.append(o)
  255. return article_list
  256. @classmethod
  257. def sendToColdPool(cls, plan_id=None):
  258. """
  259. 把文章send至第三层
  260. :return:
  261. """
  262. # 获取6个品类的数据
  263. # association_list = cls.findAssociationArticlesDaily()
  264. # cls.D.association_split(association_list)
  265. category_list = cls.findCategoryArticlesDaily()
  266. # d_list = category_list + association_list
  267. # # # 预分配账号
  268. cls.splitCategoryToAccount(category_list)
  269. # #
  270. try:
  271. army = [i for i in category_list if i['cate'] == '军事政法']
  272. cls.AidApi.updateArticleIntoCrawlerPlan(
  273. plan_id=plan_id,
  274. plan_name="军事政法类冷启",
  275. plan_tag=cls.pool3,
  276. url_list=[i['url'] for i in army]
  277. )
  278. except Exception as e:
  279. print("error--{}".format(e))
  280. try:
  281. history = [i for i in category_list if i['cate'] == '宗教历史']
  282. cls.AidApi.updateArticleIntoCrawlerPlan(
  283. plan_id=plan_id,
  284. plan_name="宗教历史类冷启",
  285. plan_tag=cls.pool3,
  286. url_list=[i['url'] for i in history]
  287. )
  288. except Exception as e:
  289. print("error--{}".format(e))
  290. # #
  291. # # try:
  292. # # news = [i for i in category_list if i['cate'] == '新闻媒体']
  293. # # cls.AidApi.updateArticleIntoCrawlerPlan(
  294. # # plan_id=plan_id,
  295. # # plan_name="新闻媒体类冷启",
  296. # # plan_tag=cls.pool3,
  297. # # url_list=[i['url'] for i in news]
  298. # # )
  299. # # except Exception as e:
  300. # # print("error--{}".format(e))
  301. # #
  302. # # try:
  303. # # life = [i for i in category_list if i['cate'] == '情感生活']
  304. # # cls.AidApi.updateArticleIntoCrawlerPlan(
  305. # # plan_id=plan_id,
  306. # # plan_name="生活情感类冷启",
  307. # # plan_tag=cls.pool3,
  308. # # url_list=[i['url'] for i in life]
  309. # # )
  310. # # except Exception as e:
  311. # # print("error--{}".format(e))
  312. # #
  313. # # try:
  314. # # healthy = [i for i in category_list if i['cate'] == '健康养生']
  315. # # cls.AidApi.updateArticleIntoCrawlerPlan(
  316. # # plan_id=plan_id,
  317. # # plan_name="健康养生类冷启",
  318. # # plan_tag=cls.pool3,
  319. # # url_list=[i['url'] for i in healthy]
  320. # # )
  321. # # except Exception as e:
  322. # # print("error--{}".format(e))
  323. # #
  324. # # try:
  325. # # fun = [i for i in category_list if i['cate'] == '娱乐八卦']
  326. # # cls.AidApi.updateArticleIntoCrawlerPlan(
  327. # # plan_id=plan_id,
  328. # # plan_name="娱乐八卦类冷启",
  329. # # plan_tag=cls.pool3,
  330. # # url_list=[i['url'] for i in fun]
  331. # # )
  332. # # except Exception as e:
  333. # # print("error--{}".format(e))
  334. # #
  335. # cls.AidApi.updateArticleIntoCrawlerPlan(
  336. # plan_id=plan_id,
  337. # plan_name="文章账号联想冷启--0805",
  338. # plan_tag=cls.pool3,
  339. # url_list=[i['url'] for i in association_list]
  340. # )
  341. if __name__ == '__main__':
  342. CT = ColdStartTask()
  343. CT.sendToColdPool()