| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 | """Created on January 4, 2024,@author: luojunhuidescription: 票圈线上代码控制程序,分布式抓取消费核心"""import asyncioimport jsonimport timeimport tracebackimport uuidfrom mq_http_sdk.mq_consumer import *from mq_http_sdk.mq_exception import MQExceptionBasesys.path.append(os.getcwd())from application.common import AliyunLogger, get_consumer, ack_messagefrom application.config import TopicGroupfrom application.common.log import Localasync def run(task_id, mode, platform,trace_id):    """    传入参数,然后根据参数执行爬虫代码    :param task_id: 任务id    :param mode: 任务类型    :param platform: 哪个抓取平台    :return: None    """    # 创建一个aliyun日志对象    logger = AliyunLogger(platform=platform, mode=mode)    # 创建并一个子进程    try:        process = await asyncio.create_subprocess_shell(            "python3 scheduler/run_spider_online.py --task_id {} --mode {} --platform {}".format(                task_id, mode, platform            )        )        # 检查进程是否已经启动        if process.returncode is None:            logger.logging(code=1005, message="{}: 启动进程成功,进程ID:{}".format(platform,process.pid),trace_id=trace_id)        else:            logger.logging(code=1006, message="{}: 启动进程失败".format(platform),trace_id=trace_id)        # logger.logging(code=5002, message="successfully run spider")    except Exception as e:        logger.logging(code=1007,message= f"发生未知错误: {e}\n {traceback.format_exc()}",trace_id=trace_id)def generate_trace_id():    timestamp = str(int(time.time() * 1000))    unique_id = str(uuid.uuid4())    return f"{unique_id}{timestamp}"async def consume_single_message(spider):    """    消费单个消息,若消费成功则启动爬虫新协程;    :param spider: 爬虫类    """    topic = spider["topic"]    group = spider["group"]    platform = spider["platform"]    mode = spider["mode"]    logger = AliyunLogger(platform=platform, mode=mode)    consumer = get_consumer(topic, group)    try:        messages = consumer.consume_message(wait_seconds=10, batch_size=1)        if messages:            # 在这里消费消息,做一些数据处理分析            for single_message in messages:                trace_id = generate_trace_id()                message_id = single_message.message_id                Local.logger(platform, mode).info("收到一条消息\t{}{},trace_id={},message_id = {}".format(single_message, single_message.message_body, trace_id,message_id))                ack_message(                    mode=mode, platform=platform, recv_msgs=single_message, consumer=consumer                )                logger.logging(                    code=5000,                    message=f"successfully consumed message [message_id] ={message_id}",                    data=single_message.message_body,                    trace_id=trace_id                )                message_body = single_message.message_body                task_id = json.loads(message_body)["id"]                Local.logger(platform, mode).info(                    f"task_id=={task_id} trace_id={trace_id} message_id ={message_id}")                # 创建爬虫task                await asyncio.create_task(                    run(task_id, spider["mode"], spider["platform"],trace_id)                )                logger.logging(code=5001, message=f"successfully created task [message_id] = {message_id}",trace_id=trace_id)        else:            logger.logging(code=5003, message="Messages Queue is Empty")    except MQExceptionBase as err:        tb = traceback.format_exc()        # Topic中没有消息可消费。        # if err.type == "MessageNotExist":        #     message = "No new message! RequestId:{}\n{}".format(err.req_id,tb)        #     logger.logging(code="5004", message=message)        # else:        message = f"Consume Message Fail! Exception:{err}\n{tb}"        logger.logging(code="5004", message=message)async def main():    """    主函数    """    spider_list = TopicGroup().produce()    while spider_list:        # print(spider_list)        tasks = [consume_single_message(spider) for spider in spider_list]        await asyncio.gather(*tasks)        await asyncio.sleep(20)        # print("Hello World {}".format(" ".join(spider_list)))        # async_tasks = []        # print(spider_list)        # for spider in spider_list:        #     print(json.dumps(spider))        # time.sleep(1000)        #     task = asyncio.create_task(consume_single_message(spider))    #         async_tasks.append(task)    #     await asyncio.gather(*async_tasks)    #     await asyncio.sleep(60)  # 每分钟接收一次MQ,if __name__ == "__main__":    # 运行主事件循环    asyncio.run(main())
 |