app.py 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. import time
  2. import schedule
  3. import datetime
  4. from functions.odps_function import OdpsFunction
  5. from functions.feishu import Feishu
  6. from functions.config import column_map, alg_map, yesterday_columns
  7. from functions.get_yesterday_data import find_yesterday_data
  8. def read_odps_data(date_info):
  9. """
  10. 从 odps 读取数据
  11. :return:
  12. """
  13. o = OdpsFunction()
  14. sql = f"""select * from loghubods.okr_kd where dt = '{date_info}';"""
  15. data = o.select(sql)
  16. return data
  17. def process_data(data_obj):
  18. """
  19. 把对象数据转化为 list
  20. :param data_obj:
  21. :return:
  22. """
  23. keys = ["c{}".format(i) for i in range(1, 350)]
  24. temp = {}
  25. for key in keys:
  26. try:
  27. temp[key] = round(float(data_obj[key]))
  28. except:
  29. temp[key] = data_obj[key]
  30. # 获取昨天的数据并且更新到 temp 中
  31. for column in yesterday_columns:
  32. yes_d = find_yesterday_data(column)
  33. if type(yes_d) != str:
  34. temp[column_map[column]] = yes_d
  35. # if temp[column_map[column]] != "-" or temp[column_map[column]] != "--":
  36. result = []
  37. for key in temp:
  38. if alg_map.get(key):
  39. obj = {
  40. "type": "formula",
  41. "text": alg_map[key],
  42. "number_setting": {"format": "percentage", "decimal_count": 2},
  43. }
  44. result.append(obj)
  45. else:
  46. result.append(temp[key])
  47. return result
  48. def main():
  49. """
  50. main函数
  51. 0. 插入一行空行,用于写入数据
  52. 1. 先从大数据 BI 表读取数据,
  53. 2. 从需要读取前一天的数据中把数据读取出来,更新到 data_list 中
  54. 3. 把数据插入飞书表,并且把数据备份到MySQL数据表中
  55. 4. 定时任务更新 MySQL 表
  56. :return: None
  57. """
  58. F = Feishu(document_token="C1Qrsa4HWh6bzEtv7aocrFlAnad")
  59. F.prepend_value(sheet_id="Zi7oYW", values=[[0]], ranges="A5:A5")
  60. # 读数据
  61. date_info = datetime.date.today().__str__().replace("-", "")
  62. ori_data = read_odps_data(date_info)
  63. # 处理元数据
  64. data_list = process_data(ori_data)
  65. # 插入飞书表
  66. F.insert_value(sheet_id="Zi7oYW", values=[data_list[:100]], ranges="A5:CV5")
  67. F.insert_value(sheet_id="Zi7oYW", values=[data_list[100:200]], ranges="CW5:GR5")
  68. F.insert_value(sheet_id="Zi7oYW", values=[data_list[200:300]], ranges="GS5:KN5")
  69. F.insert_value(sheet_id="Zi7oYW", values=[data_list[300:]], ranges="KO5:MK5")
  70. if __name__ == "__main__":
  71. schedule.every().day.at("10:00").do(main)
  72. while True:
  73. schedule.run_pending()
  74. time.sleep(1)
  75. # main()