| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551 |
- import sys
- import os
- import json
- from datetime import datetime, timedelta
- import streamlit as st
- import pandas as pd
- from sqlalchemy import create_engine
- from dotenv import load_dotenv
- import plotly.express as px
- # ===== 项目路径 =====
- sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
- from app.core.config import settings
- # ===== 页面配置 =====
- st.set_page_config(
- page_title="微信指数趋势看板",
- page_icon="📈",
- layout="wide",
- initial_sidebar_state="expanded",
- # menu_items={
- # 'Get Help': 'https://www.extremelycoolapp.com/help',
- # 'Report a bug': "https://www.extremelycoolapp.com/bug",
- # 'About': "# This is a header. This is an *extremely* cool app!"
- # }
- )
- load_dotenv(os.path.join(os.path.dirname(__file__), '../.env'))
- # ===== 字段映射 =====
- FIELD_MAPPING = {
- 'total_score': '总指数',
- 'finder_score': '视频号',
- 'query_score': '搜一搜',
- 'mpdoc_score': '公众号',
- 'live_score': '直播',
- 'miniapp_score': '小程序'
- }
- # ======================
- # 数据层
- # ======================
- @st.cache_resource
- def get_db_connection():
- return create_engine(settings.DATABASE_URL.replace('+aiomysql', '+pymysql'))
- @st.cache_data(ttl=3600)
- def get_keywords():
- engine = get_db_connection()
- sql = """
- SELECT id, keyword
- FROM wx_trend_keywords
- WHERE is_active = 1
- ORDER BY priority DESC
- """
- with engine.connect() as conn:
- return pd.read_sql(sql, conn)
- def add_keyword(keyword):
- """
- 添加关键词:如果存在则激活,不存在则新增
- """
- from sqlalchemy import text
-
- engine = get_db_connection()
- with engine.begin() as conn:
- # 检查是否已存在
- check_sql = text("SELECT id, is_active FROM wx_trend_keywords WHERE keyword = :keyword")
- result = conn.execute(check_sql, {"keyword": keyword}).fetchone()
-
- if result:
- # 已存在,激活
- if result[1] == 0:
- update_sql = text("UPDATE wx_trend_keywords SET is_active = 1 WHERE id = :id")
- conn.execute(update_sql, {"id": result[0]})
- return True, f"关键词 '{keyword}' 已重新激活"
- else:
- return False, f"关键词 '{keyword}' 已存在且处于激活状态"
- else:
- # 不存在,新增
- insert_sql = text("INSERT INTO wx_trend_keywords (keyword, is_active, priority) VALUES (:keyword, 1, 0)")
- conn.execute(insert_sql, {"keyword": keyword})
- return True, f"关键词 '{keyword}' 已成功添加"
- def delete_keyword(keyword_id):
- """
- 删除关键词:将is_active改为0
- """
- from sqlalchemy import text
-
- engine = get_db_connection()
- with engine.begin() as conn:
- sql = text("UPDATE wx_trend_keywords SET is_active = 0 WHERE id = :id")
- result = conn.execute(sql, {"id": keyword_id})
- return result.rowcount > 0
- def fetch_real_time_data(keyword):
- """
- 实时查询关键词数据
- """
- import httpx
- from datetime import datetime, timedelta
- from app.core.config import settings
-
- end_date = datetime.now()
- start_date = end_date - timedelta(days=settings.LIMIT_DAY)
-
- payload = {
- "keyword": keyword,
- "start_ymd": str(start_date.strftime("%Y%m%d")),
- "end_ymd": str(end_date.strftime("%Y%m%d"))
- }
-
- try:
- with httpx.Client(timeout=30.0) as client:
- resp = client.post(settings.API_URL, json=payload)
- resp.raise_for_status()
- data = resp.json()
-
- if data.get('code') != 0:
- return False, data.get('msg', 'API请求失败')
-
- raw_list = data.get('data', {}).get('data', [])
- raw_list.sort(key=lambda x: x['ymd'])
-
- if not raw_list:
- return False, '未获取到数据'
-
- # 转换为DataFrame
- df_list = []
- for item in raw_list:
- row = {
- 'keyword': keyword,
- 'ymd': item['ymd'],
- 'date': pd.to_datetime(item['ymd'], format='%Y%m%d')
- }
- row.update(item['channel_score'])
- df_list.append(row)
-
- return True, pd.DataFrame(df_list)
-
- except Exception as e:
- return False, f"查询失败: {str(e)}"
- @st.cache_data(ttl=3600)
- def get_trend_data(keyword_ids, start_date, end_date):
- if not keyword_ids:
- return pd.DataFrame()
- engine = get_db_connection()
- s_date = start_date.strftime("%Y%m%d")
- e_date = end_date.strftime("%Y%m%d")
- ids_tuple = tuple(keyword_ids) if len(keyword_ids) > 1 else f"({keyword_ids[0]})"
- sql = f"""
- SELECT
- k.keyword,
- d.ymd,
- d.channel_score
- FROM wx_trend_data d
- JOIN wx_trend_keywords k ON d.keyword_id = k.id
- WHERE d.keyword_id IN {ids_tuple}
- AND d.ymd BETWEEN '{s_date}' AND '{e_date}'
- ORDER BY d.ymd ASC
- """
- with engine.connect() as conn:
- df = pd.read_sql(sql, conn)
- if df.empty:
- return df
- if isinstance(df['channel_score'].iloc[0], str):
- df['channel_score'] = df['channel_score'].apply(json.loads)
- score_df = pd.json_normalize(df['channel_score'])
- result = pd.concat([df[['keyword', 'ymd']], score_df], axis=1)
- result['date'] = pd.to_datetime(result['ymd'], format='%Y%m%d')
- return result
- # ======================
- # Sidebar
- # ======================
- # 初始化session state
- if 'recent_keywords' not in st.session_state:
- st.session_state['recent_keywords'] = []
- # ======================
- # 实时查询
- # ======================
- with st.sidebar.expander("⚡ 实时查询", expanded=True):
- realtime_keyword = st.text_input("输入要实时查询的关键词", placeholder="例如:春节")
- if st.button("实时查询"):
- if realtime_keyword.strip():
- with st.spinner(f"正在查询 '{realtime_keyword}' 的实时数据..."):
- success, result = fetch_real_time_data(realtime_keyword.strip())
- if success:
- # 将实时数据存储到session state
- st.session_state['realtime_data'] = result
- st.session_state['realtime_keyword'] = realtime_keyword.strip()
- # 更新最近使用的关键词
- if realtime_keyword.strip() not in st.session_state['recent_keywords']:
- st.session_state['recent_keywords'].insert(0, realtime_keyword.strip())
- # 只保留最近10个
- st.session_state['recent_keywords'] = st.session_state['recent_keywords'][:10]
- st.success("实时查询成功")
- else:
- st.error(f"查询失败: {result}")
- else:
- st.warning("请输入关键词")
-
- # 最近使用的关键词
- if st.session_state['recent_keywords']:
- st.markdown("**最近查询**:")
- for i, kw in enumerate(st.session_state['recent_keywords'][:5]):
- if st.button(kw, key=f"recent_{i}", help="点击使用此关键词"):
- # 触发实时查询
- success, result = fetch_real_time_data(kw)
- if success:
- st.session_state['realtime_data'] = result
- st.session_state['realtime_keyword'] = kw
- st.success(f"已查询关键词: {kw}")
- else:
- st.error(f"查询失败: {result}")
- # ======================
- # 筛选条件
- # ======================
- with st.sidebar.expander("🔍 筛选条件", expanded=True):
- kw_df = get_keywords()
- keyword_options = kw_df['keyword'].tolist()
-
- # 关键词搜索
- search_term = st.text_input("搜索关键词", placeholder="输入关键词搜索")
-
- # 过滤关键词
- filtered_keywords = keyword_options
- if search_term:
- filtered_keywords = [kw for kw in keyword_options if search_term.lower() in kw.lower()]
-
- # 关键词选择:支持多选
- selected_keywords = st.multiselect(
- "选择关键词(可多选)",
- options=filtered_keywords,
- default=filtered_keywords[:3] if filtered_keywords else [],
- placeholder="请选择关键词"
- )
-
- # 自定义关键词输入
- custom_keyword = ""
- if st.checkbox("输入自定义关键词", value=False):
- custom_keyword = st.text_input("自定义关键词", placeholder="例如:春节")
-
- # 确定最终的关键词
- final_keywords = selected_keywords.copy()
- if custom_keyword:
- final_keywords.append(custom_keyword)
-
- # 获取选中关键词的ID(仅适用于列表中的关键词)
- selected_ids = []
- for keyword in selected_keywords:
- if keyword in keyword_options:
- selected_ids.extend(kw_df[kw_df['keyword'] == keyword]['id'].tolist())
-
- date_range = st.date_input(
- "日期范围",
- value=(datetime.now() - timedelta(days=30), datetime.now()),
- max_value=datetime.now()
- )
-
- selected_metrics = st.multiselect(
- "展示维度",
- options=list(FIELD_MAPPING.keys()),
- format_func=lambda x: FIELD_MAPPING[x],
- default=['total_score']
- )
-
- # 快捷操作
- col1, col2 = st.columns(2)
- with col1:
- if st.button("清空筛选"):
- # 清空筛选条件
- st.session_state['realtime_data'] = None
- st.session_state['realtime_keyword'] = None
- st.success("筛选条件已清空")
- with col2:
- if st.button("刷新数据"):
- # 清除缓存
- get_keywords.clear()
- st.success("数据已刷新")
- # ======================
- # 关键词管理
- # ======================
- with st.sidebar.expander("📝 关键词管理", expanded=False):
- # 添加关键词
- new_keyword = st.text_input("输入新关键词", placeholder="例如:人工智能")
- if st.button("添加关键词"):
- if new_keyword.strip():
- success, message = add_keyword(new_keyword.strip())
- if success:
- st.success(message)
- # 清除缓存
- get_keywords.clear()
- # 重新加载关键词列表
- kw_df = get_keywords()
- else:
- st.warning(message)
- else:
- st.warning("请输入关键词")
-
- # 删除关键词
- if not kw_df.empty:
- keyword_options = {row['keyword']: row['id'] for _, row in kw_df.iterrows()}
- selected_del_keyword = st.selectbox(
- "选择要删除的关键词",
- options=list(keyword_options.keys()),
- index=None,
- placeholder="请选择关键词"
- )
- if st.button("删除关键词"):
- if selected_del_keyword:
- keyword_id = keyword_options[selected_del_keyword]
- success = delete_keyword(keyword_id)
- if success:
- st.success(f"关键词 '{selected_del_keyword}' 已删除")
- # 清除缓存
- get_keywords.clear()
- # 重新加载关键词列表
- kw_df = get_keywords()
- else:
- st.error("删除失败,请重试")
- else:
- st.warning("请选择要删除的关键词")
- # ======================
- # 数据导出
- # ======================
- # with st.sidebar.expander("💾 数据导出", expanded=False):
- # st.markdown("**导出当前数据**:")
- # if st.button("导出为CSV"):
- # if 'df' in locals():
- # import io
- # csv = df.to_csv(index=False)
- # st.download_button(
- # label="下载CSV文件",
- # data=csv,
- # file_name=f"微信指数_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",
- # mime="text/csv"
- # )
- # else:
- # st.warning("请先查询数据")
-
- # if st.button("导出为Excel"):
- # if 'df' in locals():
- # try:
- # import openpyxl
- # import io
- # output = io.BytesIO()
- # with pd.ExcelWriter(output, engine='openpyxl') as writer:
- # df.to_excel(writer, index=False, sheet_name='微信指数')
- # output.seek(0)
- # st.download_button(
- # label="下载Excel文件",
- # data=output,
- # file_name=f"微信指数_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx",
- # mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
- # )
- # except ImportError:
- # st.warning("请先安装openpyxl库以支持Excel导出")
- # else:
- # st.warning("请先查询数据")
- # ======================
- # Main
- # =====================
- st.title("微信指数趋势")
- # 检查是否有实时查询数据
- realtime_data = st.session_state.get('realtime_data')
- realtime_keyword = st.session_state.get('realtime_keyword')
- # 处理查询逻辑
- if final_keywords:
- # 检查是否有自定义关键词需要实时查询
- if custom_keyword:
- # 执行实时查询
- with st.spinner(f"正在实时查询 '{custom_keyword}' 的数据..."):
- success, result = fetch_real_time_data(custom_keyword)
- if success:
- st.subheader(f"⚡ 实时查询结果:{custom_keyword}")
- df = result
- else:
- st.error(f"查询失败: {result}")
- st.stop()
- elif realtime_data is not None:
- # 显示实时查询结果
- st.subheader(f"⚡ 实时查询结果:{realtime_keyword}")
- df = realtime_data
- else:
- # 显示常规查询结果(支持多个关键词)
- if not selected_ids:
- st.warning("请至少选择一个关键词")
- st.stop()
- if len(date_range) != 2:
- st.warning("请选择完整的日期范围")
- st.stop()
- with st.spinner("正在加载数据..."):
- df = get_trend_data(selected_ids, date_range[0], date_range[1])
- if df.empty:
- st.warning("所选时间段内暂无数据")
- st.stop()
- else:
- st.warning("请选择或输入关键词")
- st.stop()
- # ======================
- # KPI 卡片
- # ======================
- st.markdown("### 💡 最新指数概览")
- # 获取唯一的关键词列表
- unique_keywords = df['keyword'].unique().tolist()
- cols = st.columns(min(len(unique_keywords), 4))
- for i, kw in enumerate(unique_keywords):
- kw_df = df[df['keyword'] == kw].sort_values("date")
- if kw_df.empty:
- continue
- latest = kw_df.iloc[-1]
- prev = kw_df.iloc[-2] if len(kw_df) > 1 else None
- delta = ""
- if prev is not None and prev['total_score'] > 0:
- pct = (latest['total_score'] - prev['total_score']) / prev['total_score'] * 100
- delta = f"{pct:+.2f}%"
- with cols[i % 4]:
- st.metric(
- label=f"{kw}(总指数)",
- value=f"{int(latest['total_score']):,}",
- delta=delta
- )
- st.divider()
- # ======================
- # 趋势图(专业版)
- # ======================
- st.markdown("### 📈 趋势对比分析")
- compare_metric = st.selectbox(
- "对比指标",
- options=selected_metrics,
- format_func=lambda x: FIELD_MAPPING[x]
- )
- fig = px.line(
- df,
- x="date",
- y=compare_metric,
- color="keyword",
- title=f"各关键词【{FIELD_MAPPING[compare_metric]}】趋势对比"
- )
- # —— 核心专业配置 ——
- fig.update_traces(
- mode="lines",
- line=dict(width=3),
- hovertemplate=(
- # "日期:%{x|%Y.%m.%d}<br>"
- "关键词:%{data.name}<br>"
- "指数:%{y:,.0f}"
- "<extra></extra>"
- )
- )
- fig.update_layout(
- hovermode="x unified",
- margin=dict(t=80, l=60, r=40, b=50),
- legend=dict(
- orientation="h",
- yanchor="bottom",
- y=1.05,
- xanchor="right",
- x=1,
- title="关键词(点击可隐藏)"
- ),
- xaxis=dict(
- title="日期",
- tickformat="%Y.%m.%d",
- showgrid=False,
- tickfont=dict(size=12)
- ),
- yaxis=dict(
- title="指数",
- showgrid=True,
- gridcolor="rgba(255,255,255,0.06)",
- tickfont=dict(size=12),
- tickformat=",d"
- ),
- hoverlabel=dict(
- font_size=12,
- font_family="Arial"
- )
- )
- st.plotly_chart(fig, width='stretch')
- # st.caption(
- # "说明:微信指数为相对热度指标,用于趋势对比分析,不代表绝对搜索量。"
- # )
- # ======================
- # 原始数据
- # ======================
- with st.expander("📄 查看原始数据"):
- display_df = df[['date', 'keyword'] + selected_metrics].copy()
- display_df.columns = ['日期', '关键词'] + [FIELD_MAPPING[m] for m in selected_metrics]
- display_df['日期'] = display_df['日期'].dt.strftime("%Y-%m-%d")
- st.dataframe(
- display_df.sort_values(['日期', '关键词'], ascending=[False, True]),
- width='stretch',
- hide_index=True
- )
|