import asyncio from typing import List, Dict from spiders.authorspider import AuthorSpider class XiaoniangaoAuthor(AuthorSpider): async def fetch_user_list(self) -> List[Dict]: """获取待爬取的用户列表(从数据库)""" datas = await self.db_service.get_xng_mid() return datas async def main(): rule_dict = {"videos_cnt":{"min":1500}} user_list = [{'uid': 58527261, 'link': '116311065', 'nick_name': '像我这样'}, {'uid': 58527262, 'link': '104703232', 'nick_name': '不再厌倦'}, {'uid': 58527263, 'link': '609255292', 'nick_name': '好演技'}] trace_id = "1321" xng = XiaoniangaoAuthor(rule_dict, user_list, trace_id) await xng.run() if __name__ == '__main__': asyncio.run(main()) # 异步入口