浏览代码

Merge branch 'master' of https://git.yishihui.com/linfan/OffLineRec

baichongyang 2 年之前
父节点
当前提交
82e43126a2
共有 10 个文件被更改,包括 334 次插入7 次删除
  1. 47 0
      cut_title.py
  2. 54 0
      extract_user_action.py
  3. 53 0
      extract_video_info.py
  4. 54 0
      get_sim_k.py
  5. 55 0
      predict.py
  6. 7 5
      run.sh
  7. 3 2
      run_ctr.sh
  8. 32 0
      run_extract_tag.sh
  9. 1 0
      stopwords
  10. 28 0
      word2vec.py

+ 47 - 0
cut_title.py

@@ -0,0 +1,47 @@
+#coding utf-8
+import sys
+import jieba 
+import os
+
+if __name__=="__main__":
+    #f1 = open(sys.argv[1])
+    stop_words = set('')
+    path = sys.argv[1]
+    files_dir = os.listdir(path)
+    #print(files_dir)
+    for file_name in files_dir:
+        if file_name.find('.txt')>-1:
+            f1 = open(path+"/"+file_name)
+            while True:
+                file_line = f1.readline()
+                if not file_line:
+                    break
+                file_line = file_line.strip()
+                stop_words.add(file_line)
+            f1.close()
+    #print(len(stop_words))
+    f = open(sys.argv[2])
+    f3 = open(sys.argv[3], 'w')
+    while True:
+        line = f.readline()
+        if not line:
+            break
+        line = line.strip()
+        items = line.split("\t")
+        if len(items)<2:
+            continue
+        vid = items[0]
+        title = items[1] 
+        cut_info =  jieba.lcut(title)
+        cut_arr = []
+        for cut_item in cut_info:
+            #print("cut_item:", cut_item)
+            if cut_item==' ':
+                continue
+            if cut_item in stop_words:
+                continue
+            cut_arr.append(cut_item)
+        vid_info = vid+'\t'+" ".join(cut_arr)
+        f3.write(vid_info.strip()+"\n")
+    f3.close()
+       

+ 54 - 0
extract_user_action.py

@@ -0,0 +1,54 @@
+#coding utf-8
+from odps import ODPS
+from config import set_config
+import datetime
+import pandas as pd
+from collections import defaultdict
+import sys
+
+config_ = set_config()
+
+odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project="loghubods",
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'])
+
+
+def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
+                       pool_maxsize=1000, pool_connections=1000):
+    """
+    从odps获取数据
+    :param date: 日期 type-string '%Y%m%d'
+    :param project: type-string
+    :param table: 表名 type-string
+    :param connect_timeout: 连接超时设置
+    :param read_timeout: 读取超时设置
+    :param pool_maxsize:
+    :param pool_connections:
+    :return: records
+    """
+    records = odps.read_table(name=table, partition='dt=%s' % date)
+    return records
+
+def exe_sql(sql):    
+    data = []
+    with odps.execute_sql(sql).open_reader() as reader:
+        d = defaultdict(list)  # collection默认一个dict
+        for record in reader:
+            for res in record:
+                d[res[0]].append(res[1])  # 解析record中的每一个元组,存储方式为(k,v),以k作为key,存储每一列的内容;
+        data = pd.DataFrame.from_dict(d, orient='index').T  # 转换为数据框,并转置,不转置的话是横条数据
+    return data
+
+
+if __name__=="__main__":
+    project = 'loghubods'
+    last7day=sys.argv[1]
+    now_date=sys.argv[2]
+    print("now date:", now_date)
+    table = 'user_action_log_base'
+    sql = "select  mid, videoid, businesstype, clienttimestamp, return from loghubods.user_action_log_base where dt between '"+last7day+"' and '"+now_date+"' and businesstype in ('videoShareFriend');"
+    print(sql)
+    data = exe_sql(sql)
+    data.to_csv("./data/user_action_"+now_date, sep='\t') 

+ 53 - 0
extract_video_info.py

