""" @author: luojunhui 爬虫搜索服务 接受 search_keys, 然后在搜索,并且把执行结果发送到 ETL """ import os import sys import json import asyncio from mq_http_sdk.mq_consumer import * from mq_http_sdk.mq_exception import MQExceptionBase sys.path.append(os.getcwd()) from application.common import AliyunLogger, get_consumer, ack_message from application.common.log import Local from spider.crawler_search import * async def search(params): """ 传入参数,然后根据参数执行爬虫代码 :return: None """ # await weixin_search(params) try: await weixin_search(params) except Exception as e: print(e) async def consume_search_message(): """ 消费单个消息,若消费成功则启动搜索爬虫 """ topic = "search_spider_prod" group = "search_spider_prod" mode = "search" platform = "search_platform" logger = AliyunLogger(platform=platform, mode=mode) consumer = get_consumer(topic, group) try: messages = consumer.consume_message(wait_seconds=10, batch_size=1) if messages: # 在这里消费消息,做一些数据处理分析 for single_message in messages: Local.logger(platform, mode).info( "收到一条消息\t{}{}".format(single_message, single_message.message_body)) ack_message( mode=mode, platform=platform, recv_msgs=messages, consumer=consumer ) logger.logging( code=5000, message="successfully consumed message", data=single_message.message_body, ) message_body = single_message.message_body params = json.loads(message_body) # 创建爬虫task await search(params) logger.logging(code=5001, message="successfully created task") else: logger.logging(code=5003, message="Messages Queue is Empty") except MQExceptionBase as err: # Topic中没有消息可消费。 if err.type == "MessageNotExist": message = "No new message! RequestId:{}\n".format(err.req_id) logger.logging(code="5004", message=message) else: message = "Consume Message Fail! Exception:{}\n".format(err) logger.logging(code="5004", message=message) async def main(): """ 主函数 每隔一秒接受一次消息 """ while True: await consume_search_message() await asyncio.sleep(5) if __name__ == "__main__": # 运行主事件循环 asyncio.run(main())