|
|
@@ -887,17 +887,20 @@ class HotContentRepository:
|
|
|
if insert_rows:
|
|
|
cursor.executemany(insert_sql, insert_rows)
|
|
|
|
|
|
- def list_odps_sync_records(self) -> list[dict[str, Any]]:
|
|
|
- """读取当天完成 postprocess 且质量判断完成的记录,供 ODPS 同步。"""
|
|
|
+ def list_odps_sync_records(self, *, partition_dt: str | None = None) -> list[dict[str, Any]]:
|
|
|
+ """读取指定创建日已完成质量判断的记录,供 ODPS 同步(按 created_at,不按 updated_at)。"""
|
|
|
self._ensure_record_quality_columns()
|
|
|
- today_start = datetime.now(SHANGHAI_TZ).replace(
|
|
|
- hour=0,
|
|
|
- minute=0,
|
|
|
- second=0,
|
|
|
- microsecond=0,
|
|
|
- tzinfo=None,
|
|
|
- )
|
|
|
- today_end = today_start + timedelta(days=1)
|
|
|
+ if partition_dt:
|
|
|
+ created_day_start = datetime.strptime(partition_dt, "%Y%m%d")
|
|
|
+ else:
|
|
|
+ created_day_start = datetime.now(SHANGHAI_TZ).replace(
|
|
|
+ hour=0,
|
|
|
+ minute=0,
|
|
|
+ second=0,
|
|
|
+ microsecond=0,
|
|
|
+ tzinfo=None,
|
|
|
+ )
|
|
|
+ created_day_end = created_day_start + timedelta(days=1)
|
|
|
sql = """
|
|
|
SELECT
|
|
|
id,
|
|
|
@@ -907,8 +910,8 @@ class HotContentRepository:
|
|
|
demand_event_sense_json,
|
|
|
demand_senior_fit_json
|
|
|
FROM hot_content_records
|
|
|
- WHERE updated_at >= %s
|
|
|
- AND updated_at < %s
|
|
|
+ WHERE created_at >= %s
|
|
|
+ AND created_at < %s
|
|
|
AND postprocess_status = %s
|
|
|
AND contribution_demand_match_json IS NOT NULL
|
|
|
AND TRIM(CAST(contribution_demand_match_json AS CHAR)) <> ''
|
|
|
@@ -917,7 +920,7 @@ class HotContentRepository:
|
|
|
with self.conn.cursor() as cursor:
|
|
|
cursor.execute(
|
|
|
sql,
|
|
|
- (today_start, today_end, PostprocessStatus.QUALITY_DONE),
|
|
|
+ (created_day_start, created_day_end, PostprocessStatus.QUALITY_DONE),
|
|
|
)
|
|
|
rows = cursor.fetchall()
|
|
|
|