app.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. """
  2. Created on Wednesday, February 28
  3. @author: 罗俊辉
  4. """
  5. import time
  6. import schedule
  7. from datetime import datetime, timedelta
  8. from functions.odps_function import OdpsFunction
  9. from functions.feishu import Feishu
  10. from functions.config import column_map, alg_map, yesterday_columns
  11. from functions.get_yesterday_data import find_yesterday_data
  12. def get_yesterday_str():
  13. """获取昨天的日期字符"""
  14. today = datetime.now()
  15. # 计算昨天的日期
  16. yesterday = today - timedelta(days=1)
  17. return yesterday.strftime('%Y%m%d')
  18. def read_odps_data(date_info):
  19. """
  20. 从 odps 读取数据
  21. :return:
  22. """
  23. o = OdpsFunction()
  24. sql = f"""select * from loghubods.okr_kd where dt = '{date_info}';"""
  25. data = o.select(sql)
  26. return data
  27. def process_data(data_obj):
  28. """
  29. 把对象数据转化为 list
  30. :param data_obj:
  31. :return:
  32. """
  33. keys = ["c{}".format(i) for i in range(1, 351)]
  34. temp = {}
  35. for key in keys:
  36. try:
  37. temp[key] = int(data_obj[key])
  38. except:
  39. temp[key] = data_obj[key]
  40. # 获取昨天的数据并且更新到 temp 中
  41. for column in yesterday_columns:
  42. yes_d = find_yesterday_data(column)
  43. if type(yes_d) != str:
  44. temp[column_map[column]] = yes_d
  45. result = []
  46. for key in temp:
  47. if alg_map.get(key):
  48. obj = {
  49. "type": "formula",
  50. "text": alg_map[key],
  51. "number_setting": {"format": "percentage", "decimal_count": 2},
  52. }
  53. result.append(obj)
  54. else:
  55. result.append(temp[key])
  56. return result
  57. def get_data(date_info):
  58. """
  59. 从大数据表获取昨天的数据
  60. :return:
  61. """
  62. for i in range(3):
  63. ori_data = read_odps_data(date_info)
  64. if ori_data:
  65. return ori_data
  66. print("defeat to read data")
  67. time.sleep(30 * 60)
  68. return []
  69. def main():
  70. """
  71. main函数
  72. 0. 插入一行空行,用于写入数据
  73. 1. 先从大数据 BI 表读取数据,
  74. 2. 从需要读取前一天的数据中把数据读取出来,更新到 data_list 中, 如果没有读取到数据,则重复 3 次, 每次重复间隔为 30 mins
  75. 3. 把数据插入飞书表,并且把数据备份到MySQL数据表中
  76. 4. 定时任务更新 MySQL 表
  77. :return: None
  78. PBaWRy
  79. """
  80. sheet_id = "eb6d24"
  81. F = Feishu(document_token="ZYNBsZ5lPhsKFltb6ghclfJqngb")
  82. F.prepend_value(sheet_id=sheet_id, values=[["fail to read data"]], ranges="A5:A5")
  83. # 读数据
  84. date_info = get_yesterday_str()
  85. # date_info = "20240310"
  86. ori_data = get_data(date_info)
  87. if not ori_data:
  88. return
  89. # 处理元数据
  90. print("正在处理数据")
  91. data_list = process_data(ori_data)
  92. print("数据处理完成, 开始插入")
  93. # 插入飞书表
  94. F.insert_value(sheet_id=sheet_id, values=[data_list[:100]], ranges="A5:CV5")
  95. F.insert_value(sheet_id=sheet_id, values=[data_list[100:200]], ranges="CW5:GR5")
  96. F.insert_value(sheet_id=sheet_id, values=[data_list[200:300]], ranges="GS5:KN5")
  97. F.insert_value(sheet_id=sheet_id, values=[data_list[300:]], ranges="KO5:ML5")
  98. if __name__ == "__main__":
  99. schedule.every().day.at("10:00").do(main)
  100. while True:
  101. schedule.run_pending()
  102. time.sleep(1)
  103. # ii = "PBaWRy"
  104. # main(ii)
  105. # print(get_yesterday_str())