| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 | """@author: luojunhui爬虫搜索服务接受 search_keys, 然后在搜索,并且把执行结果发送到 ETL"""import osimport sysimport jsonimport asynciofrom mq_http_sdk.mq_consumer import *from mq_http_sdk.mq_exception import MQExceptionBasesys.path.append(os.getcwd())from application.common import AliyunLogger, get_consumer, ack_messagefrom application.common.log import Localfrom spider.crawler_search import *async def search(params):    """    传入参数,然后根据参数执行爬虫代码    :return: None    """    # await weixin_search(params)    try:        await weixin_search(params)    except Exception as e:        print(e)async def consume_search_message():    """    消费单个消息,若消费成功则启动搜索爬虫    """    topic = "search_spider_prod"    group = "search_spider_prod"    mode = "search"    platform = "search_platform"    logger = AliyunLogger(platform=platform, mode=mode)    consumer = get_consumer(topic, group)    try:        messages = consumer.consume_message(wait_seconds=10, batch_size=1)        if messages:            # 在这里消费消息,做一些数据处理分析            for single_message in messages:                Local.logger(platform, mode).info(                    "收到一条消息\t{}{}".format(single_message, single_message.message_body))                ack_message(                    mode=mode, platform=platform, recv_msgs=messages, consumer=consumer                )                logger.logging(                    code=5000,                    message="successfully consumed message",                    data=single_message.message_body,                )                message_body = single_message.message_body                params = json.loads(message_body)                # 创建爬虫task                await search(params)                logger.logging(code=5001, message="successfully created task")        else:            logger.logging(code=5003, message="Messages Queue is Empty")    except MQExceptionBase as err:        # Topic中没有消息可消费。        if err.type == "MessageNotExist":            message = "No new message! RequestId:{}\n".format(err.req_id)            logger.logging(code="5004", message=message)        else:            message = "Consume Message Fail! Exception:{}\n".format(err)            logger.logging(code="5004", message=message)async def main():    """    主函数    每隔一秒接受一次消息    """    while True:        await consume_search_message()        await asyncio.sleep(5)if __name__ == "__main__":    # 运行主事件循环    asyncio.run(main())
 |