import json import datetime import math import random from odps import ODPS # ODPS服务配置 ODPS_CONFIG = { 'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api', 'ACCESSID': 'LTAIWYUujJAm7CbH', 'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P', 'PROJECT': 'loghubods' } class OdpsDataCount: @classmethod def get_data_count(cls, dt): odps = ODPS( access_id=ODPS_CONFIG['ACCESSID'], secret_access_key=ODPS_CONFIG['ACCESSKEY'], project=ODPS_CONFIG['PROJECT'], endpoint=ODPS_CONFIG['ENDPOINT'] ) data_values = [] try: sql = f'SELECT videoid,time,type,channel FROM loghubods.transport_spider_recommend_video_hour WHERE dt = "{dt}" and channel = "搬运工具"' with odps.execute_sql(sql).open_reader() as reader: for row in reader: data_values.append(json.dumps( {"videoid": row[0], "time": row[1], "type": row[2], "channel": row[3], "dt": str(dt)}, ensure_ascii=False )) except Exception as e: print(f"An error occurred: {e}") return data_values return data_values @classmethod def main(cls): dt = (datetime.datetime.now() - datetime.timedelta(hours=1)).strftime('%Y%m%d%H') data_count = cls.get_data_count(dt= dt) sample_size = math.ceil(len(data_count) / 2) random_selection = random.sample(data_count, sample_size) print(len(random_selection)) return random_selection if __name__ == '__main__': OdpsDataCount.main()