task2.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. """
  2. @author: luojunhui
  3. """
  4. import datetime
  5. import json
  6. from tqdm import tqdm
  7. from applications import AIDTApi, DeNetMysql, PQMySQL, Functions, WeixinSpider
  8. from config import cateMap, coldPoolArticlesNum, accountBaseInfo
  9. from stratrgy.distribution import ArticleDistribution
  10. class ColdStartTask(object):
  11. """
  12. 冷启分配任务
  13. """
  14. AidApi = AIDTApi()
  15. DeMysql = DeNetMysql()
  16. PqMysql = PQMySQL()
  17. Fun = Functions()
  18. Spider = WeixinSpider()
  19. D = ArticleDistribution()
  20. pool3 = "autoArticlePoolLevel3"
  21. @classmethod
  22. def generate_account_dict(cls):
  23. """
  24. 生成account_list
  25. :return:
  26. """
  27. account_dict = {}
  28. for key in accountBaseInfo:
  29. account_name = accountBaseInfo[key]['accountName']
  30. account_gh_id = accountBaseInfo[key]['ghId']
  31. account_dict[account_gh_id] = account_name
  32. return account_dict
  33. @classmethod
  34. def usedArticle(cls, content_id):
  35. """
  36. 已经使用的文章,把文章状态改为0
  37. :return:
  38. """
  39. sql = f"""
  40. update cold_start_article_pool
  41. set status = %s
  42. where content_channel_id = %s;
  43. """
  44. cls.PqMysql.update(sql=sql, params=(0, content_id))
  45. @classmethod
  46. def badArticle(cls, content_id):
  47. """
  48. 低分的文章,把文章状态改为2
  49. :return:
  50. """
  51. sql = f"""
  52. update cold_start_article_pool
  53. set status = %s
  54. where content_channel_id = %s;
  55. """
  56. cls.PqMysql.update(sql=sql, params=(2, content_id))
  57. @classmethod
  58. def getTopArticles(cls, category, limit_count):
  59. """
  60. 获取高分享的文章list
  61. :return:
  62. """
  63. sql = f"""
  64. select content_channel_id, content_link, title
  65. from cold_start_article_pool
  66. where category = '{category}' and status = 1
  67. order by view_count DESC, publish_time_stamp DESC
  68. limit {limit_count};
  69. """
  70. result = cls.PqMysql.select(sql)
  71. return result
  72. @classmethod
  73. def splitCategoryToAccount(cls, cate_list):
  74. """
  75. split articles to each account
  76. :return:
  77. """
  78. account_index_info = {
  79. "gh_058e41145a0c": 30,
  80. "gh_0e4fd9e88386": 30,
  81. "gh_744cb16f6e16": 30,
  82. "gh_ac43eb24376d": 30,
  83. "gh_970460d9ccec": 30,
  84. "gh_56ca3dae948c": 30,
  85. "gh_c91b42649690": 30,
  86. "gh_6d205db62f04": 30,
  87. "gh_e24da99dc899": 30,
  88. "gh_4c058673c07e": 30,
  89. "gh_03d32e83122f": 30,
  90. "gh_c69776baf2cd": 30,
  91. "gh_30816d8adb52": 30,
  92. "gh_789a40fe7935": 30,
  93. "gh_95ed5ecf9363": 30,
  94. "gh_3e91f0624545": 30,
  95. "gh_57573f01b2ee": 30,
  96. "gh_9877c8541764": 30,
  97. "gh_6cfd1132df94": 30,
  98. "gh_008ef23062ee": 30,
  99. "gh_5ae65db96cb7": 30,
  100. "gh_be8c29139989": 30,
  101. "gh_51e4ad40466d": 30,
  102. "gh_d4dffc34ac39": 30,
  103. "gh_89ef4798d3ea": 30,
  104. "gh_b15de7c99912": 30,
  105. "gh_9f8dc5b0c74e": 30,
  106. "gh_7b4a5f86d68c": 30,
  107. "gh_c5cdf60d9ab4": 5,
  108. "gh_0c89e11f8bf3": 5,
  109. "gh_e0eb490115f5": 5,
  110. "gh_a2901d34f75b": 5,
  111. "gh_d5f935d0d1f2": 30
  112. }
  113. account_dict = cls.generate_account_dict()
  114. account_list = list(account_index_info.keys())
  115. title_list = [i['title'] for i in cate_list]
  116. L_map = {}
  117. for account_id in account_list:
  118. account_name = account_dict[account_id]
  119. score_list = cls.Fun.getTitleScore(title_list=title_list, account_name=account_name)[account_name]['score_list']
  120. for index, score in enumerate(score_list):
  121. channel_content_id = cate_list[index]['id']
  122. item = tuple([account_id, score])
  123. if L_map.get(channel_content_id):
  124. L_map[channel_content_id].append(item)
  125. else:
  126. L_map[channel_content_id] = [item]
  127. for key in L_map:
  128. L_map[key] = sorted(L_map[key], reverse=True, key=lambda x: x[1])
  129. content_account = []
  130. for item in cate_list:
  131. content_id = item['id']
  132. account_list = L_map[content_id]
  133. for account_tuple in account_list:
  134. gh_id, score = account_tuple[0], account_tuple[1]
  135. if account_index_info[gh_id] > 0:
  136. sub_item = tuple([content_id, gh_id, score])
  137. content_account.append(sub_item)
  138. account_index_info[gh_id] -= 1
  139. break
  140. # return content_account
  141. account_article_dict = {}
  142. for item in content_account:
  143. content_id, gh_id, score = item
  144. sub_i = tuple([content_id, score])
  145. if account_article_dict.get(gh_id):
  146. account_article_dict[gh_id].append(sub_i)
  147. else:
  148. account_article_dict[gh_id] = [sub_i]
  149. for account in tqdm(account_article_dict):
  150. date_str = datetime.datetime.today().strftime("%Y-%m-%d")
  151. print(account, date_str, json.dumps(account_article_dict[account], ensure_ascii=False))
  152. # insert_sql = f"""
  153. # INSERT INTO article_pre_distribute_account
  154. # (gh_id, date, article_list)
  155. # VALUES
  156. # (%s, %s, %s);
  157. # """
  158. # try:
  159. # PQMySQL.update(sql=insert_sql, params=(account, date_str, json.dumps(account_article_dict[account], ensure_ascii=False)))
  160. # except Exception as e:
  161. # print("插入出现问题----{}".format(e))
  162. print("成功更新完成")
  163. @classmethod
  164. def findCategoryArticlesDaily(cls):
  165. """
  166. 和每个账号计算相关性分数
  167. :return:
  168. """
  169. category_list = [
  170. "军事政法",
  171. # "健康养生",
  172. "宗教历史",
  173. # "情感生活",
  174. # "娱乐八卦",
  175. # "新闻媒体",
  176. ]
  177. L = []
  178. for category in tqdm(category_list):
  179. print("{} is processing......".format(category))
  180. category_total = coldPoolArticlesNum * cateMap.get(category, 0.1)
  181. category_count = 0
  182. while category_count < category_total:
  183. article_list = cls.getTopArticles(category, 10)
  184. if len(article_list) == 0:
  185. print("{}: 该品类没有数据了!".format(category))
  186. break
  187. title_list = [article[2] for article in article_list]
  188. score_list = cls.Fun.getTitleScore(title_list, "指尖奇文")['指尖奇文']['score_list']
  189. for index, score in enumerate(score_list):
  190. content_id = article_list[index][0]
  191. if score >= 0.35:
  192. obj = {
  193. "id": article_list[index][0],
  194. "url": article_list[index][1],
  195. "title": article_list[index][2],
  196. "cate": category,
  197. "score": score
  198. }
  199. category_count += 1
  200. cls.usedArticle(content_id=content_id)
  201. print("used_article")
  202. L.append(obj)
  203. else:
  204. cls.badArticle(content_id=content_id)
  205. print("bad article")
  206. print(category_count)
  207. return L
  208. @classmethod
  209. def findAssociationArticlesDaily(cls):
  210. """
  211. 获取相关文章
  212. :return:
  213. """
  214. # target_num = int(0.8 * coldPoolArticlesNum)
  215. sql = f"""
  216. select id, publish_timestamp, title, link, title_score, url_md5
  217. from association_articles
  218. where status = 1 and content_length > 500
  219. order by publish_timestamp
  220. DESC limit 10000;
  221. """
  222. temp_list = cls.PqMysql.select(sql)
  223. id_tuple = tuple([i[0] for i in temp_list])
  224. update_sql = f"""
  225. update association_articles
  226. set status = %s
  227. where id in %s
  228. """
  229. cls.PqMysql.update(sql=update_sql, params=(0, id_tuple))
  230. # url_md5去重
  231. L = {}
  232. for line in temp_list:
  233. key = line[-1]
  234. if L.get(key):
  235. L[key].append(list(line))
  236. else:
  237. L[key] = [list(line)]
  238. LL = []
  239. for key in L:
  240. value_list = L[key]
  241. sorted_k = sorted(value_list, reverse=True, key=lambda x: (x[1], x[4]))
  242. LL.append(sorted_k[0])
  243. article_list = []
  244. LL = sorted(LL, reverse=True, key=lambda x: x[1])
  245. for i in tqdm(LL[:int(680 * 0.8)]):
  246. try:
  247. o = {
  248. "url": i[3],
  249. "title": i[2],
  250. "url_md5": i[5],
  251. "id": i[3]
  252. # "id": cls.Spider.get_article_text(i[3])['data']['data']['channel_content_id']
  253. }
  254. except:
  255. o = {
  256. "url": i[3],
  257. "title": i[2],
  258. "url_md5": i[5],
  259. "id": i[3]
  260. }
  261. article_list.append(o)
  262. return article_list
  263. @classmethod
  264. def sendToColdPool(cls, plan_id=None):
  265. """
  266. 把文章send至第三层
  267. :return:
  268. """
  269. # 获取6个品类的数据
  270. association_list = cls.findAssociationArticlesDaily()
  271. cls.D.association_split(association_list)
  272. # category_list = cls.findCategoryArticlesDaily()
  273. # d_list = category_list + association_list
  274. # # # 预分配账号
  275. # cls.splitCategoryToAccount(association_list)
  276. # #
  277. # try:
  278. # army = [i for i in category_list if i['cate'] == '军事政法']
  279. # cls.AidApi.updateArticleIntoCrawlerPlan(
  280. # plan_id=plan_id,
  281. # plan_name="军事政法类冷启",
  282. # plan_tag=cls.pool3,
  283. # url_list=[i['url'] for i in army]
  284. # )
  285. # except Exception as e:
  286. # print("error--{}".format(e))
  287. #
  288. # try:
  289. # history = [i for i in category_list if i['cate'] == '宗教历史']
  290. # cls.AidApi.updateArticleIntoCrawlerPlan(
  291. # plan_id=plan_id,
  292. # plan_name="宗教历史类冷启",
  293. # plan_tag=cls.pool3,
  294. # url_list=[i['url'] for i in history]
  295. # )
  296. # except Exception as e:
  297. # print("error--{}".format(e))
  298. # #
  299. # # try:
  300. # # news = [i for i in category_list if i['cate'] == '新闻媒体']
  301. # # cls.AidApi.updateArticleIntoCrawlerPlan(
  302. # # plan_id=plan_id,
  303. # # plan_name="新闻媒体类冷启",
  304. # # plan_tag=cls.pool3,
  305. # # url_list=[i['url'] for i in news]
  306. # # )
  307. # # except Exception as e:
  308. # # print("error--{}".format(e))
  309. # #
  310. # # try:
  311. # # life = [i for i in category_list if i['cate'] == '情感生活']
  312. # # cls.AidApi.updateArticleIntoCrawlerPlan(
  313. # # plan_id=plan_id,
  314. # # plan_name="生活情感类冷启",
  315. # # plan_tag=cls.pool3,
  316. # # url_list=[i['url'] for i in life]
  317. # # )
  318. # # except Exception as e:
  319. # # print("error--{}".format(e))
  320. # #
  321. # # try:
  322. # # healthy = [i for i in category_list if i['cate'] == '健康养生']
  323. # # cls.AidApi.updateArticleIntoCrawlerPlan(
  324. # # plan_id=plan_id,
  325. # # plan_name="健康养生类冷启",
  326. # # plan_tag=cls.pool3,
  327. # # url_list=[i['url'] for i in healthy]
  328. # # )
  329. # # except Exception as e:
  330. # # print("error--{}".format(e))
  331. # #
  332. # # try:
  333. # # fun = [i for i in category_list if i['cate'] == '娱乐八卦']
  334. # # cls.AidApi.updateArticleIntoCrawlerPlan(
  335. # # plan_id=plan_id,
  336. # # plan_name="娱乐八卦类冷启",
  337. # # plan_tag=cls.pool3,
  338. # # url_list=[i['url'] for i in fun]
  339. # # )
  340. # # except Exception as e:
  341. # # print("error--{}".format(e))
  342. # #
  343. # cls.AidApi.updateArticleIntoCrawlerPlan(
  344. # plan_id=plan_id,
  345. # plan_name="文章账号联想冷启",
  346. # plan_tag=cls.pool3,
  347. # url_list=[i['url'] for i in association_list]
  348. # )
  349. if __name__ == '__main__':
  350. CT = ColdStartTask()
  351. CT.sendToColdPool()