瀏覽代碼

Merge branch 'master' of https://git.yishihui.com/linfan/OffLineRec

linfan 1 年之前
父節點
當前提交
dd304d1298
共有 7 個文件被更改,包括 264 次插入34 次删除
  1. 1 1
      calCtr.py
  2. 80 0
      calHourCtr.py
  3. 20 0
      clean.sh
  4. 57 0
      compose_score.py
  5. 53 0
      export_hour_vid.py
  6. 29 1
      run_ctr.sh
  7. 24 32
      test.sh

+ 1 - 1
calCtr.py

@@ -67,7 +67,7 @@ if __name__=="__main__":
         share_score = float(share_pv)/(float(view_pv)+5)
         backrate = float(return_users)/(float(share_pv)+5)
         #print(k, k_score, share_score*backrate, share_score, backrate) 
-        score_info = [k_score, share_score*backrate, share_score, backrate]
+        score_info = [share_score, share_score*backrate, share_score, backrate]
         k = "k_p:"+k
         score_info = json.dumps(score_info)
         info_dict[k] = score_info

+ 80 - 0
calHourCtr.py

@@ -0,0 +1,80 @@
+#coding utf-8
+import sys
+from operator import itemgetter
+import json
+import pandas as pd
+#from db_help import RedisHelper
+
+if __name__=="__main__":
+    #1.load data
+    nowdate=sys.argv[1]
+    vlog='0'
+    love_live = 4
+    data_path = "./data/hour_video_data_"+nowdate
+    f = open(data_path)
+    #data = pd.read_csv(data_path, encoding="utf-8", sep='\t')
+    #print(data)
+    index = 0
+    data_dict = {}
+    #redis_helper = RedisHelper()
+    while True:
+        line = f.readline()
+        if not line:
+            break
+        if index==0:
+            index += 1
+            continue
+        index +=1
+        items = line.strip().split("\t")
+        #print(items)
+        if len(items)<9:
+            continue
+        vid = items[1]
+        view_users = items[2] 
+        view_pv = items[3]
+        play_users = items[4]
+        play_pv = items[5]
+        share_users = items[6]
+        share_pv = items[7]
+        return_users = items[8]
+        #print(items)
+        if vid not in data_dict:
+            data_dict[vid] = (view_users, view_pv, play_users, play_pv, share_users, share_pv, return_users)
+        else:
+            item_info = data_dict[vid]
+            view_users = item_info[0]+view_users
+            view_pv = item_info[1]+view_pv
+            play_users = item_info[2]+play_pv
+            share_users = item_info[3]+share_users
+            share_pv = item_info[4]+share_pv
+            return_users = item_info[5]+return_users 
+            data_dict[vid] = (view_users, view_pv, play_users, play_pv, share_users, share_pv, return_users)
+    #print(data_dict.items())
+    f.close()
+    info_dict = {}
+    hour_data_path = "./data/sorted_hour_data_"+nowdate
+    f = open(hour_data_path, 'w')
+    for k, v in data_dict.items():
+        #print(v)
+        return_users = v[6]
+        #print(return_users)
+        view_users = v[0]
+        view_pv = v[1]
+        share_pv = v[5]
+        share_users = [4]
+        play_users = v[2]
+        #print("return_users:", return_users) 
+        k_score = float(return_users)/(float(view_users)+5)
+        #k_score2 = float(return_users)/(float(share_pv)+5)
+        #print(k_score)
+        share_score = float(share_pv)/(float(view_pv)+5)
+        backrate = float(return_users)/(float(share_pv)+5)
+        #print(k, k_score, share_score*backrate, share_score, backrate) 
+        score_info = [share_score, share_score*backrate, share_score, backrate]
+        score_info = json.dumps(score_info)
+        info_dict[k] = score_info
+        f.write(k+"\t"+score_info+"\n")
+    #redis_helper.update_batch_setnx_key(info_dict, 60*60*24*15) 
+    f.close()
+
+     

+ 20 - 0
clean.sh

@@ -2,3 +2,23 @@
 source ~/.bash_profile
 source ~/.bashrc
 
