12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788 |
- """
- @author: luojunhui
- 爬虫搜索服务
- 接受 search_keys, 然后在搜索,并且把执行结果发送到 ETL
- """
- import os
- import sys
- import json
- import asyncio
- from mq_http_sdk.mq_consumer import *
- from mq_http_sdk.mq_exception import MQExceptionBase
- sys.path.append(os.getcwd())
- from application.common import AliyunLogger, get_consumer, ack_message
- from application.common.log import Local
- from spider.crawler_search import *
- async def search(params):
- """
- 传入参数,然后根据参数执行爬虫代码
- :return: None
- """
- # await weixin_search(params)
- try:
- await weixin_search(params)
- except Exception as e:
- print(e)
- async def consume_search_message():
- """
- 消费单个消息,若消费成功则启动搜索爬虫
- """
- topic = "search_spider_prod"
- group = "search_spider_prod"
- mode = "search"
- platform = "search_platform"
- logger = AliyunLogger(platform=platform, mode=mode)
- consumer = get_consumer(topic, group)
- try:
- messages = consumer.consume_message(wait_seconds=10, batch_size=1)
- if messages:
- # 在这里消费消息,做一些数据处理分析
- for single_message in messages:
- Local.logger(platform, mode).info(
- "收到一条消息\t{}{}".format(single_message, single_message.message_body))
- ack_message(
- mode=mode, platform=platform, recv_msgs=messages, consumer=consumer
- )
- logger.logging(
- code=5000,
- message="successfully consumed message",
- data=single_message.message_body,
- )
- message_body = single_message.message_body
- params = json.loads(message_body)
- # 创建爬虫task
- await search(params)
- logger.logging(code=5001, message="successfully created task")
- else:
- logger.logging(code=5003, message="Messages Queue is Empty")
- except MQExceptionBase as err:
- # Topic中没有消息可消费。
- if err.type == "MessageNotExist":
- message = "No new message! RequestId:{}\n".format(err.req_id)
- logger.logging(code="5004", message=message)
- else:
- message = "Consume Message Fail! Exception:{}\n".format(err)
- logger.logging(code="5004", message=message)
- async def main():
- """
- 主函数
- 每隔一秒接受一次消息
- """
- while True:
- await consume_search_message()
- await asyncio.sleep(5)
- if __name__ == "__main__":
- # 运行主事件循环
- asyncio.run(main())
|