"""热点内容 MySQL 仓储。""" from __future__ import annotations import hashlib import json from datetime import date, datetime, timedelta from typing import Any try: import pymysql from pymysql.cursors import DictCursor except ImportError: # pragma: no cover - runtime dependency check pymysql = None DictCursor = None from app.hot_content.exceptions import HotContentFlowError from app.hot_content.status import ExecutionStatus, PostprocessStatus from app.hot_content.timezone import SHANGHAI_TZ from app.hot_content.types import MysqlConfig def _json_dumps(data: Any) -> str: return json.dumps(data, ensure_ascii=False, separators=(",", ":")) def _nullable_bool(value: Any) -> int | None: if value is None: return None return 1 if value else 0 def _nullable_str(value: Any) -> str | None: if value is None: return None text = str(value).strip() return text if text else None def _nullable_json_dumps(value: Any) -> str | None: if value is None: return None return _json_dumps(value) def _wxindex_word_record_row(payload: dict[str, Any]) -> tuple[Any, ...]: name = str(payload.get("name") or "").strip() analyze_ymd = str(payload.get("analyze_ymd") or "").strip() if not name or not analyze_ymd: raise HotContentFlowError("invalid wxindex word record payload") return ( name, payload.get("meta_id"), analyze_ymd, payload.get("fetch_start_ymd"), payload.get("fetch_end_ymd"), payload.get("data_start_ymd"), payload.get("data_end_ymd"), payload.get("data_days"), _nullable_bool(payload.get("is_sustained_high")), _nullable_bool(payload.get("is_rising")), _nullable_bool(payload.get("is_spike")), payload.get("retain_reason"), _nullable_bool(payload.get("is_internal_demand_matched")), _nullable_str(payload.get("matched_demand")), payload.get("demand_cache_run_id"), _nullable_json_dumps(payload.get("internal_demand_match_json")), payload.get("senior_fit_score"), _nullable_json_dumps(payload.get("demand_senior_fit_json")), _nullable_bool(payload.get("is_final_retained")), payload.get("min_score"), payload.get("max_score"), payload.get("avg_score"), _nullable_json_dumps(payload.get("detail_json")), ) _WXINDEX_WORD_RECORD_UPSERT_SQL = """ INSERT INTO hot_content_wxindex_word_records ( name, meta_id, analyze_ymd, fetch_start_ymd, fetch_end_ymd, data_start_ymd, data_end_ymd, data_days, is_sustained_high, is_rising, is_spike, retain_reason, is_internal_demand_matched, matched_demand, demand_cache_run_id, internal_demand_match_json, senior_fit_score, demand_senior_fit_json, is_final_retained, min_score, max_score, avg_score, detail_json ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE meta_id = VALUES(meta_id), fetch_start_ymd = VALUES(fetch_start_ymd), fetch_end_ymd = VALUES(fetch_end_ymd), data_start_ymd = VALUES(data_start_ymd), data_end_ymd = VALUES(data_end_ymd), data_days = VALUES(data_days), is_sustained_high = VALUES(is_sustained_high), is_rising = VALUES(is_rising), is_spike = VALUES(is_spike), retain_reason = VALUES(retain_reason), is_internal_demand_matched = VALUES(is_internal_demand_matched), matched_demand = VALUES(matched_demand), demand_cache_run_id = VALUES(demand_cache_run_id), internal_demand_match_json = VALUES(internal_demand_match_json), senior_fit_score = VALUES(senior_fit_score), demand_senior_fit_json = VALUES(demand_senior_fit_json), is_final_retained = VALUES(is_final_retained), min_score = VALUES(min_score), max_score = VALUES(max_score), avg_score = VALUES(avg_score), detail_json = VALUES(detail_json) """ _WXINDEX_WORD_RECORD_INIT_UPSERT_SQL = """ INSERT INTO hot_content_wxindex_word_records ( name, meta_id, analyze_ymd, fetch_start_ymd, fetch_end_ymd, data_start_ymd, data_end_ymd, data_days, is_sustained_high, is_rising, is_spike, retain_reason, is_internal_demand_matched, matched_demand, demand_cache_run_id, internal_demand_match_json, senior_fit_score, demand_senior_fit_json, is_final_retained, min_score, max_score, avg_score, detail_json ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE meta_id = VALUES(meta_id), fetch_start_ymd = VALUES(fetch_start_ymd), fetch_end_ymd = VALUES(fetch_end_ymd), demand_cache_run_id = VALUES(demand_cache_run_id) """ def _wxindex_word_stats_row(payload: dict[str, Any]) -> tuple[Any, ...]: name = str(payload.get("name") or "").strip() analyze_ymd = str(payload.get("analyze_ymd") or "").strip() if not name or not analyze_ymd: raise HotContentFlowError("invalid wxindex word stats payload") return ( name, payload.get("meta_id"), analyze_ymd, payload.get("wxindex_word_record_id"), payload.get("retain_reason"), payload.get("senior_fit_score"), payload.get("data_start_ymd"), payload.get("data_end_ymd"), payload.get("data_days"), payload.get("min_score"), payload.get("max_score"), payload.get("avg_score"), _nullable_json_dumps(payload.get("detail_json")), ) _WXINDEX_WORD_STATS_UPSERT_SQL = """ INSERT INTO hot_content_wxindex_word_stats ( name, meta_id, analyze_ymd, wxindex_word_record_id, retain_reason, senior_fit_score, data_start_ymd, data_end_ymd, data_days, min_score, max_score, avg_score, detail_json ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE meta_id = VALUES(meta_id), wxindex_word_record_id = VALUES(wxindex_word_record_id), retain_reason = VALUES(retain_reason), senior_fit_score = VALUES(senior_fit_score), data_start_ymd = VALUES(data_start_ymd), data_end_ymd = VALUES(data_end_ymd), data_days = VALUES(data_days), min_score = VALUES(min_score), max_score = VALUES(max_score), avg_score = VALUES(avg_score), detail_json = VALUES(detail_json) """ def _json_loads(value: Any) -> Any: if value is None: return None if isinstance(value, (dict, list)): return value if isinstance(value, (bytes, bytearray)): value = value.decode("utf-8") if isinstance(value, str): return json.loads(value) return value def _normalize_demand_names(demand_name_set: list[str]) -> list[str]: names: list[str] = [] seen: set[str] = set() for item in demand_name_set: name = str(item).strip() if not name or name in seen: continue seen.add(name) names.append(name) return names def unique_title_key(source: str, title: str) -> str: return hashlib.sha256(f"{source}\n{title}".encode("utf-8")).hexdigest() class HotContentRepository: def __init__(self, config: MysqlConfig): if pymysql is None or DictCursor is None: raise HotContentFlowError("missing dependency: pip install pymysql") self.conn = pymysql.connect( host=config.host, port=config.port, user=config.user, password=config.password, database=config.database, charset=config.charset, autocommit=True, cursorclass=DictCursor, ) def close(self) -> None: self.conn.close() def upsert_record(self, *, source: str, title: str, rank: int | None) -> dict[str, Any]: key = unique_title_key(source, title) sql = """ INSERT INTO hot_content_records ( unique_key, source, title, hot_rank, execution_status, created_at, updated_at ) VALUES (%s, %s, %s, %s, %s, NOW(), NOW()) ON DUPLICATE KEY UPDATE id=LAST_INSERT_ID(id), hot_rank=VALUES(hot_rank), updated_at=NOW() """ with self.conn.cursor() as cursor: cursor.execute( sql, ( key, source, title, rank, ExecutionStatus.HOT_SAVED, ), ) record_id = int(cursor.lastrowid) cursor.execute( """ SELECT id, unique_key, execution_status, article_title, article_body, article_url, decode_request_result IS NOT NULL AS has_decode_request, contribution_points_json IS NOT NULL AS has_contribution_points FROM hot_content_records WHERE id = %s """, (record_id,), ) row = cursor.fetchone() if not row: raise HotContentFlowError(f"missing hot_content_records id={record_id}") return { "id": int(row["id"]), "unique_key": str(row["unique_key"]), "execution_status": int(row["execution_status"]), "article_title": row.get("article_title"), "article_body": row.get("article_body"), "article_url": row.get("article_url"), "has_decode_request": bool(row.get("has_decode_request")), "has_contribution_points": bool(row.get("has_contribution_points")), } def update_status( self, *, record_id: int, status: int, error_message: str | None = None, ) -> None: sql = """ UPDATE hot_content_records SET execution_status=%s, error_reason=%s, updated_at=NOW() WHERE id=%s """ with self.conn.cursor() as cursor: cursor.execute(sql, (status, error_message, record_id)) def update_article( self, *, record_id: int, article_title: str, article_body: str, url: str, ) -> None: sql = """ UPDATE hot_content_records SET article_title=%s, article_body=%s, article_url=%s, execution_status=%s, error_reason=NULL, updated_at=NOW() WHERE id=%s """ with self.conn.cursor() as cursor: cursor.execute( sql, ( article_title, article_body, url, ExecutionStatus.CONTENT_OK, record_id, ), ) def mark_no_valid_content( self, *, record_id: int, reason: str, ) -> None: """搜不到文章或缺标题/正文:仅更新 execution_status + error_reason,不动分类字段。""" sql = """ UPDATE hot_content_records SET execution_status=%s, error_reason=%s, updated_at=NOW() WHERE id=%s """ with self.conn.cursor() as cursor: cursor.execute( sql, ( ExecutionStatus.NO_VALID_CONTENT, str(reason or "no valid content").strip(), record_id, ), ) def update_category_filter_result( self, *, record_id: int, passed: bool, result_json: dict[str, Any], ) -> None: self._ensure_category_filter_columns() status = ( ExecutionStatus.CATEGORY_FILTER_PASSED if passed else ExecutionStatus.CATEGORY_FILTER_REJECTED ) reason = str(result_json.get("reason") or "").strip() error_message = None if passed else (reason or "category filter rejected") sql = """ UPDATE hot_content_records SET execution_status=%s, category_filter_passed=%s, category_filter_reason=%s, category_filter_json=%s, error_reason=%s, updated_at=NOW() WHERE id=%s """ with self.conn.cursor() as cursor: cursor.execute( sql, ( status, 1 if passed else 0, reason or None, _json_dumps(result_json), error_message, record_id, ), ) def get_record_for_category_filter(self, record_id: int) -> dict[str, Any] | None: self._ensure_category_filter_columns() sql = """ SELECT id, source, title, article_title, article_body, article_url, execution_status, category_filter_passed, category_filter_reason, category_filter_json FROM hot_content_records WHERE id = %s LIMIT 1 """ with self.conn.cursor() as cursor: cursor.execute(sql, (record_id,)) row = cursor.fetchone() if not row: return None category_filter_json = _json_loads(row.get("category_filter_json")) passed_raw = row.get("category_filter_passed") passed: bool | None if passed_raw is None: passed = None else: passed = bool(int(passed_raw)) return { "id": int(row["id"]), "source": str(row.get("source") or ""), "title": str(row.get("title") or ""), "article_title": str(row.get("article_title") or ""), "article_body": str(row.get("article_body") or ""), "article_url": str(row.get("article_url") or ""), "execution_status": int(row.get("execution_status") or 0), "category_filter_passed": passed, "category_filter_reason": str(row.get("category_filter_reason") or ""), "category_filter_json": ( category_filter_json if isinstance(category_filter_json, dict) else {} ), } def get_category_filter_status(self, record_id: int) -> dict[str, Any] | None: record = self.get_record_for_category_filter(record_id) if not record: return None matched_category = None category_filter_json = record.get("category_filter_json") or {} if isinstance(category_filter_json, dict): matched_category = category_filter_json.get("matched_category") return { "id": record["id"], "passed": record["category_filter_passed"], "reason": record["category_filter_reason"], "matched_category": matched_category, "execution_status": record["execution_status"], } def update_decode_result( self, *, record_id: int, status: int, request_json: dict[str, Any], response_json: dict[str, Any] | None, error_message: str | None = None, ) -> None: decode_request_result = { "request": request_json, "response": response_json, } sql = """ UPDATE hot_content_records SET decode_request_result=%s, execution_status=%s, error_reason=%s, updated_at=NOW() WHERE id=%s """ with self.conn.cursor() as cursor: cursor.execute( sql, ( _json_dumps(decode_request_result), status, error_message, record_id, ), ) def list_decode_result_candidates(self, *, limit: int) -> list[dict[str, Any]]: sql = """ SELECT id, unique_key FROM hot_content_records WHERE execution_status IN (%s, %s, %s) AND contribution_points_json IS NULL ORDER BY updated_at ASC, id ASC LIMIT %s """ with self.conn.cursor() as cursor: cursor.execute( sql, ( ExecutionStatus.DECODE_SUBMITTED, ExecutionStatus.DECODE_SUCCESS, ExecutionStatus.DECODE_PENDING, limit, ), ) rows = cursor.fetchall() return [ { "id": int(row["id"]), "unique_key": str(row["unique_key"]), } for row in rows ] def save_decode_result_export( self, *, record_id: int, decode_result_json: dict[str, Any], contribution_points_json: dict[str, Any], ) -> None: sql = """ UPDATE hot_content_records SET decode_result_json=%s, contribution_points_json=%s, execution_status=%s, error_reason=NULL, updated_at=NOW() WHERE id=%s """ with self.conn.cursor() as cursor: cursor.execute( sql, ( _json_dumps(decode_result_json), _json_dumps(contribution_points_json), ExecutionStatus.CONTRIBUTION_EXTRACTED, record_id, ), ) def get_demand_cache_by_hour(self, *, cache_hour: datetime) -> dict[str, Any] | None: sql = """ SELECT id, cache_hour, source_table, partition_dt, demand_name_set_json, item_count, updated_at FROM demand_pool_hourly_cache WHERE cache_hour=%s LIMIT 1 """ with self.conn.cursor() as cursor: cursor.execute(sql, (cache_hour,)) row = cursor.fetchone() if not row: return None demand_name_set = _json_loads(row.get("demand_name_set_json")) or [] if not isinstance(demand_name_set, list): demand_name_set = [] return { "id": int(row["id"]), "cache_hour": row.get("cache_hour"), "source_table": str(row["source_table"]), "partition_dt": row.get("partition_dt"), "demand_name_set": [ str(name).strip() for name in demand_name_set if str(name).strip() ], "item_count": int(row.get("item_count") or 0), "updated_at": row.get("updated_at"), } def save_demand_cache_set( self, *, cache_hour: datetime, source_table: str, partition_dt: str | None, excluded_strategy: str, top_n: int, demand_name_set: list[str], ) -> int: sql = """ INSERT INTO demand_pool_hourly_cache ( cache_hour, source_table, partition_dt, excluded_strategy, top_n, demand_name_set_json, item_count, created_at, updated_at ) VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW()) ON DUPLICATE KEY UPDATE id=LAST_INSERT_ID(id), source_table=VALUES(source_table), partition_dt=VALUES(partition_dt), excluded_strategy=VALUES(excluded_strategy), top_n=VALUES(top_n), demand_name_set_json=VALUES(demand_name_set_json), item_count=VALUES(item_count), updated_at=NOW() """ normalized_names = _normalize_demand_names(demand_name_set) with self.conn.cursor() as cursor: cursor.execute( sql, ( cache_hour, source_table, partition_dt, excluded_strategy, top_n, _json_dumps(normalized_names), len(normalized_names), ), ) return int(cursor.lastrowid) def list_postprocess_candidates(self, *, limit: int) -> list[dict[str, Any]]: sql = """ SELECT id, unique_key, source, title, created_at, article_title, article_body, demand_cache_run_id, postprocess_status, decode_result_json, contribution_points_json, contribution_demand_match_json, wxindex_trend_json, demand_event_sense_json, demand_senior_fit_json FROM hot_content_records WHERE contribution_points_json IS NOT NULL AND postprocess_status IN (%s, %s, %s, %s) ORDER BY updated_at ASC, id ASC LIMIT %s """ with self.conn.cursor() as cursor: cursor.execute( sql, ( PostprocessStatus.PENDING, PostprocessStatus.DEMAND_MATCHED, PostprocessStatus.WXINDEX_DONE, PostprocessStatus.FAILED, limit, ), ) rows = cursor.fetchall() return [ { "id": int(row["id"]), "unique_key": str(row["unique_key"]), "source": str(row.get("source") or ""), "title": str(row.get("title") or ""), "created_at": row.get("created_at"), "article_title": row.get("article_title"), "article_body": row.get("article_body"), "demand_cache_run_id": row.get("demand_cache_run_id"), "postprocess_status": int(row.get("postprocess_status") or 0), "decode_result_json": _json_loads(row.get("decode_result_json")), "contribution_points_json": _json_loads(row.get("contribution_points_json")), "contribution_demand_match_json": _json_loads( row.get("contribution_demand_match_json") ), "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")), "demand_event_sense_json": _json_loads( row.get("demand_event_sense_json") ), "demand_senior_fit_json": _json_loads(row.get("demand_senior_fit_json")), } for row in rows ] def save_contribution_demand_match( self, *, record_id: int, demand_cache_run_id: int, match_json: dict[str, Any], ) -> None: sql = """ UPDATE hot_content_records SET demand_cache_run_id=%s, contribution_demand_match_json=%s, postprocess_status=%s, postprocess_error_reason=NULL, updated_at=NOW() WHERE id=%s """ with self.conn.cursor() as cursor: cursor.execute( sql, ( demand_cache_run_id, _json_dumps(match_json), PostprocessStatus.DEMAND_MATCHED, record_id, ), ) def save_wxindex_trend( self, *, record_id: int, trend_json: dict[str, Any], ) -> None: sql = """ UPDATE hot_content_records SET wxindex_trend_json=%s, postprocess_status=%s, postprocess_error_reason=NULL, updated_at=NOW() WHERE id=%s """ with self.conn.cursor() as cursor: cursor.execute( sql, ( _json_dumps(trend_json), PostprocessStatus.WXINDEX_DONE, record_id, ), ) def save_demand_quality( self, *, record_id: int, event_sense_json: dict[str, Any], senior_fit_json: dict[str, Any], update_status: bool = True, ) -> None: self._ensure_record_quality_columns() if update_status: sql = """ UPDATE hot_content_records SET demand_event_sense_json=%s, demand_senior_fit_json=%s, postprocess_status=%s, postprocess_error_reason=NULL, updated_at=NOW() WHERE id=%s """ params = ( _json_dumps(event_sense_json), _json_dumps(senior_fit_json), PostprocessStatus.QUALITY_DONE, record_id, ) else: sql = """ UPDATE hot_content_records SET demand_event_sense_json=%s, demand_senior_fit_json=%s, updated_at=NOW() WHERE id=%s """ params = ( _json_dumps(event_sense_json), _json_dumps(senior_fit_json), record_id, ) with self.conn.cursor() as cursor: cursor.execute(sql, params) def update_postprocess_status( self, *, record_id: int, status: int, error_message: str | None = None, ) -> None: sql = """ UPDATE hot_content_records SET postprocess_status=%s, postprocess_error_reason=%s, updated_at=NOW() WHERE id=%s """ with self.conn.cursor() as cursor: cursor.execute(sql, (status, error_message, record_id)) def replace_demand_export_rows( self, *, record_id: int, source: str, hot_title: str, article_title: str, rows: list[dict[str, Any]], ) -> None: self._ensure_demand_export_table() delete_sql = "DELETE FROM hot_content_demand_exports WHERE record_id=%s" insert_sql = """ INSERT INTO hot_content_demand_exports ( record_id, source, hot_title, article_title, item_type, item_text, point_category, matched_demand, contribution_score, wxindex_keyword, all_hot_keywords, wxindex_latest_score, wxindex_trend, is_as_demand, event_sense_score, senior_fit_score, created_at, updated_at ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW()) """ with self.conn.cursor() as cursor: cursor.execute(delete_sql, (record_id,)) insert_rows = [ ( record_id, source, hot_title, article_title, str(item.get("item_type") or ""), str(item.get("item_text") or ""), str(item.get("point_category") or ""), str(item.get("matched_demand") or ""), item.get("contribution_score"), str(item.get("wxindex_keyword") or ""), str(item.get("all_hot_keywords") or ""), float(item.get("wxindex_latest_score") or 0), str(item.get("wxindex_trend") or ""), int(item.get("is_as_demand") or 0), item.get("event_sense_score"), item.get("senior_fit_score"), ) for item in rows if str(item.get("item_type") or "").strip() and str(item.get("item_text") or "").strip() ] if insert_rows: cursor.executemany(insert_sql, insert_rows) def list_odps_sync_records(self, *, partition_dt: str | None = None) -> list[dict[str, Any]]: """读取指定创建日已完成质量判断的记录,供 ODPS 同步(按 created_at,不按 updated_at)。""" self._ensure_record_quality_columns() if partition_dt: created_day_start = datetime.strptime(partition_dt, "%Y%m%d") else: created_day_start = datetime.now(SHANGHAI_TZ).replace( hour=0, minute=0, second=0, microsecond=0, tzinfo=None, ) created_day_end = created_day_start + timedelta(days=1) sql = """ SELECT id, contribution_points_json, contribution_demand_match_json, wxindex_trend_json, demand_event_sense_json, demand_senior_fit_json FROM hot_content_records WHERE created_at >= %s AND created_at < %s AND postprocess_status = %s AND contribution_demand_match_json IS NOT NULL AND TRIM(CAST(contribution_demand_match_json AS CHAR)) <> '' ORDER BY id ASC """ with self.conn.cursor() as cursor: cursor.execute( sql, (created_day_start, created_day_end, PostprocessStatus.QUALITY_DONE), ) rows = cursor.fetchall() records: list[dict[str, Any]] = [] for row in rows: records.append( { "id": int(row["id"]), "contribution_points_json": _json_loads( row.get("contribution_points_json") ), "contribution_demand_match_json": _json_loads( row.get("contribution_demand_match_json") ), "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")), "demand_event_sense_json": _json_loads( row.get("demand_event_sense_json") ), "demand_senior_fit_json": _json_loads( row.get("demand_senior_fit_json") ), } ) return records def list_demand_export_groups(self) -> list[dict[str, Any]]: """读取主表当天创建的 record 对应导出分组,仅供 ODPS 当天分区同步(不跨天)。""" self._ensure_demand_export_table() today_start = datetime.now(SHANGHAI_TZ).replace( hour=0, minute=0, second=0, microsecond=0, tzinfo=None, ) today_end = today_start + timedelta(days=1) sql = """ SELECT e.record_id, e.item_type, e.item_text, e.point_category, e.matched_demand, e.wxindex_latest_score FROM hot_content_demand_exports e INNER JOIN hot_content_records r ON r.id = e.record_id WHERE r.created_at >= %s AND r.created_at < %s ORDER BY e.record_id ASC, e.id ASC """ with self.conn.cursor() as cursor: cursor.execute(sql, (today_start, today_end)) rows = cursor.fetchall() grouped: dict[int, list[dict[str, Any]]] = {} for row in rows: record_id = int(row["record_id"]) grouped.setdefault(record_id, []).append( { "item_type": str(row.get("item_type") or ""), "item_text": str(row.get("item_text") or ""), "point_category": str(row.get("point_category") or ""), "matched_demand": str(row.get("matched_demand") or ""), "wxindex_latest_score": float(row.get("wxindex_latest_score") or 0), } ) return [ {"record_id": record_id, "export_rows": export_rows} for record_id, export_rows in grouped.items() ] def _ensure_demand_export_table(self) -> None: sql = """ CREATE TABLE IF NOT EXISTS hot_content_demand_exports ( id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, record_id BIGINT UNSIGNED NOT NULL, source VARCHAR(64) NOT NULL DEFAULT '', hot_title VARCHAR(1024) NOT NULL DEFAULT '', article_title VARCHAR(1024) NOT NULL DEFAULT '', item_type VARCHAR(32) NOT NULL COMMENT '元素/短语', item_text VARCHAR(1024) NOT NULL, point_category VARCHAR(32) NOT NULL DEFAULT '' COMMENT '点类型:灵感点/目的点/关键点', matched_demand VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '匹配到的需求', contribution_score DOUBLE NULL COMMENT '贡献分,仅元素有值', wxindex_keyword VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '获取微信指数的元素', all_hot_keywords VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '全部热点词', wxindex_latest_score DOUBLE NOT NULL DEFAULT 0, wxindex_trend VARCHAR(32) NOT NULL DEFAULT '' COMMENT '微信指数趋势', is_as_demand TINYINT NOT NULL DEFAULT 0 COMMENT '是否作为需求:0否 1是', created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, PRIMARY KEY (id), KEY idx_record_id (record_id), KEY idx_source_type (source, item_type) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci """ with self.conn.cursor() as cursor: cursor.execute(sql) self._ensure_demand_export_column( cursor, "matched_demand", """ ALTER TABLE hot_content_demand_exports ADD COLUMN matched_demand VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '匹配到的需求' AFTER item_text """, ) self._ensure_demand_export_column( cursor, "point_category", """ ALTER TABLE hot_content_demand_exports ADD COLUMN point_category VARCHAR(32) NOT NULL DEFAULT '' COMMENT '点类型:灵感点/目的点/关键点' AFTER item_text """, ) self._ensure_demand_export_column( cursor, "contribution_score", """ ALTER TABLE hot_content_demand_exports ADD COLUMN contribution_score DOUBLE NULL COMMENT '贡献分,仅词有值' AFTER matched_demand """, ) self._ensure_demand_export_column( cursor, "wxindex_trend", """ ALTER TABLE hot_content_demand_exports ADD COLUMN wxindex_trend VARCHAR(32) NOT NULL DEFAULT '' COMMENT '微信指数趋势' AFTER wxindex_latest_score """, ) self._ensure_demand_export_column( cursor, "wxindex_keyword", """ ALTER TABLE hot_content_demand_exports ADD COLUMN wxindex_keyword VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '获取微信指数的词' AFTER contribution_score """, ) self._ensure_demand_export_column( cursor, "all_hot_keywords", """ ALTER TABLE hot_content_demand_exports ADD COLUMN all_hot_keywords VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '全部热点词' AFTER wxindex_keyword """, ) self._ensure_demand_export_column( cursor, "is_as_demand", """ ALTER TABLE hot_content_demand_exports ADD COLUMN is_as_demand TINYINT NOT NULL DEFAULT 0 COMMENT '是否作为需求:0否 1是' AFTER wxindex_trend """, ) self._ensure_demand_export_column( cursor, "event_sense_score", """ ALTER TABLE hot_content_demand_exports ADD COLUMN event_sense_score DOUBLE NULL COMMENT '事件性得分 0-10' AFTER is_as_demand """, ) self._ensure_demand_export_column( cursor, "senior_fit_score", """ ALTER TABLE hot_content_demand_exports ADD COLUMN senior_fit_score DOUBLE NULL COMMENT '老年性得分 0-10' AFTER event_sense_score """, ) def _ensure_record_quality_columns(self) -> None: with self.conn.cursor() as cursor: for column_name, alter_sql in ( ( "demand_event_sense_json", """ ALTER TABLE hot_content_records ADD COLUMN demand_event_sense_json JSON NULL COMMENT '需求事件性 LLM 评分结果' AFTER wxindex_trend_json """, ), ( "demand_senior_fit_json", """ ALTER TABLE hot_content_records ADD COLUMN demand_senior_fit_json JSON NULL COMMENT '需求老年性 LLM 评分结果' AFTER demand_event_sense_json """, ), ): cursor.execute( """ SELECT COUNT(*) AS cnt FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'hot_content_records' AND COLUMN_NAME = %s """, (column_name,), ) if int((cursor.fetchone() or {}).get("cnt") or 0) == 0: cursor.execute(alter_sql) def _ensure_category_filter_columns(self) -> None: with self.conn.cursor() as cursor: for column_name, alter_sql in ( ( "category_filter_json", """ ALTER TABLE hot_content_records ADD COLUMN category_filter_json JSON NULL COMMENT '老年人兴趣分类筛选 LLM 结果' AFTER article_url """, ), ( "category_filter_passed", """ ALTER TABLE hot_content_records ADD COLUMN category_filter_passed TINYINT NULL COMMENT '分类筛选是否通过:1通过 0不通过 NULL未筛选' AFTER category_filter_json """, ), ( "category_filter_reason", """ ALTER TABLE hot_content_records ADD COLUMN category_filter_reason TEXT NULL COMMENT '分类筛选原因' AFTER category_filter_passed """, ), ): cursor.execute( """ SELECT COUNT(*) AS cnt FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'hot_content_records' AND COLUMN_NAME = %s """, (column_name,), ) if int((cursor.fetchone() or {}).get("cnt") or 0) == 0: cursor.execute(alter_sql) def _ensure_demand_export_column( self, cursor: Any, column_name: str, alter_sql: str, ) -> None: cursor.execute( """ SELECT COUNT(*) AS cnt FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'hot_content_demand_exports' AND COLUMN_NAME = %s """, (column_name,), ) if int((cursor.fetchone() or {}).get("cnt") or 0) == 0: cursor.execute(alter_sql) def list_synced_odps_demand_ids( self, *, partition_dt: str, ) -> set[str]: """返回 hot_content_odps_sync_log 中指定分区日已有的 demand_id(跨流程去重用)。""" normalized_partition_dt = str(partition_dt or "").strip() if not normalized_partition_dt: return set() self._ensure_odps_sync_log_table() sql = """ SELECT demand_id FROM hot_content_odps_sync_log WHERE partition_dt = %s """ with self.conn.cursor() as cursor: cursor.execute(sql, (normalized_partition_dt,)) rows = cursor.fetchall() return { str(row.get("demand_id") or "").strip() for row in rows if str(row.get("demand_id") or "").strip() } def count_odps_sync_log_rows(self, *, partition_dt: str) -> int: """统计 hot_content_odps_sync_log 指定分区日已写入条数(供日限额计算)。""" normalized_partition_dt = str(partition_dt or "").strip() if not normalized_partition_dt: return 0 self._ensure_odps_sync_log_table() sql = """ SELECT COUNT(*) AS cnt FROM hot_content_odps_sync_log WHERE partition_dt = %s """ with self.conn.cursor() as cursor: cursor.execute(sql, (normalized_partition_dt,)) row = cursor.fetchone() or {} return int(row.get("cnt") or 0) def save_odps_sync_logs(self, rows: list[dict[str, Any]]) -> int: if not rows: return 0 self._ensure_odps_sync_log_table() sql = """ INSERT INTO hot_content_odps_sync_log ( partition_dt, strategy, demand_id, demand_name, demand_type, record_id, weight, extend ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE demand_name = VALUES(demand_name), demand_type = VALUES(demand_type), record_id = VALUES(record_id), weight = VALUES(weight), extend = VALUES(extend), synced_at = CURRENT_TIMESTAMP """ insert_rows = [ ( str(item.get("partition_dt") or ""), str(item.get("strategy") or ""), str(item.get("demand_id") or ""), str(item.get("demand_name") or ""), str(item.get("demand_type") or ""), int(item.get("record_id") or 0), float(item["weight"]) if item.get("weight") is not None else None, str(item.get("extend") or "{}"), ) for item in rows if str(item.get("demand_id") or "").strip() ] with self.conn.cursor() as cursor: cursor.executemany(sql, insert_rows) return len(insert_rows) def _ensure_odps_sync_log_weight_column(self, cursor: Any) -> None: cursor.execute( """ SELECT COUNT(*) AS cnt FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'hot_content_odps_sync_log' AND COLUMN_NAME = 'weight' """, ) if int((cursor.fetchone() or {}).get("cnt") or 0) == 0: cursor.execute( """ ALTER TABLE hot_content_odps_sync_log ADD COLUMN weight DOUBLE NULL DEFAULT NULL COMMENT 'ODPS 需求权重(记录 wxindex 最高分 / 1000000)' AFTER record_id """ ) def _ensure_odps_sync_log_extend_column(self, cursor: Any) -> None: cursor.execute( """ SELECT COUNT(*) AS cnt FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'hot_content_odps_sync_log' AND COLUMN_NAME = 'extend' """, ) if int((cursor.fetchone() or {}).get("cnt") or 0) == 0: cursor.execute( """ ALTER TABLE hot_content_odps_sync_log ADD COLUMN extend TEXT NULL DEFAULT NULL COMMENT 'ODPS extend 扩展字段 JSON' AFTER weight """ ) def list_wxindex_word_scores(self, name: str) -> list[dict[str, Any]]: word = str(name or "").strip() if not word: return [] self._ensure_wxindex_words_table() sql = """ SELECT dt, total_score FROM hot_content_wxindex_words WHERE name = %s ORDER BY dt ASC """ with self.conn.cursor() as cursor: cursor.execute(sql, (word,)) rows = cursor.fetchall() scores: list[dict[str, Any]] = [] for row in rows: dt = str(row.get("dt") or "").strip() if not dt: continue try: total_score = float(row["total_score"]) except (TypeError, ValueError, KeyError): continue scores.append({"ymd": dt, "total_score": total_score}) return scores def list_wxindex_word_scores_in_range( self, name: str, *, start_ymd: str, end_ymd: str, ) -> list[dict[str, Any]]: word = str(name or "").strip() start = str(start_ymd or "").strip() end = str(end_ymd or "").strip() if not word or not start or not end: return [] self._ensure_wxindex_words_table() sql = """ SELECT dt, total_score FROM hot_content_wxindex_words WHERE name = %s AND dt >= %s AND dt <= %s ORDER BY dt ASC """ with self.conn.cursor() as cursor: cursor.execute(sql, (word, start, end)) rows = cursor.fetchall() scores: list[dict[str, Any]] = [] for row in rows: dt = str(row.get("dt") or "").strip() if not dt: continue try: total_score = float(row["total_score"]) except (TypeError, ValueError, KeyError): continue scores.append({"ymd": dt, "total_score": total_score}) return scores def list_active_wxindex_word_meta( self, *, today: date | None = None, ) -> list[dict[str, Any]]: """返回当天仍在抓取窗口内的 meta(today < fetch_end_ymd)。""" current = today or datetime.now(SHANGHAI_TZ).date() today_ymd = current.strftime("%Y%m%d") self._ensure_wxindex_word_meta_table() sql = """ SELECT id, name, event_created_at, fetch_start_ymd, fetch_end_ymd FROM hot_content_wxindex_word_meta WHERE fetch_end_ymd > %s ORDER BY id ASC """ with self.conn.cursor() as cursor: cursor.execute(sql, (today_ymd,)) rows = cursor.fetchall() result: list[dict[str, Any]] = [] for row in rows: meta = self._normalize_wxindex_word_meta_row(row) if meta is not None: result.append(meta) return result def save_wxindex_word_record(self, payload: dict[str, Any]) -> int: row = _wxindex_word_record_row(payload) name = str(row[0]) analyze_ymd = str(row[2]) self._ensure_wxindex_word_records_table() with self.conn.cursor() as cursor: cursor.execute(_WXINDEX_WORD_RECORD_UPSERT_SQL, row) cursor.execute( """ SELECT id FROM hot_content_wxindex_word_records WHERE name = %s AND analyze_ymd = %s """, (name, analyze_ymd), ) record_row = cursor.fetchone() or {} return int(record_row.get("id") or 0) def init_wxindex_word_records( self, payloads: list[dict[str, Any]], ) -> dict[str, int]: """批量 init 追溯记录:一次 executemany,重复跑只更新 meta/抓取窗口。""" if not payloads: return {} rows: list[tuple[Any, ...]] = [] analyze_ymd = "" names: list[str] = [] for payload in payloads: row = _wxindex_word_record_row(payload) rows.append(row) analyze_ymd = str(row[2]) names.append(str(row[0])) self._ensure_wxindex_word_records_table() with self.conn.cursor() as cursor: cursor.executemany(_WXINDEX_WORD_RECORD_INIT_UPSERT_SQL, rows) return self.map_wxindex_word_record_ids(analyze_ymd=analyze_ymd, names=names) def map_wxindex_word_record_ids( self, *, analyze_ymd: str, names: list[str], ) -> dict[str, int]: normalized_analyze_ymd = str(analyze_ymd or "").strip() normalized_names = [ str(name or "").strip() for name in names if str(name or "").strip() ] if not normalized_analyze_ymd or not normalized_names: return {} placeholders = ", ".join(["%s"] * len(normalized_names)) sql = f""" SELECT id, name FROM hot_content_wxindex_word_records WHERE analyze_ymd = %s AND name IN ({placeholders}) """ with self.conn.cursor() as cursor: cursor.execute(sql, [normalized_analyze_ymd, *normalized_names]) rows = cursor.fetchall() return { str(row.get("name") or "").strip(): int(row.get("id") or 0) for row in rows if str(row.get("name") or "").strip() } def list_wxindex_word_records_by_analyze_ymd( self, *, analyze_ymd: str, names: list[str] | None = None, ) -> dict[str, dict[str, Any]]: """按 analyze_ymd 批量读取追溯记录,供同日重跑跳过已完成阶段。""" normalized_analyze_ymd = str(analyze_ymd or "").strip() if not normalized_analyze_ymd: return {} normalized_names = [ str(name or "").strip() for name in (names or []) if str(name or "").strip() ] self._ensure_wxindex_word_records_table() params: list[Any] = [normalized_analyze_ymd] name_filter = "" if normalized_names: placeholders = ", ".join(["%s"] * len(normalized_names)) name_filter = f" AND name IN ({placeholders})" params.extend(normalized_names) sql = f""" SELECT id, name, meta_id, analyze_ymd, fetch_start_ymd, fetch_end_ymd, data_start_ymd, data_end_ymd, data_days, is_sustained_high, is_rising, is_spike, retain_reason, is_internal_demand_matched, matched_demand, demand_cache_run_id, internal_demand_match_json, senior_fit_score, demand_senior_fit_json, is_final_retained, min_score, max_score, avg_score, detail_json FROM hot_content_wxindex_word_records WHERE analyze_ymd = %s{name_filter} """ with self.conn.cursor() as cursor: cursor.execute(sql, params) rows = cursor.fetchall() result: dict[str, dict[str, Any]] = {} for row in rows or []: name = str(row.get("name") or "").strip() if not name: continue record = dict(row) record["internal_demand_match_json"] = _json_loads( record.get("internal_demand_match_json") ) record["demand_senior_fit_json"] = _json_loads( record.get("demand_senior_fit_json") ) record["detail_json"] = _json_loads(record.get("detail_json")) result[name] = record return result def list_wxindex_word_stats_names( self, *, analyze_ymd: str, names: list[str] | None = None, ) -> set[str]: """返回指定 analyze_ymd 下已写入 stats 表的词名集合。""" normalized_analyze_ymd = str(analyze_ymd or "").strip() if not normalized_analyze_ymd: return set() normalized_names = [ str(name or "").strip() for name in (names or []) if str(name or "").strip() ] self._ensure_wxindex_word_stats_table() params: list[Any] = [normalized_analyze_ymd] name_filter = "" if normalized_names: placeholders = ", ".join(["%s"] * len(normalized_names)) name_filter = f" AND name IN ({placeholders})" params.extend(normalized_names) sql = f""" SELECT name FROM hot_content_wxindex_word_stats WHERE analyze_ymd = %s{name_filter} """ with self.conn.cursor() as cursor: cursor.execute(sql, params) rows = cursor.fetchall() return { str(row.get("name") or "").strip() for row in rows or [] if str(row.get("name") or "").strip() } def save_wxindex_word_stats_batch( self, payloads: list[dict[str, Any]], ) -> int: """批量写入通过热度+老年性筛选的词统计。""" if not payloads: return 0 rows = [_wxindex_word_stats_row(payload) for payload in payloads] self._ensure_wxindex_word_stats_table() with self.conn.cursor() as cursor: cursor.executemany(_WXINDEX_WORD_STATS_UPSERT_SQL, rows) return len(rows) def _ensure_wxindex_word_stats_table(self) -> None: sql = """ CREATE TABLE IF NOT EXISTS hot_content_wxindex_word_stats ( id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键', name VARCHAR(256) NOT NULL COMMENT '词', meta_id BIGINT UNSIGNED NULL COMMENT '关联 meta.id', analyze_ymd VARCHAR(8) NOT NULL COMMENT '分析日期 yyyymmdd', wxindex_word_record_id BIGINT UNSIGNED NULL COMMENT '关联 records.id', retain_reason VARCHAR(64) NULL COMMENT '热度保留原因', senior_fit_score DOUBLE NULL COMMENT '老年性得分 0-10', data_start_ymd VARCHAR(8) NULL COMMENT '分析数据起始日', data_end_ymd VARCHAR(8) NULL COMMENT '分析数据结束日', data_days INT NULL COMMENT '分析天数', min_score DOUBLE NULL COMMENT '区间最低热度', max_score DOUBLE NULL COMMENT '区间最高热度', avg_score DOUBLE NULL COMMENT '区间平均热度', detail_json JSON NULL COMMENT '扩展详情', created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (id), UNIQUE KEY uk_name_analyze_ymd (name, analyze_ymd), KEY idx_analyze_ymd (analyze_ymd), KEY idx_retain_reason (retain_reason) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci """ with self.conn.cursor() as cursor: cursor.execute(sql) def _ensure_wxindex_word_records_table(self) -> None: sql = """ CREATE TABLE IF NOT EXISTS hot_content_wxindex_word_records ( id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键', name VARCHAR(256) NOT NULL COMMENT '词', meta_id BIGINT UNSIGNED NULL COMMENT '关联 meta.id', analyze_ymd VARCHAR(8) NOT NULL COMMENT '分析日期 yyyymmdd', fetch_start_ymd VARCHAR(8) NULL COMMENT 'meta 抓取起始日', fetch_end_ymd VARCHAR(8) NULL COMMENT 'meta 抓取结束日', data_start_ymd VARCHAR(8) NULL COMMENT '实际分析数据起始日', data_end_ymd VARCHAR(8) NULL COMMENT '实际分析数据结束日', data_days INT NULL COMMENT '实际分析天数', is_sustained_high TINYINT(1) NULL COMMENT '持续热度>1000万', is_rising TINYINT(1) NULL COMMENT '热度持续上涨', is_spike TINYINT(1) NULL COMMENT '最近3天突然暴涨', retain_reason VARCHAR(64) NULL COMMENT '保留原因(按2->3->1优先级)', is_internal_demand_matched TINYINT(1) NULL COMMENT '是否匹配票圈内部需求', matched_demand VARCHAR(1024) NULL COMMENT '匹配到的内部需求', demand_cache_run_id BIGINT UNSIGNED NULL COMMENT '需求池缓存ID', internal_demand_match_json JSON NULL COMMENT '内部需求匹配详情', senior_fit_score DOUBLE NULL COMMENT '老年性得分 0-10', demand_senior_fit_json JSON NULL COMMENT '老年性 LLM 评分结果', is_final_retained TINYINT(1) NULL COMMENT '老年性达标且最终保留', min_score DOUBLE NULL COMMENT '区间最低热度', max_score DOUBLE NULL COMMENT '区间最高热度', avg_score DOUBLE NULL COMMENT '区间平均热度', detail_json JSON NULL COMMENT '分析详情', created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', PRIMARY KEY (id), UNIQUE KEY uk_name_analyze_ymd (name, analyze_ymd), KEY idx_analyze_ymd (analyze_ymd), KEY idx_patterns (is_sustained_high, is_rising, is_spike), KEY idx_retain_reason (retain_reason) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci """ with self.conn.cursor() as cursor: cursor.execute(sql) self._ensure_wxindex_word_records_retain_columns(cursor) self._ensure_wxindex_word_records_senior_columns(cursor) self._ensure_wxindex_word_records_nullable_columns(cursor) def _ensure_wxindex_word_records_nullable_columns(self, cursor: Any) -> None: nullable_specs = { "fetch_start_ymd": ( "MODIFY COLUMN fetch_start_ymd VARCHAR(8) NULL " "COMMENT 'meta 抓取起始日'" ), "fetch_end_ymd": ( "MODIFY COLUMN fetch_end_ymd VARCHAR(8) NULL " "COMMENT 'meta 抓取结束日'" ), "data_start_ymd": ( "MODIFY COLUMN data_start_ymd VARCHAR(8) NULL " "COMMENT '实际分析数据起始日'" ), "data_end_ymd": ( "MODIFY COLUMN data_end_ymd VARCHAR(8) NULL " "COMMENT '实际分析数据结束日'" ), "data_days": ( "MODIFY COLUMN data_days INT NULL COMMENT '实际分析天数'" ), "is_sustained_high": ( "MODIFY COLUMN is_sustained_high TINYINT(1) NULL " "COMMENT '持续热度>1000万'" ), "is_rising": ( "MODIFY COLUMN is_rising TINYINT(1) NULL " "COMMENT '热度持续上涨'" ), "is_spike": ( "MODIFY COLUMN is_spike TINYINT(1) NULL " "COMMENT '最近3天突然暴涨'" ), "matched_demand": ( "MODIFY COLUMN matched_demand VARCHAR(1024) NULL " "COMMENT '匹配到的内部需求'" ), "is_final_retained": ( "MODIFY COLUMN is_final_retained TINYINT(1) NULL " "COMMENT '老年性达标且最终保留'" ), } column_names = list(nullable_specs.keys()) placeholders = ", ".join(["%s"] * len(column_names)) cursor.execute( f""" SELECT COLUMN_NAME, IS_NULLABLE FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'hot_content_wxindex_word_records' AND COLUMN_NAME IN ({placeholders}) """, column_names, ) nullable_map = { str(row.get("COLUMN_NAME") or ""): str(row.get("IS_NULLABLE") or "").upper() for row in (cursor.fetchall() or []) } alters = [ sql for column_name, sql in nullable_specs.items() if nullable_map.get(column_name) == "NO" ] if alters: cursor.execute( "ALTER TABLE hot_content_wxindex_word_records " + ", ".join(alters) ) def _ensure_wxindex_word_records_senior_columns(self, cursor: Any) -> None: columns = { "senior_fit_score": ( "ALTER TABLE hot_content_wxindex_word_records " "ADD COLUMN senior_fit_score DOUBLE NULL " "COMMENT '老年性得分 0-10' " "AFTER internal_demand_match_json" ), "demand_senior_fit_json": ( "ALTER TABLE hot_content_wxindex_word_records " "ADD COLUMN demand_senior_fit_json JSON NULL " "COMMENT '老年性 LLM 评分结果' " "AFTER senior_fit_score" ), "is_final_retained": ( "ALTER TABLE hot_content_wxindex_word_records " "ADD COLUMN is_final_retained TINYINT(1) NULL " "COMMENT '老年性达标且最终保留' " "AFTER demand_senior_fit_json" ), } for column_name, alter_sql in columns.items(): cursor.execute( """ SELECT COUNT(*) AS cnt FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'hot_content_wxindex_word_records' AND COLUMN_NAME = %s """, (column_name,), ) if int((cursor.fetchone() or {}).get("cnt") or 0) == 0: cursor.execute(alter_sql) def _ensure_wxindex_word_records_retain_columns(self, cursor: Any) -> None: columns = { "retain_reason": ( "ALTER TABLE hot_content_wxindex_word_records " "ADD COLUMN retain_reason VARCHAR(64) NULL " "COMMENT '保留原因(按2->3->1优先级)' " "AFTER is_spike" ), "is_internal_demand_matched": ( "ALTER TABLE hot_content_wxindex_word_records " "ADD COLUMN is_internal_demand_matched TINYINT(1) NULL " "COMMENT '是否匹配票圈内部需求' " "AFTER retain_reason" ), "matched_demand": ( "ALTER TABLE hot_content_wxindex_word_records " "ADD COLUMN matched_demand VARCHAR(1024) NULL " "COMMENT '匹配到的内部需求' " "AFTER is_internal_demand_matched" ), "demand_cache_run_id": ( "ALTER TABLE hot_content_wxindex_word_records " "ADD COLUMN demand_cache_run_id BIGINT UNSIGNED NULL " "COMMENT '需求池缓存ID' " "AFTER matched_demand" ), "internal_demand_match_json": ( "ALTER TABLE hot_content_wxindex_word_records " "ADD COLUMN internal_demand_match_json JSON NULL " "COMMENT '内部需求匹配详情' " "AFTER demand_cache_run_id" ), } for column_name, alter_sql in columns.items(): cursor.execute( """ SELECT COUNT(*) AS cnt FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'hot_content_wxindex_word_records' AND COLUMN_NAME = %s """, (column_name,), ) if int((cursor.fetchone() or {}).get("cnt") or 0) == 0: cursor.execute(alter_sql) def list_stale_wxindex_words( self, *, end_ymd: str, update_window_days: int = 7, today: date | None = None, ) -> list[dict[str, Any]]: """返回更新窗口内、仍缺近 7 日区间数据的词。""" target_end = str(end_ymd or "").strip() if not target_end: return [] current = today or datetime.now(SHANGHAI_TZ).date() active_since = current - timedelta(days=max(update_window_days, 0)) self._ensure_wxindex_word_meta_table() self._ensure_wxindex_words_table() sql = """ SELECT m.name, m.event_created_at, m.fetch_start_ymd, MIN(w.dt) AS earliest_dt, MAX(w.dt) AS latest_dt FROM hot_content_wxindex_word_meta m INNER JOIN hot_content_wxindex_words w ON w.name = m.name WHERE DATE(m.event_created_at) >= %s GROUP BY m.name, m.event_created_at, m.fetch_start_ymd HAVING MAX(w.dt) < %s OR MIN(w.dt) > m.fetch_start_ymd ORDER BY m.name ASC """ with self.conn.cursor() as cursor: cursor.execute(sql, (active_since, target_end)) rows = cursor.fetchall() stale_words: list[dict[str, Any]] = [] for row in rows: name = str(row.get("name") or "").strip() fetch_start_ymd = str(row.get("fetch_start_ymd") or "").strip() earliest_dt = str(row.get("earliest_dt") or "").strip() latest_dt = str(row.get("latest_dt") or "").strip() event_created_at = row.get("event_created_at") if name and fetch_start_ymd and earliest_dt and latest_dt and event_created_at: stale_words.append( { "name": name, "event_created_at": event_created_at, "fetch_start_ymd": fetch_start_ymd, "earliest_dt": earliest_dt, "latest_dt": latest_dt, } ) return stale_words def list_word_earliest_event_times( self, *, since_dt: datetime, ) -> dict[str, datetime]: """从 wxindex_trend_json 汇总近期间每个检索词的最早事件时间。""" self._ensure_record_quality_columns() sql = """ SELECT word_name, MIN(event_created_at) AS event_created_at FROM ( SELECT TRIM(searches.keyword) AS word_name, r.created_at AS event_created_at FROM hot_content_records r JOIN JSON_TABLE( r.wxindex_trend_json, '$.wxindex_searches[*]' COLUMNS ( keyword VARCHAR(256) PATH '$.keyword' ) ) AS searches WHERE r.created_at >= %s AND r.wxindex_trend_json IS NOT NULL AND TRIM(searches.keyword) <> '' UNION ALL SELECT TRIM(JSON_UNQUOTE(JSON_EXTRACT(r.wxindex_trend_json, '$.llm_selected_word'))) AS word_name, r.created_at AS event_created_at FROM hot_content_records r WHERE r.created_at >= %s AND r.wxindex_trend_json IS NOT NULL AND TRIM(JSON_UNQUOTE(JSON_EXTRACT(r.wxindex_trend_json, '$.llm_selected_word'))) <> '' ) AS word_events WHERE word_name IS NOT NULL AND word_name <> '' GROUP BY word_name """ with self.conn.cursor() as cursor: cursor.execute(sql, (since_dt, since_dt)) rows = cursor.fetchall() event_map: dict[str, datetime] = {} for row in rows: name = str(row.get("word_name") or "").strip() event_created_at = row.get("event_created_at") if name and isinstance(event_created_at, datetime): event_map[name] = event_created_at return event_map def list_wxindex_word_bounds_without_meta(self) -> list[dict[str, Any]]: self._ensure_wxindex_word_meta_table() self._ensure_wxindex_words_table() sql = """ SELECT w.name, MIN(w.dt) AS earliest_dt, MIN(w.created_at) AS first_created_at FROM hot_content_wxindex_words w LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name WHERE m.name IS NULL GROUP BY w.name ORDER BY w.name ASC """ with self.conn.cursor() as cursor: cursor.execute(sql) rows = cursor.fetchall() bounds: list[dict[str, Any]] = [] for row in rows: name = str(row.get("name") or "").strip() earliest_dt = str(row.get("earliest_dt") or "").strip() first_created_at = row.get("first_created_at") if name and earliest_dt: bounds.append( { "name": name, "earliest_dt": earliest_dt, "first_created_at": first_created_at, } ) return bounds def list_wxindex_word_names_without_meta(self) -> list[str]: self._ensure_wxindex_word_meta_table() self._ensure_wxindex_words_table() sql = """ SELECT DISTINCT w.name FROM hot_content_wxindex_words w LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name WHERE m.name IS NULL ORDER BY w.name ASC """ with self.conn.cursor() as cursor: cursor.execute(sql) rows = cursor.fetchall() return [ str(row.get("name") or "").strip() for row in rows if str(row.get("name") or "").strip() ] def get_wxindex_word_first_row_created_at(self, name: str) -> datetime | None: word = str(name or "").strip() if not word: return None self._ensure_wxindex_words_table() sql = """ SELECT MIN(created_at) AS first_created_at FROM hot_content_wxindex_words WHERE name = %s """ with self.conn.cursor() as cursor: cursor.execute(sql, (word,)) row = cursor.fetchone() or {} first_created_at = row.get("first_created_at") return first_created_at if isinstance(first_created_at, datetime) else None def list_all_wxindex_word_meta(self) -> list[dict[str, Any]]: self._ensure_wxindex_word_meta_table() sql = """ SELECT id, name, event_created_at, fetch_start_ymd, fetch_end_ymd FROM hot_content_wxindex_word_meta ORDER BY id ASC """ with self.conn.cursor() as cursor: cursor.execute(sql) rows = cursor.fetchall() result: list[dict[str, Any]] = [] for row in rows: meta = self._normalize_wxindex_word_meta_row(row) if meta is not None: result.append(meta) return result def update_wxindex_word_meta_fetch_start( self, *, name: str, fetch_start_ymd: str, ) -> None: word = str(name or "").strip() target_start = str(fetch_start_ymd or "").strip() if not word or not target_start: raise HotContentFlowError("invalid wxindex word meta fetch_start_ymd payload") self._ensure_wxindex_word_meta_table() sql = """ UPDATE hot_content_wxindex_word_meta SET fetch_start_ymd = %s WHERE name = %s """ with self.conn.cursor() as cursor: cursor.execute(sql, (target_start, word)) def update_wxindex_word_meta( self, *, name: str, event_created_at: datetime, fetch_start_ymd: str, fetch_end_ymd: str, ) -> None: word = str(name or "").strip() target_start = str(fetch_start_ymd or "").strip() target_end = str(fetch_end_ymd or "").strip() if not word or not target_start or not target_end: raise HotContentFlowError("invalid wxindex word meta payload") self._ensure_wxindex_word_meta_table() event_at = event_created_at if event_at.tzinfo is not None: event_at = event_at.astimezone(SHANGHAI_TZ).replace(tzinfo=None) sql = """ UPDATE hot_content_wxindex_word_meta SET event_created_at = %s, fetch_start_ymd = %s, fetch_end_ymd = %s WHERE name = %s """ with self.conn.cursor() as cursor: cursor.execute(sql, (event_at, target_start, target_end, word)) def get_wxindex_word_meta(self, name: str) -> dict[str, Any] | None: word = str(name or "").strip() if not word: return None self._ensure_wxindex_word_meta_table() sql = """ SELECT id, name, event_created_at, fetch_start_ymd, fetch_end_ymd FROM hot_content_wxindex_word_meta WHERE name = %s """ with self.conn.cursor() as cursor: cursor.execute(sql, (word,)) row = cursor.fetchone() if not row: return None return self._normalize_wxindex_word_meta_row(row) def ensure_wxindex_word_meta( self, *, name: str, event_created_at: datetime, fetch_start_ymd: str, fetch_end_ymd: str, ) -> dict[str, Any]: word = str(name or "").strip() target_start = str(fetch_start_ymd or "").strip() target_end = str(fetch_end_ymd or "").strip() if not word or not target_start or not target_end: raise HotContentFlowError("invalid wxindex word meta payload") self._ensure_wxindex_word_meta_table() event_at = event_created_at if event_at.tzinfo is not None: event_at = event_at.astimezone(SHANGHAI_TZ).replace(tzinfo=None) sql = """ INSERT INTO hot_content_wxindex_word_meta ( name, event_created_at, fetch_start_ymd, fetch_end_ymd ) VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE event_created_at = VALUES(event_created_at), fetch_start_ymd = VALUES(fetch_start_ymd), fetch_end_ymd = VALUES(fetch_end_ymd) """ with self.conn.cursor() as cursor: cursor.execute(sql, (word, event_at, target_start, target_end)) meta = self.get_wxindex_word_meta(word) if meta is None: raise HotContentFlowError(f"failed to persist wxindex word meta: {word}") return meta def delete_wxindex_word_meta_by_names(self, names: list[str]) -> int: words = [str(name or "").strip() for name in names] words = [name for name in words if name] if not words: return 0 self._ensure_wxindex_word_meta_table() placeholders = ", ".join(["%s"] * len(words)) sql = f"DELETE FROM hot_content_wxindex_word_meta WHERE name IN ({placeholders})" with self.conn.cursor() as cursor: cursor.execute(sql, tuple(words)) return int(cursor.rowcount or 0) def list_low_max_wxindex_words( self, *, min_max_score: float, ) -> list[dict[str, Any]]: """按 name 聚合,返回最大值低于阈值的词。""" self._ensure_wxindex_words_table() sql = """ SELECT name, MAX(total_score) AS max_score, COUNT(*) AS row_count FROM hot_content_wxindex_words GROUP BY name HAVING MAX(total_score) < %s ORDER BY name ASC """ with self.conn.cursor() as cursor: cursor.execute(sql, (min_max_score,)) rows = cursor.fetchall() low_words: list[dict[str, Any]] = [] for row in rows: name = str(row.get("name") or "").strip() if not name: continue try: max_score = float(row["max_score"]) row_count = int(row["row_count"]) except (TypeError, ValueError, KeyError): continue low_words.append( { "name": name, "max_score": max_score, "row_count": row_count, } ) return low_words def count_wxindex_words_outside_event_window( self, *, window_days: int = 7, ) -> int: self._ensure_wxindex_word_meta_table() self._ensure_wxindex_words_table() sql = """ SELECT COUNT(*) AS row_count FROM hot_content_wxindex_words w INNER JOIN hot_content_wxindex_word_meta m ON m.name = w.name WHERE w.dt < m.fetch_start_ymd OR w.dt > m.fetch_end_ymd """ with self.conn.cursor() as cursor: cursor.execute(sql) row = cursor.fetchone() or {} return int(row.get("row_count") or 0) def list_wxindex_words_outside_event_window_samples( self, *, window_days: int = 7, limit: int = 20, ) -> list[dict[str, Any]]: self._ensure_wxindex_word_meta_table() self._ensure_wxindex_words_table() sql = """ SELECT w.name, w.dt, m.event_created_at, m.fetch_start_ymd AS start_ymd, m.fetch_end_ymd AS end_ymd FROM hot_content_wxindex_words w INNER JOIN hot_content_wxindex_word_meta m ON m.name = w.name WHERE w.dt < m.fetch_start_ymd OR w.dt > m.fetch_end_ymd ORDER BY w.name ASC, w.dt ASC LIMIT %s """ with self.conn.cursor() as cursor: cursor.execute(sql, (limit,)) rows = cursor.fetchall() samples: list[dict[str, Any]] = [] for row in rows: name = str(row.get("name") or "").strip() dt = str(row.get("dt") or "").strip() if name and dt: samples.append( { "name": name, "dt": dt, "event_created_at": row.get("event_created_at"), "start_ymd": str(row.get("start_ymd") or "").strip(), "end_ymd": str(row.get("end_ymd") or "").strip(), } ) return samples def delete_wxindex_words_outside_event_window( self, *, window_days: int = 7, ) -> int: self._ensure_wxindex_word_meta_table() self._ensure_wxindex_words_table() sql = """ DELETE w FROM hot_content_wxindex_words w INNER JOIN hot_content_wxindex_word_meta m ON m.name = w.name WHERE w.dt < m.fetch_start_ymd OR w.dt > m.fetch_end_ymd """ with self.conn.cursor() as cursor: cursor.execute(sql) return int(cursor.rowcount or 0) def delete_wxindex_words_without_meta(self) -> int: self._ensure_wxindex_word_meta_table() self._ensure_wxindex_words_table() sql = """ DELETE w FROM hot_content_wxindex_words w LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name WHERE m.name IS NULL """ with self.conn.cursor() as cursor: cursor.execute(sql) return int(cursor.rowcount or 0) def count_wxindex_words_without_meta(self) -> int: self._ensure_wxindex_word_meta_table() self._ensure_wxindex_words_table() sql = """ SELECT COUNT(*) AS row_count FROM hot_content_wxindex_words w LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name WHERE m.name IS NULL """ with self.conn.cursor() as cursor: cursor.execute(sql) row = cursor.fetchone() or {} return int(row.get("row_count") or 0) def delete_wxindex_words_by_names(self, names: list[str]) -> int: cleaned = [str(name or "").strip() for name in names if str(name or "").strip()] if not cleaned: return 0 self._ensure_wxindex_words_table() placeholders = ", ".join(["%s"] * len(cleaned)) sql = f""" DELETE FROM hot_content_wxindex_words WHERE name IN ({placeholders}) """ with self.conn.cursor() as cursor: cursor.execute(sql, tuple(cleaned)) return int(cursor.rowcount or 0) def list_wxindex_word_names_with_dt( self, names: list[str], *, dt: str, ) -> set[str]: """返回在 hot_content_wxindex_words 中存在指定日期数据的词名集合。""" target_dt = str(dt or "").strip() normalized_names = [ str(name or "").strip() for name in names if str(name or "").strip() ] if not target_dt or not normalized_names: return set() self._ensure_wxindex_words_table() placeholders = ", ".join(["%s"] * len(normalized_names)) sql = f""" SELECT DISTINCT name FROM hot_content_wxindex_words WHERE dt = %s AND name IN ({placeholders}) """ with self.conn.cursor() as cursor: cursor.execute(sql, [target_dt, *normalized_names]) rows = cursor.fetchall() return { str(row.get("name") or "").strip() for row in rows or [] if str(row.get("name") or "").strip() } def has_wxindex_word(self, name: str) -> bool: return self.get_wxindex_word_latest_dt(name) is not None def get_wxindex_word_latest_dt(self, name: str) -> str | None: word = str(name or "").strip() if not word: return None self._ensure_wxindex_words_table() sql = """ SELECT MAX(dt) AS latest_dt FROM hot_content_wxindex_words WHERE name = %s """ with self.conn.cursor() as cursor: cursor.execute(sql, (word,)) row = cursor.fetchone() or {} latest_dt = str(row.get("latest_dt") or "").strip() return latest_dt or None def save_wxindex_daily_scores( self, *, name: str, scores: list[dict[str, Any]], ) -> tuple[int, int]: """按词+日期写入每日指数,重复行跳过。返回 (inserted, skipped)。""" word = str(name or "").strip() if not word or not scores: return 0, 0 self._ensure_wxindex_words_table() sql = """ INSERT IGNORE INTO hot_content_wxindex_words ( name, dt, total_score ) VALUES (%s, %s, %s) """ rows: list[tuple[str, str, float]] = [] seen: set[tuple[str, str]] = set() for item in scores: if not isinstance(item, dict): continue dt = str(item.get("ymd") or item.get("dt") or "").strip() if not dt: continue try: total_score = float(item["total_score"]) except (TypeError, ValueError, KeyError): continue key = (word, dt) if key in seen: continue seen.add(key) rows.append((word, dt, total_score)) if not rows: return 0, 0 inserted = 0 with self.conn.cursor() as cursor: for row in rows: cursor.execute(sql, row) inserted += int(cursor.rowcount or 0) skipped = len(rows) - inserted return inserted, skipped def list_records_with_wxindex_trend_after( self, *, after_created_at: datetime, ) -> list[dict[str, Any]]: sql = """ SELECT id, created_at, wxindex_trend_json FROM hot_content_records WHERE created_at > %s AND wxindex_trend_json IS NOT NULL AND TRIM(CAST(wxindex_trend_json AS CHAR)) <> '' ORDER BY id ASC """ with self.conn.cursor() as cursor: cursor.execute(sql, (after_created_at,)) rows = cursor.fetchall() records: list[dict[str, Any]] = [] for row in rows: records.append( { "id": int(row["id"]), "created_at": row.get("created_at"), "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")), } ) return records def list_records_with_wxindex_trend( self, *, since_dt: datetime, ) -> list[dict[str, Any]]: sql = """ SELECT id, created_at, wxindex_trend_json FROM hot_content_records WHERE created_at >= %s AND wxindex_trend_json IS NOT NULL AND TRIM(CAST(wxindex_trend_json AS CHAR)) <> '' ORDER BY id ASC """ with self.conn.cursor() as cursor: cursor.execute(sql, (since_dt,)) rows = cursor.fetchall() records: list[dict[str, Any]] = [] for row in rows: records.append( { "id": int(row["id"]), "created_at": row.get("created_at"), "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")), } ) return records def _ensure_wxindex_word_meta_table(self) -> None: sql = """ CREATE TABLE IF NOT EXISTS hot_content_wxindex_word_meta ( id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键', name VARCHAR(256) NOT NULL COMMENT '词', event_created_at DATETIME NOT NULL COMMENT '首次关联热点事件创建时间', fetch_start_ymd VARCHAR(8) NOT NULL COMMENT '数据窗口左边界:事件创建日往前7天', fetch_end_ymd VARCHAR(8) NOT NULL DEFAULT '' COMMENT '数据窗口右边界:事件创建日后7天', meta_created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '元数据创建时间', PRIMARY KEY (id), UNIQUE KEY uk_name (name), KEY idx_event_created_at (event_created_at) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci """ with self.conn.cursor() as cursor: cursor.execute(sql) self._ensure_wxindex_word_meta_id_column(cursor) self._ensure_wxindex_word_meta_fetch_end_column(cursor) def _ensure_wxindex_word_meta_fetch_end_column(self, cursor: Any) -> None: cursor.execute( """ SELECT COUNT(*) AS cnt FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'hot_content_wxindex_word_meta' AND COLUMN_NAME = 'fetch_end_ymd' """ ) if int((cursor.fetchone() or {}).get("cnt") or 0) == 0: cursor.execute( """ ALTER TABLE hot_content_wxindex_word_meta ADD COLUMN fetch_end_ymd VARCHAR(8) NOT NULL DEFAULT '' COMMENT '数据窗口右边界:事件创建日后7天' AFTER fetch_start_ymd """ ) cursor.execute( """ UPDATE hot_content_wxindex_word_meta SET fetch_end_ymd = DATE_FORMAT( DATE_ADD(DATE(event_created_at), INTERVAL 7 DAY), '%Y%m%d' ) WHERE fetch_end_ymd IS NULL OR TRIM(fetch_end_ymd) = '' """ ) def _ensure_wxindex_word_meta_id_column(self, cursor: Any) -> None: cursor.execute( """ SELECT COUNT(*) AS cnt FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = 'hot_content_wxindex_word_meta' AND COLUMN_NAME = 'id' """ ) if int((cursor.fetchone() or {}).get("cnt") or 0) > 0: return cursor.execute( """ ALTER TABLE hot_content_wxindex_word_meta DROP PRIMARY KEY, ADD COLUMN id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY FIRST, ADD UNIQUE KEY uk_name (name) """ ) @staticmethod def _normalize_wxindex_word_meta_row(row: dict[str, Any]) -> dict[str, Any] | None: name = str(row.get("name") or "").strip() fetch_start_ymd = str(row.get("fetch_start_ymd") or "").strip() fetch_end_ymd = str(row.get("fetch_end_ymd") or "").strip() event_created_at = row.get("event_created_at") if not name or not fetch_start_ymd or event_created_at is None: return None if not fetch_end_ymd and isinstance(event_created_at, datetime): event_date = event_created_at.date() fetch_end_ymd = (event_date + timedelta(days=7)).strftime("%Y%m%d") if not fetch_end_ymd: return None try: meta_id = int(row.get("id")) except (TypeError, ValueError): return None return { "id": meta_id, "name": name, "event_created_at": event_created_at, "fetch_start_ymd": fetch_start_ymd, "fetch_end_ymd": fetch_end_ymd, } def _ensure_wxindex_words_table(self) -> None: sql = """ CREATE TABLE IF NOT EXISTS hot_content_wxindex_words ( id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, name VARCHAR(256) NOT NULL COMMENT '词', dt VARCHAR(8) NOT NULL COMMENT '日期 yyyymmdd', total_score DOUBLE NOT NULL COMMENT '微信指数', created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', PRIMARY KEY (id), UNIQUE KEY uk_name_dt (name, dt), KEY idx_name (name), KEY idx_dt (dt) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci """ with self.conn.cursor() as cursor: cursor.execute(sql) def _ensure_odps_sync_log_table(self) -> None: sql = """ CREATE TABLE IF NOT EXISTS hot_content_odps_sync_log ( id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, partition_dt VARCHAR(8) NOT NULL COMMENT 'ODPS 分区 dt', strategy VARCHAR(128) NOT NULL COMMENT '需求 strategy', demand_id CHAR(32) NOT NULL COMMENT 'ODPS demand_id', demand_name VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '需求名', demand_type VARCHAR(32) NOT NULL DEFAULT '' COMMENT '特征点/短语', record_id BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '来源 hot_content_records.id', weight DOUBLE NULL DEFAULT NULL COMMENT 'ODPS 需求权重(记录 wxindex 最高分 / 1000000)', extend TEXT NULL DEFAULT NULL COMMENT 'ODPS extend 扩展字段 JSON', synced_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '写入 ODPS 时间', PRIMARY KEY (id), UNIQUE KEY uk_odps_sync (partition_dt, strategy, demand_id), KEY idx_record_partition (record_id, partition_dt), KEY idx_partition_strategy (partition_dt, strategy) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci """ with self.conn.cursor() as cursor: cursor.execute(sql) self._ensure_odps_sync_log_weight_column(cursor) self._ensure_odps_sync_log_extend_column(cursor)