hurry_up.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. import json
  2. from concurrent.futures.thread import ThreadPoolExecutor
  3. from tqdm import tqdm
  4. from datetime import datetime, timedelta
  5. from applications import AdMySQL, PQMySQL, WeixinSpider
  6. class DailyDataManager(object):
  7. """
  8. daily 数据每日更新
  9. """
  10. ad_mysql = AdMySQL()
  11. pq_mysql = PQMySQL()
  12. wx_spider = WeixinSpider()
  13. @classmethod
  14. def getPublishedArticles(cls):
  15. """
  16. 获取已经发布的文章的信息
  17. :return:
  18. """
  19. sql2 = f"""
  20. select ContentUrl, wx_sn, createTime from official_articles_v2 where createTime > 1719763200 and accountName in (
  21. select distinct account_name from account_avg_info_v2);
  22. """
  23. result_list = cls.pq_mysql.select(sql2)
  24. return result_list
  25. @classmethod
  26. def getRootSourceIds(cls, data_info):
  27. """
  28. 通过抓取接口获取 data_info
  29. :return:
  30. """
  31. url = data_info[0]
  32. article_detail = cls.wx_spider.get_article_text(url)
  33. print(url)
  34. print(article_detail)
  35. mini_info = article_detail['data']['data']['mini_program']
  36. return data_info[1].decode(), mini_info, data_info[2]
  37. @classmethod
  38. def getMinigramInfo(cls, rootSourceId):
  39. """
  40. :param rootIdTuple:
  41. :return:
  42. """
  43. sql = f"""
  44. select type, machinecode, create_time, first_level_dt
  45. from changwen_data_base_v2
  46. where rootsourceid = '{rootSourceId}';
  47. """
  48. result_list = cls.ad_mysql.select(sql)
  49. def summarize(values):
  50. """
  51. :param values:
  52. :return:
  53. """
  54. L = {}
  55. first_level = {}
  56. fission_level = {}
  57. for line in values:
  58. # 先统计首层
  59. if line[0] == '首层':
  60. try:
  61. dt = str(line[-1])
  62. key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
  63. if first_level.get(key_dt):
  64. first_level[key_dt].add(line[1])
  65. else:
  66. first_level[key_dt] = {line[1]}
  67. except Exception as e:
  68. continue
  69. else:
  70. try:
  71. dt = str(line[-1])
  72. first_level_dt = datetime.strptime(dt, '%Y%m%d')
  73. create_level_dt = line[-2]
  74. delta = create_level_dt - first_level_dt
  75. days = int(delta.days)
  76. key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
  77. if fission_level.get(key_dt):
  78. fission_level[key_dt].append((line[1], days))
  79. else:
  80. fission_level[key_dt] = [(line[1], days)]
  81. except Exception as e:
  82. continue
  83. # print("first level dt is NULL")
  84. tt = {}
  85. for key in fission_level:
  86. detail_list = fission_level[key]
  87. temp = {}
  88. for item in detail_list:
  89. mid, days = item
  90. if temp.get(days):
  91. temp[days].add(mid)
  92. else:
  93. temp[days] = {mid}
  94. final = {}
  95. for sub_key in temp:
  96. length = len(temp[sub_key])
  97. final[sub_key] = length
  98. tt[key] = final
  99. for key in first_level:
  100. temp = [len(first_level[key]), tt.get(key, {}).get(0, 0), tt.get(key, {}).get(1, 0), tt.get(key, {}).get(2, 0)]
  101. L[key] = temp
  102. return L
  103. return summarize(result_list)
  104. @classmethod
  105. def getArticleInfo(cls, trace_id):
  106. """
  107. 通过 trace_id来获取文章信息
  108. :param trace_id:
  109. :return:
  110. """
  111. sql = f"""
  112. SELECT account_name, article_title
  113. FROM long_articles_video
  114. WHERE trace_id = '{trace_id}';
  115. """
  116. info = cls.pq_mysql.select(sql)
  117. return info[0]
  118. @classmethod
  119. def updateDetail(cls):
  120. """
  121. :return:
  122. """
  123. sql = f"""
  124. select distinct root_source_id
  125. from long_articles_detail_info;
  126. """
  127. source_id_list = cls.pq_mysql.select(sql)
  128. for item in tqdm(source_id_list):
  129. s_id = item[0]
  130. try:
  131. result = cls.getMinigramInfo(s_id)
  132. for key in result:
  133. recall_dt = key
  134. first_level = result[key][0]
  135. fission_0 = result[key][1]
  136. fission_1 = result[key][2]
  137. fission_2 = result[key][3]
  138. print(key, first_level, fission_0, fission_1, fission_2)
  139. update_sql = f"""
  140. UPDATE long_articles_detail_info
  141. set first_level = %s, fission_0 = %s, fission_1 = %s, fission_2 = %s
  142. where root_source_id = %s and recall_dt = %s;
  143. """
  144. try:
  145. cls.pq_mysql.update(
  146. sql=update_sql,
  147. params=(
  148. first_level, fission_0, fission_1, fission_2, s_id, recall_dt
  149. )
  150. )
  151. except Exception as e:
  152. print("insert error", e)
  153. except Exception as e:
  154. print(e)
  155. if __name__ == '__main__':
  156. DM = DailyDataManager()
  157. # result = DM.getMinigramInfo("longArticles_d409f27d9d64501d6811b47a3779d2d7")
  158. # print(result)
  159. # DM.updateDetail()