+last3day=`date  +"%Y%m%d" -d -3days`
+
+merge_path=./data/merge_score_${last3day}'*'
+video_data_path=./data/video_data_${last3day}'*'
+sorted_path=./data/sorted_data_${last3day}'*'
+cls_path=./data/redis_cls_${last3day}'*'
+hour_video_path=./data/hour_video_data_'*'
+sorted_hour_path=./data/sorted_hour_data_'*'
+rec_path=./data/rec_result_'*'
+echo ${merge_path}
+echo ${video_data_path}
+echo ${cls_path}
+
+rm -rf ${merge_path}
+rm -rf ${video_data_path}
+rm -rf ${sorted_path}
+rm -rf ${cls_path}
+rm -rf ${hour_video_path}
+rm -rf ${sorted_hour_path}
+rm -rf ${rec_path}

+ 57 - 0
compose_score.py

@@ -0,0 +1,57 @@
+#coding utf-8
+import sys
+import json
+from db_help import RedisHelper
+
+if __name__=="__main__":
+    nowdate = sys.argv[1]
+    f1 = open("./data/sorted_hour_data_"+nowdate)
+    f2 = open("./data/sorted_data_"+nowdate)
+    data_dict = {}
+    while True:
+        line = f1.readline()
+        if not line:
+            break
+        items = line.strip().split("\t")
+        if len(items)<2:
+            continue
+        kid = items[0]
+        #print(items[1])
+        item_info = json.loads(items[1])
+        data_dict[kid] = item_info
+    f1.close()
+    f3 = open("./data/merge_score_"+nowdate, 'w')
+    info_dict = {}
+    while True:
+        line = f2.readline()
+        if not line:
+            break
+        items = line.split("\t")
+        if len(items)<2:
+            continue
+        kid = items[0].replace("k_p:", "")
+        kid2 = "k_p2:"+kid
+        #print(kid)
+        d_item_info = json.loads(items[1])
+        if kid in data_dict:
+            item_info = data_dict[kid]
+            #print("h:",item_info)
+            #print("d:",d_item_info)
+            total_info = []
+            for i in range(len(item_info)):
+                total_info.append(0.3*item_info[i]+0.7*d_item_info[i])
+            total_item_info = json.dumps(total_info)
+            f3.write(kid2+"\t"+total_item_info+"\n")
+            info_dict[kid2] = total_item_info
+            #print("m:",total_item_info)
+        else:
+            total_info = []
+            for i  in range(len(d_item_info)):
+                total_info.append(0.7*d_item_info[i])
+            total_item_info = json.dumps(total_info)
+            f3.write(kid2+"\t"+total_item_info+"\n")
+            info_dict[kid2] = total_item_info
+    print(info_dict)
+    redis_helper = RedisHelper()
+    redis_helper.update_batch_setnx_key(info_dict, 60*60*24*15)
+    f2.close()

+ 53 - 0
export_hour_vid.py

@@ -0,0 +1,53 @@
+#coding utf-8
+from odps import ODPS
+from config import set_config
+import datetime
+import pandas as pd
+from collections import defaultdict
+import sys
+
+config_ = set_config()
+
+odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project="loghubods",
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'])
+
+
+def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
+                       pool_maxsize=1000, pool_connections=1000):
+    """
+    从odps获取数据
+    :param date: 日期 type-string '%Y%m%d'
+    :param project: type-string
+    :param table: 表名 type-string
+    :param connect_timeout: 连接超时设置
+    :param read_timeout: 读取超时设置
+    :param pool_maxsize:
+    :param pool_connections:
+    :return: records
+    """
+    records = odps.read_table(name=table, partition='dt=%s' % date)
+    return records
+
+def exe_sql(sql):    
+    data = []
+    with odps.execute_sql(sql).open_reader() as reader:
+        d = defaultdict(list)  # collection默认一个dict
+        for record in reader:
+            for res in record:
+                d[res[0]].append(res[1])  # 解析record中的每一个元组,存储方式为(k,v),以k作为key,存储每一列的内容;
+        data = pd.DataFrame.from_dict(d, orient='index').T  # 转换为数据框,并转置,不转置的话是横条数据
+    return data
+
+
+if __name__=="__main__":
+    project = 'loghubods'
+    now_date=sys.argv[1]
+    print("now date:", now_date)
+    table = 'video_data_each_hour_dataset_24h_total_apptype'
+    sql = "select apptype, videoid, lastonehour_view, lastonehour_view_total, lastonehour_play, lastonehour_play_total,lastonehour_share, lastonehour_share_total, lastonehour_return from loghubods.video_each_hour_update_province_apptype where dt="+now_date
+    print(sql)
+    data = exe_sql(sql)
+    data.to_csv("./data/hour_video_data_"+now_date, sep='\t', index=None) 