@@ -0,0 +1,53 @@
+#coding utf-8
+from odps import ODPS
+from config import set_config
+import datetime
+import pandas as pd
+from collections import defaultdict
+import sys
+
+config_ = set_config()
+
+odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project="loghubods",
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'])
+
+
+def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
+                       pool_maxsize=1000, pool_connections=1000):
+    """
+    从odps获取数据
+    :param date: 日期 type-string '%Y%m%d'
+    :param project: type-string
+    :param table: 表名 type-string
+    :param connect_timeout: 连接超时设置
+    :param read_timeout: 读取超时设置
+    :param pool_maxsize:
+    :param pool_connections:
+    :return: records
+    """
+    records = odps.read_table(name=table, partition='dt=%s' % date)
+    return records
+
+def exe_sql(sql):    
+    data = []
+    with odps.execute_sql(sql).open_reader() as reader:
+        d = defaultdict(list)  # collection默认一个dict
+        for record in reader:
+            for res in record:
+                d[res[0]].append(res[1])  # 解析record中的每一个元组,存储方式为(k,v),以k作为key,存储每一列的内容;
+        data = pd.DataFrame.from_dict(d, orient='index').T  # 转换为数据框,并转置,不转置的话是横条数据
+    return data
+
+
+if __name__=="__main__":
+    project = 'loghubods'
+    now_date=sys.argv[1]
+    print("now date:", now_date)
+    table = 'video_data_each_hour_dataset_24h_total_apptype'
+    sql = "select id, title, video_path, cover_img_path,self_cover_img_path,play_count, share_count, reported_count, favoriteds, total_time, tag_count,stage_recommend_examine_status, sensitive_status, new_share_image_path from videoods.wx_video_per1h where status=1 and examine_status=1 ";
+    print(sql)
+    data = exe_sql(sql)
+    data.to_csv("./data/video_data_info_"+now_date, sep='\t', index=None) 

+ 54 - 0
get_sim_k.py

@@ -0,0 +1,54 @@
+#coding utf-8
+import sys
+import pandas as pd
+import numpy as np
+import faiss
+
+
+def gen_i2i(index_item, embeddings,i2i):
+    fw=open(i2i,"w")
+    #print(i2i)
+    embed_matrix=np.array(embeddings).astype('float32')
+    #print(embed_matrix)
+    index=faiss.IndexFlatL2(100)
+    index.add(embed_matrix)
+    #the candicate matrix is embed_matrix,but the search matrix is the same.
+    #if the search vector is in the candicate matrix, the return idx>> the first is the search vector itself
+    #if the search vector is not in the candicate matrix, the return idx>>the first is the index of the candicate
+    distence_matrix,recall_list=index.search(embed_matrix, 20)
+    for idx,rec_arr in enumerate(recall_list):
+        #print("idx:", idx)
+        orgin_item=str(index_item[idx])
+        recall_str=""
+        #rec_arr=[0 6 3 8 7 1]
+        for re_id in rec_arr[1:]:
+            recall_idstr=str(index_item[re_id])
+            #print(recall_idstr)
+            recall_str=recall_str+","+recall_idstr
+        fw.write(orgin_item+"\t"+recall_str[1:]+"\n")
+
+if __name__ == '__main__':
+    f = open(sys.argv[1])
+    index = 0
+    index_dict = {}
+    index_arr = []
+    while True:
+        line = f.readline()
+        if not line:
+           break
+        line = line.strip()
+        #print(eval(line))
+        items = line.split(" ")
+        try:
+            vid = int(items[0])
+            vid_vec = eval(" ".join(items[1:]))
+            index_arr.append(vid_vec)
+            #index +=1
+            index_dict[index] = vid
+            index +=1
+            #print(index_arr)
+        except:
+            continue
+    f.close()
+    print(len(index_arr))
+    gen_i2i(index_dict, index_arr, "i2i_result")

+ 55 - 0
predict.py

@@ -0,0 +1,55 @@
+#coding utf-8
+import sys
+from gensim import models
+import numpy as np
+
+if __name__=="__main__":
+    #model = models.word2vec.Word2Vec.load('word2vec.txt')
+    #print(model.wx)
+    f1 = open('word2vec.txt')
+    word_dict = {}
+    while True:
+        line = f1.readline()
+        if not line:
+            break
+        items = line.strip().split(" ")
+        if len(items)<100:
+            continue
+        arr = []
+        for w in items[1:]:
+            arr.append(float(w))
+        word_dict[items[0]] = arr
+    #print(word_dict)
+    f  = open(sys.argv[1])
+    num = 0
+    while True:
+        line = f.readline()
+        if not line:
+            break
+        num = num+1
+        if num == 1:
+            continue
+        items = line.split("\t")
+        if len(items)<2:
+            continue
+        vid = items[0]
+        title_arr = items[1].split(" ")
+        title_info = np.zeros(100)
+        word_len = 0
+        for word in title_arr:
+            if word in word_dict:
+                 #print(title_info)
+                 #print(word)
+                 word_vec = word_dict[word]
+                 #print(word_vec)
+                 title_info = np.add(title_info, word_vec)
+                 word_len +=1
+        title_info_list = []
+        if word_len<=0:
+            continue
+        for j in title_info:
+            title_info_list.append(j/word_len)
+        #print("title_info_list:", title_info_list)
+        print(vid,title_info_list)
+        
+    

