import sys
import os
import json
from datetime import datetime, timedelta
import streamlit as st
import pandas as pd
from sqlalchemy import create_engine
from dotenv import load_dotenv
import plotly.express as px
# ===== 项目路径 =====
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from app.core.config import settings
# ===== 页面配置 =====
st.set_page_config(
page_title="微信指数趋势看板",
page_icon="📈",
layout="wide",
initial_sidebar_state="expanded",
# menu_items={
# 'Get Help': 'https://www.extremelycoolapp.com/help',
# 'Report a bug': "https://www.extremelycoolapp.com/bug",
# 'About': "# This is a header. This is an *extremely* cool app!"
# }
)
load_dotenv(os.path.join(os.path.dirname(__file__), '../.env'))
# ===== 字段映射 =====
FIELD_MAPPING = {
'total_score': '总指数',
'finder_score': '视频号',
'query_score': '搜一搜',
'mpdoc_score': '公众号',
'live_score': '直播',
'miniapp_score': '小程序'
}
# ======================
# 数据层
# ======================
@st.cache_resource
def get_db_connection():
return create_engine(settings.DATABASE_URL.replace('+aiomysql', '+pymysql'))
@st.cache_data(ttl=3600)
def get_keywords():
engine = get_db_connection()
sql = """
SELECT id, keyword
FROM wx_trend_keywords
WHERE is_active = 1
ORDER BY priority DESC
"""
with engine.connect() as conn:
return pd.read_sql(sql, conn)
def add_keyword(keyword):
"""
添加关键词:如果存在则激活,不存在则新增
"""
from sqlalchemy import text
engine = get_db_connection()
with engine.begin() as conn:
# 检查是否已存在
check_sql = text("SELECT id, is_active FROM wx_trend_keywords WHERE keyword = :keyword")
result = conn.execute(check_sql, {"keyword": keyword}).fetchone()
if result:
# 已存在,激活
if result[1] == 0:
update_sql = text("UPDATE wx_trend_keywords SET is_active = 1 WHERE id = :id")
conn.execute(update_sql, {"id": result[0]})
return True, f"关键词 '{keyword}' 已重新激活"
else:
return False, f"关键词 '{keyword}' 已存在且处于激活状态"
else:
# 不存在,新增
insert_sql = text("INSERT INTO wx_trend_keywords (keyword, is_active, priority) VALUES (:keyword, 1, 0)")
conn.execute(insert_sql, {"keyword": keyword})
return True, f"关键词 '{keyword}' 已成功添加"
def delete_keyword(keyword_id):
"""
删除关键词:将is_active改为0
"""
from sqlalchemy import text
engine = get_db_connection()
with engine.begin() as conn:
sql = text("UPDATE wx_trend_keywords SET is_active = 0 WHERE id = :id")
result = conn.execute(sql, {"id": keyword_id})
return result.rowcount > 0
def fetch_real_time_data(keyword):
"""
实时查询关键词数据
"""
import httpx
from datetime import datetime, timedelta
from app.core.config import settings
end_date = datetime.now()
start_date = end_date - timedelta(days=settings.LIMIT_DAY)
payload = {
"keyword": keyword,
"start_ymd": str(start_date.strftime("%Y%m%d")),
"end_ymd": str(end_date.strftime("%Y%m%d"))
}
try:
with httpx.Client(timeout=30.0) as client:
resp = client.post(settings.API_URL, json=payload)
resp.raise_for_status()
data = resp.json()
if data.get('code') != 0:
return False, data.get('msg', 'API请求失败')
raw_list = data.get('data', {}).get('data', [])
raw_list.sort(key=lambda x: x['ymd'])
if not raw_list:
return False, '未获取到数据'
# 转换为DataFrame
df_list = []
for item in raw_list:
row = {
'keyword': keyword,
'ymd': item['ymd'],
'date': pd.to_datetime(item['ymd'], format='%Y%m%d')
}
row.update(item['channel_score'])
df_list.append(row)
return True, pd.DataFrame(df_list)
except Exception as e:
return False, f"查询失败: {str(e)}"
@st.cache_data(ttl=3600)
def get_trend_data(keyword_ids, start_date, end_date):
if not keyword_ids:
return pd.DataFrame()
engine = get_db_connection()
s_date = start_date.strftime("%Y%m%d")
e_date = end_date.strftime("%Y%m%d")
ids_tuple = tuple(keyword_ids) if len(keyword_ids) > 1 else f"({keyword_ids[0]})"
sql = f"""
SELECT
k.keyword,
d.ymd,
d.channel_score
FROM wx_trend_data d
JOIN wx_trend_keywords k ON d.keyword_id = k.id
WHERE d.keyword_id IN {ids_tuple}
AND d.ymd BETWEEN '{s_date}' AND '{e_date}'
ORDER BY d.ymd ASC
"""
with engine.connect() as conn:
df = pd.read_sql(sql, conn)
if df.empty:
return df
if isinstance(df['channel_score'].iloc[0], str):
df['channel_score'] = df['channel_score'].apply(json.loads)
score_df = pd.json_normalize(df['channel_score'])
result = pd.concat([df[['keyword', 'ymd']], score_df], axis=1)
result['date'] = pd.to_datetime(result['ymd'], format='%Y%m%d')
return result
# ======================
# Sidebar
# ======================
# 初始化session state
if 'recent_keywords' not in st.session_state:
st.session_state['recent_keywords'] = []
# ======================
# 实时查询
# ======================
with st.sidebar.expander("⚡ 实时查询", expanded=True):
realtime_keyword = st.text_input("输入要实时查询的关键词", placeholder="例如:春节")
if st.button("实时查询"):
if realtime_keyword.strip():
with st.spinner(f"正在查询 '{realtime_keyword}' 的实时数据..."):
success, result = fetch_real_time_data(realtime_keyword.strip())
if success:
# 将实时数据存储到session state
st.session_state['realtime_data'] = result
st.session_state['realtime_keyword'] = realtime_keyword.strip()
# 更新最近使用的关键词
if realtime_keyword.strip() not in st.session_state['recent_keywords']:
st.session_state['recent_keywords'].insert(0, realtime_keyword.strip())
# 只保留最近10个
st.session_state['recent_keywords'] = st.session_state['recent_keywords'][:10]
st.success("实时查询成功")
else:
st.error(f"查询失败: {result}")
else:
st.warning("请输入关键词")
# 最近使用的关键词
if st.session_state['recent_keywords']:
st.markdown("**最近查询**:")
for i, kw in enumerate(st.session_state['recent_keywords'][:5]):
if st.button(kw, key=f"recent_{i}", help="点击使用此关键词"):
# 触发实时查询
success, result = fetch_real_time_data(kw)
if success:
st.session_state['realtime_data'] = result
st.session_state['realtime_keyword'] = kw
st.success(f"已查询关键词: {kw}")
else:
st.error(f"查询失败: {result}")
# ======================
# 筛选条件
# ======================
with st.sidebar.expander("🔍 筛选条件", expanded=True):
kw_df = get_keywords()
keyword_options = kw_df['keyword'].tolist()
# 关键词搜索
search_term = st.text_input("搜索关键词", placeholder="输入关键词搜索")
# 过滤关键词
filtered_keywords = keyword_options
if search_term:
filtered_keywords = [kw for kw in keyword_options if search_term.lower() in kw.lower()]
# 关键词选择:支持多选
selected_keywords = st.multiselect(
"选择关键词(可多选)",
options=filtered_keywords,
default=filtered_keywords[:3] if filtered_keywords else [],
placeholder="请选择关键词"
)
# 自定义关键词输入
custom_keyword = ""
if st.checkbox("输入自定义关键词", value=False):
custom_keyword = st.text_input("自定义关键词", placeholder="例如:春节")
# 确定最终的关键词
final_keywords = selected_keywords.copy()
if custom_keyword:
final_keywords.append(custom_keyword)
# 获取选中关键词的ID(仅适用于列表中的关键词)
selected_ids = []
for keyword in selected_keywords:
if keyword in keyword_options:
selected_ids.extend(kw_df[kw_df['keyword'] == keyword]['id'].tolist())
date_range = st.date_input(
"日期范围",
value=(datetime.now() - timedelta(days=30), datetime.now()),
max_value=datetime.now()
)
selected_metrics = st.multiselect(
"展示维度",
options=list(FIELD_MAPPING.keys()),
format_func=lambda x: FIELD_MAPPING[x],
default=['total_score']
)
# 快捷操作
col1, col2 = st.columns(2)
with col1:
if st.button("清空筛选"):
# 清空筛选条件
st.session_state['realtime_data'] = None
st.session_state['realtime_keyword'] = None
st.success("筛选条件已清空")
with col2:
if st.button("刷新数据"):
# 清除缓存
get_keywords.clear()
st.success("数据已刷新")
# ======================
# 关键词管理
# ======================
with st.sidebar.expander("📝 关键词管理", expanded=False):
# 添加关键词
new_keyword = st.text_input("输入新关键词", placeholder="例如:人工智能")
if st.button("添加关键词"):
if new_keyword.strip():
success, message = add_keyword(new_keyword.strip())
if success:
st.success(message)
# 清除缓存
get_keywords.clear()
# 重新加载关键词列表
kw_df = get_keywords()
else:
st.warning(message)
else:
st.warning("请输入关键词")
# 删除关键词
if not kw_df.empty:
keyword_options = {row['keyword']: row['id'] for _, row in kw_df.iterrows()}
selected_del_keyword = st.selectbox(
"选择要删除的关键词",
options=list(keyword_options.keys()),
index=None,
placeholder="请选择关键词"
)
if st.button("删除关键词"):
if selected_del_keyword:
keyword_id = keyword_options[selected_del_keyword]
success = delete_keyword(keyword_id)
if success:
st.success(f"关键词 '{selected_del_keyword}' 已删除")
# 清除缓存
get_keywords.clear()
# 重新加载关键词列表
kw_df = get_keywords()
else:
st.error("删除失败,请重试")
else:
st.warning("请选择要删除的关键词")
# ======================
# 数据导出
# ======================
# with st.sidebar.expander("💾 数据导出", expanded=False):
# st.markdown("**导出当前数据**:")
# if st.button("导出为CSV"):
# if 'df' in locals():
# import io
# csv = df.to_csv(index=False)
# st.download_button(
# label="下载CSV文件",
# data=csv,
# file_name=f"微信指数_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",
# mime="text/csv"
# )
# else:
# st.warning("请先查询数据")
# if st.button("导出为Excel"):
# if 'df' in locals():
# try:
# import openpyxl
# import io
# output = io.BytesIO()
# with pd.ExcelWriter(output, engine='openpyxl') as writer:
# df.to_excel(writer, index=False, sheet_name='微信指数')
# output.seek(0)
# st.download_button(
# label="下载Excel文件",
# data=output,
# file_name=f"微信指数_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx",
# mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
# )
# except ImportError:
# st.warning("请先安装openpyxl库以支持Excel导出")
# else:
# st.warning("请先查询数据")
# ======================
# Main
# =====================
st.title("微信指数趋势")
# 检查是否有实时查询数据
realtime_data = st.session_state.get('realtime_data')
realtime_keyword = st.session_state.get('realtime_keyword')
# 处理查询逻辑
if final_keywords:
# 检查是否有自定义关键词需要实时查询
if custom_keyword:
# 执行实时查询
with st.spinner(f"正在实时查询 '{custom_keyword}' 的数据..."):
success, result = fetch_real_time_data(custom_keyword)
if success:
st.subheader(f"⚡ 实时查询结果:{custom_keyword}")
df = result
else:
st.error(f"查询失败: {result}")
st.stop()
elif realtime_data is not None:
# 显示实时查询结果
st.subheader(f"⚡ 实时查询结果:{realtime_keyword}")
df = realtime_data
else:
# 显示常规查询结果(支持多个关键词)
if not selected_ids:
st.warning("请至少选择一个关键词")
st.stop()
if len(date_range) != 2:
st.warning("请选择完整的日期范围")
st.stop()
with st.spinner("正在加载数据..."):
df = get_trend_data(selected_ids, date_range[0], date_range[1])
if df.empty:
st.warning("所选时间段内暂无数据")
st.stop()
else:
st.warning("请选择或输入关键词")
st.stop()
# ======================
# KPI 卡片
# ======================
st.markdown("### 💡 最新指数概览")
# 获取唯一的关键词列表
unique_keywords = df['keyword'].unique().tolist()
cols = st.columns(min(len(unique_keywords), 4))
for i, kw in enumerate(unique_keywords):
kw_df = df[df['keyword'] == kw].sort_values("date")
if kw_df.empty:
continue
latest = kw_df.iloc[-1]
prev = kw_df.iloc[-2] if len(kw_df) > 1 else None
delta = ""
if prev is not None and prev['total_score'] > 0:
pct = (latest['total_score'] - prev['total_score']) / prev['total_score'] * 100
delta = f"{pct:+.2f}%"
with cols[i % 4]:
st.metric(
label=f"{kw}(总指数)",
value=f"{int(latest['total_score']):,}",
delta=delta
)
st.divider()
# ======================
# 趋势图(专业版)
# ======================
st.markdown("### 📈 趋势对比分析")
compare_metric = st.selectbox(
"对比指标",
options=selected_metrics,
format_func=lambda x: FIELD_MAPPING[x]
)
fig = px.line(
df,
x="date",
y=compare_metric,
color="keyword",
title=f"各关键词【{FIELD_MAPPING[compare_metric]}】趋势对比"
)
# —— 核心专业配置 ——
fig.update_traces(
mode="lines",
line=dict(width=3),
hovertemplate=(
# "日期:%{x|%Y.%m.%d}
"
"关键词:%{data.name}
"
"指数:%{y:,.0f}"
""
)
)
fig.update_layout(
hovermode="x unified",
margin=dict(t=80, l=60, r=40, b=50),
legend=dict(
orientation="h",
yanchor="bottom",
y=1.05,
xanchor="right",
x=1,
title="关键词(点击可隐藏)"
),
xaxis=dict(
title="日期",
tickformat="%Y.%m.%d",
showgrid=False,
tickfont=dict(size=12)
),
yaxis=dict(
title="指数",
showgrid=True,
gridcolor="rgba(255,255,255,0.06)",
tickfont=dict(size=12),
tickformat=",d"
),
hoverlabel=dict(
font_size=12,
font_family="Arial"
)
)
st.plotly_chart(fig, width='stretch')
# st.caption(
# "说明:微信指数为相对热度指标,用于趋势对比分析,不代表绝对搜索量。"
# )
# ======================
# 原始数据
# ======================
with st.expander("📄 查看原始数据"):
display_df = df[['date', 'keyword'] + selected_metrics].copy()
display_df.columns = ['日期', '关键词'] + [FIELD_MAPPING[m] for m in selected_metrics]
display_df['日期'] = display_df['日期'].dt.strftime("%Y-%m-%d")
st.dataframe(
display_df.sort_values(['日期', '关键词'], ascending=[False, True]),
width='stretch',
hide_index=True
)