updateMinigramInfoDaily.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. """
  2. @author luojunhui
  3. @description Update Minigram Info Daily
  4. """
  5. import time
  6. import sys
  7. from tqdm import tqdm
  8. from datetime import datetime, timedelta
  9. import schedule
  10. from argparse import ArgumentParser
  11. from applications import longArticlesMySQL, PQMySQL, WeixinSpider, Functions, log, bot
  12. def get_yesterday():
  13. yesterday = datetime.today() - timedelta(1)
  14. return yesterday
  15. class DailyDataManager(object):
  16. """
  17. daily 数据每日更新
  18. """
  19. laMysql = longArticlesMySQL()
  20. pqMysql = PQMySQL()
  21. wxSpider = WeixinSpider()
  22. functions = Functions()
  23. @classmethod
  24. def getPublishedArticles(cls, biz_date):
  25. """
  26. 获取已经发布的文章的信息, createTime 选择为前一天的 0 点并且转化为时间戳
  27. :return:
  28. """
  29. biz_date_midnight = datetime(year=biz_date.year, month=biz_date.month, day=biz_date.day)
  30. biz_date_ts = biz_date_midnight.timestamp()
  31. biz_date_end_ts = biz_date_ts + 24 * 60 * 60 - 1
  32. sql2 = f"""
  33. select ContentUrl, wx_sn, createTime
  34. from official_articles_v2
  35. where createTime between {biz_date_ts} and {biz_date_end_ts};
  36. -- and accountName in (
  37. -- select distinct account_name from account_avg_info_v2
  38. -- );
  39. """
  40. result_list = cls.pqMysql.select(sql2)
  41. log(
  42. task="updateMinigramInfoDaily",
  43. function="getPublishedArticles",
  44. message="一共获取 {} 篇文章数据".format(len(result_list))
  45. )
  46. return result_list
  47. @classmethod
  48. def updateInfo(cls, line):
  49. """
  50. update info into mysql
  51. :return:
  52. """
  53. try:
  54. wx_sn, mini_info, create_time = cls.getRootSourceIds(line)
  55. dt_object = datetime.fromtimestamp(create_time)
  56. publish_dt = dt_object.strftime('%Y-%m-%d')
  57. one_day = timedelta(days=1)
  58. two_day = timedelta(days=2)
  59. next_day = dt_object + one_day
  60. next_next_day = dt_object + two_day
  61. recall_dt_list = [dt_object, next_day, next_next_day]
  62. recall_dt_str_list = [i.strftime('%Y-%m-%d') for i in recall_dt_list]
  63. for dt_str in recall_dt_str_list:
  64. for index, item in enumerate(mini_info, 1):
  65. image_url = item['image_url']
  66. nick_name = item['nike_name']
  67. root_source_id = item['path'].split("rootSourceId%3D")[-1]
  68. video_id = item['path'].split("videos%3Fid%3D")[1].split("%26su%3D")[0]
  69. kimi_title = item['title']
  70. # print(image_url, nick_name, root_source_id, video_id, kimi_title)
  71. insert_sql = f"""
  72. INSERT INTO long_articles_detail_info
  73. (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
  74. values
  75. (%s, %s, %s, %s, %s, %s, %s, %s, %s);
  76. """
  77. cls.pqMysql.update(
  78. sql=insert_sql,
  79. params=(
  80. wx_sn,
  81. kimi_title,
  82. nick_name,
  83. image_url,
  84. index,
  85. root_source_id,
  86. video_id,
  87. publish_dt,
  88. dt_str
  89. )
  90. )
  91. log(
  92. task="updateMinigramInfoDaily",
  93. function="updateInfo",
  94. message="插入数据成功, video_id 是: {}".format(video_id)
  95. )
  96. except Exception as e:
  97. log(
  98. task="updateMinigramInfoDaily",
  99. function="updateInfo",
  100. status="fail",
  101. message="插入数据失败, 失败原因是".format(e)
  102. )
  103. @classmethod
  104. def getRootSourceIds(cls, data_info):
  105. """
  106. 通过抓取接口获取 data_info
  107. :return:
  108. """
  109. url = data_info[0]
  110. try:
  111. article_detail = cls.wxSpider.get_article_text(url)
  112. mini_info = article_detail['data']['data']['mini_program']
  113. log(
  114. task="updateMinigramInfoDaily",
  115. function="getRootSourceIds",
  116. message="获取文章链接对应的 rootSourceId 成功",
  117. data={
  118. "ContentUrl": url,
  119. "wxSn": data_info[1].decode(),
  120. "createTime": data_info[2],
  121. "miniInfo": mini_info
  122. }
  123. )
  124. return data_info[1].decode(), mini_info, data_info[2]
  125. except Exception as e:
  126. log(
  127. task="updateMinigramInfoDaily",
  128. function="getRootSourceIds",
  129. status="fail",
  130. message="获取文章链接对应的 rootSourceId失败, 报错信息是: {}".format(e),
  131. data={
  132. "ContentUrl": url
  133. }
  134. )
  135. return
  136. @classmethod
  137. def getMinigramInfo(cls, rootSourceId):
  138. """
  139. :param rootSourceId:
  140. :return:
  141. """
  142. sql = f"""
  143. select type, machinecode, create_time, first_level_dt
  144. from changwen_data_base_v2
  145. where rootsourceid = '{rootSourceId}';
  146. """
  147. result_list = cls.laMysql.select(sql)
  148. def summarize(values):
  149. """
  150. :param values:
  151. :return:
  152. """
  153. L = {}
  154. first_level = {}
  155. fission_level = {}
  156. for line in values:
  157. # 先统计首层
  158. if line[0] == '首层':
  159. try:
  160. dt = str(line[-1])
  161. key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
  162. if first_level.get(key_dt):
  163. first_level[key_dt].add(line[1])
  164. else:
  165. first_level[key_dt] = {line[1]}
  166. except Exception as e:
  167. continue
  168. else:
  169. try:
  170. dt = str(line[-1])
  171. first_level_dt = datetime.strptime(dt, '%Y%m%d')
  172. create_level_dt = line[-2]
  173. delta = create_level_dt - first_level_dt
  174. days = int(delta.days)
  175. key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
  176. if fission_level.get(key_dt):
  177. fission_level[key_dt].append((line[1], days))
  178. else:
  179. fission_level[key_dt] = [(line[1], days)]
  180. except Exception as e:
  181. continue
  182. # print("first level dt is NULL")
  183. tt = {}
  184. for key in fission_level:
  185. detail_list = fission_level[key]
  186. temp = {}
  187. for item in detail_list:
  188. mid, days = item
  189. if temp.get(days):
  190. temp[days].add(mid)
  191. else:
  192. temp[days] = {mid}
  193. final = {}
  194. for sub_key in temp:
  195. length = len(temp[sub_key])
  196. final[sub_key] = length
  197. tt[key] = final
  198. for key in first_level:
  199. temp = [len(first_level[key]), tt.get(key, {}).get(0, 0), tt.get(key, {}).get(1, 0), tt.get(key, {}).get(2, 0)]
  200. L[key] = temp
  201. return L
  202. try:
  203. response = summarize(result_list)
  204. log(
  205. task="updateMinigramInfoDaily",
  206. function="getMinigramInfo",
  207. message="计算source_id信息成功",
  208. data=response
  209. )
  210. return response
  211. except Exception as e:
  212. log(
  213. task="updateMinigramInfoDaily",
  214. function="getMinigramInfo",
  215. message="获取 source_id信息失败, 报错信息是: {}".format(e),
  216. status="fail"
  217. )
  218. return None
  219. @classmethod
  220. def updateDetail(cls, biz_date):
  221. """
  222. :return:
  223. """
  224. today = datetime.today()
  225. # 获取三天前的日期
  226. stats_date = biz_date - timedelta(days=3)
  227. stats_date_str = stats_date.__str__().split(" ")[0]
  228. sql = f"""
  229. select distinct root_source_id
  230. from long_articles_detail_info
  231. where publish_dt >= '{stats_date_str}';
  232. """
  233. source_id_list = cls.pqMysql.select(sql)
  234. log(
  235. task="updateMinigramInfoDaily",
  236. function="updateDetail",
  237. message="获取前三天的 rootSourceId, 一共有 {} 条记录".format(len(source_id_list))
  238. )
  239. for item in tqdm(source_id_list):
  240. s_id = item[0]
  241. try:
  242. result = cls.getMinigramInfo(s_id)
  243. for key in result:
  244. recall_dt = key
  245. first_level = result[key][0]
  246. fission_0 = result[key][1]
  247. fission_1 = result[key][2]
  248. fission_2 = result[key][3]
  249. # print(s_id, recall_dt, first_level, fission_0, fission_1, fission_2)
  250. update_sql = f"""
  251. UPDATE long_articles_detail_info
  252. set first_level = %s, fission_0 = %s, fission_1 = %s, fission_2 = %s
  253. where root_source_id = %s and recall_dt = %s;
  254. """
  255. try:
  256. cls.pqMysql.update(
  257. sql=update_sql,
  258. params=(
  259. first_level, fission_0, fission_1, fission_2, s_id, recall_dt
  260. )
  261. )
  262. except Exception as e:
  263. log(
  264. task="updateMinigramInfoDaily",
  265. function="updateDetail",
  266. status="fail",
  267. message="mysql 更新失败, 报错信息是 {}".format(e)
  268. )
  269. except Exception as e:
  270. log(
  271. task="updateMinigramInfoDaily",
  272. function="updateDetail",
  273. status="fail",
  274. message="更新单条数据失败, 报错信息是 {}".format(e)
  275. )
  276. def updateArticlesJob(biz_date=None):
  277. """
  278. 更新文章数据
  279. :return:
  280. """
  281. if not biz_date:
  282. biz_date = get_yesterday()
  283. DDM = DailyDataManager()
  284. article_list = DDM.getPublishedArticles(biz_date)
  285. for article in tqdm(article_list):
  286. DDM.updateInfo(article)
  287. log(
  288. task="updateMinigramInfoDaily",
  289. function="updateArticlesJob",
  290. message="文章更新完成---{}".format(biz_date.__str__())
  291. )
  292. def updateMinigramInfoJob(biz_date=None):
  293. """
  294. 更新前三天小程序数据
  295. :return:
  296. """
  297. if not biz_date:
  298. biz_date = get_yesterday()
  299. DDM = DailyDataManager()
  300. try:
  301. DDM.updateDetail(biz_date)
  302. log(
  303. task="updateMinigramInfoDaily",
  304. function="updateArticlesJob",
  305. message="小程序更新完成---{}".format(biz_date.__str__())
  306. )
  307. except Exception as e:
  308. log(
  309. task="updateMinigramInfoDaily",
  310. function="updateArticlesJob",
  311. status="fail",
  312. message="小程序更新失败---{}, 报错信息是: {}".format(biz_date.__str__(), e)
  313. )
  314. def main():
  315. parser = ArgumentParser()
  316. parser.add_argument("--run-date",
  317. help="Run only once for date in format of %Y%m%d. \
  318. If no specified, run as daily jobs.")
  319. args = parser.parse_args()
  320. if args.run_date:
  321. biz_date = datetime.strptime(args.run_date, "%Y%m%d")
  322. print("Run in manual mode. Date: {}".format(args.run_date))
  323. updateArticlesJob(biz_date)
  324. updateMinigramInfoJob(biz_date)
  325. return
  326. else:
  327. print("Run in daily mode.")
  328. schedule.every().day.at("01:30").do(Functions().job_with_thread, updateArticlesJob)
  329. schedule.every().day.at("03:30").do(Functions().job_with_thread, updateMinigramInfoJob)
  330. while True:
  331. schedule.run_pending()
  332. time.sleep(1)
  333. # log(
  334. # task="updateMinigramInfoDaily",
  335. # function="main",
  336. # message="更新文章小程序信息任务正常执行"
  337. # )
  338. if __name__ == '__main__':
  339. main()