updateMinigramInfoDaily.py 15 KB


  1. """
  2. @author luojunhui
  3. @description Update Minigram Info Daily
  4. """
  5. import time
  6. import sys
  7. import traceback
  8. from tqdm import tqdm
  9. from datetime import datetime, timedelta
  10. import schedule
  11. from argparse import ArgumentParser
  12. from applications import longArticlesMySQL, PQMySQL, WeixinSpider, Functions, log, bot
  13. TASK_NAME = "updateMinigramInfoDaily"
  14. SPIDER_SUCCESS_STATUS = 0
  15. def get_yesterday():
  16. """
  17. get yesterday date
  18. :return:
  19. """
  20. yesterday = datetime.today() - timedelta(1)
  21. return yesterday
  22. class DailyDataManager(object):
  23. """
  24. daily 数据每日更新
  25. """
  26. long_articles_db = longArticlesMySQL()
  27. pq_db = PQMySQL()
  28. wx_spider = WeixinSpider()
  29. functions = Functions()
  30. @classmethod
  31. def get_published_articles(cls, biz_date):
  32. """
  33. 获取已经发布的文章的信息, updateTime 选择为前一天的 0 点并且转化为时间戳
  34. :return:
  35. """
  36. biz_date_midnight = datetime(year=biz_date.year, month=biz_date.month, day=biz_date.day)
  37. biz_date_ts = biz_date_midnight.timestamp()
  38. biz_date_end_ts = biz_date_ts + 24 * 60 * 60 - 1
  39. sql2 = f"""
  40. select ContentUrl, wx_sn, publish_timestamp, accountName, title
  41. from official_articles_v2
  42. where publish_timestamp between {biz_date_ts} and {biz_date_end_ts};
  43. -- and accountName in (
  44. -- select distinct account_name from account_avg_info_v2
  45. -- );
  46. """
  47. result_list = cls.pq_db.select(sql2)
  48. log(
  49. task=TASK_NAME,
  50. function="get_published_articles",
  51. message="一共获取 {} 篇文章数据".format(len(result_list))
  52. )
  53. return result_list
  54. @classmethod
  55. def update_article_info(cls, line):
  56. """
  57. update info into mysql
  58. :return:
  59. """
  60. url = line[0]
  61. update_time = line[2]
  62. wx_sn = line[1].decode()
  63. article_detail = cls.get_root_source_ids(line)
  64. if article_detail:
  65. response_code = article_detail['code']
  66. if response_code == SPIDER_SUCCESS_STATUS:
  67. mini_info = article_detail['data']['data']['mini_program']
  68. if mini_info:
  69. log(
  70. task=TASK_NAME,
  71. function="get_root_source_ids",
  72. message="获取文章链接对应的 rootSourceId 成功",
  73. data={
  74. "ContentUrl": url,
  75. "wxSn": wx_sn,
  76. "updateTime": update_time,
  77. "miniInfo": mini_info
  78. }
  79. )
  80. try:
  81. dt_object = datetime.fromtimestamp(update_time)
  82. publish_dt = dt_object.strftime('%Y-%m-%d')
  83. one_day = timedelta(days=1)
  84. two_day = timedelta(days=2)
  85. next_day = dt_object + one_day
  86. next_next_day = dt_object + two_day
  87. recall_dt_list = [dt_object, next_day, next_next_day]
  88. recall_dt_str_list = [i.strftime('%Y-%m-%d') for i in recall_dt_list]
  89. for dt_str in recall_dt_str_list:
  90. for index, item in enumerate(mini_info, 1):
  91. image_url = item['image_url']
  92. nick_name = item['nike_name']
  93. root_source_id = item['path'].split("rootSourceId%3D")[-1]
  94. video_id = item['path'].split("videos%3Fid%3D")[1].split("%26su%3D")[0]
  95. kimi_title = item['title']
  96. # print(image_url, nick_name, root_source_id, video_id, kimi_title)
  97. insert_sql = f"""
  98. INSERT INTO long_articles_detail_info
  99. (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
  100. values
  101. (%s, %s, %s, %s, %s, %s, %s, %s, %s);
  102. """
  103. cls.pq_db.update(
  104. sql=insert_sql,
  105. params=(
  106. wx_sn,
  107. kimi_title,
  108. nick_name,
  109. image_url,
  110. index,
  111. root_source_id,
  112. video_id,
  113. publish_dt,
  114. dt_str
  115. )
  116. )
  117. log(
  118. task=TASK_NAME,
  119. function="update_article_info",
  120. message="插入数据成功, video_id 是: {}".format(video_id)
  121. )
  122. except Exception as e:
  123. error_msg = traceback.format_exc()
  124. log(
  125. task=TASK_NAME,
  126. function="update_article_info",
  127. status="fail",
  128. message="插入数据失败, 失败原因是{}--{}".format(e, error_msg)
  129. )
  130. return None
  131. else:
  132. return line
  133. else:
  134. return line
  135. @classmethod
  136. def get_root_source_ids(cls, data_info):
  137. """
  138. 通过抓取接口获取 data_info
  139. :return:
  140. """
  141. url = data_info[0]
  142. try:
  143. article_detail = cls.wx_spider.get_article_text(url)
  144. return article_detail
  145. except Exception as e:
  146. log(
  147. task=TASK_NAME,
  148. function="get_root_source_ids",
  149. status="fail",
  150. message="获取文章链接对应的 rootSourceId失败, 报错信息是: {}".format(e),
  151. data={
  152. "ContentUrl": url
  153. }
  154. )
  155. return False
  156. @classmethod
  157. def get_minigram_info(cls, rootSourceId):
  158. """
  159. :param rootSourceId:
  160. :return:
  161. """
  162. sql = f"""
  163. select type, machinecode, create_time, first_level_dt
  164. from changwen_data_base_v2
  165. where rootsourceid = '{rootSourceId}';
  166. """
  167. result_list = cls.long_articles_db.select(sql)
  168. def summarize(values):
  169. """
  170. :param values:
  171. :return:
  172. """
  173. L = {}
  174. first_level = {}
  175. fission_level = {}
  176. for line in values:
  177. # 先统计首层
  178. if line[0] == '首层':
  179. try:
  180. dt = str(line[-1])
  181. key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
  182. if first_level.get(key_dt):
  183. first_level[key_dt].add(line[1])
  184. else:
  185. first_level[key_dt] = {line[1]}
  186. except Exception as e:
  187. continue
  188. else:
  189. try:
  190. dt = str(line[-1])
  191. first_level_dt = datetime.strptime(dt, '%Y%m%d')
  192. create_level_dt = line[-2]
  193. delta = create_level_dt - first_level_dt
  194. days = int(delta.days)
  195. key_dt = datetime.strptime(dt, '%Y%m%d').strftime('%Y-%m-%d')
  196. if fission_level.get(key_dt):
  197. fission_level[key_dt].append((line[1], days))
  198. else:
  199. fission_level[key_dt] = [(line[1], days)]
  200. except Exception as e:
  201. continue
  202. # print("first level dt is NULL")
  203. tt = {}
  204. for key in fission_level:
  205. detail_list = fission_level[key]
  206. temp = {}
  207. for item in detail_list:
  208. mid, days = item
  209. if temp.get(days):
  210. temp[days].add(mid)
  211. else:
  212. temp[days] = {mid}
  213. final = {}
  214. for sub_key in temp:
  215. length = len(temp[sub_key])
  216. final[sub_key] = length
  217. tt[key] = final
  218. for key in first_level:
  219. temp = [len(first_level[key]), tt.get(key, {}).get(0, 0), tt.get(key, {}).get(1, 0),
  220. tt.get(key, {}).get(2, 0)]
  221. L[key] = temp
  222. return L
  223. try:
  224. response = summarize(result_list)
  225. log(
  226. task=TASK_NAME,
  227. function="get_minigram_info",
  228. message="计算source_id信息成功",
  229. data=response
  230. )
  231. return response
  232. except Exception as e:
  233. log(
  234. task=TASK_NAME,
  235. function="get_minigram_info",
  236. message="获取 source_id信息失败, 报错信息是: {}".format(e),
  237. status="fail"
  238. )
  239. return None
  240. @classmethod
  241. def update_minigram_detail(cls, biz_date):
  242. """
  243. :return:
  244. """
  245. # 获取三天前的日期
  246. date_begin = biz_date - timedelta(days=3)
  247. datestr_begin = date_begin.strftime("%Y-%m-%d")
  248. datestr_end = biz_date.strftime("%Y-%m-%d")
  249. sql = f"""
  250. select distinct root_source_id
  251. from long_articles_detail_info
  252. where publish_dt between '{datestr_begin}' and '{datestr_end}';
  253. """
  254. source_id_list = cls.pq_db.select(sql)
  255. log(
  256. task=TASK_NAME,
  257. function="update_minigram_detail",
  258. message="获取前三天的 rootSourceId, 一共有 {} 条记录".format(len(source_id_list))
  259. )
  260. fail_count = 0
  261. for item in tqdm(source_id_list):
  262. s_id = item[0]
  263. try:
  264. result = cls.get_minigram_info(s_id)
  265. for key in result:
  266. recall_dt = key
  267. first_level = result[key][0]
  268. fission_0 = result[key][1]
  269. fission_1 = result[key][2]
  270. fission_2 = result[key][3]
  271. # print(s_id, recall_dt, first_level, fission_0, fission_1, fission_2)
  272. update_sql = f"""
  273. UPDATE long_articles_detail_info
  274. set first_level = %s, fission_0 = %s, fission_1 = %s, fission_2 = %s
  275. where root_source_id = %s and recall_dt = %s;
  276. """
  277. try:
  278. cls.pq_db.update(
  279. sql=update_sql,
  280. params=(
  281. first_level, fission_0, fission_1, fission_2, s_id, recall_dt
  282. )
  283. )
  284. except Exception as e:
  285. log(
  286. task=TASK_NAME,
  287. function="update_minigram_detail",
  288. status="fail",
  289. message="mysql 更新失败, 报错信息是 {}".format(e)
  290. )
  291. except Exception as e:
  292. log(
  293. task=TASK_NAME,
  294. function="update_minigram_detail",
  295. status="fail",
  296. message="更新单条数据失败, 报错信息是 {}".format(e),
  297. data={"error_msg": traceback.format_exc()}
  298. )
  299. fail_count += 1
  300. if fail_count:
  301. bot(
  302. title="{} fail because of lam db error".format(TASK_NAME),
  303. detail={
  304. "fail_count": fail_count
  305. }
  306. )
  307. def updateArticlesJob(biz_date=None):
  308. """
  309. 更新文章数据
  310. :return:
  311. """
  312. if not biz_date:
  313. biz_date = get_yesterday()
  314. data_manager = DailyDataManager()
  315. article_list = data_manager.get_published_articles(biz_date)
  316. failed_article_list = []
  317. for article in tqdm(article_list):
  318. failed_article = data_manager.update_article_info(article)
  319. if failed_article:
  320. failed_article_list.append(failed_article)
  321. # 重试
  322. second_try_fail_article_list = []
  323. if failed_article_list:
  324. for article in tqdm(failed_article_list):
  325. second_failed_article = data_manager.update_article_info(article)
  326. if second_failed_article:
  327. second_try_fail_article_list.append(second_failed_article)
  328. log(
  329. task=TASK_NAME,
  330. function="updateArticlesJob",
  331. message="文章更新完成---{}".format(biz_date.__str__())
  332. )
  333. bot(
  334. title="更新文章任务完成",
  335. detail={
  336. "finish_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  337. },
  338. mention=False
  339. )
  340. if second_try_fail_article_list:
  341. bot(
  342. title="更新文章任务存在文章抓取失败",
  343. detail=[
  344. {
  345. "account": line[3],
  346. "title": line[4],
  347. "url": line[0]
  348. }
  349. for line in second_try_fail_article_list
  350. ]
  351. )
  352. def updateMinigramInfoJob(biz_date=None):
  353. """
  354. 更新前三天小程序数据
  355. :return:
  356. """
  357. if not biz_date:
  358. biz_date = get_yesterday()
  359. data_manager = DailyDataManager()
  360. try:
  361. data_manager.update_minigram_detail(biz_date)
  362. log(
  363. task=TASK_NAME,
  364. function="updateMinigramInfoJob",
  365. message="小程序更新完成---{}".format(biz_date.__str__())
  366. )
  367. except Exception as e:
  368. log(
  369. task=TASK_NAME,
  370. function="updateMinigramInfoJob",
  371. status="fail",
  372. message="小程序更新失败---{}, 报错信息是: {}".format(biz_date.__str__(), e)
  373. )
  374. bot(
  375. title="更新小程序信息任务完成",
  376. detail={
  377. "finish_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  378. },
  379. mention=False
  380. )
  381. def main():
  382. """
  383. main function
  384. :return:
  385. """
  386. parser = ArgumentParser()
  387. parser.add_argument("--run-date",
  388. help="Run only once for date in format of %Y%m%d. \
  389. If no specified, run as daily jobs.")
  390. args = parser.parse_args()
  391. if args.run_date:
  392. biz_date = datetime.strptime(args.run_date, "%Y%m%d")
  393. print("Run in manual mode. Date: {}".format(args.run_date))
  394. updateArticlesJob(biz_date)
  395. updateMinigramInfoJob(biz_date)
  396. return
  397. else:
  398. print("Run in daily mode.")
  399. schedule.every().day.at("01:30").do(Functions().job_with_thread, updateArticlesJob)
  400. schedule.every().day.at("03:30").do(Functions().job_with_thread, updateMinigramInfoJob)
  401. while True:
  402. schedule.run_pending()
  403. time.sleep(1)
  404. if __name__ == '__main__':
  405. main()