search_app.py 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. """
  2. @author: luojunhui
  3. 爬虫搜索服务
  4. 接受 search_keys, 然后在搜索,并且把执行结果发送到 ETL
  5. """
  6. import os
  7. import sys
  8. import json
  9. import asyncio
  10. from mq_http_sdk.mq_consumer import *
  11. from mq_http_sdk.mq_exception import MQExceptionBase
  12. sys.path.append(os.getcwd())
  13. from application.common import AliyunLogger, get_consumer, ack_message
  14. from application.common.log import Local
  15. from spider.crawler_search import *
  16. async def search(params):
  17. """
  18. 传入参数,然后根据参数执行爬虫代码
  19. :return: None
  20. """
  21. # await weixin_search(params)
  22. try:
  23. await weixin_search(params)
  24. except Exception as e:
  25. print(e)
  26. async def consume_search_message():
  27. """
  28. 消费单个消息,若消费成功则启动搜索爬虫
  29. """
  30. topic = "search_spider_prod"
  31. group = "search_spider_prod"
  32. mode = "search"
  33. platform = "search_platform"
  34. logger = AliyunLogger(platform=platform, mode=mode)
  35. consumer = get_consumer(topic, group)
  36. try:
  37. messages = consumer.consume_message(wait_seconds=10, batch_size=1)
  38. if messages:
  39. # 在这里消费消息,做一些数据处理分析
  40. for single_message in messages:
  41. Local.logger(platform, mode).info(
  42. "收到一条消息\t{}{}".format(single_message, single_message.message_body))
  43. ack_message(
  44. mode=mode, platform=platform, recv_msgs=messages, consumer=consumer
  45. )
  46. logger.logging(
  47. code=5000,
  48. message="successfully consumed message",
  49. data=single_message.message_body,
  50. )
  51. message_body = single_message.message_body
  52. params = json.loads(message_body)
  53. # 创建爬虫task
  54. await search(params)
  55. logger.logging(code=5001, message="successfully created task")
  56. else:
  57. logger.logging(code=5003, message="Messages Queue is Empty")
  58. except MQExceptionBase as err:
  59. # Topic中没有消息可消费。
  60. if err.type == "MessageNotExist":
  61. message = "No new message! RequestId:{}\n".format(err.req_id)
  62. logger.logging(code="5004", message=message)
  63. else:
  64. message = "Consume Message Fail! Exception:{}\n".format(err)
  65. logger.logging(code="5004", message=message)
  66. async def main():
  67. """
  68. 主函数
  69. 每隔一秒接受一次消息
  70. """
  71. while True:
  72. await consume_search_message()
  73. await asyncio.sleep(5)
  74. if __name__ == "__main__":
  75. # 运行主事件循环
  76. asyncio.run(main())