updateMinigramInfoDaily.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. import json
  2. from concurrent.futures.thread import ThreadPoolExecutor
  3. from tqdm import tqdm
  4. from datetime import datetime, timedelta
  5. from applications import AdMySQL, PQMySQL, WeixinSpider
  6. class DailyDataManager(object):
  7. """
  8. daily 数据每日更新
  9. """
  10. ad_mysql = AdMySQL()
  11. pq_mysql = PQMySQL()
  12. wx_spider = WeixinSpider()
  13. @classmethod
  14. def getPublishedArticles(cls):
  15. """
  16. 获取已经发布的文章的信息
  17. :return:
  18. """
  19. sql2 = f"""
  20. select ContentUrl, wx_sn, createTime
  21. from official_articles_v2
  22. where createTime >= 1724774400
  23. and accountName in (
  24. select distinct account_name from account_avg_info_v2
  25. );
  26. """
  27. result_list = cls.pq_mysql.select(sql2)
  28. return result_list
  29. @classmethod
  30. def getRootSourceIds(cls, data_info):
  31. """
  32. 通过抓取接口获取 data_info
  33. :return:
  34. """
  35. url = data_info[0]
  36. article_detail = cls.wx_spider.get_article_text(url)
  37. print(url)
  38. print(article_detail)
  39. mini_info = article_detail['data']['data']['mini_program']
  40. return data_info[1].decode(), mini_info, data_info[2]
  41. @classmethod
  42. def getMinigramInfo(cls, rootSourceId):
  43. """
  44. :param rootIdTuple:
  45. :return:
  46. """
  47. sql = f"""
  48. select type, machinecode, create_time, first_level_dt
  49. from changwen_data_base_v2
  50. where rootsourceid = '{rootSourceId}';
  51. """
  52. result_list = cls.ad_mysql.select(sql)
  53. def summarize(values):
  54. """
  55. :param values:
  56. :return:
  57. """
  58. L = {}
  59. first_level = {}
  60. fission_level = {}
  61. for line in values:
  62. # 先统计首层
  63. if line[0] == '首层':
  64. try:
  65. dt = str(line[-1])
  66. key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
  67. if first_level.get(key_dt):
  68. first_level[key_dt].add(line[1])
  69. else:
  70. first_level[key_dt] = {line[1]}
  71. except Exception as e:
  72. continue
  73. else:
  74. try:
  75. dt = str(line[-1])
  76. first_level_dt = datetime.strptime(dt, '%Y%m%d')
  77. create_level_dt = line[-2]
  78. delta = create_level_dt - first_level_dt
  79. days = int(delta.days)
  80. key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
  81. if fission_level.get(key_dt):
  82. fission_level[key_dt].append((line[1], days))
  83. else:
  84. fission_level[key_dt] = [(line[1], days)]
  85. except Exception as e:
  86. continue
  87. # print("first level dt is NULL")
  88. tt = {}
  89. for key in fission_level:
  90. detail_list = fission_level[key]
  91. temp = {}
  92. for item in detail_list:
  93. mid, days = item
  94. if temp.get(days):
  95. temp[days].add(mid)
  96. else:
  97. temp[days] = {mid}
  98. final = {}
  99. for sub_key in temp:
  100. length = len(temp[sub_key])
  101. final[sub_key] = length
  102. tt[key] = final
  103. for key in first_level:
  104. temp = [len(first_level[key]), tt.get(key, {}).get(0, 0), tt.get(key, {}).get(1, 0), tt.get(key, {}).get(2, 0)]
  105. L[key] = temp
  106. return L
  107. return summarize(result_list)
  108. @classmethod
  109. def getArticleInfo(cls, trace_id):
  110. """
  111. 通过 trace_id来获取文章信息
  112. :param trace_id:
  113. :return:
  114. """
  115. sql = f"""
  116. SELECT account_name, article_title
  117. FROM long_articles_video
  118. WHERE trace_id = '{trace_id}';
  119. """
  120. info = cls.pq_mysql.select(sql)
  121. return info[0]
  122. @classmethod
  123. def updateDetail(cls):
  124. """
  125. :return:
  126. """
  127. sql = f"""
  128. select distinct root_source_id
  129. from long_articles_detail_info;
  130. """
  131. source_id_list = cls.pq_mysql.select(sql)
  132. for item in tqdm(source_id_list):
  133. s_id = item[0]
  134. try:
  135. result = cls.getMinigramInfo(s_id)
  136. for key in result:
  137. recall_dt = key
  138. first_level = result[key][0]
  139. fission_0 = result[key][1]
  140. fission_1 = result[key][2]
  141. fission_2 = result[key][3]
  142. print(key, first_level, fission_0, fission_1, fission_2)
  143. update_sql = f"""
  144. UPDATE long_articles_detail_info
  145. set first_level = %s, fission_0 = %s, fission_1 = %s, fission_2 = %s
  146. where root_source_id = %s and recall_dt = %s;
  147. """
  148. try:
  149. cls.pq_mysql.update(
  150. sql=update_sql,
  151. params=(
  152. first_level, fission_0, fission_1, fission_2, s_id, recall_dt
  153. )
  154. )
  155. except Exception as e:
  156. print("insert error", e)
  157. except Exception as e:
  158. print(e)
  159. if __name__ == '__main__':
  160. DM = DailyDataManager()
  161. # result = DM.getMinigramInfo("longArticles_d409f27d9d64501d6811b47a3779d2d7")
  162. # print(result)
  163. # DM.updateDetail()