updateMinigramInfoDaily.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. """
  2. @author luojunhui
  3. @description Update Minigram Info Daily
  4. """
  5. import time
  6. from tqdm import tqdm
  7. from datetime import datetime, timedelta
  8. import schedule
  9. from applications import longArticlesMySQL, PQMySQL, WeixinSpider, Functions
  10. class DailyDataManager(object):
  11. """
  12. daily 数据每日更新
  13. """
  14. laMysql = longArticlesMySQL()
  15. pqMysql = PQMySQL()
  16. wxSpider = WeixinSpider()
  17. functions = Functions()
  18. @classmethod
  19. def getPublishedArticles(cls):
  20. """
  21. 获取已经发布的文章的信息, createTime 选择为前一天的 0 点并且转化为时间戳
  22. :return:
  23. """
  24. today = datetime.today()
  25. # 获取昨天的日期
  26. yesterday = today - timedelta(days=1)
  27. yesterday_midnight = datetime(year=yesterday.year, month=yesterday.month, day=yesterday.day)
  28. yesterday_timestamp = yesterday_midnight.timestamp()
  29. sql2 = f"""
  30. select ContentUrl, wx_sn, createTime
  31. from official_articles_v2
  32. where createTime >= {yesterday_timestamp}
  33. and accountName in (
  34. select distinct account_name from account_avg_info_v2
  35. );
  36. """
  37. result_list = cls.pqMysql.select(sql2)
  38. return result_list
  39. @classmethod
  40. def updateInfo(cls, line):
  41. """
  42. update info into mysql
  43. :return:
  44. """
  45. try:
  46. wx_sn, mini_info, create_time = cls.getRootSourceIds(line)
  47. dt_object = datetime.fromtimestamp(create_time)
  48. publish_dt = dt_object.strftime('%Y-%m-%d')
  49. one_day = timedelta(days=1)
  50. two_day = timedelta(days=2)
  51. next_day = dt_object + one_day
  52. next_next_day = dt_object + two_day
  53. recall_dt_list = [dt_object, next_day, next_next_day]
  54. recall_dt_str_list = [i.strftime('%Y-%m-%d') for i in recall_dt_list]
  55. for dt_str in recall_dt_str_list:
  56. for index, item in enumerate(mini_info, 1):
  57. image_url = item['image_url']
  58. nick_name = item['nike_name']
  59. root_source_id = item['path'].split("rootSourceId%3D")[-1]
  60. video_id = item['path'].split("videos%3Fid%3D")[1].split("%26su%3D")[0]
  61. kimi_title = item['title']
  62. # print(image_url, nick_name, root_source_id, video_id, kimi_title)
  63. insert_sql = f"""
  64. INSERT INTO long_articles_detail_info
  65. (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
  66. values
  67. (%s, %s, %s, %s, %s, %s, %s, %s, %s);
  68. """
  69. cls.pqMysql.update(
  70. sql=insert_sql,
  71. params=(
  72. wx_sn,
  73. kimi_title,
  74. nick_name,
  75. image_url,
  76. index,
  77. root_source_id,
  78. video_id,
  79. publish_dt,
  80. dt_str
  81. )
  82. )
  83. except Exception as e:
  84. print(e)
  85. pass
  86. @classmethod
  87. def getRootSourceIds(cls, data_info):
  88. """
  89. 通过抓取接口获取 data_info
  90. :return:
  91. """
  92. url = data_info[0]
  93. article_detail = cls.wxSpider.get_article_text(url)
  94. mini_info = article_detail['data']['data']['mini_program']
  95. return data_info[1].decode(), mini_info, data_info[2]
  96. @classmethod
  97. def getMinigramInfo(cls, rootSourceId):
  98. """
  99. :param rootSourceId:
  100. :return:
  101. """
  102. sql = f"""
  103. select type, machinecode, create_time, first_level_dt
  104. from changwen_data_base_v2
  105. where rootsourceid = '{rootSourceId}';
  106. """
  107. result_list = cls.laMysql.select(sql)
  108. def summarize(values):
  109. """
  110. :param values:
  111. :return:
  112. """
  113. L = {}
  114. first_level = {}
  115. fission_level = {}
  116. for line in values:
  117. # 先统计首层
  118. if line[0] == '首层':
  119. try:
  120. dt = str(line[-1])
  121. key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
  122. if first_level.get(key_dt):
  123. first_level[key_dt].add(line[1])
  124. else:
  125. first_level[key_dt] = {line[1]}
  126. except Exception as e:
  127. continue
  128. else:
  129. try:
  130. dt = str(line[-1])
  131. first_level_dt = datetime.strptime(dt, '%Y%m%d')
  132. create_level_dt = line[-2]
  133. delta = create_level_dt - first_level_dt
  134. days = int(delta.days)
  135. key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
  136. if fission_level.get(key_dt):
  137. fission_level[key_dt].append((line[1], days))
  138. else:
  139. fission_level[key_dt] = [(line[1], days)]
  140. except Exception as e:
  141. continue
  142. # print("first level dt is NULL")
  143. tt = {}
  144. for key in fission_level:
  145. detail_list = fission_level[key]
  146. temp = {}
  147. for item in detail_list:
  148. mid, days = item
  149. if temp.get(days):
  150. temp[days].add(mid)
  151. else:
  152. temp[days] = {mid}
  153. final = {}
  154. for sub_key in temp:
  155. length = len(temp[sub_key])
  156. final[sub_key] = length
  157. tt[key] = final
  158. for key in first_level:
  159. temp = [len(first_level[key]), tt.get(key, {}).get(0, 0), tt.get(key, {}).get(1, 0), tt.get(key, {}).get(2, 0)]
  160. L[key] = temp
  161. return L
  162. return summarize(result_list)
  163. @classmethod
  164. def updateDetail(cls):
  165. """
  166. :return:
  167. """
  168. today = datetime.today()
  169. # 获取昨天的日期
  170. yesterday = today - timedelta(days=3)
  171. yesterday_str = yesterday.__str__().split(" ")[0]
  172. print(yesterday_str)
  173. sql = f"""
  174. select distinct root_source_id
  175. from long_articles_detail_info
  176. where publish_dt >= '{yesterday_str}';
  177. """
  178. source_id_list = cls.pqMysql.select(sql)
  179. for item in tqdm(source_id_list):
  180. s_id = item[0]
  181. try:
  182. result = cls.getMinigramInfo(s_id)
  183. for key in result:
  184. recall_dt = key
  185. first_level = result[key][0]
  186. fission_0 = result[key][1]
  187. fission_1 = result[key][2]
  188. fission_2 = result[key][3]
  189. # print(s_id, recall_dt, first_level, fission_0, fission_1, fission_2)
  190. update_sql = f"""
  191. UPDATE long_articles_detail_info
  192. set first_level = %s, fission_0 = %s, fission_1 = %s, fission_2 = %s
  193. where root_source_id = %s and recall_dt = %s;
  194. """
  195. try:
  196. cls.pqMysql.update(
  197. sql=update_sql,
  198. params=(
  199. first_level, fission_0, fission_1, fission_2, s_id, recall_dt
  200. )
  201. )
  202. except Exception as e:
  203. print("insert error", e)
  204. except Exception as e:
  205. print(e)
  206. def updateArticlesJob():
  207. """
  208. 更新文章数据
  209. :return:
  210. """
  211. DDM = DailyDataManager()
  212. article_list = DDM.getPublishedArticles()
  213. for article in tqdm(article_list):
  214. DDM.updateInfo(article)
  215. print("文章更新完成---{}".format(datetime.today().__str__()))
  216. def updateMinigramInfoJob():
  217. """
  218. 更新前三天小程序数据
  219. :return:
  220. """
  221. DDM = DailyDataManager()
  222. DDM.updateDetail()
  223. print("小程序更新完成---{}".format(datetime.today().__str__()))
  224. if __name__ == '__main__':
  225. # updateArticlesJob()
  226. # updateMinigramInfoJob()
  227. schedule.every().day.at("01:00").do(Functions().job_with_thread, updateArticlesJob)
  228. schedule.every().day.at("03:30").do(Functions().job_with_thread, updateMinigramInfoJob)
  229. while True:
  230. schedule.run_pending()
  231. time.sleep(1)