cd examples/content_finder
python demo.py
这个演示展示了完整的工作流程:
content_finder/
├── demo.py # ✅ 可运行的演示脚本(独立版本)
├── agent.py # ContentFinderAgent 完整实现
├── run.py # 框架集成版本(需要完整框架)
├── tools/ # 工具层
│ ├── crawler.py # 爬虫工具(抖音、快手)
│ ├── content_eval.py # 内容评估工具
│ └── feedback.py # 反馈收集工具
├── memory/ # 记忆层
│ ├── search_history.py # 搜索历史管理
│ ├── content_perf.py # 内容表现数据
│ └── operator_pref.py # 运营偏好数据
├── skills/ # 技能层
│ └── content_finder.md # Agent技能定义
├── README.md # 项目概述
└── ARCHITECTURE.md # 架构设计文档
特点:
运行:
python demo.py
特点:
运行(需要框架环境):
# 从项目根目录运行
python -m examples.content_finder.run
from demo import ContentFinderAgent, MemoryManager, SearchRequest
memory = MemoryManager(".cache/content_finder")
agent = ContentFinderAgent(memory)
request = SearchRequest(
keywords=["美食探店", "美食推荐"],
tags=["美食", "探店"],
platforms=["douyin", "kuaishou"],
filters={"min_views": 10000},
max_results=20,
)
results = await agent.search_content(request)
# 运营人员对内容进行评级
agent.collect_feedback(
content_id="dy_123",
rating="excellent", # excellent / good / poor
notes="内容质量很高,符合平台调性"
)
# 更新内容在平台的实际表现
agent.update_performance(
content_id="dy_123",
internal_views=5000,
engagement=0.15
)
# 分析运营反馈,获取洞察
agent.analyze_insights()
所有数据存储在 .cache/content_finder/ 目录:
.cache/content_finder/
├── feedbacks.jsonl # 运营反馈数据
└── performances.jsonl # 内容表现数据
{
"content_id": "dy_123",
"rating": "excellent",
"notes": "内容质量很高",
"timestamp": "2026-03-06T10:00:00"
}
{
"content_id": "dy_123",
"internal_views": 5000,
"engagement": 0.15,
"timestamp": "2026-03-06T10:00:00"
}
在 demo.py 中添加新的搜索函数:
async def bilibili_search(keywords: str, max_results: int = 20) -> List[ContentItem]:
"""B站搜索"""
# 实现搜索逻辑
pass
然后在 search_content 方法中添加平台支持:
elif platform == "bilibili":
tasks.append(bilibili_search(keyword, request.max_results))
修改 evaluate_content 函数中的评分逻辑:
def evaluate_content(items: List[ContentItem], keywords: List[str]) -> List[ContentItem]:
for item in items:
# 自定义评分算法
quality_score = your_custom_scoring_logic(item)
item.quality_score = quality_score
return items
在 MemoryManager 类中添加新方法:
def save_custom_data(self, data: Dict):
"""保存自定义数据"""
with open(self.custom_file, "a", encoding="utf-8") as f:
f.write(json.dumps(data, ensure_ascii=False) + "\n")
将 demo.py 中的模拟函数替换为真实爬虫调用:
async def douyin_search(keywords: str, max_results: int = 20) -> List[ContentItem]:
"""真实的抖音搜索"""
# 调用抖音爬虫 API
response = await douyin_api.search(keywords, limit=max_results)
# 解析并返回结果
return [
ContentItem(
content_id=item['id'],
platform='douyin',
title=item['title'],
# ... 其他字段
)
for item in response['data']
]
将 JSONL 文件存储替换为数据库:
class DatabaseMemoryManager:
def __init__(self, db_connection):
self.db = db_connection
def save_feedback(self, content_id: str, rating: str, notes: str):
self.db.execute(
"INSERT INTO feedbacks (content_id, rating, notes, created_at) VALUES (?, ?, ?, ?)",
(content_id, rating, notes, datetime.now())
)
使用 FastAPI 提供 HTTP 接口:
from fastapi import FastAPI
app = FastAPI()
agent = ContentFinderAgent(memory)
@app.post("/search")
async def search(request: SearchRequest):
results = await agent.search_content(request)
return {"results": results}
@app.post("/feedback")
async def feedback(content_id: str, rating: str, notes: str):
agent.collect_feedback(content_id, rating, notes)
return {"status": "ok"}
已实现多平台并行搜索:
tasks = []
for platform in request.platforms:
tasks.append(search_platform(platform, keywords))
results = await asyncio.gather(*tasks)
添加缓存机制避免重复搜索:
from functools import lru_cache
@lru_cache(maxsize=100)
def get_cached_results(keywords: str, platform: str):
# 返回缓存的结果
pass
批量更新表现数据:
def batch_update_performance(self, updates: List[Dict]):
"""批量更新表现数据"""
with open(self.performance_file, "a", encoding="utf-8") as f:
for update in updates:
f.write(json.dumps(update, ensure_ascii=False) + "\n")
A: 需要先解决框架依赖问题(dbutils),或者使用 demo.py 独立版本。
A: 参考 douyin_search 和 kuaishou_search 的实现模式,添加新的搜索函数。
A: 默认存储在 .cache/content_finder/ 目录,可以通过修改 MemoryManager 的初始化参数更改。
A: 修改 evaluate_content 函数中的 quality_score 计算逻辑。