+ 7 - 5
run.sh

@@ -1,9 +1,9 @@
 #!/bin/bash
 source ~/.bash_profile
 source ~/.bashrc
-conda activate base 
+conda activate python36 
 
-cd /data/rec_project/OffLineRec
+cd /home/rec/project/git_project/OffLineRec
 
 #1. download data
 nowday=`date  +"%Y%m%d" -d -0days`
@@ -11,12 +11,14 @@ last7day=`date  +"%Y%m%d" -d -8days`
 echo ${nowday} 
 echo ${last7day}
 mkdir -p ./data/
+mkdir -p ./logs/
+#conda activate py36
 
 python extract_share_log.py ${last7day} ${nowday}
 if [ $? -ne 0 ];
 then
     msg = "[ERROR] simrecall extract_share_log"
-    #sh sendmsg.sh  $nowday  $msg
+    sh sendmsg.sh  $nowday  $msg
     echo "[ERROR] echo 'extract_share_log"
     exit 255
 fi
@@ -26,7 +28,7 @@ python calI2I.py ${nowday}
 if [ $? -ne 0 ];
 then
     msg = "[ERROR] simrecall calI2I.py"
-    #sh sendmsg.sh $nowday $msg
+    sh sendmsg.sh $nowday $msg
     echo $msg
     exit -1
 fi
@@ -36,7 +38,7 @@ python import_redist.py "./data/rec_result_"${nowday}".json"  "./data/redis_cls_
 if [ $? -ne 0 ];
 then
     msg = "[ERROR] simhot recall import_redist.py"
-    #sh sendmsg.sh  $nowday  $msg
+    sh sendmsg.sh  $nowday  $msg
     echo $msg
     exit -1
 fi

+ 3 - 2
run_ctr.sh

@@ -2,8 +2,9 @@
 source ~/.bash_profile
 source ~/.bashrc
 
-conda activate base 
-cd /data/rec_project/OffLineRec
+conda activate python36 
+cd /home/rec/project/git_project/OffLineRec 
+#cd /data/rec_project/OffLineRec
 #1. download data
 nowday=`date  +"%Y%m%d%H" -d -0days`
 echo ${nowday} 

+ 32 - 0
run_extract_tag.sh

@@ -0,0 +1,32 @@
+#!/bin/bash
+source ~/.bash_profile
+source ~/.bashrc
+
+conda activate python36 
+
+#1. download data
+nowday=`date  +"%Y%m%d" -d -0days`
+last7day=`date  +"%Y%m%d" -d -1days`
+echo ${nowday} 
+#3.import res
+mkdir -p ./data/
+
+#python extract_user_action.py  ${last7day} ${nowday}
+#if [ $? -ne 0 ];
+#then
+#    msg = "[ERROR] sorted extract_vid_log"
+#    sh sendmsg.sh  $nowday  $msg
+#    echo "[ERROR] echo 'extract_vid.py"
+#    exit 255
+#fi
+
+python extract_video_info.py ${nowday}
+#if [ $? -ne 0 ];
+#then
+#    msg = "[ERROR] cal ctr "
+#    sh sendmsg.sh  $nowday  $msg
+#    echo "[ERROR] echo 'calCtr.py"
+#    exit 255
+#fi
+#echo "finish sorted"
+

+ 1 - 0
stopwords

@@ -0,0 +1 @@
+Subproject commit fbc0150f746e2757324ef67b15e3b347079a7e9d

+ 28 - 0
word2vec.py

@@ -0,0 +1,28 @@
+#coding utf-8
+import sys
+from gensim.models import word2vec
+
+
+if __name__=="__main__":
+    f = open(sys.argv[1])
+    arr = []
+    num = 0
+    while True:
+        line = f.readline()
+        if not line:
+            break
+        num = num+1
+        if  num == 1:
+            continue
+        items = line.strip().split("\t")
+        print(items)
+        if len(items)<2:
+            continue
+        arr.append(items[1].split(" "))
+        print(arr)
+    f.close()
+    '''model = word2vec.Word2Vec(arr, vector_size=100, min_count=1,sg=1)
+    model.wv.save_word2vec_format('word2vec.txt',binary=False)'''
+    #model.save('word2vec.model')
+
+