|
@@ -867,13 +867,15 @@ class HotContentRepository:
|
|
|
demand_id,
|
|
demand_id,
|
|
|
demand_name,
|
|
demand_name,
|
|
|
demand_type,
|
|
demand_type,
|
|
|
- record_id
|
|
|
|
|
|
|
+ record_id,
|
|
|
|
|
+ weight
|
|
|
)
|
|
)
|
|
|
- VALUES (%s, %s, %s, %s, %s, %s)
|
|
|
|
|
|
|
+ VALUES (%s, %s, %s, %s, %s, %s, %s)
|
|
|
ON DUPLICATE KEY UPDATE
|
|
ON DUPLICATE KEY UPDATE
|
|
|
demand_name = VALUES(demand_name),
|
|
demand_name = VALUES(demand_name),
|
|
|
demand_type = VALUES(demand_type),
|
|
demand_type = VALUES(demand_type),
|
|
|
record_id = VALUES(record_id),
|
|
record_id = VALUES(record_id),
|
|
|
|
|
+ weight = VALUES(weight),
|
|
|
synced_at = CURRENT_TIMESTAMP
|
|
synced_at = CURRENT_TIMESTAMP
|
|
|
"""
|
|
"""
|
|
|
insert_rows = [
|
|
insert_rows = [
|
|
@@ -884,6 +886,7 @@ class HotContentRepository:
|
|
|
str(item.get("demand_name") or ""),
|
|
str(item.get("demand_name") or ""),
|
|
|
str(item.get("demand_type") or ""),
|
|
str(item.get("demand_type") or ""),
|
|
|
int(item.get("record_id") or 0),
|
|
int(item.get("record_id") or 0),
|
|
|
|
|
+ float(item["weight"]) if item.get("weight") is not None else None,
|
|
|
)
|
|
)
|
|
|
for item in rows
|
|
for item in rows
|
|
|
if str(item.get("demand_id") or "").strip()
|
|
if str(item.get("demand_id") or "").strip()
|
|
@@ -892,6 +895,26 @@ class HotContentRepository:
|
|
|
cursor.executemany(sql, insert_rows)
|
|
cursor.executemany(sql, insert_rows)
|
|
|
return len(insert_rows)
|
|
return len(insert_rows)
|
|
|
|
|
|
|
|
|
|
+ def _ensure_odps_sync_log_weight_column(self, cursor: Any) -> None:
|
|
|
|
|
+ cursor.execute(
|
|
|
|
|
+ """
|
|
|
|
|
+ SELECT COUNT(*) AS cnt
|
|
|
|
|
+ FROM information_schema.COLUMNS
|
|
|
|
|
+ WHERE TABLE_SCHEMA = DATABASE()
|
|
|
|
|
+ AND TABLE_NAME = 'hot_content_odps_sync_log'
|
|
|
|
|
+ AND COLUMN_NAME = 'weight'
|
|
|
|
|
+ """,
|
|
|
|
|
+ )
|
|
|
|
|
+ if int((cursor.fetchone() or {}).get("cnt") or 0) == 0:
|
|
|
|
|
+ cursor.execute(
|
|
|
|
|
+ """
|
|
|
|
|
+ ALTER TABLE hot_content_odps_sync_log
|
|
|
|
|
+ ADD COLUMN weight DOUBLE NULL DEFAULT NULL
|
|
|
|
|
+ COMMENT 'ODPS 需求权重(记录 wxindex 最高分 / 1000000)'
|
|
|
|
|
+ AFTER record_id
|
|
|
|
|
+ """
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
def _ensure_odps_sync_log_table(self) -> None:
|
|
def _ensure_odps_sync_log_table(self) -> None:
|
|
|
sql = """
|
|
sql = """
|
|
|
CREATE TABLE IF NOT EXISTS hot_content_odps_sync_log (
|
|
CREATE TABLE IF NOT EXISTS hot_content_odps_sync_log (
|
|
@@ -902,6 +925,7 @@ class HotContentRepository:
|
|
|
demand_name VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '需求名',
|
|
demand_name VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '需求名',
|
|
|
demand_type VARCHAR(32) NOT NULL DEFAULT '' COMMENT '特征点/短语',
|
|
demand_type VARCHAR(32) NOT NULL DEFAULT '' COMMENT '特征点/短语',
|
|
|
record_id BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '来源 hot_content_records.id',
|
|
record_id BIGINT UNSIGNED NOT NULL DEFAULT 0 COMMENT '来源 hot_content_records.id',
|
|
|
|
|
+ weight DOUBLE NULL DEFAULT NULL COMMENT 'ODPS 需求权重(记录 wxindex 最高分 / 1000000)',
|
|
|
synced_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '写入 ODPS 时间',
|
|
synced_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '写入 ODPS 时间',
|
|
|
PRIMARY KEY (id),
|
|
PRIMARY KEY (id),
|
|
|
UNIQUE KEY uk_odps_sync (partition_dt, strategy, demand_id),
|
|
UNIQUE KEY uk_odps_sync (partition_dt, strategy, demand_id),
|
|
@@ -911,3 +935,4 @@ class HotContentRepository:
|
|
|
"""
|
|
"""
|
|
|
with self.conn.cursor() as cursor:
|
|
with self.conn.cursor() as cursor:
|
|
|
cursor.execute(sql)
|
|
cursor.execute(sql)
|
|
|
|
|
+ self._ensure_odps_sync_log_weight_column(cursor)
|