import sys import os import json from datetime import datetime, timedelta import streamlit as st import pandas as pd from sqlalchemy import create_engine from dotenv import load_dotenv import plotly.express as px # ===== 项目路径 ===== sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) from app.core.config import settings # ===== 页面配置 ===== st.set_page_config( page_title="微信指数趋势看板", page_icon="📈", layout="wide", initial_sidebar_state="expanded", # menu_items={ # 'Get Help': 'https://www.extremelycoolapp.com/help', # 'Report a bug': "https://www.extremelycoolapp.com/bug", # 'About': "# This is a header. This is an *extremely* cool app!" # } ) load_dotenv(os.path.join(os.path.dirname(__file__), '../.env')) # ===== 字段映射 ===== FIELD_MAPPING = { 'total_score': '总指数', 'finder_score': '视频号', 'query_score': '搜一搜', 'mpdoc_score': '公众号', 'live_score': '直播', 'miniapp_score': '小程序' } # ====================== # 数据层 # ====================== @st.cache_resource def get_db_connection(): return create_engine(settings.DATABASE_URL.replace('+aiomysql', '+pymysql')) @st.cache_data(ttl=3600) def get_keywords(): engine = get_db_connection() sql = """ SELECT id, keyword FROM wx_trend_keywords WHERE is_active = 1 ORDER BY priority DESC """ with engine.connect() as conn: return pd.read_sql(sql, conn) def add_keyword(keyword): """ 添加关键词:如果存在则激活,不存在则新增 """ from sqlalchemy import text engine = get_db_connection() with engine.begin() as conn: # 检查是否已存在 check_sql = text("SELECT id, is_active FROM wx_trend_keywords WHERE keyword = :keyword") result = conn.execute(check_sql, {"keyword": keyword}).fetchone() if result: # 已存在,激活 if result[1] == 0: update_sql = text("UPDATE wx_trend_keywords SET is_active = 1 WHERE id = :id") conn.execute(update_sql, {"id": result[0]}) return True, f"关键词 '{keyword}' 已重新激活" else: return False, f"关键词 '{keyword}' 已存在且处于激活状态" else: # 不存在,新增 insert_sql = text("INSERT INTO wx_trend_keywords (keyword, is_active, priority) VALUES (:keyword, 1, 0)") conn.execute(insert_sql, {"keyword": keyword}) return True, f"关键词 '{keyword}' 已成功添加" def delete_keyword(keyword_id): """ 删除关键词:将is_active改为0 """ from sqlalchemy import text engine = get_db_connection() with engine.begin() as conn: sql = text("UPDATE wx_trend_keywords SET is_active = 0 WHERE id = :id") result = conn.execute(sql, {"id": keyword_id}) return result.rowcount > 0 def fetch_real_time_data(keyword): """ 实时查询关键词数据 """ import httpx from datetime import datetime, timedelta from app.core.config import settings end_date = datetime.now() start_date = end_date - timedelta(days=settings.LIMIT_DAY) payload = { "keyword": keyword, "start_ymd": str(start_date.strftime("%Y%m%d")), "end_ymd": str(end_date.strftime("%Y%m%d")) } try: with httpx.Client(timeout=30.0) as client: resp = client.post(settings.API_URL, json=payload) resp.raise_for_status() data = resp.json() if data.get('code') != 0: return False, data.get('msg', 'API请求失败') raw_list = data.get('data', {}).get('data', []) raw_list.sort(key=lambda x: x['ymd']) if not raw_list: return False, '未获取到数据' # 转换为DataFrame df_list = [] for item in raw_list: row = { 'keyword': keyword, 'ymd': item['ymd'], 'date': pd.to_datetime(item['ymd'], format='%Y%m%d') } row.update(item['channel_score']) df_list.append(row) return True, pd.DataFrame(df_list) except Exception as e: return False, f"查询失败: {str(e)}" @st.cache_data(ttl=3600) def get_trend_data(keyword_ids, start_date, end_date): if not keyword_ids: return pd.DataFrame() engine = get_db_connection() s_date = start_date.strftime("%Y%m%d") e_date = end_date.strftime("%Y%m%d") ids_tuple = tuple(keyword_ids) if len(keyword_ids) > 1 else f"({keyword_ids[0]})" sql = f""" SELECT k.keyword, d.ymd, d.channel_score FROM wx_trend_data d JOIN wx_trend_keywords k ON d.keyword_id = k.id WHERE d.keyword_id IN {ids_tuple} AND d.ymd BETWEEN '{s_date}' AND '{e_date}' ORDER BY d.ymd ASC """ with engine.connect() as conn: df = pd.read_sql(sql, conn) if df.empty: return df if isinstance(df['channel_score'].iloc[0], str): df['channel_score'] = df['channel_score'].apply(json.loads) score_df = pd.json_normalize(df['channel_score']) result = pd.concat([df[['keyword', 'ymd']], score_df], axis=1) result['date'] = pd.to_datetime(result['ymd'], format='%Y%m%d') return result # ====================== # Sidebar # ====================== # 初始化session state if 'recent_keywords' not in st.session_state: st.session_state['recent_keywords'] = [] # ====================== # 实时查询 # ====================== with st.sidebar.expander("⚡ 实时查询", expanded=True): realtime_keyword = st.text_input("输入要实时查询的关键词", placeholder="例如:春节") if st.button("实时查询"): if realtime_keyword.strip(): with st.spinner(f"正在查询 '{realtime_keyword}' 的实时数据..."): success, result = fetch_real_time_data(realtime_keyword.strip()) if success: # 将实时数据存储到session state st.session_state['realtime_data'] = result st.session_state['realtime_keyword'] = realtime_keyword.strip() # 更新最近使用的关键词 if realtime_keyword.strip() not in st.session_state['recent_keywords']: st.session_state['recent_keywords'].insert(0, realtime_keyword.strip()) # 只保留最近10个 st.session_state['recent_keywords'] = st.session_state['recent_keywords'][:10] st.success("实时查询成功") else: st.error(f"查询失败: {result}") else: st.warning("请输入关键词") # 最近使用的关键词 if st.session_state['recent_keywords']: st.markdown("**最近查询**:") for i, kw in enumerate(st.session_state['recent_keywords'][:5]): if st.button(kw, key=f"recent_{i}", help="点击使用此关键词"): # 触发实时查询 success, result = fetch_real_time_data(kw) if success: st.session_state['realtime_data'] = result st.session_state['realtime_keyword'] = kw st.success(f"已查询关键词: {kw}") else: st.error(f"查询失败: {result}") # ====================== # 筛选条件 # ====================== with st.sidebar.expander("🔍 筛选条件", expanded=True): kw_df = get_keywords() keyword_options = kw_df['keyword'].tolist() # 关键词搜索 search_term = st.text_input("搜索关键词", placeholder="输入关键词搜索") # 过滤关键词 filtered_keywords = keyword_options if search_term: filtered_keywords = [kw for kw in keyword_options if search_term.lower() in kw.lower()] # 关键词选择:支持多选 selected_keywords = st.multiselect( "选择关键词(可多选)", options=filtered_keywords, default=filtered_keywords[:3] if filtered_keywords else [], placeholder="请选择关键词" ) # 自定义关键词输入 custom_keyword = "" if st.checkbox("输入自定义关键词", value=False): custom_keyword = st.text_input("自定义关键词", placeholder="例如:春节") # 确定最终的关键词 final_keywords = selected_keywords.copy() if custom_keyword: final_keywords.append(custom_keyword) # 获取选中关键词的ID(仅适用于列表中的关键词) selected_ids = [] for keyword in selected_keywords: if keyword in keyword_options: selected_ids.extend(kw_df[kw_df['keyword'] == keyword]['id'].tolist()) date_range = st.date_input( "日期范围", value=(datetime.now() - timedelta(days=30), datetime.now()), max_value=datetime.now() ) selected_metrics = st.multiselect( "展示维度", options=list(FIELD_MAPPING.keys()), format_func=lambda x: FIELD_MAPPING[x], default=['total_score'] ) # 快捷操作 col1, col2 = st.columns(2) with col1: if st.button("清空筛选"): # 清空筛选条件 st.session_state['realtime_data'] = None st.session_state['realtime_keyword'] = None st.success("筛选条件已清空") with col2: if st.button("刷新数据"): # 清除缓存 get_keywords.clear() st.success("数据已刷新") # ====================== # 关键词管理 # ====================== with st.sidebar.expander("📝 关键词管理", expanded=False): # 添加关键词 new_keyword = st.text_input("输入新关键词", placeholder="例如:人工智能") if st.button("添加关键词"): if new_keyword.strip(): success, message = add_keyword(new_keyword.strip()) if success: st.success(message) # 清除缓存 get_keywords.clear() # 重新加载关键词列表 kw_df = get_keywords() else: st.warning(message) else: st.warning("请输入关键词") # 删除关键词 if not kw_df.empty: keyword_options = {row['keyword']: row['id'] for _, row in kw_df.iterrows()} selected_del_keyword = st.selectbox( "选择要删除的关键词", options=list(keyword_options.keys()), index=None, placeholder="请选择关键词" ) if st.button("删除关键词"): if selected_del_keyword: keyword_id = keyword_options[selected_del_keyword] success = delete_keyword(keyword_id) if success: st.success(f"关键词 '{selected_del_keyword}' 已删除") # 清除缓存 get_keywords.clear() # 重新加载关键词列表 kw_df = get_keywords() else: st.error("删除失败,请重试") else: st.warning("请选择要删除的关键词") # ====================== # 数据导出 # ====================== # with st.sidebar.expander("💾 数据导出", expanded=False): # st.markdown("**导出当前数据**:") # if st.button("导出为CSV"): # if 'df' in locals(): # import io # csv = df.to_csv(index=False) # st.download_button( # label="下载CSV文件", # data=csv, # file_name=f"微信指数_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv", # mime="text/csv" # ) # else: # st.warning("请先查询数据") # if st.button("导出为Excel"): # if 'df' in locals(): # try: # import openpyxl # import io # output = io.BytesIO() # with pd.ExcelWriter(output, engine='openpyxl') as writer: # df.to_excel(writer, index=False, sheet_name='微信指数') # output.seek(0) # st.download_button( # label="下载Excel文件", # data=output, # file_name=f"微信指数_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx", # mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" # ) # except ImportError: # st.warning("请先安装openpyxl库以支持Excel导出") # else: # st.warning("请先查询数据") # ====================== # Main # ===================== st.title("微信指数趋势") # 检查是否有实时查询数据 realtime_data = st.session_state.get('realtime_data') realtime_keyword = st.session_state.get('realtime_keyword') # 处理查询逻辑 if final_keywords: # 检查是否有自定义关键词需要实时查询 if custom_keyword: # 执行实时查询 with st.spinner(f"正在实时查询 '{custom_keyword}' 的数据..."): success, result = fetch_real_time_data(custom_keyword) if success: st.subheader(f"⚡ 实时查询结果:{custom_keyword}") df = result else: st.error(f"查询失败: {result}") st.stop() elif realtime_data is not None: # 显示实时查询结果 st.subheader(f"⚡ 实时查询结果:{realtime_keyword}") df = realtime_data else: # 显示常规查询结果(支持多个关键词) if not selected_ids: st.warning("请至少选择一个关键词") st.stop() if len(date_range) != 2: st.warning("请选择完整的日期范围") st.stop() with st.spinner("正在加载数据..."): df = get_trend_data(selected_ids, date_range[0], date_range[1]) if df.empty: st.warning("所选时间段内暂无数据") st.stop() else: st.warning("请选择或输入关键词") st.stop() # ====================== # KPI 卡片 # ====================== st.markdown("### 💡 最新指数概览") # 获取唯一的关键词列表 unique_keywords = df['keyword'].unique().tolist() cols = st.columns(min(len(unique_keywords), 4)) for i, kw in enumerate(unique_keywords): kw_df = df[df['keyword'] == kw].sort_values("date") if kw_df.empty: continue latest = kw_df.iloc[-1] prev = kw_df.iloc[-2] if len(kw_df) > 1 else None delta = "" if prev is not None and prev['total_score'] > 0: pct = (latest['total_score'] - prev['total_score']) / prev['total_score'] * 100 delta = f"{pct:+.2f}%" with cols[i % 4]: st.metric( label=f"{kw}(总指数)", value=f"{int(latest['total_score']):,}", delta=delta ) st.divider() # ====================== # 趋势图(专业版) # ====================== st.markdown("### 📈 趋势对比分析") compare_metric = st.selectbox( "对比指标", options=selected_metrics, format_func=lambda x: FIELD_MAPPING[x] ) fig = px.line( df, x="date", y=compare_metric, color="keyword", title=f"各关键词【{FIELD_MAPPING[compare_metric]}】趋势对比" ) # —— 核心专业配置 —— fig.update_traces( mode="lines", line=dict(width=3), hovertemplate=( # "日期:%{x|%Y.%m.%d}
" "关键词:%{data.name}
" "指数:%{y:,.0f}" "" ) ) fig.update_layout( hovermode="x unified", margin=dict(t=80, l=60, r=40, b=50), legend=dict( orientation="h", yanchor="bottom", y=1.05, xanchor="right", x=1, title="关键词(点击可隐藏)" ), xaxis=dict( title="日期", tickformat="%Y.%m.%d", showgrid=False, tickfont=dict(size=12) ), yaxis=dict( title="指数", showgrid=True, gridcolor="rgba(255,255,255,0.06)", tickfont=dict(size=12), tickformat=",d" ), hoverlabel=dict( font_size=12, font_family="Arial" ) ) st.plotly_chart(fig, width='stretch') # st.caption( # "说明:微信指数为相对热度指标,用于趋势对比分析,不代表绝对搜索量。" # ) # ====================== # 原始数据 # ====================== with st.expander("📄 查看原始数据"): display_df = df[['date', 'keyword'] + selected_metrics].copy() display_df.columns = ['日期', '关键词'] + [FIELD_MAPPING[m] for m in selected_metrics] display_df['日期'] = display_df['日期'].dt.strftime("%Y-%m-%d") st.dataframe( display_df.sort_values(['日期', '关键词'], ascending=[False, True]), width='stretch', hide_index=True )