hurry_up.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. import json
  2. import pandas as pd
  3. from tqdm import tqdm
  4. from datetime import datetime, timedelta
  5. from applications import AdMySQL, PQMySQL, WeixinSpider
  6. class DailyDataManager(object):
  7. """
  8. daily 数据每日更新
  9. """
  10. ad_mysql = AdMySQL()
  11. pq_mysql = PQMySQL()
  12. wx_spider = WeixinSpider()
  13. @classmethod
  14. def getPublishedArticles(cls):
  15. """
  16. 获取已经发布的文章的信息
  17. :return:
  18. """
  19. # sql = f"""
  20. # SELECT ContentUrl, wx_sn, createTime
  21. # FROM official_articles_v2
  22. # WHERE createTime > 1719763200;
  23. # """
  24. sql2 = f"""
  25. select ContentUrl, wx_sn, createTime from official_articles_v2 where createTime > 1719763200 and accountName in (
  26. select distinct account_name from account_avg_info_v2
  27. );
  28. """
  29. result_list = cls.pq_mysql.select(sql2)
  30. return result_list
  31. @classmethod
  32. def getRootSourceIds(cls, data_info):
  33. """
  34. 通过抓取接口获取 data_info
  35. :return:
  36. """
  37. url = data_info[0]
  38. article_detail = cls.wx_spider.get_article_text(url)
  39. print(url)
  40. print(article_detail)
  41. # print(json.dumps(article_detail, ensure_ascii=False, indent=4))
  42. mini_info = article_detail['data']['data']['mini_program']
  43. return data_info[1].decode(), mini_info, data_info[2]
  44. @classmethod
  45. def getMinigramInfo(cls, rootSourceId):
  46. """
  47. :param rootIdTuple:
  48. :return:
  49. """
  50. sql = f"""
  51. select type, machinecode, create_time, first_level_dt
  52. from changwen_data_base
  53. where rootsourceid = '{rootSourceId}';
  54. """
  55. result_list = cls.ad_mysql.select(sql)
  56. def summarize(values):
  57. """
  58. :param values:
  59. :return:
  60. """
  61. L = {}
  62. first_level = {}
  63. fission_level = {}
  64. for line in values:
  65. # 先统计首层
  66. if line[0] == '首层':
  67. c_time = line[-2].__str__().split(" ")[0]
  68. if first_level.get(c_time):
  69. first_level[c_time].add(line[1])
  70. else:
  71. first_level[c_time] = {line[1]}
  72. else:
  73. dt = str(line[-1])
  74. first_level_dt = datetime.strptime(dt, '%Y%m%d')
  75. create_level_dt = line[-2]
  76. delta = create_level_dt - first_level_dt
  77. days = int(delta.days)
  78. key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
  79. if fission_level.get(key_dt):
  80. fission_level[key_dt].append((line[1], days))
  81. else:
  82. fission_level[key_dt] = [(line[1], days)]
  83. tt = {}
  84. for key in fission_level:
  85. detail_list = fission_level[key]
  86. temp = {}
  87. for item in detail_list:
  88. mid, days = item
  89. if temp.get(days):
  90. temp[days].add(mid)
  91. else:
  92. temp[days] = {mid}
  93. final = {}
  94. for sub_key in temp:
  95. length = len(temp[sub_key])
  96. final[sub_key] = length
  97. tt[key] = final
  98. for key in first_level:
  99. temp = [len(first_level[key]), tt.get(key, {}).get(0, 0), tt.get(key, {}).get(1, 0), tt.get(key, {}).get(2, 0)]
  100. L[key] = temp
  101. return L
  102. return summarize(result_list)
  103. @classmethod
  104. def getArticleInfo(cls, trace_id):
  105. """
  106. 通过 trace_id来获取文章信息
  107. :param trace_id:
  108. :return:
  109. """
  110. sql = f"""
  111. SELECT account_name, article_title
  112. FROM long_articles_video
  113. WHERE trace_id = '{trace_id}';
  114. """
  115. info = cls.pq_mysql.select(sql)
  116. return info[0]
  117. @classmethod
  118. def updateDetail(cls):
  119. """
  120. :return:
  121. """
  122. sql = f"""
  123. select distinct root_source_id
  124. from long_articles_detail_info
  125. """
  126. source_id_list = cls.pq_mysql.select(sql)
  127. for item in tqdm(source_id_list):
  128. s_id = item[0]
  129. try:
  130. result = cls.getMinigramInfo(s_id)
  131. for key in result:
  132. recall_dt = key
  133. first_level = result[key][0]
  134. fission_0 = result[key][1]
  135. fission_1 = result[key][2]
  136. fission_2 = result[key][3]
  137. print(key, first_level, fission_0, fission_1, fission_2)
  138. update_sql = f"""
  139. UPDATE long_articles_detail_info
  140. set first_level = %s, fission_0 = %s, fission_1 = %s, fission_2 = %s
  141. where root_source_id = %s and recall_dt = %s;
  142. """
  143. try:
  144. cls.pq_mysql.update(
  145. sql=update_sql,
  146. params=(
  147. first_level, fission_0, fission_1, fission_2, s_id, recall_dt
  148. )
  149. )
  150. except Exception as e:
  151. print("insert error", e)
  152. except Exception as e:
  153. print(e)
  154. if __name__ == '__main__':
  155. DM = DailyDataManager()
  156. # DM.updateDetail()
  157. publishArticles = DM.getPublishedArticles()
  158. print(len(publishArticles))
  159. for line in tqdm(publishArticles):
  160. try:
  161. wx_sn, mini_info, create_time = DM.getRootSourceIds(line)
  162. dt_object = datetime.fromtimestamp(create_time)
  163. publish_dt = dt_object.strftime('%Y-%m-%d')
  164. one_day = timedelta(days=1)
  165. two_day = timedelta(days=2)
  166. next_day = dt_object + one_day
  167. next_next_day = dt_object + two_day
  168. recall_dt_list = [dt_object, next_day, next_next_day]
  169. recall_dt_str_list = [i.strftime('%Y-%m-%d') for i in recall_dt_list]
  170. for dt_str in recall_dt_str_list:
  171. for index, item in enumerate(mini_info, 1):
  172. image_url = item['image_url']
  173. nick_name = item['nike_name']
  174. root_source_id = item['path'].split("rootSourceId%3D")[-1]
  175. video_id = item['path'].split("videos%3Fid%3D")[1].split("%26su%3D")[0]
  176. kimi_title = item['title']
  177. insert_sql = f"""
  178. INSERT INTO long_articles_detail_info
  179. (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
  180. values
  181. (%s, %s, %s, %s, %s, %s, %s, %s, %s);
  182. """
  183. DM.pq_mysql.update(
  184. sql=insert_sql,
  185. params=(
  186. wx_sn,
  187. kimi_title,
  188. nick_name,
  189. image_url,
  190. index,
  191. root_source_id,
  192. video_id,
  193. publish_dt,
  194. dt_str
  195. )
  196. )
  197. except Exception as e:
  198. print(e)
  199. pass
  200. # for line in DailyIds:
  201. # try:
  202. # source_id_tuple = DM.getRootSourceIds(trace_id=line)
  203. # result = DM.getMinigramInfo(source_id_tuple)
  204. # print(line)
  205. # print(result)
  206. # print("\n")
  207. # except Exception as e:
  208. # print(e)
  209. # print(line)
  210. # L = {}
  211. # trace_id = "search-a9bb246a-57fa-49f4-88d7-eec575813130-1723608633"
  212. # source_id_tuple = DM.getRootSourceIds(trace_id=trace_id)
  213. # result = DM.getMinigramInfo(source_id_tuple)
  214. # print(result)
  215. # for t_id in tqdm(DailyIds):
  216. # s_tuple = DM.getRootSourceIds(trace_id=t_id)
  217. # first_, fission_ = DM.getMinigramInfo(s_tuple)
  218. # obj = {
  219. # "first_": first_,
  220. # "fission_": fission_,
  221. # "rate": fission_ / first_ if first_ > 0 else 0
  222. # }
  223. # L[t_id] = obj
  224. # Df = []
  225. # with open("t.json", encoding="utf-8") as f:
  226. # L = json.loads(f.read())
  227. # for key in L:
  228. # print(key)
  229. # value = L[key]
  230. # result = DM.getArticleInfo(trace_id=key)
  231. # account_name, article_title = result
  232. # temp = [
  233. # account_name,
  234. # article_title,
  235. # value['first_'],
  236. # value['fission_'],
  237. # value['rate']
  238. # ]
  239. # Df.append(temp)
  240. # df = pd.DataFrame(Df, columns=['account_name', 'article_title', 'first_', 'fission_', 'rate0'])
  241. # df.to_excel("0825.xlsx", index=False)