| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105 |
- """热点内容 MySQL 仓储。"""
- from __future__ import annotations
- import hashlib
- import json
- from datetime import datetime, timedelta
- from typing import Any
- try:
- import pymysql
- from pymysql.cursors import DictCursor
- except ImportError: # pragma: no cover - runtime dependency check
- pymysql = None
- DictCursor = None
- from app.hot_content.exceptions import HotContentFlowError
- from app.hot_content.status import ExecutionStatus, PostprocessStatus
- from app.hot_content.timezone import SHANGHAI_TZ
- from app.hot_content.types import MysqlConfig
- def _json_dumps(data: Any) -> str:
- return json.dumps(data, ensure_ascii=False, separators=(",", ":"))
- def _json_loads(value: Any) -> Any:
- if value is None:
- return None
- if isinstance(value, (dict, list)):
- return value
- if isinstance(value, (bytes, bytearray)):
- value = value.decode("utf-8")
- if isinstance(value, str):
- return json.loads(value)
- return value
- def _normalize_demand_names(demand_name_set: list[str]) -> list[str]:
- names: list[str] = []
- seen: set[str] = set()
- for item in demand_name_set:
- name = str(item).strip()
- if not name or name in seen:
- continue
- seen.add(name)
- names.append(name)
- return names
- def unique_title_key(source: str, title: str) -> str:
- return hashlib.sha256(f"{source}\n{title}".encode("utf-8")).hexdigest()
- class HotContentRepository:
- def __init__(self, config: MysqlConfig):
- if pymysql is None or DictCursor is None:
- raise HotContentFlowError("missing dependency: pip install pymysql")
- self.conn = pymysql.connect(
- host=config.host,
- port=config.port,
- user=config.user,
- password=config.password,
- database=config.database,
- charset=config.charset,
- autocommit=True,
- cursorclass=DictCursor,
- )
- def close(self) -> None:
- self.conn.close()
- def upsert_record(self, *, source: str, title: str, rank: int | None) -> dict[str, Any]:
- key = unique_title_key(source, title)
- sql = """
- INSERT INTO hot_content_records (
- unique_key, source, title, hot_rank, execution_status, created_at, updated_at
- )
- VALUES (%s, %s, %s, %s, %s, NOW(), NOW())
- ON DUPLICATE KEY UPDATE
- id=LAST_INSERT_ID(id),
- hot_rank=VALUES(hot_rank),
- updated_at=NOW()
- """
- with self.conn.cursor() as cursor:
- cursor.execute(
- sql,
- (
- key,
- source,
- title,
- rank,
- ExecutionStatus.HOT_SAVED,
- ),
- )
- record_id = int(cursor.lastrowid)
- cursor.execute(
- """
- SELECT
- id,
- unique_key,
- execution_status,
- article_title,
- article_body,
- article_url,
- decode_request_result IS NOT NULL AS has_decode_request,
- contribution_points_json IS NOT NULL AS has_contribution_points
- FROM hot_content_records
- WHERE id = %s
- """,
- (record_id,),
- )
- row = cursor.fetchone()
- if not row:
- raise HotContentFlowError(f"missing hot_content_records id={record_id}")
- return {
- "id": int(row["id"]),
- "unique_key": str(row["unique_key"]),
- "execution_status": int(row["execution_status"]),
- "article_title": row.get("article_title"),
- "article_body": row.get("article_body"),
- "article_url": row.get("article_url"),
- "has_decode_request": bool(row.get("has_decode_request")),
- "has_contribution_points": bool(row.get("has_contribution_points")),
- }
- def update_status(
- self,
- *,
- record_id: int,
- status: int,
- error_message: str | None = None,
- ) -> None:
- sql = """
- UPDATE hot_content_records
- SET execution_status=%s, error_reason=%s, updated_at=NOW()
- WHERE id=%s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (status, error_message, record_id))
- def update_article(
- self,
- *,
- record_id: int,
- article_title: str,
- article_body: str,
- url: str,
- ) -> None:
- sql = """
- UPDATE hot_content_records
- SET article_title=%s,
- article_body=%s,
- article_url=%s,
- execution_status=%s,
- error_reason=NULL,
- updated_at=NOW()
- WHERE id=%s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(
- sql,
- (
- article_title,
- article_body,
- url,
- ExecutionStatus.CONTENT_OK,
- record_id,
- ),
- )
- def mark_no_valid_content(
- self,
- *,
- record_id: int,
- reason: str,
- ) -> None:
- """搜不到文章或缺标题/正文:仅更新 execution_status + error_reason,不动分类字段。"""
- sql = """
- UPDATE hot_content_records
- SET execution_status=%s,
- error_reason=%s,
- updated_at=NOW()
- WHERE id=%s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(
- sql,
- (
- ExecutionStatus.NO_VALID_CONTENT,
- str(reason or "no valid content").strip(),
- record_id,
- ),
- )
- def update_category_filter_result(
- self,
- *,
- record_id: int,
- passed: bool,
- result_json: dict[str, Any],
- ) -> None:
- self._ensure_category_filter_columns()
- status = (
- ExecutionStatus.CATEGORY_FILTER_PASSED
- if passed
- else ExecutionStatus.CATEGORY_FILTER_REJECTED
- )
- reason = str(result_json.get("reason") or "").strip()
- error_message = None if passed else (reason or "category filter rejected")
- sql = """
- UPDATE hot_content_records
- SET execution_status=%s,
- category_filter_passed=%s,
- category_filter_reason=%s,
- category_filter_json=%s,
- error_reason=%s,
- updated_at=NOW()
- WHERE id=%s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(
- sql,
- (
- status,
- 1 if passed else 0,
- reason or None,
- _json_dumps(result_json),
- error_message,
- record_id,
- ),
- )
- def get_record_for_category_filter(self, record_id: int) -> dict[str, Any] | None:
- self._ensure_category_filter_columns()
- sql = """
- SELECT
- id,
- source,
- title,
- article_title,
- article_body,
- article_url,
- execution_status,
- category_filter_passed,
- category_filter_reason,
- category_filter_json
- FROM hot_content_records
- WHERE id = %s
- LIMIT 1
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (record_id,))
- row = cursor.fetchone()
- if not row:
- return None
- category_filter_json = _json_loads(row.get("category_filter_json"))
- passed_raw = row.get("category_filter_passed")
- passed: bool | None
- if passed_raw is None:
- passed = None
- else:
- passed = bool(int(passed_raw))
- return {
- "id": int(row["id"]),
- "source": str(row.get("source") or ""),
- "title": str(row.get("title") or ""),
- "article_title": str(row.get("article_title") or ""),
- "article_body": str(row.get("article_body") or ""),
- "article_url": str(row.get("article_url") or ""),
- "execution_status": int(row.get("execution_status") or 0),
- "category_filter_passed": passed,
- "category_filter_reason": str(row.get("category_filter_reason") or ""),
- "category_filter_json": (
- category_filter_json if isinstance(category_filter_json, dict) else {}
- ),
- }
- def get_category_filter_status(self, record_id: int) -> dict[str, Any] | None:
- record = self.get_record_for_category_filter(record_id)
- if not record:
- return None
- matched_category = None
- category_filter_json = record.get("category_filter_json") or {}
- if isinstance(category_filter_json, dict):
- matched_category = category_filter_json.get("matched_category")
- return {
- "id": record["id"],
- "passed": record["category_filter_passed"],
- "reason": record["category_filter_reason"],
- "matched_category": matched_category,
- "execution_status": record["execution_status"],
- }
- def update_decode_result(
- self,
- *,
- record_id: int,
- status: int,
- request_json: dict[str, Any],
- response_json: dict[str, Any] | None,
- error_message: str | None = None,
- ) -> None:
- decode_request_result = {
- "request": request_json,
- "response": response_json,
- }
- sql = """
- UPDATE hot_content_records
- SET decode_request_result=%s,
- execution_status=%s,
- error_reason=%s,
- updated_at=NOW()
- WHERE id=%s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(
- sql,
- (
- _json_dumps(decode_request_result),
- status,
- error_message,
- record_id,
- ),
- )
- def list_decode_result_candidates(self, *, limit: int) -> list[dict[str, Any]]:
- sql = """
- SELECT id, unique_key
- FROM hot_content_records
- WHERE execution_status IN (%s, %s, %s)
- AND contribution_points_json IS NULL
- ORDER BY updated_at ASC, id ASC
- LIMIT %s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(
- sql,
- (
- ExecutionStatus.DECODE_SUBMITTED,
- ExecutionStatus.DECODE_SUCCESS,
- ExecutionStatus.DECODE_PENDING,
- limit,
- ),
- )
- rows = cursor.fetchall()
- return [
- {
- "id": int(row["id"]),
- "unique_key": str(row["unique_key"]),
- }
- for row in rows
- ]
- def save_decode_result_export(
- self,
- *,
- record_id: int,
- decode_result_json: dict[str, Any],
- contribution_points_json: dict[str, Any],
- ) -> None:
- sql = """
- UPDATE hot_content_records
- SET decode_result_json=%s,
- contribution_points_json=%s,
- execution_status=%s,
- error_reason=NULL,
- updated_at=NOW()
- WHERE id=%s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(
- sql,
- (
- _json_dumps(decode_result_json),
- _json_dumps(contribution_points_json),
- ExecutionStatus.CONTRIBUTION_EXTRACTED,
- record_id,
- ),
- )
- def get_demand_cache_by_hour(self, *, cache_hour: datetime) -> dict[str, Any] | None:
- sql = """
- SELECT
- id,
- cache_hour,
- source_table,
- partition_dt,
- demand_name_set_json,
- item_count,
- updated_at
- FROM demand_pool_hourly_cache
- WHERE cache_hour=%s
- LIMIT 1
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (cache_hour,))
- row = cursor.fetchone()
- if not row:
- return None
- demand_name_set = _json_loads(row.get("demand_name_set_json")) or []
- if not isinstance(demand_name_set, list):
- demand_name_set = []
- return {
- "id": int(row["id"]),
- "cache_hour": row.get("cache_hour"),
- "source_table": str(row["source_table"]),
- "partition_dt": row.get("partition_dt"),
- "demand_name_set": [
- str(name).strip()
- for name in demand_name_set
- if str(name).strip()
- ],
- "item_count": int(row.get("item_count") or 0),
- "updated_at": row.get("updated_at"),
- }
- def save_demand_cache_set(
- self,
- *,
- cache_hour: datetime,
- source_table: str,
- partition_dt: str | None,
- excluded_strategy: str,
- top_n: int,
- demand_name_set: list[str],
- ) -> int:
- sql = """
- INSERT INTO demand_pool_hourly_cache (
- cache_hour,
- source_table,
- partition_dt,
- excluded_strategy,
- top_n,
- demand_name_set_json,
- item_count,
- created_at,
- updated_at
- )
- VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
- ON DUPLICATE KEY UPDATE
- id=LAST_INSERT_ID(id),
- source_table=VALUES(source_table),
- partition_dt=VALUES(partition_dt),
- excluded_strategy=VALUES(excluded_strategy),
- top_n=VALUES(top_n),
- demand_name_set_json=VALUES(demand_name_set_json),
- item_count=VALUES(item_count),
- updated_at=NOW()
- """
- normalized_names = _normalize_demand_names(demand_name_set)
- with self.conn.cursor() as cursor:
- cursor.execute(
- sql,
- (
- cache_hour,
- source_table,
- partition_dt,
- excluded_strategy,
- top_n,
- _json_dumps(normalized_names),
- len(normalized_names),
- ),
- )
- return int(cursor.lastrowid)
- def list_postprocess_candidates(self, *, limit: int) -> list[dict[str, Any]]:
- sql = """
- SELECT
- id,
- unique_key,
- source,
- title,
- article_title,
- article_body,
- demand_cache_run_id,
- decode_result_json,
- contribution_points_json,
- contribution_demand_match_json
- FROM hot_content_records
- WHERE contribution_points_json IS NOT NULL
- AND postprocess_status IN (%s, %s, %s)
- ORDER BY updated_at ASC, id ASC
- LIMIT %s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(
- sql,
- (
- PostprocessStatus.PENDING,
- PostprocessStatus.DEMAND_MATCHED,
- PostprocessStatus.FAILED,
- limit,
- ),
- )
- rows = cursor.fetchall()
- return [
- {
- "id": int(row["id"]),
- "unique_key": str(row["unique_key"]),
- "source": str(row.get("source") or ""),
- "title": str(row.get("title") or ""),
- "article_title": row.get("article_title"),
- "article_body": row.get("article_body"),
- "demand_cache_run_id": row.get("demand_cache_run_id"),
- "decode_result_json": _json_loads(row.get("decode_result_json")),
- "contribution_points_json": _json_loads(row.get("contribution_points_json")),
- "contribution_demand_match_json": _json_loads(
- row.get("contribution_demand_match_json")
- ),
- }
- for row in rows
- ]
- def save_contribution_demand_match(
- self,
- *,
- record_id: int,
- demand_cache_run_id: int,
- match_json: dict[str, Any],
- ) -> None:
- sql = """
- UPDATE hot_content_records
- SET demand_cache_run_id=%s,
- contribution_demand_match_json=%s,
- postprocess_status=%s,
- postprocess_error_reason=NULL,
- updated_at=NOW()
- WHERE id=%s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(
- sql,
- (
- demand_cache_run_id,
- _json_dumps(match_json),
- PostprocessStatus.DEMAND_MATCHED,
- record_id,
- ),
- )
- def save_wxindex_trend(
- self,
- *,
- record_id: int,
- trend_json: dict[str, Any],
- ) -> None:
- sql = """
- UPDATE hot_content_records
- SET wxindex_trend_json=%s,
- postprocess_status=%s,
- postprocess_error_reason=NULL,
- updated_at=NOW()
- WHERE id=%s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(
- sql,
- (
- _json_dumps(trend_json),
- PostprocessStatus.WXINDEX_DONE,
- record_id,
- ),
- )
- def save_demand_quality(
- self,
- *,
- record_id: int,
- event_sense_json: dict[str, Any],
- senior_fit_json: dict[str, Any],
- update_status: bool = True,
- ) -> None:
- self._ensure_record_quality_columns()
- if update_status:
- sql = """
- UPDATE hot_content_records
- SET demand_event_sense_json=%s,
- demand_senior_fit_json=%s,
- postprocess_status=%s,
- postprocess_error_reason=NULL,
- updated_at=NOW()
- WHERE id=%s
- """
- params = (
- _json_dumps(event_sense_json),
- _json_dumps(senior_fit_json),
- PostprocessStatus.QUALITY_DONE,
- record_id,
- )
- else:
- sql = """
- UPDATE hot_content_records
- SET demand_event_sense_json=%s,
- demand_senior_fit_json=%s,
- updated_at=NOW()
- WHERE id=%s
- """
- params = (
- _json_dumps(event_sense_json),
- _json_dumps(senior_fit_json),
- record_id,
- )
- with self.conn.cursor() as cursor:
- cursor.execute(sql, params)
- def update_postprocess_status(
- self,
- *,
- record_id: int,
- status: int,
- error_message: str | None = None,
- ) -> None:
- sql = """
- UPDATE hot_content_records
- SET postprocess_status=%s,
- postprocess_error_reason=%s,
- updated_at=NOW()
- WHERE id=%s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (status, error_message, record_id))
- def replace_demand_export_rows(
- self,
- *,
- record_id: int,
- source: str,
- hot_title: str,
- article_title: str,
- rows: list[dict[str, Any]],
- ) -> None:
- self._ensure_demand_export_table()
- delete_sql = "DELETE FROM hot_content_demand_exports WHERE record_id=%s"
- insert_sql = """
- INSERT INTO hot_content_demand_exports (
- record_id,
- source,
- hot_title,
- article_title,
- item_type,
- item_text,
- point_category,
- matched_demand,
- contribution_score,
- wxindex_keyword,
- all_hot_keywords,
- wxindex_latest_score,
- wxindex_trend,
- is_as_demand,
- event_sense_score,
- senior_fit_score,
- created_at,
- updated_at
- )
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
- """
- with self.conn.cursor() as cursor:
- cursor.execute(delete_sql, (record_id,))
- insert_rows = [
- (
- record_id,
- source,
- hot_title,
- article_title,
- str(item.get("item_type") or ""),
- str(item.get("item_text") or ""),
- str(item.get("point_category") or ""),
- str(item.get("matched_demand") or ""),
- item.get("contribution_score"),
- str(item.get("wxindex_keyword") or ""),
- str(item.get("all_hot_keywords") or ""),
- float(item.get("wxindex_latest_score") or 0),
- str(item.get("wxindex_trend") or ""),
- int(item.get("is_as_demand") or 0),
- item.get("event_sense_score"),
- item.get("senior_fit_score"),
- )
- for item in rows
- if str(item.get("item_type") or "").strip()
- and str(item.get("item_text") or "").strip()
- ]
- if insert_rows:
- cursor.executemany(insert_sql, insert_rows)
- def list_odps_sync_records(self) -> list[dict[str, Any]]:
- """读取当天创建且已完成质量判断的新记录,供 ODPS 同步(不处理历史数据)。"""
- self._ensure_record_quality_columns()
- today_start = datetime.now(SHANGHAI_TZ).replace(
- hour=0,
- minute=0,
- second=0,
- microsecond=0,
- tzinfo=None,
- )
- today_end = today_start + timedelta(days=1)
- sql = """
- SELECT
- id,
- contribution_points_json,
- contribution_demand_match_json,
- wxindex_trend_json,
- demand_event_sense_json,
- demand_senior_fit_json
- FROM hot_content_records
- WHERE created_at >= %s
- AND created_at < %s
- AND postprocess_status = %s
- AND contribution_demand_match_json IS NOT NULL
- AND TRIM(CAST(contribution_demand_match_json AS CHAR)) <> ''
- ORDER BY id ASC
- """
- with self.conn.cursor() as cursor:
- cursor.execute(
- sql,
- (today_start, today_end, PostprocessStatus.QUALITY_DONE),
- )
- rows = cursor.fetchall()
- records: list[dict[str, Any]] = []
- for row in rows:
- records.append(
- {
- "id": int(row["id"]),
- "contribution_points_json": _json_loads(
- row.get("contribution_points_json")
- ),
- "contribution_demand_match_json": _json_loads(
- row.get("contribution_demand_match_json")
- ),
- "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")),
- "demand_event_sense_json": _json_loads(
- row.get("demand_event_sense_json")
- ),
- "demand_senior_fit_json": _json_loads(
- row.get("demand_senior_fit_json")
- ),
- }
- )
- return records
- def list_demand_export_groups(self) -> list[dict[str, Any]]:
- """读取主表当天创建的 record 对应导出分组,仅供 ODPS 当天分区同步(不跨天)。"""
- self._ensure_demand_export_table()
- today_start = datetime.now(SHANGHAI_TZ).replace(
- hour=0,
- minute=0,
- second=0,
- microsecond=0,
- tzinfo=None,
- )
- today_end = today_start + timedelta(days=1)
- sql = """
- SELECT
- e.record_id,
- e.item_type,
- e.item_text,
- e.point_category,
- e.matched_demand,
- e.wxindex_latest_score
- FROM hot_content_demand_exports e
- INNER JOIN hot_content_records r ON r.id = e.record_id
- WHERE r.created_at >= %s
- AND r.created_at < %s
- ORDER BY e.record_id ASC, e.id ASC
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (today_start, today_end))
- rows = cursor.fetchall()
- grouped: dict[int, list[dict[str, Any]]] = {}
- for row in rows:
- record_id = int(row["record_id"])
- grouped.setdefault(record_id, []).append(
- {
- "item_type": str(row.get("item_type") or ""),
- "item_text": str(row.get("item_text") or ""),
- "point_category": str(row.get("point_category") or ""),
- "matched_demand": str(row.get("matched_demand") or ""),
- "wxindex_latest_score": float(row.get("wxindex_latest_score") or 0),
- }
- )
- return [
- {"record_id": record_id, "export_rows": export_rows}
- for record_id, export_rows in grouped.items()
- ]
- def _ensure_demand_export_table(self) -> None:
- sql = """
- CREATE TABLE IF NOT EXISTS hot_content_demand_exports (
- id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
- record_id BIGINT UNSIGNED NOT NULL,
- source VARCHAR(64) NOT NULL DEFAULT '',
- hot_title VARCHAR(1024) NOT NULL DEFAULT '',
- article_title VARCHAR(1024) NOT NULL DEFAULT '',
- item_type VARCHAR(32) NOT NULL COMMENT '元素/短语',
- item_text VARCHAR(1024) NOT NULL,
- point_category VARCHAR(32) NOT NULL DEFAULT '' COMMENT '点类型:灵感点/目的点/关键点',
- matched_demand VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '匹配到的需求',
- contribution_score DOUBLE NULL COMMENT '贡献分,仅元素有值',
- wxindex_keyword VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '获取微信指数的元素',
- all_hot_keywords VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '全部热点词',
- wxindex_latest_score DOUBLE NOT NULL DEFAULT 0,
- wxindex_trend VARCHAR(32) NOT NULL DEFAULT '' COMMENT '微信指数趋势',
- is_as_demand TINYINT NOT NULL DEFAULT 0 COMMENT '是否作为需求:0否 1是',
- created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
- updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
- PRIMARY KEY (id),
- KEY idx_record_id (record_id),
- KEY idx_source_type (source, item_type)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql)
- self._ensure_demand_export_column(
- cursor,
- "matched_demand",
- """
- ALTER TABLE hot_content_demand_exports
- ADD COLUMN matched_demand VARCHAR(1024) NOT NULL DEFAULT ''
- COMMENT '匹配到的需求'
- AFTER item_text
- """,
- )
- self._ensure_demand_export_column(
- cursor,
- "point_category",
- """
- ALTER TABLE hot_content_demand_exports
- ADD COLUMN point_category VARCHAR(32) NOT NULL DEFAULT ''
- COMMENT '点类型:灵感点/目的点/关键点'
- AFTER item_text
- """,
- )
- self._ensure_demand_export_column(
- cursor,
- "contribution_score",
- """
- ALTER TABLE hot_content_demand_exports
- ADD COLUMN contribution_score DOUBLE NULL
- COMMENT '贡献分,仅词有值'
- AFTER matched_demand
- """,
- )
- self._ensure_demand_export_column(
- cursor,
- "wxindex_trend",
- """
- ALTER TABLE hot_content_demand_exports
- ADD COLUMN wxindex_trend VARCHAR(32) NOT NULL DEFAULT ''
- COMMENT '微信指数趋势'
- AFTER wxindex_latest_score
- """,
- )
- self._ensure_demand_export_column(
- cursor,
- "wxindex_keyword",
- """
- ALTER TABLE hot_content_demand_exports
- ADD COLUMN wxindex_keyword VARCHAR(1024) NOT NULL DEFAULT ''
- COMMENT '获取微信指数的词'
- AFTER contribution_score
- """,
- )
- self._ensure_demand_export_column(
- cursor,
- "all_hot_keywords",
- """
- ALTER TABLE hot_content_demand_exports
- ADD COLUMN all_hot_keywords VARCHAR(1024) NOT NULL DEFAULT ''
- COMMENT '全部热点词'
- AFTER wxindex_keyword
- """,
- )
- self._ensure_demand_export_column(
- cursor,
- "is_as_demand",
- """
- ALTER TABLE hot_content_demand_exports
- ADD COLUMN is_as_demand TINYINT NOT NULL DEFAULT 0
- COMMENT '是否作为需求:0否 1是'
- AFTER wxindex_trend
- """,
- )
- self._ensure_demand_export_column(
- cursor,
- "event_sense_score",
- """
- ALTER TABLE hot_content_demand_exports
- ADD COLUMN event_sense_score DOUBLE NULL
- COMMENT '事件性得分 0-10'
- AFTER is_as_demand
- """,
- )
- self._ensure_demand_export_column(
- cursor,
- "senior_fit_score",
- """
- ALTER TABLE hot_content_demand_exports
- ADD COLUMN senior_fit_score DOUBLE NULL
- COMMENT '老年性得分 0-10'
- AFTER event_sense_score
- """,
- )
- def _ensure_record_quality_columns(self) -> None:
- with self.conn.cursor() as cursor:
- for column_name, alter_sql in (
- (
- "demand_event_sense_json",
- """
- ALTER TABLE hot_content_records
- ADD COLUMN demand_event_sense_json JSON NULL
- COMMENT '需求事件性 LLM 评分结果'
- AFTER wxindex_trend_json
- """,
- ),
- (
- "demand_senior_fit_json",
- """
- ALTER TABLE hot_content_records
- ADD COLUMN demand_senior_fit_json JSON NULL
- COMMENT '需求老年性 LLM 评分结果'
- AFTER demand_event_sense_json
- """,
- ),
- ):
- cursor.execute(
- """
- SELECT COUNT(*) AS cnt
- FROM information_schema.COLUMNS
- WHERE TABLE_SCHEMA = DATABASE()
- AND TABLE_NAME = 'hot_content_records'
- AND COLUMN_NAME = %s
- """,
- (column_name,),
- )
- if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
- cursor.execute(alter_sql)
- def _ensure_category_filter_columns(self) -> None:
- with self.conn.cursor() as cursor:
- for column_name, alter_sql in (
- (
- "category_filter_json",
- """
- ALTER TABLE hot_content_records
- ADD COLUMN category_filter_json JSON NULL
- COMMENT '老年人兴趣分类筛选 LLM 结果'
- AFTER article_url
- """,
- ),
- (
- "category_filter_passed",
- """
- ALTER TABLE hot_content_records
- ADD COLUMN category_filter_passed TINYINT NULL
- COMMENT '分类筛选是否通过:1通过 0不通过 NULL未筛选'
- AFTER category_filter_json
- """,
- ),
- (
- "category_filter_reason",
- """
- ALTER TABLE hot_content_records
- ADD COLUMN category_filter_reason TEXT NULL
- COMMENT '分类筛选原因'
- AFTER category_filter_passed
- """,
- ),
- ):
- cursor.execute(
- """
- SELECT COUNT(*) AS cnt
- FROM information_schema.COLUMNS
- WHERE TABLE_SCHEMA = DATABASE()
- AND TABLE_NAME = 'hot_content_records'
- AND COLUMN_NAME = %s
- """,
- (column_name,),
- )
- if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
- cursor.execute(alter_sql)
- def _ensure_demand_export_column(
- self,
- cursor: Any,
- column_name: str,
- alter_sql: str,
- ) -> None:
- cursor.execute(
- """
- SELECT COUNT(*) AS cnt
- FROM information_schema.COLUMNS
- WHERE TABLE_SCHEMA = DATABASE()
- AND TABLE_NAME = 'hot_content_demand_exports'
- AND COLUMN_NAME = %s
- """,
- (column_name,),
- )
- if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
- cursor.execute(alter_sql)
- def list_synced_odps_demand_ids(
- self,
- *,
- partition_dt: str,
- strategy: str,
- ) -> set[str]:
- self._ensure_odps_sync_log_table()
- sql = """
- SELECT demand_id
- FROM hot_content_odps_sync_log
- WHERE partition_dt = %s
- AND strategy = %s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (partition_dt, strategy))
- rows = cursor.fetchall()
- return {
- str(row.get("demand_id") or "").strip()
- for row in rows
- if str(row.get("demand_id") or "").strip()
- }
- def save_odps_sync_logs(self, rows: list[dict[str, Any]]) -> int:
- if not rows:
- return 0
- self._ensure_odps_sync_log_table()
- sql = """
- INSERT INTO hot_content_odps_sync_log (
- partition_dt,
- strategy,
- demand_id,
- demand_name,
- demand_type,
- record_id,
- weight
- )
- VALUES (%s, %s, %s, %s, %s, %s, %s)
- ON DUPLICATE KEY UPDATE
- demand_name = VALUES(demand_name),
- demand_type = VALUES(demand_type),
- record_id = VALUES(record_id),
- weight = VALUES(weight),
- synced_at = CURRENT_TIMESTAMP
- """
- insert_rows = [
- (
- str(item.get("partition_dt") or ""),
- str(item.get("strategy") or ""),
- str(item.get("demand_id") or ""),
- str(item.get("demand_name") or ""),
- str(item.get("demand_type") or ""),
- int(item.get("record_id") or 0),
- float(item["weight"]) if item.get("weight") is not None else None,
- )
- for item in rows
- if str(item.get("demand_id") or "").strip()
- ]
- with self.conn.cursor() as cursor:
- cursor.executemany(sql, insert_rows)
- return len(insert_rows)
- def _ensure_odps_sync_log_weight_column(self, cursor: Any) -> None:
- cursor.execute(
- """
- SELECT COUNT(*) AS cnt
- FROM information_schema.COLUMNS
- WHERE TABLE_SCHEMA = DATABASE()
- AND TABLE_NAME = 'hot_content_odps_sync_log'
- AND COLUMN_NAME = 'weight'
- """,
- )
- if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
- cursor.execute(
- """
- ALTER TABLE hot_content_odps_sync_log
- ADD COLUMN weight DOUBLE NULL DEFAULT NULL
- COMMENT 'ODPS 需求权重(记录 wxindex 最高分 / 1000000)'
- AFTER record_id
- """
- )
- def _ensure_odps_sync_log_table(self) -> None:
- sql = """
- CREATE TABLE IF NOT EXISTS hot_content_odps_sync_log (
- id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
- partition_dt VARCHAR(8) NOT NULL COMMENT 'ODPS 分区 dt',
- strategy VARCHAR(128) NOT NULL COMMENT '需求 strategy',
- demand_id CHAR(32) NOT NULL COMMENT 'ODPS demand_id',
- demand_name VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '需求名',
- demand_type VARCHAR(32) NOT NULL DEFAULT '' COMMENT '特征点/短语',
- record_id BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '来源 hot_content_records.id',
- weight DOUBLE NULL DEFAULT NULL COMMENT 'ODPS 需求权重(记录 wxindex 最高分 / 1000000)',
- synced_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '写入 ODPS 时间',
- PRIMARY KEY (id),
- UNIQUE KEY uk_odps_sync (partition_dt, strategy, demand_id),
- KEY idx_record_partition (record_id, partition_dt),
- KEY idx_partition_strategy (partition_dt, strategy)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql)
- self._ensure_odps_sync_log_weight_column(cursor)
|