123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- import importlib
- import threading
- import traceback
- import json
- import time
- import uuid
- from application.common import AliyunLogger, get_consumer, ack_message
- from application.common.log import Local
- from crawler_worker.universal_crawler import UniversalCrawler
- from application.config import TopicGroup
- from application.functions.mysql_service import MysqlService
- from utils.config_loader import ConfigLoader
- def generate_trace_id():
- return f"{uuid.uuid4().hex}{int(time.time() * 1000)}"
- def import_custom_class(class_path):
- """
- 动态导入模块中的类,如 crawler_worker.universal_crawler.UniversalCrawler
- """
- module_path, class_name = class_path.rsplit(".", 1)
- print(module_path, class_name)
- module = importlib.import_module(module_path)
- return getattr(module, class_name)
- def handle_message(topic: str, mode: str):
- consumer = get_consumer(topic_name=topic, group_id=topic)
- logger = AliyunLogger(platform=topic, mode=mode)
- platform_config = ConfigLoader().get_platform_config(topic)
- while True:
- try:
- messages = consumer.consume_message(wait_seconds=10, batch_size=1)
- if not messages:
- continue
- for message in messages:
- trace_id = generate_trace_id()
- body = message.message_body
- try:
- payload = json.loads(body)
- platform = payload["platform"]
- mode = payload["mode"]
- task_id = payload["id"]
- mysql_service = MysqlService(task_id, mode, platform)
- logger.logging(
- 1001,
- "开始一轮抓取",
- data=payload,
- trace_id=trace_id
- )
- Local.init_logger(platform, mode).info(f"[trace_id={trace_id}] 收到任务: {body}")
- # 加载 user_list 与 rule_dict
- user_list = mysql_service.get_user_list()
- rule_dict = mysql_service.get_rule_dict()
- custom_class = platform_config.get("custom_class") # 自定义类
- try:
- if custom_class:
- CrawlerClass = import_custom_class(custom_class)
- else:
- CrawlerClass = UniversalCrawler
- crawler = CrawlerClass(
- platform_config=platform_config, # 把整段配置传进去
- rule_dict=rule_dict,
- user_list=user_list,
- trace_id=trace_id
- )
- crawler.run()
- except Exception as e:
- print(f"[{topic}] 爬虫运行异常: {e}")
- # 执行成功后 ack
- ack_message(mode, platform, message, consumer, trace_id=trace_id)
- logger.logging(code="1000", message="任务成功完成并确认消息", trace_id=trace_id)
- except Exception as e:
- logger.logging(
- code="9001",
- message=f"处理消息失败(未确认 ack): {e}\n{traceback.format_exc()}",
- trace_id=trace_id,
- data=body,
- )
- # 不 ack,等待下次重试
- except Exception as err:
- logger.logging(code="9002", message=f"消费失败: {err}\n{traceback.format_exc()}")
- def main():
- topic_list = TopicGroup()
- print(f"监听 Topics:{topic_list}")
- threads = []
- for topic in topic_list:
- mode = topic.split("_")[1]
- t = threading.Thread(target=handle_message, args=(topic, mode,))
- t.start()
- threads.append(t)
- for t in threads:
- t.join()
- if __name__ == '__main__':
- main()
|