123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- """
- Created on Wednesday, February 28
- @author: 罗俊辉
- """
- import time
- import schedule
- from datetime import datetime, timedelta
- from functions.odps_function import OdpsFunction
- from functions.feishu import Feishu
- from functions.config import column_map, alg_map, yesterday_columns
- from functions.get_yesterday_data import find_yesterday_data
- def get_yesterday_str():
- """获取昨天的日期字符"""
- today = datetime.now()
- # 计算昨天的日期
- yesterday = today - timedelta(days=1)
- return yesterday.strftime('%Y%m%d')
- def read_odps_data(date_info):
- """
- 从 odps 读取数据
- :return:
- """
- o = OdpsFunction()
- sql = f"""select * from loghubods.okr_kd where dt = '{date_info}';"""
- data = o.select(sql)
- return data
- def process_data(data_obj):
- """
- 把对象数据转化为 list
- :param data_obj:
- :return:
- """
- keys = ["c{}".format(i) for i in range(1, 351)]
- temp = {}
- for key in keys:
- try:
- temp[key] = int(data_obj[key])
- except:
- temp[key] = data_obj[key]
- # 获取昨天的数据并且更新到 temp 中
- for column in yesterday_columns:
- yes_d = find_yesterday_data(column)
- if type(yes_d) != str:
- temp[column_map[column]] = yes_d
- result = []
- for key in temp:
- if alg_map.get(key):
- obj = {
- "type": "formula",
- "text": alg_map[key],
- "number_setting": {"format": "percentage", "decimal_count": 2},
- }
- result.append(obj)
- else:
- result.append(temp[key])
- return result
- def get_data(date_info):
- """
- 从大数据表获取昨天的数据
- :return:
- """
- for i in range(3):
- ori_data = read_odps_data(date_info)
- if ori_data:
- return ori_data
- print("defeat to read data")
- time.sleep(30 * 60)
- return []
- def main():
- """
- main函数
- 0. 插入一行空行,用于写入数据
- 1. 先从大数据 BI 表读取数据,
- 2. 从需要读取前一天的数据中把数据读取出来,更新到 data_list 中, 如果没有读取到数据,则重复 3 次, 每次重复间隔为 30 mins
- 3. 把数据插入飞书表,并且把数据备份到MySQL数据表中
- 4. 定时任务更新 MySQL 表
- :return: None
- PBaWRy
- """
- sheet_id = "eb6d24"
- F = Feishu(document_token="ZYNBsZ5lPhsKFltb6ghclfJqngb")
- F.prepend_value(sheet_id=sheet_id, values=[["fail to read data"]], ranges="A5:A5")
- # 读数据
- date_info = get_yesterday_str()
- # date_info = "20240310"
- ori_data = get_data(date_info)
- if not ori_data:
- return
- # 处理元数据
- print("正在处理数据")
- data_list = process_data(ori_data)
- print("数据处理完成, 开始插入")
- # 插入飞书表
- F.insert_value(sheet_id=sheet_id, values=[data_list[:100]], ranges="A5:CV5")
- F.insert_value(sheet_id=sheet_id, values=[data_list[100:200]], ranges="CW5:GR5")
- F.insert_value(sheet_id=sheet_id, values=[data_list[200:300]], ranges="GS5:KN5")
- F.insert_value(sheet_id=sheet_id, values=[data_list[300:]], ranges="KO5:ML5")
- if __name__ == "__main__":
- schedule.every().day.at("10:00").do(main)
- while True:
- schedule.run_pending()
- time.sleep(1)
- # ii = "PBaWRy"
- # main(ii)
- # print(get_yesterday_str())
|