+ 29 - 1
run_ctr.sh

@@ -19,14 +19,42 @@ then
     echo "[ERROR] echo 'extract_vid.py"
     exit 255
 fi
+python export_hour_vid.py ${nowday}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] sorted extract_hour_log"
+    sh sendmsg.sh  $nowday  $msg
+    echo "[ERROR] echo 'extract_hour_vid.py"
+    exit 255
+fi
+
 
 python calCtr.py ${nowday}
 if [ $? -ne 0 ];
 then
-    msg = "[ERROR] cal ctr "
+    msg = "[ERROR] cal  ctr "
+    sh sendmsg.sh  $nowday  $msg
+    echo "[ERROR] echo 'calCtr.py"
+    exit 255
+fi
+
+python calHourCtr.py ${nowday}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] cal hour ctr "
     sh sendmsg.sh  $nowday  $msg
     echo "[ERROR] echo 'calCtr.py"
     exit 255
 fi
+python compose_score.py ${nowday}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] cal compose_score "
+    sh sendmsg.sh  $nowday  $msg
+    echo "[ERROR] echo 'compose_score.py"
+    exit 255
+fi
+
 echo "finish sorted"
 
+

+ 24 - 32
test.sh

@@ -3,41 +3,33 @@ source ~/.bash_profile
 source ~/.bashrc
 
 conda activate base 
-
+cd /data/rec_project/OffLineRec
 #1. download data
-nowday=`date  +"%Y%m%d" -d -0days`
-last7day=`date  +"%Y%m%d" -d -8days`
+nowday=`date  +"%Y%m%d%H" -d -0days`
 echo ${nowday} 
-echo ${last7day}
-mkdir -p ./data/
+#3.import res
+#mkdir -p ./data/
 
-#python extract_share_log.py ${last7day} ${nowday}
-#if [ $? -ne 0 ];
-#then
-#    msg = "[ERROR] simrecall extract_share_log"
-#    sh sendmsg.sh  $nowday  $msg
-#    echo "[ERROR] echo 'extract_share_log"
-#    exit 255
-#fi
-#nowday='20230505'
-#2. cal i2i result
-#python calI2I.py ${nowday}
-#if [ $? -ne 0 ];
-#then
-#    msg = "[ERROR] simrecall calI2I.py"
-#    sh sendmsg.sh $nowday $msg
-#    echo $msg
-#    exit -1
-#fi
+nowday='2023051814'
+#python export_hour_vid.py ${nowday}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] sorted extract_hour_log"
+    #sh sendmsg.sh  $nowday  $msg
+    echo "[ERROR] echo 'extract_hour_vid.py"
+    exit 255
+fi
+#python calHourCtr.py ${nowday}
 
-#3.import res
-python import_redist.py "./data/rec_result_"${nowday}".json"  "./data/redis_cls_"${nowday}".json"
-#python test.py "./data/rec_result_"${nowday}".json"  './data/redis_cls_'${nowday}".json"
-#if [ $? -ne 0 ];
-#then
-#    msg = "[ERROR] simhot recall import_redist.py"
-#    sh sendmsg.sh  $nowday  $msg
-#    echo $msg
-    exit -1
+python compose_score.py ${nowday} 
+exit
+pytho calCtr.py ${nowday}
+if [ $? -ne 0 ];
+then
+    msg = "[ERROR] cal ctr "
+    sh sendmsg.sh  $nowday  $msg
+    echo "[ERROR] echo 'calCtr.py"
+    exit 255
 fi
 echo "finish sorted"
+