""" Created on Wednesday, February 28 @author: 罗俊辉 """ import time import schedule from datetime import datetime, timedelta from functions.odps_function import OdpsFunction from functions.feishu import Feishu from functions.config import column_map, alg_map, yesterday_columns from functions.get_yesterday_data import find_yesterday_data def get_yesterday_str(): """获取昨天的日期字符""" today = datetime.now() # 计算昨天的日期 yesterday = today - timedelta(days=1) return yesterday.strftime('%Y%m%d') def read_odps_data(date_info): """ 从 odps 读取数据 :return: """ o = OdpsFunction() sql = f"""select * from loghubods.okr_kd where dt = '{date_info}';""" data = o.select(sql) return data def process_data(data_obj): """ 把对象数据转化为 list :param data_obj: :return: """ keys = ["c{}".format(i) for i in range(1, 351)] temp = {} for key in keys: try: temp[key] = int(data_obj[key]) except: temp[key] = data_obj[key] # 获取昨天的数据并且更新到 temp 中 for column in yesterday_columns: yes_d = find_yesterday_data(column) if type(yes_d) != str: temp[column_map[column]] = yes_d result = [] for key in temp: if alg_map.get(key): obj = { "type": "formula", "text": alg_map[key], "number_setting": {"format": "percentage", "decimal_count": 2}, } result.append(obj) else: result.append(temp[key]) return result def get_data(date_info): """ 从大数据表获取昨天的数据 :return: """ for i in range(3): ori_data = read_odps_data(date_info) if ori_data: return ori_data print("defeat to read data") time.sleep(30 * 60) return [] def main(): """ main函数 0. 插入一行空行,用于写入数据 1. 先从大数据 BI 表读取数据, 2. 从需要读取前一天的数据中把数据读取出来,更新到 data_list 中, 如果没有读取到数据,则重复 3 次, 每次重复间隔为 30 mins 3. 把数据插入飞书表,并且把数据备份到MySQL数据表中 4. 定时任务更新 MySQL 表 :return: None PBaWRy """ sheet_id = "eb6d24" F = Feishu(document_token="ZYNBsZ5lPhsKFltb6ghclfJqngb") F.prepend_value(sheet_id=sheet_id, values=[["fail to read data"]], ranges="A5:A5") # 读数据 date_info = get_yesterday_str() # date_info = "20240310" ori_data = get_data(date_info) if not ori_data: return # 处理元数据 print("正在处理数据") data_list = process_data(ori_data) print("数据处理完成, 开始插入") # 插入飞书表 F.insert_value(sheet_id=sheet_id, values=[data_list[:100]], ranges="A5:CV5") F.insert_value(sheet_id=sheet_id, values=[data_list[100:200]], ranges="CW5:GR5") F.insert_value(sheet_id=sheet_id, values=[data_list[200:300]], ranges="GS5:KN5") F.insert_value(sheet_id=sheet_id, values=[data_list[300:]], ranges="KO5:ML5") if __name__ == "__main__": schedule.every().day.at("10:00").do(main) while True: schedule.run_pending() time.sleep(1) # ii = "PBaWRy" # main(ii) # print(get_yesterday_str())