| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569 |
- """热点内容 MySQL 仓储。"""
- from __future__ import annotations
- import hashlib
- import json
- from datetime import date, datetime, timedelta
- from typing import Any
- try:
- import pymysql
- from pymysql.cursors import DictCursor
- except ImportError: # pragma: no cover - runtime dependency check
- pymysql = None
- DictCursor = None
- from app.hot_content.exceptions import HotContentFlowError
- from app.hot_content.status import ExecutionStatus, PostprocessStatus
- from app.hot_content.timezone import SHANGHAI_TZ
- from app.hot_content.types import MysqlConfig
- def _json_dumps(data: Any) -> str:
- return json.dumps(data, ensure_ascii=False, separators=(",", ":"))
- def _nullable_bool(value: Any) -> int | None:
- if value is None:
- return None
- return 1 if value else 0
- def _nullable_str(value: Any) -> str | None:
- if value is None:
- return None
- text = str(value).strip()
- return text if text else None
- def _nullable_json_dumps(value: Any) -> str | None:
- if value is None:
- return None
- return _json_dumps(value)
- def _wxindex_word_record_row(payload: dict[str, Any]) -> tuple[Any, ...]:
- name = str(payload.get("name") or "").strip()
- analyze_ymd = str(payload.get("analyze_ymd") or "").strip()
- if not name or not analyze_ymd:
- raise HotContentFlowError("invalid wxindex word record payload")
- return (
- name,
- payload.get("meta_id"),
- analyze_ymd,
- payload.get("fetch_start_ymd"),
- payload.get("fetch_end_ymd"),
- payload.get("data_start_ymd"),
- payload.get("data_end_ymd"),
- payload.get("data_days"),
- _nullable_bool(payload.get("is_sustained_high")),
- _nullable_bool(payload.get("is_rising")),
- _nullable_bool(payload.get("is_spike")),
- payload.get("retain_reason"),
- _nullable_bool(payload.get("is_internal_demand_matched")),
- _nullable_str(payload.get("matched_demand")),
- payload.get("demand_cache_run_id"),
- _nullable_json_dumps(payload.get("internal_demand_match_json")),
- payload.get("senior_fit_score"),
- _nullable_json_dumps(payload.get("demand_senior_fit_json")),
- _nullable_bool(payload.get("is_final_retained")),
- payload.get("min_score"),
- payload.get("max_score"),
- payload.get("avg_score"),
- _nullable_json_dumps(payload.get("detail_json")),
- )
- _WXINDEX_WORD_RECORD_UPSERT_SQL = """
- INSERT INTO hot_content_wxindex_word_records (
- name,
- meta_id,
- analyze_ymd,
- fetch_start_ymd,
- fetch_end_ymd,
- data_start_ymd,
- data_end_ymd,
- data_days,
- is_sustained_high,
- is_rising,
- is_spike,
- retain_reason,
- is_internal_demand_matched,
- matched_demand,
- demand_cache_run_id,
- internal_demand_match_json,
- senior_fit_score,
- demand_senior_fit_json,
- is_final_retained,
- min_score,
- max_score,
- avg_score,
- detail_json
- )
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- ON DUPLICATE KEY UPDATE
- meta_id = VALUES(meta_id),
- fetch_start_ymd = VALUES(fetch_start_ymd),
- fetch_end_ymd = VALUES(fetch_end_ymd),
- data_start_ymd = VALUES(data_start_ymd),
- data_end_ymd = VALUES(data_end_ymd),
- data_days = VALUES(data_days),
- is_sustained_high = VALUES(is_sustained_high),
- is_rising = VALUES(is_rising),
- is_spike = VALUES(is_spike),
- retain_reason = VALUES(retain_reason),
- is_internal_demand_matched = VALUES(is_internal_demand_matched),
- matched_demand = VALUES(matched_demand),
- demand_cache_run_id = VALUES(demand_cache_run_id),
- internal_demand_match_json = VALUES(internal_demand_match_json),
- senior_fit_score = VALUES(senior_fit_score),
- demand_senior_fit_json = VALUES(demand_senior_fit_json),
- is_final_retained = VALUES(is_final_retained),
- min_score = VALUES(min_score),
- max_score = VALUES(max_score),
- avg_score = VALUES(avg_score),
- detail_json = VALUES(detail_json)
- """
- _WXINDEX_WORD_RECORD_INIT_UPSERT_SQL = """
- INSERT INTO hot_content_wxindex_word_records (
- name,
- meta_id,
- analyze_ymd,
- fetch_start_ymd,
- fetch_end_ymd,
- data_start_ymd,
- data_end_ymd,
- data_days,
- is_sustained_high,
- is_rising,
- is_spike,
- retain_reason,
- is_internal_demand_matched,
- matched_demand,
- demand_cache_run_id,
- internal_demand_match_json,
- senior_fit_score,
- demand_senior_fit_json,
- is_final_retained,
- min_score,
- max_score,
- avg_score,
- detail_json
- )
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- ON DUPLICATE KEY UPDATE
- meta_id = VALUES(meta_id),
- fetch_start_ymd = VALUES(fetch_start_ymd),
- fetch_end_ymd = VALUES(fetch_end_ymd),
- demand_cache_run_id = VALUES(demand_cache_run_id)
- """
- def _wxindex_word_stats_row(payload: dict[str, Any]) -> tuple[Any, ...]:
- name = str(payload.get("name") or "").strip()
- analyze_ymd = str(payload.get("analyze_ymd") or "").strip()
- if not name or not analyze_ymd:
- raise HotContentFlowError("invalid wxindex word stats payload")
- return (
- name,
- payload.get("meta_id"),
- analyze_ymd,
- payload.get("wxindex_word_record_id"),
- payload.get("retain_reason"),
- payload.get("senior_fit_score"),
- payload.get("data_start_ymd"),
- payload.get("data_end_ymd"),
- payload.get("data_days"),
- payload.get("min_score"),
- payload.get("max_score"),
- payload.get("avg_score"),
- _nullable_json_dumps(payload.get("detail_json")),
- )
- _WXINDEX_WORD_STATS_UPSERT_SQL = """
- INSERT INTO hot_content_wxindex_word_stats (
- name,
- meta_id,
- analyze_ymd,
- wxindex_word_record_id,
- retain_reason,
- senior_fit_score,
- data_start_ymd,
- data_end_ymd,
- data_days,
- min_score,
- max_score,
- avg_score,
- detail_json
- )
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
- ON DUPLICATE KEY UPDATE
- meta_id = VALUES(meta_id),
- wxindex_word_record_id = VALUES(wxindex_word_record_id),
- retain_reason = VALUES(retain_reason),
- senior_fit_score = VALUES(senior_fit_score),
- data_start_ymd = VALUES(data_start_ymd),
- data_end_ymd = VALUES(data_end_ymd),
- data_days = VALUES(data_days),
- min_score = VALUES(min_score),
- max_score = VALUES(max_score),
- avg_score = VALUES(avg_score),
- detail_json = VALUES(detail_json)
- """
- def _json_loads(value: Any) -> Any:
- if value is None:
- return None
- if isinstance(value, (dict, list)):
- return value
- if isinstance(value, (bytes, bytearray)):
- value = value.decode("utf-8")
- if isinstance(value, str):
- return json.loads(value)
- return value
- def _normalize_demand_names(demand_name_set: list[str]) -> list[str]:
- names: list[str] = []
- seen: set[str] = set()
- for item in demand_name_set:
- name = str(item).strip()
- if not name or name in seen:
- continue
- seen.add(name)
- names.append(name)
- return names
- def unique_title_key(source: str, title: str) -> str:
- return hashlib.sha256(f"{source}\n{title}".encode("utf-8")).hexdigest()
- class HotContentRepository:
- def __init__(self, config: MysqlConfig):
- if pymysql is None or DictCursor is None:
- raise HotContentFlowError("missing dependency: pip install pymysql")
- self.conn = pymysql.connect(
- host=config.host,
- port=config.port,
- user=config.user,
- password=config.password,
- database=config.database,
- charset=config.charset,
- autocommit=True,
- cursorclass=DictCursor,
- )
- def close(self) -> None:
- self.conn.close()
- def upsert_record(self, *, source: str, title: str, rank: int | None) -> dict[str, Any]:
- key = unique_title_key(source, title)
- sql = """
- INSERT INTO hot_content_records (
- unique_key, source, title, hot_rank, execution_status, created_at, updated_at
- )
- VALUES (%s, %s, %s, %s, %s, NOW(), NOW())
- ON DUPLICATE KEY UPDATE
- id=LAST_INSERT_ID(id),
- hot_rank=VALUES(hot_rank),
- updated_at=NOW()
- """
- with self.conn.cursor() as cursor:
- cursor.execute(
- sql,
- (
- key,
- source,
- title,
- rank,
- ExecutionStatus.HOT_SAVED,
- ),
- )
- record_id = int(cursor.lastrowid)
- cursor.execute(
- """
- SELECT
- id,
- unique_key,
- execution_status,
- article_title,
- article_body,
- article_url,
- decode_request_result IS NOT NULL AS has_decode_request,
- contribution_points_json IS NOT NULL AS has_contribution_points
- FROM hot_content_records
- WHERE id = %s
- """,
- (record_id,),
- )
- row = cursor.fetchone()
- if not row:
- raise HotContentFlowError(f"missing hot_content_records id={record_id}")
- return {
- "id": int(row["id"]),
- "unique_key": str(row["unique_key"]),
- "execution_status": int(row["execution_status"]),
- "article_title": row.get("article_title"),
- "article_body": row.get("article_body"),
- "article_url": row.get("article_url"),
- "has_decode_request": bool(row.get("has_decode_request")),
- "has_contribution_points": bool(row.get("has_contribution_points")),
- }
- def update_status(
- self,
- *,
- record_id: int,
- status: int,
- error_message: str | None = None,
- ) -> None:
- sql = """
- UPDATE hot_content_records
- SET execution_status=%s, error_reason=%s, updated_at=NOW()
- WHERE id=%s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (status, error_message, record_id))
- def update_article(
- self,
- *,
- record_id: int,
- article_title: str,
- article_body: str,
- url: str,
- ) -> None:
- sql = """
- UPDATE hot_content_records
- SET article_title=%s,
- article_body=%s,
- article_url=%s,
- execution_status=%s,
- error_reason=NULL,
- updated_at=NOW()
- WHERE id=%s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(
- sql,
- (
- article_title,
- article_body,
- url,
- ExecutionStatus.CONTENT_OK,
- record_id,
- ),
- )
- def mark_no_valid_content(
- self,
- *,
- record_id: int,
- reason: str,
- ) -> None:
- """搜不到文章或缺标题/正文:仅更新 execution_status + error_reason,不动分类字段。"""
- sql = """
- UPDATE hot_content_records
- SET execution_status=%s,
- error_reason=%s,
- updated_at=NOW()
- WHERE id=%s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(
- sql,
- (
- ExecutionStatus.NO_VALID_CONTENT,
- str(reason or "no valid content").strip(),
- record_id,
- ),
- )
- def update_category_filter_result(
- self,
- *,
- record_id: int,
- passed: bool,
- result_json: dict[str, Any],
- ) -> None:
- self._ensure_category_filter_columns()
- status = (
- ExecutionStatus.CATEGORY_FILTER_PASSED
- if passed
- else ExecutionStatus.CATEGORY_FILTER_REJECTED
- )
- reason = str(result_json.get("reason") or "").strip()
- error_message = None if passed else (reason or "category filter rejected")
- sql = """
- UPDATE hot_content_records
- SET execution_status=%s,
- category_filter_passed=%s,
- category_filter_reason=%s,
- category_filter_json=%s,
- error_reason=%s,
- updated_at=NOW()
- WHERE id=%s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(
- sql,
- (
- status,
- 1 if passed else 0,
- reason or None,
- _json_dumps(result_json),
- error_message,
- record_id,
- ),
- )
- def get_record_for_category_filter(self, record_id: int) -> dict[str, Any] | None:
- self._ensure_category_filter_columns()
- sql = """
- SELECT
- id,
- source,
- title,
- article_title,
- article_body,
- article_url,
- execution_status,
- category_filter_passed,
- category_filter_reason,
- category_filter_json
- FROM hot_content_records
- WHERE id = %s
- LIMIT 1
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (record_id,))
- row = cursor.fetchone()
- if not row:
- return None
- category_filter_json = _json_loads(row.get("category_filter_json"))
- passed_raw = row.get("category_filter_passed")
- passed: bool | None
- if passed_raw is None:
- passed = None
- else:
- passed = bool(int(passed_raw))
- return {
- "id": int(row["id"]),
- "source": str(row.get("source") or ""),
- "title": str(row.get("title") or ""),
- "article_title": str(row.get("article_title") or ""),
- "article_body": str(row.get("article_body") or ""),
- "article_url": str(row.get("article_url") or ""),
- "execution_status": int(row.get("execution_status") or 0),
- "category_filter_passed": passed,
- "category_filter_reason": str(row.get("category_filter_reason") or ""),
- "category_filter_json": (
- category_filter_json if isinstance(category_filter_json, dict) else {}
- ),
- }
- def get_category_filter_status(self, record_id: int) -> dict[str, Any] | None:
- record = self.get_record_for_category_filter(record_id)
- if not record:
- return None
- matched_category = None
- category_filter_json = record.get("category_filter_json") or {}
- if isinstance(category_filter_json, dict):
- matched_category = category_filter_json.get("matched_category")
- return {
- "id": record["id"],
- "passed": record["category_filter_passed"],
- "reason": record["category_filter_reason"],
- "matched_category": matched_category,
- "execution_status": record["execution_status"],
- }
- def update_decode_result(
- self,
- *,
- record_id: int,
- status: int,
- request_json: dict[str, Any],
- response_json: dict[str, Any] | None,
- error_message: str | None = None,
- ) -> None:
- decode_request_result = {
- "request": request_json,
- "response": response_json,
- }
- sql = """
- UPDATE hot_content_records
- SET decode_request_result=%s,
- execution_status=%s,
- error_reason=%s,
- updated_at=NOW()
- WHERE id=%s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(
- sql,
- (
- _json_dumps(decode_request_result),
- status,
- error_message,
- record_id,
- ),
- )
- def list_decode_result_candidates(self, *, limit: int) -> list[dict[str, Any]]:
- sql = """
- SELECT id, unique_key
- FROM hot_content_records
- WHERE execution_status IN (%s, %s, %s)
- AND contribution_points_json IS NULL
- ORDER BY updated_at ASC, id ASC
- LIMIT %s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(
- sql,
- (
- ExecutionStatus.DECODE_SUBMITTED,
- ExecutionStatus.DECODE_SUCCESS,
- ExecutionStatus.DECODE_PENDING,
- limit,
- ),
- )
- rows = cursor.fetchall()
- return [
- {
- "id": int(row["id"]),
- "unique_key": str(row["unique_key"]),
- }
- for row in rows
- ]
- def save_decode_result_export(
- self,
- *,
- record_id: int,
- decode_result_json: dict[str, Any],
- contribution_points_json: dict[str, Any],
- ) -> None:
- sql = """
- UPDATE hot_content_records
- SET decode_result_json=%s,
- contribution_points_json=%s,
- execution_status=%s,
- error_reason=NULL,
- updated_at=NOW()
- WHERE id=%s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(
- sql,
- (
- _json_dumps(decode_result_json),
- _json_dumps(contribution_points_json),
- ExecutionStatus.CONTRIBUTION_EXTRACTED,
- record_id,
- ),
- )
- def get_demand_cache_by_hour(self, *, cache_hour: datetime) -> dict[str, Any] | None:
- sql = """
- SELECT
- id,
- cache_hour,
- source_table,
- partition_dt,
- demand_name_set_json,
- item_count,
- updated_at
- FROM demand_pool_hourly_cache
- WHERE cache_hour=%s
- LIMIT 1
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (cache_hour,))
- row = cursor.fetchone()
- if not row:
- return None
- demand_name_set = _json_loads(row.get("demand_name_set_json")) or []
- if not isinstance(demand_name_set, list):
- demand_name_set = []
- return {
- "id": int(row["id"]),
- "cache_hour": row.get("cache_hour"),
- "source_table": str(row["source_table"]),
- "partition_dt": row.get("partition_dt"),
- "demand_name_set": [
- str(name).strip()
- for name in demand_name_set
- if str(name).strip()
- ],
- "item_count": int(row.get("item_count") or 0),
- "updated_at": row.get("updated_at"),
- }
- def save_demand_cache_set(
- self,
- *,
- cache_hour: datetime,
- source_table: str,
- partition_dt: str | None,
- excluded_strategy: str,
- top_n: int,
- demand_name_set: list[str],
- ) -> int:
- sql = """
- INSERT INTO demand_pool_hourly_cache (
- cache_hour,
- source_table,
- partition_dt,
- excluded_strategy,
- top_n,
- demand_name_set_json,
- item_count,
- created_at,
- updated_at
- )
- VALUES (%s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
- ON DUPLICATE KEY UPDATE
- id=LAST_INSERT_ID(id),
- source_table=VALUES(source_table),
- partition_dt=VALUES(partition_dt),
- excluded_strategy=VALUES(excluded_strategy),
- top_n=VALUES(top_n),
- demand_name_set_json=VALUES(demand_name_set_json),
- item_count=VALUES(item_count),
- updated_at=NOW()
- """
- normalized_names = _normalize_demand_names(demand_name_set)
- with self.conn.cursor() as cursor:
- cursor.execute(
- sql,
- (
- cache_hour,
- source_table,
- partition_dt,
- excluded_strategy,
- top_n,
- _json_dumps(normalized_names),
- len(normalized_names),
- ),
- )
- return int(cursor.lastrowid)
- def list_postprocess_candidates(self, *, limit: int) -> list[dict[str, Any]]:
- sql = """
- SELECT
- id,
- unique_key,
- source,
- title,
- created_at,
- article_title,
- article_body,
- demand_cache_run_id,
- postprocess_status,
- decode_result_json,
- contribution_points_json,
- contribution_demand_match_json,
- wxindex_trend_json,
- demand_event_sense_json,
- demand_senior_fit_json
- FROM hot_content_records
- WHERE contribution_points_json IS NOT NULL
- AND postprocess_status IN (%s, %s, %s, %s)
- ORDER BY updated_at ASC, id ASC
- LIMIT %s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(
- sql,
- (
- PostprocessStatus.PENDING,
- PostprocessStatus.DEMAND_MATCHED,
- PostprocessStatus.WXINDEX_DONE,
- PostprocessStatus.FAILED,
- limit,
- ),
- )
- rows = cursor.fetchall()
- return [
- {
- "id": int(row["id"]),
- "unique_key": str(row["unique_key"]),
- "source": str(row.get("source") or ""),
- "title": str(row.get("title") or ""),
- "created_at": row.get("created_at"),
- "article_title": row.get("article_title"),
- "article_body": row.get("article_body"),
- "demand_cache_run_id": row.get("demand_cache_run_id"),
- "postprocess_status": int(row.get("postprocess_status") or 0),
- "decode_result_json": _json_loads(row.get("decode_result_json")),
- "contribution_points_json": _json_loads(row.get("contribution_points_json")),
- "contribution_demand_match_json": _json_loads(
- row.get("contribution_demand_match_json")
- ),
- "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")),
- "demand_event_sense_json": _json_loads(
- row.get("demand_event_sense_json")
- ),
- "demand_senior_fit_json": _json_loads(row.get("demand_senior_fit_json")),
- }
- for row in rows
- ]
- def save_contribution_demand_match(
- self,
- *,
- record_id: int,
- demand_cache_run_id: int,
- match_json: dict[str, Any],
- ) -> None:
- sql = """
- UPDATE hot_content_records
- SET demand_cache_run_id=%s,
- contribution_demand_match_json=%s,
- postprocess_status=%s,
- postprocess_error_reason=NULL,
- updated_at=NOW()
- WHERE id=%s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(
- sql,
- (
- demand_cache_run_id,
- _json_dumps(match_json),
- PostprocessStatus.DEMAND_MATCHED,
- record_id,
- ),
- )
- def save_wxindex_trend(
- self,
- *,
- record_id: int,
- trend_json: dict[str, Any],
- ) -> None:
- sql = """
- UPDATE hot_content_records
- SET wxindex_trend_json=%s,
- postprocess_status=%s,
- postprocess_error_reason=NULL,
- updated_at=NOW()
- WHERE id=%s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(
- sql,
- (
- _json_dumps(trend_json),
- PostprocessStatus.WXINDEX_DONE,
- record_id,
- ),
- )
- def save_demand_quality(
- self,
- *,
- record_id: int,
- event_sense_json: dict[str, Any],
- senior_fit_json: dict[str, Any],
- update_status: bool = True,
- ) -> None:
- self._ensure_record_quality_columns()
- if update_status:
- sql = """
- UPDATE hot_content_records
- SET demand_event_sense_json=%s,
- demand_senior_fit_json=%s,
- postprocess_status=%s,
- postprocess_error_reason=NULL,
- updated_at=NOW()
- WHERE id=%s
- """
- params = (
- _json_dumps(event_sense_json),
- _json_dumps(senior_fit_json),
- PostprocessStatus.QUALITY_DONE,
- record_id,
- )
- else:
- sql = """
- UPDATE hot_content_records
- SET demand_event_sense_json=%s,
- demand_senior_fit_json=%s,
- updated_at=NOW()
- WHERE id=%s
- """
- params = (
- _json_dumps(event_sense_json),
- _json_dumps(senior_fit_json),
- record_id,
- )
- with self.conn.cursor() as cursor:
- cursor.execute(sql, params)
- def update_postprocess_status(
- self,
- *,
- record_id: int,
- status: int,
- error_message: str | None = None,
- ) -> None:
- sql = """
- UPDATE hot_content_records
- SET postprocess_status=%s,
- postprocess_error_reason=%s,
- updated_at=NOW()
- WHERE id=%s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (status, error_message, record_id))
- def replace_demand_export_rows(
- self,
- *,
- record_id: int,
- source: str,
- hot_title: str,
- article_title: str,
- rows: list[dict[str, Any]],
- ) -> None:
- self._ensure_demand_export_table()
- delete_sql = "DELETE FROM hot_content_demand_exports WHERE record_id=%s"
- insert_sql = """
- INSERT INTO hot_content_demand_exports (
- record_id,
- source,
- hot_title,
- article_title,
- item_type,
- item_text,
- point_category,
- matched_demand,
- contribution_score,
- wxindex_keyword,
- all_hot_keywords,
- wxindex_latest_score,
- wxindex_trend,
- is_as_demand,
- event_sense_score,
- senior_fit_score,
- created_at,
- updated_at
- )
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
- """
- with self.conn.cursor() as cursor:
- cursor.execute(delete_sql, (record_id,))
- insert_rows = [
- (
- record_id,
- source,
- hot_title,
- article_title,
- str(item.get("item_type") or ""),
- str(item.get("item_text") or ""),
- str(item.get("point_category") or ""),
- str(item.get("matched_demand") or ""),
- item.get("contribution_score"),
- str(item.get("wxindex_keyword") or ""),
- str(item.get("all_hot_keywords") or ""),
- float(item.get("wxindex_latest_score") or 0),
- str(item.get("wxindex_trend") or ""),
- int(item.get("is_as_demand") or 0),
- item.get("event_sense_score"),
- item.get("senior_fit_score"),
- )
- for item in rows
- if str(item.get("item_type") or "").strip()
- and str(item.get("item_text") or "").strip()
- ]
- if insert_rows:
- cursor.executemany(insert_sql, insert_rows)
- def list_odps_sync_records(self, *, partition_dt: str | None = None) -> list[dict[str, Any]]:
- """读取指定创建日已完成质量判断的记录,供 ODPS 同步(按 created_at,不按 updated_at)。"""
- self._ensure_record_quality_columns()
- if partition_dt:
- created_day_start = datetime.strptime(partition_dt, "%Y%m%d")
- else:
- created_day_start = datetime.now(SHANGHAI_TZ).replace(
- hour=0,
- minute=0,
- second=0,
- microsecond=0,
- tzinfo=None,
- )
- created_day_end = created_day_start + timedelta(days=1)
- sql = """
- SELECT
- id,
- contribution_points_json,
- contribution_demand_match_json,
- wxindex_trend_json,
- demand_event_sense_json,
- demand_senior_fit_json
- FROM hot_content_records
- WHERE created_at >= %s
- AND created_at < %s
- AND postprocess_status = %s
- AND contribution_demand_match_json IS NOT NULL
- AND TRIM(CAST(contribution_demand_match_json AS CHAR)) <> ''
- ORDER BY id ASC
- """
- with self.conn.cursor() as cursor:
- cursor.execute(
- sql,
- (created_day_start, created_day_end, PostprocessStatus.QUALITY_DONE),
- )
- rows = cursor.fetchall()
- records: list[dict[str, Any]] = []
- for row in rows:
- records.append(
- {
- "id": int(row["id"]),
- "contribution_points_json": _json_loads(
- row.get("contribution_points_json")
- ),
- "contribution_demand_match_json": _json_loads(
- row.get("contribution_demand_match_json")
- ),
- "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")),
- "demand_event_sense_json": _json_loads(
- row.get("demand_event_sense_json")
- ),
- "demand_senior_fit_json": _json_loads(
- row.get("demand_senior_fit_json")
- ),
- }
- )
- return records
- def list_demand_export_groups(self) -> list[dict[str, Any]]:
- """读取主表当天创建的 record 对应导出分组,仅供 ODPS 当天分区同步(不跨天)。"""
- self._ensure_demand_export_table()
- today_start = datetime.now(SHANGHAI_TZ).replace(
- hour=0,
- minute=0,
- second=0,
- microsecond=0,
- tzinfo=None,
- )
- today_end = today_start + timedelta(days=1)
- sql = """
- SELECT
- e.record_id,
- e.item_type,
- e.item_text,
- e.point_category,
- e.matched_demand,
- e.wxindex_latest_score
- FROM hot_content_demand_exports e
- INNER JOIN hot_content_records r ON r.id = e.record_id
- WHERE r.created_at >= %s
- AND r.created_at < %s
- ORDER BY e.record_id ASC, e.id ASC
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (today_start, today_end))
- rows = cursor.fetchall()
- grouped: dict[int, list[dict[str, Any]]] = {}
- for row in rows:
- record_id = int(row["record_id"])
- grouped.setdefault(record_id, []).append(
- {
- "item_type": str(row.get("item_type") or ""),
- "item_text": str(row.get("item_text") or ""),
- "point_category": str(row.get("point_category") or ""),
- "matched_demand": str(row.get("matched_demand") or ""),
- "wxindex_latest_score": float(row.get("wxindex_latest_score") or 0),
- }
- )
- return [
- {"record_id": record_id, "export_rows": export_rows}
- for record_id, export_rows in grouped.items()
- ]
- def _ensure_demand_export_table(self) -> None:
- sql = """
- CREATE TABLE IF NOT EXISTS hot_content_demand_exports (
- id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
- record_id BIGINT UNSIGNED NOT NULL,
- source VARCHAR(64) NOT NULL DEFAULT '',
- hot_title VARCHAR(1024) NOT NULL DEFAULT '',
- article_title VARCHAR(1024) NOT NULL DEFAULT '',
- item_type VARCHAR(32) NOT NULL COMMENT '元素/短语',
- item_text VARCHAR(1024) NOT NULL,
- point_category VARCHAR(32) NOT NULL DEFAULT '' COMMENT '点类型:灵感点/目的点/关键点',
- matched_demand VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '匹配到的需求',
- contribution_score DOUBLE NULL COMMENT '贡献分,仅元素有值',
- wxindex_keyword VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '获取微信指数的元素',
- all_hot_keywords VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '全部热点词',
- wxindex_latest_score DOUBLE NOT NULL DEFAULT 0,
- wxindex_trend VARCHAR(32) NOT NULL DEFAULT '' COMMENT '微信指数趋势',
- is_as_demand TINYINT NOT NULL DEFAULT 0 COMMENT '是否作为需求:0否 1是',
- created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
- updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
- PRIMARY KEY (id),
- KEY idx_record_id (record_id),
- KEY idx_source_type (source, item_type)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql)
- self._ensure_demand_export_column(
- cursor,
- "matched_demand",
- """
- ALTER TABLE hot_content_demand_exports
- ADD COLUMN matched_demand VARCHAR(1024) NOT NULL DEFAULT ''
- COMMENT '匹配到的需求'
- AFTER item_text
- """,
- )
- self._ensure_demand_export_column(
- cursor,
- "point_category",
- """
- ALTER TABLE hot_content_demand_exports
- ADD COLUMN point_category VARCHAR(32) NOT NULL DEFAULT ''
- COMMENT '点类型:灵感点/目的点/关键点'
- AFTER item_text
- """,
- )
- self._ensure_demand_export_column(
- cursor,
- "contribution_score",
- """
- ALTER TABLE hot_content_demand_exports
- ADD COLUMN contribution_score DOUBLE NULL
- COMMENT '贡献分,仅词有值'
- AFTER matched_demand
- """,
- )
- self._ensure_demand_export_column(
- cursor,
- "wxindex_trend",
- """
- ALTER TABLE hot_content_demand_exports
- ADD COLUMN wxindex_trend VARCHAR(32) NOT NULL DEFAULT ''
- COMMENT '微信指数趋势'
- AFTER wxindex_latest_score
- """,
- )
- self._ensure_demand_export_column(
- cursor,
- "wxindex_keyword",
- """
- ALTER TABLE hot_content_demand_exports
- ADD COLUMN wxindex_keyword VARCHAR(1024) NOT NULL DEFAULT ''
- COMMENT '获取微信指数的词'
- AFTER contribution_score
- """,
- )
- self._ensure_demand_export_column(
- cursor,
- "all_hot_keywords",
- """
- ALTER TABLE hot_content_demand_exports
- ADD COLUMN all_hot_keywords VARCHAR(1024) NOT NULL DEFAULT ''
- COMMENT '全部热点词'
- AFTER wxindex_keyword
- """,
- )
- self._ensure_demand_export_column(
- cursor,
- "is_as_demand",
- """
- ALTER TABLE hot_content_demand_exports
- ADD COLUMN is_as_demand TINYINT NOT NULL DEFAULT 0
- COMMENT '是否作为需求:0否 1是'
- AFTER wxindex_trend
- """,
- )
- self._ensure_demand_export_column(
- cursor,
- "event_sense_score",
- """
- ALTER TABLE hot_content_demand_exports
- ADD COLUMN event_sense_score DOUBLE NULL
- COMMENT '事件性得分 0-10'
- AFTER is_as_demand
- """,
- )
- self._ensure_demand_export_column(
- cursor,
- "senior_fit_score",
- """
- ALTER TABLE hot_content_demand_exports
- ADD COLUMN senior_fit_score DOUBLE NULL
- COMMENT '老年性得分 0-10'
- AFTER event_sense_score
- """,
- )
- def _ensure_record_quality_columns(self) -> None:
- with self.conn.cursor() as cursor:
- for column_name, alter_sql in (
- (
- "demand_event_sense_json",
- """
- ALTER TABLE hot_content_records
- ADD COLUMN demand_event_sense_json JSON NULL
- COMMENT '需求事件性 LLM 评分结果'
- AFTER wxindex_trend_json
- """,
- ),
- (
- "demand_senior_fit_json",
- """
- ALTER TABLE hot_content_records
- ADD COLUMN demand_senior_fit_json JSON NULL
- COMMENT '需求老年性 LLM 评分结果'
- AFTER demand_event_sense_json
- """,
- ),
- ):
- cursor.execute(
- """
- SELECT COUNT(*) AS cnt
- FROM information_schema.COLUMNS
- WHERE TABLE_SCHEMA = DATABASE()
- AND TABLE_NAME = 'hot_content_records'
- AND COLUMN_NAME = %s
- """,
- (column_name,),
- )
- if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
- cursor.execute(alter_sql)
- def _ensure_category_filter_columns(self) -> None:
- with self.conn.cursor() as cursor:
- for column_name, alter_sql in (
- (
- "category_filter_json",
- """
- ALTER TABLE hot_content_records
- ADD COLUMN category_filter_json JSON NULL
- COMMENT '老年人兴趣分类筛选 LLM 结果'
- AFTER article_url
- """,
- ),
- (
- "category_filter_passed",
- """
- ALTER TABLE hot_content_records
- ADD COLUMN category_filter_passed TINYINT NULL
- COMMENT '分类筛选是否通过:1通过 0不通过 NULL未筛选'
- AFTER category_filter_json
- """,
- ),
- (
- "category_filter_reason",
- """
- ALTER TABLE hot_content_records
- ADD COLUMN category_filter_reason TEXT NULL
- COMMENT '分类筛选原因'
- AFTER category_filter_passed
- """,
- ),
- ):
- cursor.execute(
- """
- SELECT COUNT(*) AS cnt
- FROM information_schema.COLUMNS
- WHERE TABLE_SCHEMA = DATABASE()
- AND TABLE_NAME = 'hot_content_records'
- AND COLUMN_NAME = %s
- """,
- (column_name,),
- )
- if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
- cursor.execute(alter_sql)
- def _ensure_demand_export_column(
- self,
- cursor: Any,
- column_name: str,
- alter_sql: str,
- ) -> None:
- cursor.execute(
- """
- SELECT COUNT(*) AS cnt
- FROM information_schema.COLUMNS
- WHERE TABLE_SCHEMA = DATABASE()
- AND TABLE_NAME = 'hot_content_demand_exports'
- AND COLUMN_NAME = %s
- """,
- (column_name,),
- )
- if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
- cursor.execute(alter_sql)
- def list_synced_odps_demand_ids(
- self,
- *,
- partition_dt: str,
- ) -> set[str]:
- """返回 hot_content_odps_sync_log 中指定分区日已有的 demand_id(跨流程去重用)。"""
- normalized_partition_dt = str(partition_dt or "").strip()
- if not normalized_partition_dt:
- return set()
- self._ensure_odps_sync_log_table()
- sql = """
- SELECT demand_id
- FROM hot_content_odps_sync_log
- WHERE partition_dt = %s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (normalized_partition_dt,))
- rows = cursor.fetchall()
- return {
- str(row.get("demand_id") or "").strip()
- for row in rows
- if str(row.get("demand_id") or "").strip()
- }
- def count_odps_sync_log_rows(self, *, partition_dt: str) -> int:
- """统计 hot_content_odps_sync_log 指定分区日已写入条数(供日限额计算)。"""
- normalized_partition_dt = str(partition_dt or "").strip()
- if not normalized_partition_dt:
- return 0
- self._ensure_odps_sync_log_table()
- sql = """
- SELECT COUNT(*) AS cnt
- FROM hot_content_odps_sync_log
- WHERE partition_dt = %s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (normalized_partition_dt,))
- row = cursor.fetchone() or {}
- return int(row.get("cnt") or 0)
- def save_odps_sync_logs(self, rows: list[dict[str, Any]]) -> int:
- if not rows:
- return 0
- self._ensure_odps_sync_log_table()
- sql = """
- INSERT INTO hot_content_odps_sync_log (
- partition_dt,
- strategy,
- demand_id,
- demand_name,
- demand_type,
- record_id,
- weight,
- extend
- )
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
- ON DUPLICATE KEY UPDATE
- demand_name = VALUES(demand_name),
- demand_type = VALUES(demand_type),
- record_id = VALUES(record_id),
- weight = VALUES(weight),
- extend = VALUES(extend),
- synced_at = CURRENT_TIMESTAMP
- """
- insert_rows = [
- (
- str(item.get("partition_dt") or ""),
- str(item.get("strategy") or ""),
- str(item.get("demand_id") or ""),
- str(item.get("demand_name") or ""),
- str(item.get("demand_type") or ""),
- int(item.get("record_id") or 0),
- float(item["weight"]) if item.get("weight") is not None else None,
- str(item.get("extend") or "{}"),
- )
- for item in rows
- if str(item.get("demand_id") or "").strip()
- ]
- with self.conn.cursor() as cursor:
- cursor.executemany(sql, insert_rows)
- return len(insert_rows)
- def _ensure_odps_sync_log_weight_column(self, cursor: Any) -> None:
- cursor.execute(
- """
- SELECT COUNT(*) AS cnt
- FROM information_schema.COLUMNS
- WHERE TABLE_SCHEMA = DATABASE()
- AND TABLE_NAME = 'hot_content_odps_sync_log'
- AND COLUMN_NAME = 'weight'
- """,
- )
- if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
- cursor.execute(
- """
- ALTER TABLE hot_content_odps_sync_log
- ADD COLUMN weight DOUBLE NULL DEFAULT NULL
- COMMENT 'ODPS 需求权重(记录 wxindex 最高分 / 1000000)'
- AFTER record_id
- """
- )
- def _ensure_odps_sync_log_extend_column(self, cursor: Any) -> None:
- cursor.execute(
- """
- SELECT COUNT(*) AS cnt
- FROM information_schema.COLUMNS
- WHERE TABLE_SCHEMA = DATABASE()
- AND TABLE_NAME = 'hot_content_odps_sync_log'
- AND COLUMN_NAME = 'extend'
- """,
- )
- if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
- cursor.execute(
- """
- ALTER TABLE hot_content_odps_sync_log
- ADD COLUMN extend TEXT NULL DEFAULT NULL
- COMMENT 'ODPS extend 扩展字段 JSON'
- AFTER weight
- """
- )
- def list_wxindex_word_scores(self, name: str) -> list[dict[str, Any]]:
- word = str(name or "").strip()
- if not word:
- return []
- self._ensure_wxindex_words_table()
- sql = """
- SELECT dt, total_score
- FROM hot_content_wxindex_words
- WHERE name = %s
- ORDER BY dt ASC
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (word,))
- rows = cursor.fetchall()
- scores: list[dict[str, Any]] = []
- for row in rows:
- dt = str(row.get("dt") or "").strip()
- if not dt:
- continue
- try:
- total_score = float(row["total_score"])
- except (TypeError, ValueError, KeyError):
- continue
- scores.append({"ymd": dt, "total_score": total_score})
- return scores
- def list_wxindex_word_scores_in_range(
- self,
- name: str,
- *,
- start_ymd: str,
- end_ymd: str,
- ) -> list[dict[str, Any]]:
- word = str(name or "").strip()
- start = str(start_ymd or "").strip()
- end = str(end_ymd or "").strip()
- if not word or not start or not end:
- return []
- self._ensure_wxindex_words_table()
- sql = """
- SELECT dt, total_score
- FROM hot_content_wxindex_words
- WHERE name = %s
- AND dt >= %s
- AND dt <= %s
- ORDER BY dt ASC
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (word, start, end))
- rows = cursor.fetchall()
- scores: list[dict[str, Any]] = []
- for row in rows:
- dt = str(row.get("dt") or "").strip()
- if not dt:
- continue
- try:
- total_score = float(row["total_score"])
- except (TypeError, ValueError, KeyError):
- continue
- scores.append({"ymd": dt, "total_score": total_score})
- return scores
- def list_active_wxindex_word_meta(
- self,
- *,
- today: date | None = None,
- ) -> list[dict[str, Any]]:
- """返回当天仍在抓取窗口内的 meta(today < fetch_end_ymd)。"""
- current = today or datetime.now(SHANGHAI_TZ).date()
- today_ymd = current.strftime("%Y%m%d")
- self._ensure_wxindex_word_meta_table()
- sql = """
- SELECT id, name, event_created_at, fetch_start_ymd, fetch_end_ymd
- FROM hot_content_wxindex_word_meta
- WHERE fetch_end_ymd > %s
- ORDER BY id ASC
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (today_ymd,))
- rows = cursor.fetchall()
- result: list[dict[str, Any]] = []
- for row in rows:
- meta = self._normalize_wxindex_word_meta_row(row)
- if meta is not None:
- result.append(meta)
- return result
- def save_wxindex_word_record(self, payload: dict[str, Any]) -> int:
- row = _wxindex_word_record_row(payload)
- name = str(row[0])
- analyze_ymd = str(row[2])
- self._ensure_wxindex_word_records_table()
- with self.conn.cursor() as cursor:
- cursor.execute(_WXINDEX_WORD_RECORD_UPSERT_SQL, row)
- cursor.execute(
- """
- SELECT id
- FROM hot_content_wxindex_word_records
- WHERE name = %s
- AND analyze_ymd = %s
- """,
- (name, analyze_ymd),
- )
- record_row = cursor.fetchone() or {}
- return int(record_row.get("id") or 0)
- def init_wxindex_word_records(
- self,
- payloads: list[dict[str, Any]],
- ) -> dict[str, int]:
- """批量 init 追溯记录:一次 executemany,重复跑只更新 meta/抓取窗口。"""
- if not payloads:
- return {}
- rows: list[tuple[Any, ...]] = []
- analyze_ymd = ""
- names: list[str] = []
- for payload in payloads:
- row = _wxindex_word_record_row(payload)
- rows.append(row)
- analyze_ymd = str(row[2])
- names.append(str(row[0]))
- self._ensure_wxindex_word_records_table()
- with self.conn.cursor() as cursor:
- cursor.executemany(_WXINDEX_WORD_RECORD_INIT_UPSERT_SQL, rows)
- return self.map_wxindex_word_record_ids(analyze_ymd=analyze_ymd, names=names)
- def map_wxindex_word_record_ids(
- self,
- *,
- analyze_ymd: str,
- names: list[str],
- ) -> dict[str, int]:
- normalized_analyze_ymd = str(analyze_ymd or "").strip()
- normalized_names = [
- str(name or "").strip() for name in names if str(name or "").strip()
- ]
- if not normalized_analyze_ymd or not normalized_names:
- return {}
- placeholders = ", ".join(["%s"] * len(normalized_names))
- sql = f"""
- SELECT id, name
- FROM hot_content_wxindex_word_records
- WHERE analyze_ymd = %s
- AND name IN ({placeholders})
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, [normalized_analyze_ymd, *normalized_names])
- rows = cursor.fetchall()
- return {
- str(row.get("name") or "").strip(): int(row.get("id") or 0)
- for row in rows
- if str(row.get("name") or "").strip()
- }
- def list_wxindex_word_records_by_analyze_ymd(
- self,
- *,
- analyze_ymd: str,
- names: list[str] | None = None,
- ) -> dict[str, dict[str, Any]]:
- """按 analyze_ymd 批量读取追溯记录,供同日重跑跳过已完成阶段。"""
- normalized_analyze_ymd = str(analyze_ymd or "").strip()
- if not normalized_analyze_ymd:
- return {}
- normalized_names = [
- str(name or "").strip() for name in (names or []) if str(name or "").strip()
- ]
- self._ensure_wxindex_word_records_table()
- params: list[Any] = [normalized_analyze_ymd]
- name_filter = ""
- if normalized_names:
- placeholders = ", ".join(["%s"] * len(normalized_names))
- name_filter = f" AND name IN ({placeholders})"
- params.extend(normalized_names)
- sql = f"""
- SELECT
- id,
- name,
- meta_id,
- analyze_ymd,
- fetch_start_ymd,
- fetch_end_ymd,
- data_start_ymd,
- data_end_ymd,
- data_days,
- is_sustained_high,
- is_rising,
- is_spike,
- retain_reason,
- is_internal_demand_matched,
- matched_demand,
- demand_cache_run_id,
- internal_demand_match_json,
- senior_fit_score,
- demand_senior_fit_json,
- is_final_retained,
- min_score,
- max_score,
- avg_score,
- detail_json
- FROM hot_content_wxindex_word_records
- WHERE analyze_ymd = %s{name_filter}
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, params)
- rows = cursor.fetchall()
- result: dict[str, dict[str, Any]] = {}
- for row in rows or []:
- name = str(row.get("name") or "").strip()
- if not name:
- continue
- record = dict(row)
- record["internal_demand_match_json"] = _json_loads(
- record.get("internal_demand_match_json")
- )
- record["demand_senior_fit_json"] = _json_loads(
- record.get("demand_senior_fit_json")
- )
- record["detail_json"] = _json_loads(record.get("detail_json"))
- result[name] = record
- return result
- def list_wxindex_word_stats_names(
- self,
- *,
- analyze_ymd: str,
- names: list[str] | None = None,
- ) -> set[str]:
- """返回指定 analyze_ymd 下已写入 stats 表的词名集合。"""
- normalized_analyze_ymd = str(analyze_ymd or "").strip()
- if not normalized_analyze_ymd:
- return set()
- normalized_names = [
- str(name or "").strip() for name in (names or []) if str(name or "").strip()
- ]
- self._ensure_wxindex_word_stats_table()
- params: list[Any] = [normalized_analyze_ymd]
- name_filter = ""
- if normalized_names:
- placeholders = ", ".join(["%s"] * len(normalized_names))
- name_filter = f" AND name IN ({placeholders})"
- params.extend(normalized_names)
- sql = f"""
- SELECT name
- FROM hot_content_wxindex_word_stats
- WHERE analyze_ymd = %s{name_filter}
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, params)
- rows = cursor.fetchall()
- return {
- str(row.get("name") or "").strip()
- for row in rows or []
- if str(row.get("name") or "").strip()
- }
- def save_wxindex_word_stats_batch(
- self,
- payloads: list[dict[str, Any]],
- ) -> int:
- """批量写入通过热度+老年性筛选的词统计。"""
- if not payloads:
- return 0
- rows = [_wxindex_word_stats_row(payload) for payload in payloads]
- self._ensure_wxindex_word_stats_table()
- with self.conn.cursor() as cursor:
- cursor.executemany(_WXINDEX_WORD_STATS_UPSERT_SQL, rows)
- return len(rows)
- def _ensure_wxindex_word_stats_table(self) -> None:
- sql = """
- CREATE TABLE IF NOT EXISTS hot_content_wxindex_word_stats (
- id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键',
- name VARCHAR(256) NOT NULL COMMENT '词',
- meta_id BIGINT UNSIGNED NULL COMMENT '关联 meta.id',
- analyze_ymd VARCHAR(8) NOT NULL COMMENT '分析日期 yyyymmdd',
- wxindex_word_record_id BIGINT UNSIGNED NULL COMMENT '关联 records.id',
- retain_reason VARCHAR(64) NULL COMMENT '热度保留原因',
- senior_fit_score DOUBLE NULL COMMENT '老年性得分 0-10',
- data_start_ymd VARCHAR(8) NULL COMMENT '分析数据起始日',
- data_end_ymd VARCHAR(8) NULL COMMENT '分析数据结束日',
- data_days INT NULL COMMENT '分析天数',
- min_score DOUBLE NULL COMMENT '区间最低热度',
- max_score DOUBLE NULL COMMENT '区间最高热度',
- avg_score DOUBLE NULL COMMENT '区间平均热度',
- detail_json JSON NULL COMMENT '扩展详情',
- created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
- updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
- ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
- PRIMARY KEY (id),
- UNIQUE KEY uk_name_analyze_ymd (name, analyze_ymd),
- KEY idx_analyze_ymd (analyze_ymd),
- KEY idx_retain_reason (retain_reason)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql)
- def _ensure_wxindex_word_records_table(self) -> None:
- sql = """
- CREATE TABLE IF NOT EXISTS hot_content_wxindex_word_records (
- id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键',
- name VARCHAR(256) NOT NULL COMMENT '词',
- meta_id BIGINT UNSIGNED NULL COMMENT '关联 meta.id',
- analyze_ymd VARCHAR(8) NOT NULL COMMENT '分析日期 yyyymmdd',
- fetch_start_ymd VARCHAR(8) NULL COMMENT 'meta 抓取起始日',
- fetch_end_ymd VARCHAR(8) NULL COMMENT 'meta 抓取结束日',
- data_start_ymd VARCHAR(8) NULL COMMENT '实际分析数据起始日',
- data_end_ymd VARCHAR(8) NULL COMMENT '实际分析数据结束日',
- data_days INT NULL COMMENT '实际分析天数',
- is_sustained_high TINYINT(1) NULL COMMENT '持续热度>1000万',
- is_rising TINYINT(1) NULL COMMENT '热度持续上涨',
- is_spike TINYINT(1) NULL COMMENT '最近3天突然暴涨',
- retain_reason VARCHAR(64) NULL COMMENT '保留原因(按2->3->1优先级)',
- is_internal_demand_matched TINYINT(1) NULL COMMENT '是否匹配票圈内部需求',
- matched_demand VARCHAR(1024) NULL COMMENT '匹配到的内部需求',
- demand_cache_run_id BIGINT UNSIGNED NULL COMMENT '需求池缓存ID',
- internal_demand_match_json JSON NULL COMMENT '内部需求匹配详情',
- senior_fit_score DOUBLE NULL COMMENT '老年性得分 0-10',
- demand_senior_fit_json JSON NULL COMMENT '老年性 LLM 评分结果',
- is_final_retained TINYINT(1) NULL COMMENT '老年性达标且最终保留',
- min_score DOUBLE NULL COMMENT '区间最低热度',
- max_score DOUBLE NULL COMMENT '区间最高热度',
- avg_score DOUBLE NULL COMMENT '区间平均热度',
- detail_json JSON NULL COMMENT '分析详情',
- created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
- updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
- ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
- PRIMARY KEY (id),
- UNIQUE KEY uk_name_analyze_ymd (name, analyze_ymd),
- KEY idx_analyze_ymd (analyze_ymd),
- KEY idx_patterns (is_sustained_high, is_rising, is_spike),
- KEY idx_retain_reason (retain_reason)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql)
- self._ensure_wxindex_word_records_retain_columns(cursor)
- self._ensure_wxindex_word_records_senior_columns(cursor)
- self._ensure_wxindex_word_records_nullable_columns(cursor)
- def _ensure_wxindex_word_records_nullable_columns(self, cursor: Any) -> None:
- nullable_specs = {
- "fetch_start_ymd": (
- "MODIFY COLUMN fetch_start_ymd VARCHAR(8) NULL "
- "COMMENT 'meta 抓取起始日'"
- ),
- "fetch_end_ymd": (
- "MODIFY COLUMN fetch_end_ymd VARCHAR(8) NULL "
- "COMMENT 'meta 抓取结束日'"
- ),
- "data_start_ymd": (
- "MODIFY COLUMN data_start_ymd VARCHAR(8) NULL "
- "COMMENT '实际分析数据起始日'"
- ),
- "data_end_ymd": (
- "MODIFY COLUMN data_end_ymd VARCHAR(8) NULL "
- "COMMENT '实际分析数据结束日'"
- ),
- "data_days": (
- "MODIFY COLUMN data_days INT NULL COMMENT '实际分析天数'"
- ),
- "is_sustained_high": (
- "MODIFY COLUMN is_sustained_high TINYINT(1) NULL "
- "COMMENT '持续热度>1000万'"
- ),
- "is_rising": (
- "MODIFY COLUMN is_rising TINYINT(1) NULL "
- "COMMENT '热度持续上涨'"
- ),
- "is_spike": (
- "MODIFY COLUMN is_spike TINYINT(1) NULL "
- "COMMENT '最近3天突然暴涨'"
- ),
- "matched_demand": (
- "MODIFY COLUMN matched_demand VARCHAR(1024) NULL "
- "COMMENT '匹配到的内部需求'"
- ),
- "is_final_retained": (
- "MODIFY COLUMN is_final_retained TINYINT(1) NULL "
- "COMMENT '老年性达标且最终保留'"
- ),
- }
- column_names = list(nullable_specs.keys())
- placeholders = ", ".join(["%s"] * len(column_names))
- cursor.execute(
- f"""
- SELECT COLUMN_NAME, IS_NULLABLE
- FROM information_schema.COLUMNS
- WHERE TABLE_SCHEMA = DATABASE()
- AND TABLE_NAME = 'hot_content_wxindex_word_records'
- AND COLUMN_NAME IN ({placeholders})
- """,
- column_names,
- )
- nullable_map = {
- str(row.get("COLUMN_NAME") or ""): str(row.get("IS_NULLABLE") or "").upper()
- for row in (cursor.fetchall() or [])
- }
- alters = [
- sql
- for column_name, sql in nullable_specs.items()
- if nullable_map.get(column_name) == "NO"
- ]
- if alters:
- cursor.execute(
- "ALTER TABLE hot_content_wxindex_word_records " + ", ".join(alters)
- )
- def _ensure_wxindex_word_records_senior_columns(self, cursor: Any) -> None:
- columns = {
- "senior_fit_score": (
- "ALTER TABLE hot_content_wxindex_word_records "
- "ADD COLUMN senior_fit_score DOUBLE NULL "
- "COMMENT '老年性得分 0-10' "
- "AFTER internal_demand_match_json"
- ),
- "demand_senior_fit_json": (
- "ALTER TABLE hot_content_wxindex_word_records "
- "ADD COLUMN demand_senior_fit_json JSON NULL "
- "COMMENT '老年性 LLM 评分结果' "
- "AFTER senior_fit_score"
- ),
- "is_final_retained": (
- "ALTER TABLE hot_content_wxindex_word_records "
- "ADD COLUMN is_final_retained TINYINT(1) NULL "
- "COMMENT '老年性达标且最终保留' "
- "AFTER demand_senior_fit_json"
- ),
- }
- for column_name, alter_sql in columns.items():
- cursor.execute(
- """
- SELECT COUNT(*) AS cnt
- FROM information_schema.COLUMNS
- WHERE TABLE_SCHEMA = DATABASE()
- AND TABLE_NAME = 'hot_content_wxindex_word_records'
- AND COLUMN_NAME = %s
- """,
- (column_name,),
- )
- if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
- cursor.execute(alter_sql)
- def _ensure_wxindex_word_records_retain_columns(self, cursor: Any) -> None:
- columns = {
- "retain_reason": (
- "ALTER TABLE hot_content_wxindex_word_records "
- "ADD COLUMN retain_reason VARCHAR(64) NULL "
- "COMMENT '保留原因(按2->3->1优先级)' "
- "AFTER is_spike"
- ),
- "is_internal_demand_matched": (
- "ALTER TABLE hot_content_wxindex_word_records "
- "ADD COLUMN is_internal_demand_matched TINYINT(1) NULL "
- "COMMENT '是否匹配票圈内部需求' "
- "AFTER retain_reason"
- ),
- "matched_demand": (
- "ALTER TABLE hot_content_wxindex_word_records "
- "ADD COLUMN matched_demand VARCHAR(1024) NULL "
- "COMMENT '匹配到的内部需求' "
- "AFTER is_internal_demand_matched"
- ),
- "demand_cache_run_id": (
- "ALTER TABLE hot_content_wxindex_word_records "
- "ADD COLUMN demand_cache_run_id BIGINT UNSIGNED NULL "
- "COMMENT '需求池缓存ID' "
- "AFTER matched_demand"
- ),
- "internal_demand_match_json": (
- "ALTER TABLE hot_content_wxindex_word_records "
- "ADD COLUMN internal_demand_match_json JSON NULL "
- "COMMENT '内部需求匹配详情' "
- "AFTER demand_cache_run_id"
- ),
- }
- for column_name, alter_sql in columns.items():
- cursor.execute(
- """
- SELECT COUNT(*) AS cnt
- FROM information_schema.COLUMNS
- WHERE TABLE_SCHEMA = DATABASE()
- AND TABLE_NAME = 'hot_content_wxindex_word_records'
- AND COLUMN_NAME = %s
- """,
- (column_name,),
- )
- if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
- cursor.execute(alter_sql)
- def list_stale_wxindex_words(
- self,
- *,
- end_ymd: str,
- update_window_days: int = 7,
- today: date | None = None,
- ) -> list[dict[str, Any]]:
- """返回更新窗口内、仍缺近 7 日区间数据的词。"""
- target_end = str(end_ymd or "").strip()
- if not target_end:
- return []
- current = today or datetime.now(SHANGHAI_TZ).date()
- active_since = current - timedelta(days=max(update_window_days, 0))
- self._ensure_wxindex_word_meta_table()
- self._ensure_wxindex_words_table()
- sql = """
- SELECT
- m.name,
- m.event_created_at,
- m.fetch_start_ymd,
- MIN(w.dt) AS earliest_dt,
- MAX(w.dt) AS latest_dt
- FROM hot_content_wxindex_word_meta m
- INNER JOIN hot_content_wxindex_words w ON w.name = m.name
- WHERE DATE(m.event_created_at) >= %s
- GROUP BY m.name, m.event_created_at, m.fetch_start_ymd
- HAVING MAX(w.dt) < %s OR MIN(w.dt) > m.fetch_start_ymd
- ORDER BY m.name ASC
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (active_since, target_end))
- rows = cursor.fetchall()
- stale_words: list[dict[str, Any]] = []
- for row in rows:
- name = str(row.get("name") or "").strip()
- fetch_start_ymd = str(row.get("fetch_start_ymd") or "").strip()
- earliest_dt = str(row.get("earliest_dt") or "").strip()
- latest_dt = str(row.get("latest_dt") or "").strip()
- event_created_at = row.get("event_created_at")
- if name and fetch_start_ymd and earliest_dt and latest_dt and event_created_at:
- stale_words.append(
- {
- "name": name,
- "event_created_at": event_created_at,
- "fetch_start_ymd": fetch_start_ymd,
- "earliest_dt": earliest_dt,
- "latest_dt": latest_dt,
- }
- )
- return stale_words
- def list_word_earliest_event_times(
- self,
- *,
- since_dt: datetime,
- ) -> dict[str, datetime]:
- """从 wxindex_trend_json 汇总近期间每个检索词的最早事件时间。"""
- self._ensure_record_quality_columns()
- sql = """
- SELECT
- word_name,
- MIN(event_created_at) AS event_created_at
- FROM (
- SELECT
- TRIM(searches.keyword) AS word_name,
- r.created_at AS event_created_at
- FROM hot_content_records r
- JOIN JSON_TABLE(
- r.wxindex_trend_json,
- '$.wxindex_searches[*]' COLUMNS (
- keyword VARCHAR(256) PATH '$.keyword'
- )
- ) AS searches
- WHERE r.created_at >= %s
- AND r.wxindex_trend_json IS NOT NULL
- AND TRIM(searches.keyword) <> ''
- UNION ALL
- SELECT
- TRIM(JSON_UNQUOTE(JSON_EXTRACT(r.wxindex_trend_json, '$.llm_selected_word'))) AS word_name,
- r.created_at AS event_created_at
- FROM hot_content_records r
- WHERE r.created_at >= %s
- AND r.wxindex_trend_json IS NOT NULL
- AND TRIM(JSON_UNQUOTE(JSON_EXTRACT(r.wxindex_trend_json, '$.llm_selected_word'))) <> ''
- ) AS word_events
- WHERE word_name IS NOT NULL
- AND word_name <> ''
- GROUP BY word_name
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (since_dt, since_dt))
- rows = cursor.fetchall()
- event_map: dict[str, datetime] = {}
- for row in rows:
- name = str(row.get("word_name") or "").strip()
- event_created_at = row.get("event_created_at")
- if name and isinstance(event_created_at, datetime):
- event_map[name] = event_created_at
- return event_map
- def list_wxindex_word_bounds_without_meta(self) -> list[dict[str, Any]]:
- self._ensure_wxindex_word_meta_table()
- self._ensure_wxindex_words_table()
- sql = """
- SELECT
- w.name,
- MIN(w.dt) AS earliest_dt,
- MIN(w.created_at) AS first_created_at
- FROM hot_content_wxindex_words w
- LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name
- WHERE m.name IS NULL
- GROUP BY w.name
- ORDER BY w.name ASC
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql)
- rows = cursor.fetchall()
- bounds: list[dict[str, Any]] = []
- for row in rows:
- name = str(row.get("name") or "").strip()
- earliest_dt = str(row.get("earliest_dt") or "").strip()
- first_created_at = row.get("first_created_at")
- if name and earliest_dt:
- bounds.append(
- {
- "name": name,
- "earliest_dt": earliest_dt,
- "first_created_at": first_created_at,
- }
- )
- return bounds
- def list_wxindex_word_names_without_meta(self) -> list[str]:
- self._ensure_wxindex_word_meta_table()
- self._ensure_wxindex_words_table()
- sql = """
- SELECT DISTINCT w.name
- FROM hot_content_wxindex_words w
- LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name
- WHERE m.name IS NULL
- ORDER BY w.name ASC
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql)
- rows = cursor.fetchall()
- return [
- str(row.get("name") or "").strip()
- for row in rows
- if str(row.get("name") or "").strip()
- ]
- def get_wxindex_word_first_row_created_at(self, name: str) -> datetime | None:
- word = str(name or "").strip()
- if not word:
- return None
- self._ensure_wxindex_words_table()
- sql = """
- SELECT MIN(created_at) AS first_created_at
- FROM hot_content_wxindex_words
- WHERE name = %s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (word,))
- row = cursor.fetchone() or {}
- first_created_at = row.get("first_created_at")
- return first_created_at if isinstance(first_created_at, datetime) else None
- def list_all_wxindex_word_meta(self) -> list[dict[str, Any]]:
- self._ensure_wxindex_word_meta_table()
- sql = """
- SELECT id, name, event_created_at, fetch_start_ymd, fetch_end_ymd
- FROM hot_content_wxindex_word_meta
- ORDER BY id ASC
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql)
- rows = cursor.fetchall()
- result: list[dict[str, Any]] = []
- for row in rows:
- meta = self._normalize_wxindex_word_meta_row(row)
- if meta is not None:
- result.append(meta)
- return result
- def update_wxindex_word_meta_fetch_start(
- self,
- *,
- name: str,
- fetch_start_ymd: str,
- ) -> None:
- word = str(name or "").strip()
- target_start = str(fetch_start_ymd or "").strip()
- if not word or not target_start:
- raise HotContentFlowError("invalid wxindex word meta fetch_start_ymd payload")
- self._ensure_wxindex_word_meta_table()
- sql = """
- UPDATE hot_content_wxindex_word_meta
- SET fetch_start_ymd = %s
- WHERE name = %s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (target_start, word))
- def update_wxindex_word_meta(
- self,
- *,
- name: str,
- event_created_at: datetime,
- fetch_start_ymd: str,
- fetch_end_ymd: str,
- ) -> None:
- word = str(name or "").strip()
- target_start = str(fetch_start_ymd or "").strip()
- target_end = str(fetch_end_ymd or "").strip()
- if not word or not target_start or not target_end:
- raise HotContentFlowError("invalid wxindex word meta payload")
- self._ensure_wxindex_word_meta_table()
- event_at = event_created_at
- if event_at.tzinfo is not None:
- event_at = event_at.astimezone(SHANGHAI_TZ).replace(tzinfo=None)
- sql = """
- UPDATE hot_content_wxindex_word_meta
- SET event_created_at = %s,
- fetch_start_ymd = %s,
- fetch_end_ymd = %s
- WHERE name = %s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (event_at, target_start, target_end, word))
- def get_wxindex_word_meta(self, name: str) -> dict[str, Any] | None:
- word = str(name or "").strip()
- if not word:
- return None
- self._ensure_wxindex_word_meta_table()
- sql = """
- SELECT id, name, event_created_at, fetch_start_ymd, fetch_end_ymd
- FROM hot_content_wxindex_word_meta
- WHERE name = %s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (word,))
- row = cursor.fetchone()
- if not row:
- return None
- return self._normalize_wxindex_word_meta_row(row)
- def ensure_wxindex_word_meta(
- self,
- *,
- name: str,
- event_created_at: datetime,
- fetch_start_ymd: str,
- fetch_end_ymd: str,
- ) -> dict[str, Any]:
- word = str(name or "").strip()
- target_start = str(fetch_start_ymd or "").strip()
- target_end = str(fetch_end_ymd or "").strip()
- if not word or not target_start or not target_end:
- raise HotContentFlowError("invalid wxindex word meta payload")
- self._ensure_wxindex_word_meta_table()
- event_at = event_created_at
- if event_at.tzinfo is not None:
- event_at = event_at.astimezone(SHANGHAI_TZ).replace(tzinfo=None)
- sql = """
- INSERT INTO hot_content_wxindex_word_meta (
- name,
- event_created_at,
- fetch_start_ymd,
- fetch_end_ymd
- )
- VALUES (%s, %s, %s, %s)
- ON DUPLICATE KEY UPDATE
- event_created_at = VALUES(event_created_at),
- fetch_start_ymd = VALUES(fetch_start_ymd),
- fetch_end_ymd = VALUES(fetch_end_ymd)
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (word, event_at, target_start, target_end))
- meta = self.get_wxindex_word_meta(word)
- if meta is None:
- raise HotContentFlowError(f"failed to persist wxindex word meta: {word}")
- return meta
- def delete_wxindex_word_meta_by_names(self, names: list[str]) -> int:
- words = [str(name or "").strip() for name in names]
- words = [name for name in words if name]
- if not words:
- return 0
- self._ensure_wxindex_word_meta_table()
- placeholders = ", ".join(["%s"] * len(words))
- sql = f"DELETE FROM hot_content_wxindex_word_meta WHERE name IN ({placeholders})"
- with self.conn.cursor() as cursor:
- cursor.execute(sql, tuple(words))
- return int(cursor.rowcount or 0)
- def list_low_max_wxindex_words(
- self,
- *,
- min_max_score: float,
- ) -> list[dict[str, Any]]:
- """按 name 聚合,返回最大值低于阈值的词。"""
- self._ensure_wxindex_words_table()
- sql = """
- SELECT
- name,
- MAX(total_score) AS max_score,
- COUNT(*) AS row_count
- FROM hot_content_wxindex_words
- GROUP BY name
- HAVING MAX(total_score) < %s
- ORDER BY name ASC
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (min_max_score,))
- rows = cursor.fetchall()
- low_words: list[dict[str, Any]] = []
- for row in rows:
- name = str(row.get("name") or "").strip()
- if not name:
- continue
- try:
- max_score = float(row["max_score"])
- row_count = int(row["row_count"])
- except (TypeError, ValueError, KeyError):
- continue
- low_words.append(
- {
- "name": name,
- "max_score": max_score,
- "row_count": row_count,
- }
- )
- return low_words
- def count_wxindex_words_outside_event_window(
- self,
- *,
- window_days: int = 7,
- ) -> int:
- self._ensure_wxindex_word_meta_table()
- self._ensure_wxindex_words_table()
- sql = """
- SELECT COUNT(*) AS row_count
- FROM hot_content_wxindex_words w
- INNER JOIN hot_content_wxindex_word_meta m ON m.name = w.name
- WHERE w.dt < m.fetch_start_ymd
- OR w.dt > m.fetch_end_ymd
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql)
- row = cursor.fetchone() or {}
- return int(row.get("row_count") or 0)
- def list_wxindex_words_outside_event_window_samples(
- self,
- *,
- window_days: int = 7,
- limit: int = 20,
- ) -> list[dict[str, Any]]:
- self._ensure_wxindex_word_meta_table()
- self._ensure_wxindex_words_table()
- sql = """
- SELECT
- w.name,
- w.dt,
- m.event_created_at,
- m.fetch_start_ymd AS start_ymd,
- m.fetch_end_ymd AS end_ymd
- FROM hot_content_wxindex_words w
- INNER JOIN hot_content_wxindex_word_meta m ON m.name = w.name
- WHERE w.dt < m.fetch_start_ymd
- OR w.dt > m.fetch_end_ymd
- ORDER BY w.name ASC, w.dt ASC
- LIMIT %s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (limit,))
- rows = cursor.fetchall()
- samples: list[dict[str, Any]] = []
- for row in rows:
- name = str(row.get("name") or "").strip()
- dt = str(row.get("dt") or "").strip()
- if name and dt:
- samples.append(
- {
- "name": name,
- "dt": dt,
- "event_created_at": row.get("event_created_at"),
- "start_ymd": str(row.get("start_ymd") or "").strip(),
- "end_ymd": str(row.get("end_ymd") or "").strip(),
- }
- )
- return samples
- def delete_wxindex_words_outside_event_window(
- self,
- *,
- window_days: int = 7,
- ) -> int:
- self._ensure_wxindex_word_meta_table()
- self._ensure_wxindex_words_table()
- sql = """
- DELETE w
- FROM hot_content_wxindex_words w
- INNER JOIN hot_content_wxindex_word_meta m ON m.name = w.name
- WHERE w.dt < m.fetch_start_ymd
- OR w.dt > m.fetch_end_ymd
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql)
- return int(cursor.rowcount or 0)
- def delete_wxindex_words_without_meta(self) -> int:
- self._ensure_wxindex_word_meta_table()
- self._ensure_wxindex_words_table()
- sql = """
- DELETE w
- FROM hot_content_wxindex_words w
- LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name
- WHERE m.name IS NULL
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql)
- return int(cursor.rowcount or 0)
- def count_wxindex_words_without_meta(self) -> int:
- self._ensure_wxindex_word_meta_table()
- self._ensure_wxindex_words_table()
- sql = """
- SELECT COUNT(*) AS row_count
- FROM hot_content_wxindex_words w
- LEFT JOIN hot_content_wxindex_word_meta m ON m.name = w.name
- WHERE m.name IS NULL
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql)
- row = cursor.fetchone() or {}
- return int(row.get("row_count") or 0)
- def delete_wxindex_words_by_names(self, names: list[str]) -> int:
- cleaned = [str(name or "").strip() for name in names if str(name or "").strip()]
- if not cleaned:
- return 0
- self._ensure_wxindex_words_table()
- placeholders = ", ".join(["%s"] * len(cleaned))
- sql = f"""
- DELETE FROM hot_content_wxindex_words
- WHERE name IN ({placeholders})
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, tuple(cleaned))
- return int(cursor.rowcount or 0)
- def list_wxindex_word_names_with_dt(
- self,
- names: list[str],
- *,
- dt: str,
- ) -> set[str]:
- """返回在 hot_content_wxindex_words 中存在指定日期数据的词名集合。"""
- target_dt = str(dt or "").strip()
- normalized_names = [
- str(name or "").strip() for name in names if str(name or "").strip()
- ]
- if not target_dt or not normalized_names:
- return set()
- self._ensure_wxindex_words_table()
- placeholders = ", ".join(["%s"] * len(normalized_names))
- sql = f"""
- SELECT DISTINCT name
- FROM hot_content_wxindex_words
- WHERE dt = %s
- AND name IN ({placeholders})
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, [target_dt, *normalized_names])
- rows = cursor.fetchall()
- return {
- str(row.get("name") or "").strip()
- for row in rows or []
- if str(row.get("name") or "").strip()
- }
- def has_wxindex_word(self, name: str) -> bool:
- return self.get_wxindex_word_latest_dt(name) is not None
- def get_wxindex_word_latest_dt(self, name: str) -> str | None:
- word = str(name or "").strip()
- if not word:
- return None
- self._ensure_wxindex_words_table()
- sql = """
- SELECT MAX(dt) AS latest_dt
- FROM hot_content_wxindex_words
- WHERE name = %s
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (word,))
- row = cursor.fetchone() or {}
- latest_dt = str(row.get("latest_dt") or "").strip()
- return latest_dt or None
- def save_wxindex_daily_scores(
- self,
- *,
- name: str,
- scores: list[dict[str, Any]],
- ) -> tuple[int, int]:
- """按词+日期写入每日指数,重复行跳过。返回 (inserted, skipped)。"""
- word = str(name or "").strip()
- if not word or not scores:
- return 0, 0
- self._ensure_wxindex_words_table()
- sql = """
- INSERT IGNORE INTO hot_content_wxindex_words (
- name,
- dt,
- total_score
- )
- VALUES (%s, %s, %s)
- """
- rows: list[tuple[str, str, float]] = []
- seen: set[tuple[str, str]] = set()
- for item in scores:
- if not isinstance(item, dict):
- continue
- dt = str(item.get("ymd") or item.get("dt") or "").strip()
- if not dt:
- continue
- try:
- total_score = float(item["total_score"])
- except (TypeError, ValueError, KeyError):
- continue
- key = (word, dt)
- if key in seen:
- continue
- seen.add(key)
- rows.append((word, dt, total_score))
- if not rows:
- return 0, 0
- inserted = 0
- with self.conn.cursor() as cursor:
- for row in rows:
- cursor.execute(sql, row)
- inserted += int(cursor.rowcount or 0)
- skipped = len(rows) - inserted
- return inserted, skipped
- def list_records_with_wxindex_trend_after(
- self,
- *,
- after_created_at: datetime,
- ) -> list[dict[str, Any]]:
- sql = """
- SELECT id, created_at, wxindex_trend_json
- FROM hot_content_records
- WHERE created_at > %s
- AND wxindex_trend_json IS NOT NULL
- AND TRIM(CAST(wxindex_trend_json AS CHAR)) <> ''
- ORDER BY id ASC
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (after_created_at,))
- rows = cursor.fetchall()
- records: list[dict[str, Any]] = []
- for row in rows:
- records.append(
- {
- "id": int(row["id"]),
- "created_at": row.get("created_at"),
- "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")),
- }
- )
- return records
- def list_records_with_wxindex_trend(
- self,
- *,
- since_dt: datetime,
- ) -> list[dict[str, Any]]:
- sql = """
- SELECT id, created_at, wxindex_trend_json
- FROM hot_content_records
- WHERE created_at >= %s
- AND wxindex_trend_json IS NOT NULL
- AND TRIM(CAST(wxindex_trend_json AS CHAR)) <> ''
- ORDER BY id ASC
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql, (since_dt,))
- rows = cursor.fetchall()
- records: list[dict[str, Any]] = []
- for row in rows:
- records.append(
- {
- "id": int(row["id"]),
- "created_at": row.get("created_at"),
- "wxindex_trend_json": _json_loads(row.get("wxindex_trend_json")),
- }
- )
- return records
- def _ensure_wxindex_word_meta_table(self) -> None:
- sql = """
- CREATE TABLE IF NOT EXISTS hot_content_wxindex_word_meta (
- id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键',
- name VARCHAR(256) NOT NULL COMMENT '词',
- event_created_at DATETIME NOT NULL COMMENT '首次关联热点事件创建时间',
- fetch_start_ymd VARCHAR(8) NOT NULL COMMENT '数据窗口左边界:事件创建日往前7天',
- fetch_end_ymd VARCHAR(8) NOT NULL DEFAULT '' COMMENT '数据窗口右边界:事件创建日后7天',
- meta_created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '元数据创建时间',
- PRIMARY KEY (id),
- UNIQUE KEY uk_name (name),
- KEY idx_event_created_at (event_created_at)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql)
- self._ensure_wxindex_word_meta_id_column(cursor)
- self._ensure_wxindex_word_meta_fetch_end_column(cursor)
- def _ensure_wxindex_word_meta_fetch_end_column(self, cursor: Any) -> None:
- cursor.execute(
- """
- SELECT COUNT(*) AS cnt
- FROM information_schema.COLUMNS
- WHERE TABLE_SCHEMA = DATABASE()
- AND TABLE_NAME = 'hot_content_wxindex_word_meta'
- AND COLUMN_NAME = 'fetch_end_ymd'
- """
- )
- if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
- cursor.execute(
- """
- ALTER TABLE hot_content_wxindex_word_meta
- ADD COLUMN fetch_end_ymd VARCHAR(8) NOT NULL DEFAULT ''
- COMMENT '数据窗口右边界:事件创建日后7天'
- AFTER fetch_start_ymd
- """
- )
- cursor.execute(
- """
- UPDATE hot_content_wxindex_word_meta
- SET fetch_end_ymd = DATE_FORMAT(
- DATE_ADD(DATE(event_created_at), INTERVAL 7 DAY),
- '%Y%m%d'
- )
- WHERE fetch_end_ymd IS NULL
- OR TRIM(fetch_end_ymd) = ''
- """
- )
- def _ensure_wxindex_word_meta_id_column(self, cursor: Any) -> None:
- cursor.execute(
- """
- SELECT COUNT(*) AS cnt
- FROM information_schema.COLUMNS
- WHERE TABLE_SCHEMA = DATABASE()
- AND TABLE_NAME = 'hot_content_wxindex_word_meta'
- AND COLUMN_NAME = 'id'
- """
- )
- if int((cursor.fetchone() or {}).get("cnt") or 0) > 0:
- return
- cursor.execute(
- """
- ALTER TABLE hot_content_wxindex_word_meta
- DROP PRIMARY KEY,
- ADD COLUMN id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY FIRST,
- ADD UNIQUE KEY uk_name (name)
- """
- )
- @staticmethod
- def _normalize_wxindex_word_meta_row(row: dict[str, Any]) -> dict[str, Any] | None:
- name = str(row.get("name") or "").strip()
- fetch_start_ymd = str(row.get("fetch_start_ymd") or "").strip()
- fetch_end_ymd = str(row.get("fetch_end_ymd") or "").strip()
- event_created_at = row.get("event_created_at")
- if not name or not fetch_start_ymd or event_created_at is None:
- return None
- if not fetch_end_ymd and isinstance(event_created_at, datetime):
- event_date = event_created_at.date()
- fetch_end_ymd = (event_date + timedelta(days=7)).strftime("%Y%m%d")
- if not fetch_end_ymd:
- return None
- try:
- meta_id = int(row.get("id"))
- except (TypeError, ValueError):
- return None
- return {
- "id": meta_id,
- "name": name,
- "event_created_at": event_created_at,
- "fetch_start_ymd": fetch_start_ymd,
- "fetch_end_ymd": fetch_end_ymd,
- }
- def _ensure_wxindex_words_table(self) -> None:
- sql = """
- CREATE TABLE IF NOT EXISTS hot_content_wxindex_words (
- id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
- name VARCHAR(256) NOT NULL COMMENT '词',
- dt VARCHAR(8) NOT NULL COMMENT '日期 yyyymmdd',
- total_score DOUBLE NOT NULL COMMENT '微信指数',
- created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
- PRIMARY KEY (id),
- UNIQUE KEY uk_name_dt (name, dt),
- KEY idx_name (name),
- KEY idx_dt (dt)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql)
- def _ensure_odps_sync_log_table(self) -> None:
- sql = """
- CREATE TABLE IF NOT EXISTS hot_content_odps_sync_log (
- id BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
- partition_dt VARCHAR(8) NOT NULL COMMENT 'ODPS 分区 dt',
- strategy VARCHAR(128) NOT NULL COMMENT '需求 strategy',
- demand_id CHAR(32) NOT NULL COMMENT 'ODPS demand_id',
- demand_name VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '需求名',
- demand_type VARCHAR(32) NOT NULL DEFAULT '' COMMENT '特征点/短语',
- record_id BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '来源 hot_content_records.id',
- weight DOUBLE NULL DEFAULT NULL COMMENT 'ODPS 需求权重(记录 wxindex 最高分 / 1000000)',
- extend TEXT NULL DEFAULT NULL COMMENT 'ODPS extend 扩展字段 JSON',
- synced_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '写入 ODPS 时间',
- PRIMARY KEY (id),
- UNIQUE KEY uk_odps_sync (partition_dt, strategy, demand_id),
- KEY idx_record_partition (record_id, partition_dt),
- KEY idx_partition_strategy (partition_dt, strategy)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
- """
- with self.conn.cursor() as cursor:
- cursor.execute(sql)
- self._ensure_odps_sync_log_weight_column(cursor)
- self._ensure_odps_sync_log_extend_column(cursor)
|