server.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581
  1. """
  2. 内容寻找服务
  3. 提供:
  4. 1. API 接口:POST /api/tasks - 触发内容寻找任务
  5. 2. 定时调度:启动后先恢复 demand_find_task 中 status=执行中 的任务;之后按间隔轮询;
  6. 若当前无任务在执行,则从 demand_content 取当天(dt=YYYYMMDD)、未建任务记录且 score 最高的一条执行(不区分品类)。
  7. 本文件常量 SCHEDULE_DISPATCH_NOT_BEFORE_HOUR:仅在该本地时刻(SCHEDULER_TIMEZONE)及之后才派发;与按 UTC 计日的上游日限额对齐时常用 8(北京 08:00 = UTC 换日)。
  8. 3. 并发控制:限制最大并发任务数;定时侧若已有任务在执行则跳过本次轮询
  9. 4. 单次寻找任务最长执行 25 分钟,超时记为失败并回写 demand_find_task
  10. """
  11. import asyncio
  12. import json
  13. import logging
  14. import os
  15. import sys
  16. import uuid
  17. from datetime import datetime, timedelta
  18. from decimal import Decimal, ROUND_HALF_UP
  19. from pathlib import Path
  20. from typing import Optional
  21. sys.path.insert(0, str(Path(__file__).parent.parent.parent))
  22. from fastapi import FastAPI, HTTPException
  23. from pydantic import BaseModel
  24. from apscheduler.schedulers.asyncio import AsyncIOScheduler
  25. from zoneinfo import ZoneInfo
  26. from dotenv import load_dotenv
  27. load_dotenv()
  28. import core
  29. from db import (
  30. create_task_record,
  31. get_first_running_task,
  32. get_latest_demand_task_oprate_is_open,
  33. get_one_today_unprocessed_demand,
  34. update_task_status,
  35. update_task_on_complete,
  36. )
  37. from db.schedule import (
  38. STATUS_RUNNING,
  39. STATUS_SUCCESS,
  40. STATUS_FAILED,
  41. get_latest_day_limit_coast,
  42. get_total_token_coast_between,
  43. )
  44. # 配置日志
  45. log_dir = Path(__file__).parent / '.cache'
  46. log_dir.mkdir(exist_ok=True)
  47. logging.basicConfig(
  48. level=logging.INFO,
  49. format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
  50. handlers=[
  51. logging.FileHandler(log_dir / 'server.log'),
  52. logging.StreamHandler()
  53. ]
  54. )
  55. logger = logging.getLogger(__name__)
  56. # FastAPI 应用
  57. app = FastAPI(
  58. title="内容寻找服务",
  59. version="1.0.0",
  60. description="抖音内容寻找 Agent 服务"
  61. )
  62. # 定时调度器(默认用中国时区,避免容器 UTC 导致错过预期时间点)
  63. SCHEDULER_TIMEZONE = os.getenv("SCHEDULER_TIMEZONE", os.getenv("TZ", "Asia/Shanghai"))
  64. SCHEDULER_TZ = ZoneInfo(SCHEDULER_TIMEZONE)
  65. scheduler = AsyncIOScheduler(timezone=SCHEDULER_TZ)
  66. # 并发控制
  67. MAX_CONCURRENT_TASKS = int(os.getenv("MAX_CONCURRENT_TASKS", "1"))
  68. task_semaphore = asyncio.Semaphore(MAX_CONCURRENT_TASKS)
  69. # 定时:派发间隔(秒)、单次任务超时(秒,默认 15 分钟)
  70. # - 为避免启动时同时派发多个任务导致潜在重复处理,默认每 30s 只派发 1 条;
  71. # 通过持续派发逐步填满并发槽,直到达到 MAX_CONCURRENT_TASKS。
  72. SCHEDULE_DISPATCH_INTERVAL_SECONDS = int(os.getenv("SCHEDULE_DISPATCH_INTERVAL_SECONDS", "30"))
  73. TASK_TIMEOUT_SECONDS = int(os.getenv("SCHEDULE_TASK_TIMEOUT_SECONDS", "1500"))
  74. # 定时派发最早本地整点(0-23,含该小时起至当日结束;SCHEDULER_TIMEZONE)。None=不限制。
  75. # 上游按 UTC 计日(如 OpenRouter 日限额)时与 UTC 换日对齐可改为 8。
  76. SCHEDULE_DISPATCH_NOT_BEFORE_HOUR: Optional[int] = 8
  77. # 统计信息
  78. stats = {
  79. "total_tasks": 0,
  80. "completed_tasks": 0,
  81. "failed_tasks": 0,
  82. "scheduled_tasks": 0
  83. }
  84. # ============ 数据模型 ============
  85. class TaskRequest(BaseModel):
  86. query: Optional[str] = None
  87. demand_id: Optional[int] = None
  88. suggestion: Optional[str] = None
  89. class TaskResponse(BaseModel):
  90. trace_id: str
  91. status: str
  92. query: str
  93. message: str
  94. # ============ 核心函数 ============
  95. def _load_token_coast_from_meta(trace_id: str) -> Optional[Decimal]:
  96. """
  97. 从 TRACE_DIR/{trace_id}/meta.json 读取本次任务的 token 费用,并转成两位小数的 Decimal。
  98. 优先读取 total_cost 字段,兼容 total_coast;读取或解析失败返回 None。
  99. """
  100. trace_dir = Path(os.getenv("TRACE_DIR", ".cache/traces"))
  101. meta_path = trace_dir / trace_id / "meta.json"
  102. if not meta_path.exists():
  103. logger.warning("未找到 meta.json,trace_id=%s, path=%s", trace_id, meta_path)
  104. return None
  105. try:
  106. with meta_path.open("r", encoding="utf-8") as f:
  107. data = json.load(f)
  108. except Exception as e:
  109. logger.warning("读取 meta.json 失败: trace_id=%s, error=%s", trace_id, e)
  110. return None
  111. raw_cost = data.get("total_cost")
  112. if raw_cost is None:
  113. raw_cost = data.get("total_coast")
  114. if raw_cost is None:
  115. logger.warning("meta.json 中未找到 total_cost/total_coast 字段: trace_id=%s", trace_id)
  116. return None
  117. try:
  118. cost_decimal = Decimal(str(raw_cost)).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
  119. return cost_decimal
  120. except Exception as e:
  121. logger.warning("解析 token 费用失败: trace_id=%s, raw=%s, error=%s", trace_id, raw_cost, e)
  122. return None
  123. def _update_scheduled_task_complete(demand_id: int, trace_id: str, status: int) -> None:
  124. """
  125. 定时任务完成时更新 trace_id、status 以及 token_coast(若能从 meta.json 成功解析)。
  126. 静默处理异常,不影响整体调度流程。
  127. """
  128. try:
  129. token_coast: Optional[Decimal] = None
  130. if trace_id:
  131. token_coast = _load_token_coast_from_meta(trace_id)
  132. update_task_on_complete(demand_id, trace_id, status, token_coast)
  133. except Exception as e:
  134. logger.warning("更新任务状态或 token_coast 失败: %s", e)
  135. async def execute_task(
  136. query: str,
  137. demand_id: Optional[int] = None,
  138. suggestion: str = "",
  139. task_type: str = "api",
  140. ):
  141. """
  142. 执行任务(带并发控制)
  143. Args:
  144. query: 查询内容
  145. demand_id: 需求 id(demand_content.id,关联 demand_content 表)
  146. suggestion: 补充信息(定时任务与 demand_content.suggestion 一致)
  147. task_type: 任务类型("api" 或 "scheduled")
  148. """
  149. async with task_semaphore:
  150. current_concurrent = MAX_CONCURRENT_TASKS - task_semaphore._value + 1
  151. logger.info(f"任务开始 [{task_type}]: query={query[:50]}..., 当前并发={current_concurrent}/{MAX_CONCURRENT_TASKS}")
  152. start_time = datetime.now(SCHEDULER_TZ)
  153. stats["total_tasks"] += 1
  154. if task_type == "scheduled":
  155. stats["scheduled_tasks"] += 1
  156. if task_type == "scheduled" and demand_id is not None:
  157. try:
  158. update_task_status("", demand_id, STATUS_RUNNING)
  159. except Exception as e:
  160. logger.warning(f"更新任务状态为执行中失败: {e}")
  161. try:
  162. result = await asyncio.wait_for(
  163. core.run_agent(
  164. query,
  165. demand_id=demand_id,
  166. suggestion=suggestion or None,
  167. stream_output=False,
  168. log_assistant_text=True,
  169. ),
  170. timeout=float(TASK_TIMEOUT_SECONDS),
  171. )
  172. duration = (datetime.now(SCHEDULER_TZ) - start_time).total_seconds()
  173. if result["status"] == "completed":
  174. stats["completed_tasks"] += 1
  175. logger.info(f"任务完成 [{task_type}]: trace_id={result['trace_id']}, 耗时={duration:.1f}s")
  176. if task_type == "scheduled" and demand_id is not None:
  177. _update_scheduled_task_complete(demand_id, result["trace_id"], STATUS_SUCCESS)
  178. else:
  179. stats["failed_tasks"] += 1
  180. logger.error(f"任务失败 [{task_type}]: trace_id={result.get('trace_id')}, 错误={result.get('error')}, 耗时={duration:.1f}s")
  181. if task_type == "scheduled" and demand_id is not None:
  182. _update_scheduled_task_complete(demand_id, result.get("trace_id") or "", STATUS_FAILED)
  183. except asyncio.TimeoutError:
  184. stats["failed_tasks"] += 1
  185. duration = (datetime.now(SCHEDULER_TZ) - start_time).total_seconds()
  186. logger.error(
  187. f"任务超时 [{task_type}]: 超过 {TASK_TIMEOUT_SECONDS}s,记为失败, 耗时={duration:.1f}s"
  188. )
  189. if task_type == "scheduled" and demand_id is not None:
  190. _update_scheduled_task_complete(demand_id, "", STATUS_FAILED)
  191. except Exception as e:
  192. stats["failed_tasks"] += 1
  193. duration = (datetime.now(SCHEDULER_TZ) - start_time).total_seconds()
  194. logger.error(f"任务异常 [{task_type}]: {e}, 耗时={duration:.1f}s", exc_info=True)
  195. if task_type == "scheduled" and demand_id is not None:
  196. _update_scheduled_task_complete(demand_id, "", STATUS_FAILED)
  197. def _today_dt_int() -> int:
  198. """当天 demand_content.dt 约定为 YYYYMMDD 整数(如 20260402),与定时器时区一致。"""
  199. return int(datetime.now(SCHEDULER_TZ).strftime("%Y%m%d"))
  200. def _has_running_content_task() -> bool:
  201. """
  202. 本进程内是否有内容寻找任务正在执行(占用并发槽)。
  203. 与 execute_task 共用 task_semaphore,含 API 触发与定时触发。
  204. """
  205. return task_semaphore._value != MAX_CONCURRENT_TASKS
  206. async def scheduled_tick():
  207. """
  208. 按 SCHEDULE_DISPATCH_INTERVAL_SECONDS 派发:若当前并发有空槽,则从 demand_content 取
  209. 当天(dt=今日)、尚未出现在 demand_find_task 中的 1 条需求并执行。
  210. 调度顺序按“品类分层轮转”:先各品类 top1(层内 score 高者优先),再各品类 top2,依此类推。
  211. """
  212. if SCHEDULE_DISPATCH_NOT_BEFORE_HOUR is not None:
  213. now = datetime.now(SCHEDULER_TZ)
  214. if now.hour < SCHEDULE_DISPATCH_NOT_BEFORE_HOUR:
  215. logger.info(
  216. "定时任务跳过:未到本地派发窗口(需 %02d:00 %s 及之后,当前 %s)",
  217. SCHEDULE_DISPATCH_NOT_BEFORE_HOUR,
  218. SCHEDULER_TIMEZONE,
  219. now.strftime("%H:%M"),
  220. )
  221. return
  222. logger.info("定时任务触发(scheduled_tick)")
  223. # demand_task_oprate:最新一条 is_open=0 时关闭定时派发;无记录时默认继续(兼容未配置)
  224. is_open = get_latest_demand_task_oprate_is_open()
  225. if is_open == 0:
  226. logger.info("定时任务跳过:demand_task_oprate 最新记录 is_open=0")
  227. return
  228. # 检查当日 token_coast 是否已超出日预算(以 SCHEDULER_TZ 当地时间为准)
  229. day_limit_coast = get_latest_day_limit_coast()
  230. if day_limit_coast is not None:
  231. now_local = datetime.now(SCHEDULER_TZ)
  232. day_start = now_local.replace(hour=0, minute=0, second=0, microsecond=0)
  233. day_end = day_start + timedelta(days=1)
  234. used_today = get_total_token_coast_between(day_start, day_end)
  235. if used_today >= day_limit_coast:
  236. logger.info(
  237. "定时任务跳过:当日 token_coast 已达上限,used=%s, limit=%s",
  238. used_today,
  239. day_limit_coast,
  240. )
  241. return
  242. # 无空闲并发槽则不派发;保持 tick 很快返回,避免阻塞调度器。
  243. if task_semaphore._value <= 0:
  244. logger.info("定时任务跳过:无空闲并发槽")
  245. return
  246. dt = _today_dt_int()
  247. item = get_one_today_unprocessed_demand(dt=dt)
  248. if not item:
  249. logger.info(f"定时任务跳过:无待处理需求(dt={dt} 或均已建任务)")
  250. return
  251. demand_content_id = item.get("demand_content_id")
  252. query = (item.get("query") or "").strip()
  253. suggestion = (item.get("suggestion") or "").strip()
  254. if demand_content_id is None or not query:
  255. logger.info("定时任务跳过:查询结果无效")
  256. return
  257. score = item.get("score")
  258. merge_leve2 = (item.get("merge_leve2") or "").strip()
  259. category_rank = item.get("category_rank")
  260. logger.info(
  261. "定时任务领取(品类分层轮转):demand_content_id=%s, dt=%s, score=%s, merge_leve2=%s, category_rank=%s",
  262. demand_content_id,
  263. dt,
  264. score,
  265. merge_leve2 or "<EMPTY>",
  266. category_rank,
  267. )
  268. create_task_record(demand_content_id)
  269. # 后台执行:由 execute_task 内部 semaphore 控制并发占用
  270. asyncio.create_task(
  271. execute_task(
  272. query=query,
  273. demand_id=demand_content_id,
  274. suggestion=suggestion,
  275. task_type="scheduled",
  276. )
  277. )
  278. async def run_startup_resume():
  279. """
  280. 启动后先执行 demand_find_task 中 status=执行中(1) 的任务(理论上仅一条)。
  281. """
  282. try:
  283. row = get_first_running_task()
  284. if not row:
  285. logger.info("启动恢复:无执行中(status=1)的 demand_find_task")
  286. return
  287. demand_content_id = row.get("demand_content_id")
  288. query = (row.get("query") or "").strip()
  289. suggestion = (row.get("suggestion") or "").strip()
  290. if demand_content_id is None or not query:
  291. logger.warning("启动恢复:执行中任务数据不完整,跳过")
  292. return
  293. logger.info(f"启动恢复:执行 demand_find_task status=1, demand_content_id={demand_content_id}")
  294. await execute_task(
  295. query=query,
  296. demand_id=int(demand_content_id),
  297. suggestion=suggestion,
  298. task_type="scheduled",
  299. )
  300. except Exception as e:
  301. logger.error(f"启动恢复失败: {e}", exc_info=True)
  302. # ============ API 接口 ============
  303. @app.post("/api/tasks", response_model=TaskResponse)
  304. async def create_task(request: TaskRequest):
  305. """
  306. 创建内容寻找任务
  307. Args:
  308. request.query: 查询内容(可选,不传则使用默认值)
  309. Returns:
  310. {
  311. "trace_id": "20260317_103046_xyz789",
  312. "status": "started",
  313. "query": "...",
  314. "message": "任务已启动,结果将保存到 .cache/traces/xxx/"
  315. }
  316. """
  317. # 获取 query、demand_id、suggestion(API 显式传入;与库表字段同名便于对齐)
  318. query = request.query or core.DEFAULT_QUERY
  319. demand_id = request.demand_id
  320. suggestion_str = (request.suggestion or "").strip()
  321. # 用 Event 等待 trace_id
  322. trace_id_ready = asyncio.Event()
  323. trace_id_holder = {"id": None}
  324. async def run_and_capture():
  325. try:
  326. # 获取第一个 Trace 对象来获取 trace_id
  327. from agent import Trace
  328. async with task_semaphore:
  329. # 重新构建 runner 来获取 trace_id
  330. from agent import AgentRunner, RunConfig, FileSystemTraceStore
  331. from agent.llm import create_openrouter_llm_call
  332. from agent.llm.prompts import SimplePrompt
  333. from agent.tools.builtin.knowledge import KnowledgeConfig
  334. prompt_path = Path(__file__).parent / "content_finder.md"
  335. prompt = SimplePrompt(prompt_path)
  336. trace_dir = os.getenv("TRACE_DIR", ".cache/traces")
  337. demand_id_str = str(demand_id) if demand_id is not None else ""
  338. messages = prompt.build_messages(
  339. query=query,
  340. suggestion=suggestion_str,
  341. trace_dir=trace_dir,
  342. demand_id=demand_id_str,
  343. )
  344. api_key = os.getenv("OPEN_ROUTER_API_KEY")
  345. model_name = prompt.config.get("model", "sonnet-4.6")
  346. model = os.getenv("MODEL", f"anthropic/claude-{model_name}")
  347. temperature = float(prompt.config.get("temperature", 0.3))
  348. max_iterations = int(os.getenv("MAX_ITERATIONS", "30"))
  349. trace_dir = os.getenv("TRACE_DIR", ".cache/traces")
  350. skills_dir = str(Path(__file__).parent / "skills")
  351. Path(trace_dir).mkdir(parents=True, exist_ok=True)
  352. store = FileSystemTraceStore(base_path=trace_dir)
  353. allowed_tools = [
  354. "douyin_search",
  355. "douyin_search_tikhub",
  356. "douyin_user_videos",
  357. "get_content_fans_portrait",
  358. "get_account_fans_portrait",
  359. "batch_fetch_portraits",
  360. "store_results_mysql",
  361. "exec_summary",
  362. ]
  363. runner = AgentRunner(
  364. llm_call=create_openrouter_llm_call(model=model),
  365. trace_store=store,
  366. skills_dir=skills_dir,
  367. )
  368. config = RunConfig(
  369. name="内容寻找",
  370. model=model,
  371. temperature=temperature,
  372. max_iterations=max_iterations,
  373. tools=allowed_tools,
  374. extra_llm_params={"max_tokens": 8192},
  375. knowledge=KnowledgeConfig(
  376. enable_extraction=True,
  377. enable_completion_extraction=True,
  378. enable_injection=True,
  379. owner="content_finder_agent",
  380. default_tags={"project": "content_finder"},
  381. default_scopes=["com.piaoquantv.supply"],
  382. default_search_types=["tool", "usecase", "definition"],
  383. default_search_owner="content_finder_agent"
  384. )
  385. )
  386. async for item in runner.run(messages=messages, config=config):
  387. if isinstance(item, Trace):
  388. if not trace_id_holder["id"]:
  389. trace_id_holder["id"] = item.trace_id
  390. trace_id_ready.set()
  391. logger.info(f"任务启动 [api]: trace_id={item.trace_id}")
  392. if item.status == "completed":
  393. stats["completed_tasks"] += 1
  394. logger.info(f"任务完成 [api]: trace_id={item.trace_id}")
  395. break
  396. elif item.status == "failed":
  397. stats["failed_tasks"] += 1
  398. logger.error(f"任务失败 [api]: trace_id={item.trace_id}, 错误={item.error_message}")
  399. break
  400. except Exception as e:
  401. stats["failed_tasks"] += 1
  402. logger.error(f"任务异常 [api]: {e}", exc_info=True)
  403. if not trace_id_holder["id"]:
  404. trace_id_holder["id"] = f"error_{datetime.now(SCHEDULER_TZ).strftime('%Y%m%d_%H%M%S')}"
  405. trace_id_ready.set()
  406. # 启动后台任务
  407. stats["total_tasks"] += 1
  408. asyncio.create_task(run_and_capture())
  409. # 等待 trace_id(最多 5 秒)
  410. try:
  411. await asyncio.wait_for(trace_id_ready.wait(), timeout=5.0)
  412. except asyncio.TimeoutError:
  413. logger.error("获取 trace_id 超时")
  414. raise HTTPException(status_code=500, detail="任务启动超时")
  415. trace_id = trace_id_holder["id"]
  416. return TaskResponse(
  417. trace_id=trace_id,
  418. status="started",
  419. query=query,
  420. message=f"任务已启动,结果将保存到 .cache/traces/{trace_id}/"
  421. )
  422. @app.get("/health")
  423. async def health_check():
  424. """健康检查"""
  425. return {
  426. "status": "ok",
  427. "max_concurrent_tasks": MAX_CONCURRENT_TASKS,
  428. "current_tasks": MAX_CONCURRENT_TASKS - task_semaphore._value,
  429. "scheduler_running": scheduler.running,
  430. "stats": stats
  431. }
  432. @app.get("/")
  433. async def root():
  434. """根路径"""
  435. return {
  436. "service": "内容寻找服务",
  437. "version": "1.0.0",
  438. "endpoints": {
  439. "create_task": "POST /api/tasks",
  440. "health": "GET /health"
  441. }
  442. }
  443. # ============ 启动事件 ============
  444. @app.on_event("startup")
  445. async def startup():
  446. """服务启动时初始化"""
  447. logger.info("=" * 60)
  448. logger.info("内容寻找服务启动中...")
  449. logger.info(f"最大并发任务数: {MAX_CONCURRENT_TASKS}")
  450. logger.info(f"定时器时区: {SCHEDULER_TIMEZONE}")
  451. window_desc = (
  452. f";本地派发不早于 {SCHEDULE_DISPATCH_NOT_BEFORE_HOUR:02d}:00({SCHEDULER_TIMEZONE})"
  453. if SCHEDULE_DISPATCH_NOT_BEFORE_HOUR is not None
  454. else ""
  455. )
  456. logger.info(
  457. f"定时策略:每 {SCHEDULE_DISPATCH_INTERVAL_SECONDS} 秒尝试派发 1 条(有并发空槽才派发)"
  458. f"{window_desc};单次任务超时 {TASK_TIMEOUT_SECONDS}s"
  459. )
  460. asyncio.create_task(run_startup_resume())
  461. job = scheduler.add_job(
  462. scheduled_tick,
  463. "interval",
  464. seconds=SCHEDULE_DISPATCH_INTERVAL_SECONDS,
  465. misfire_grace_time=300,
  466. coalesce=True,
  467. max_instances=1,
  468. )
  469. scheduler.start()
  470. logger.info(f"定时任务已注册: id={job.id}, next_run_time={job.next_run_time}")
  471. logger.info("服务启动完成")
  472. logger.info("=" * 60)
  473. @app.on_event("shutdown")
  474. async def shutdown():
  475. """服务关闭时清理"""
  476. logger.info("服务关闭中...")
  477. if scheduler.running:
  478. scheduler.shutdown()
  479. logger.info("服务已关闭")
  480. # ============ 主函数 ============
  481. if __name__ == "__main__":
  482. import uvicorn
  483. port = int(os.getenv("PORT", "8080"))
  484. host = os.getenv("HOST", "0.0.0.0")
  485. logger.info(f"启动服务: http://{host}:{port}")
  486. uvicorn.run(app, host=host, port=port)