Browse Source

feat:添加UDF

zhaohaipeng 8 months ago
parent
commit
9830c47a27
3 changed files with 64 additions and 0 deletions
  1. 16 0
      helper/RedisHelper.py
  2. 48 0
      udf/UDF.py
  3. 0 0
      udf/__init__.py

+ 16 - 0
helper/RedisHelper.py

@@ -20,3 +20,19 @@ class RedisHelper(object):
     def set_expire(self, key: str, expire: int):
         logger.info(f"Redis Expire: {key} ---> {expire}")
         self.redis_conn.expire(key, expire)
+
+    def m_get_value(self, keys):
+        self.redis_conn.mget(keys)
+
+    def get_value(self, key: str):
+        # logger.info(f"Redis Get Value: {key}")
+        return self.redis_conn.get(key)
+
+    def m_get_pipeline(self, keys):
+        pipeline = self.redis_conn.pipeline()
+        for key in keys:
+            pipeline.get(key)
+
+        v = pipeline.execute()
+
+        return v

+ 48 - 0
udf/UDF.py

@@ -0,0 +1,48 @@
+# coding:utf-8
+from odps.udf import annotate
+import json
+
+
+@annotate("string,string,string->bigint")
+class kv_in_json_array_position(object):
+    def evaluate(self, json_array_str, key, value):
+
+        if json_array_str is None or json_array_str == '':
+            return -1
+        index = 0
+        json_array = json.loads(json_array_str)
+        for json_item in json_array:
+            if key in json_item and str(json_item[key]) == value:
+                return index
+            index += 1
+        return -1
+
+
+@annotate("string,string,string->bigint")
+class kv_in_json_array_item(object):
+    def evaluate(self, json_array_str, key, value):
+
+        if json_array_str is None or json_array_str == '':
+            return ""
+        json_array = json.loads(json_array_str)
+        for json_item in json_array:
+            if key in json_item and str(json_item[key]) == value:
+                return json.dumps(json_item)
+        return ""
+
+
+@annotate("string->string")
+class json_array_path(object):
+
+    def evaluate(self, json_str):
+        if json_str is None or json_str == '':
+            return None
+        try:
+            arr = []
+            json_array = json.loads(json_str)
+            for data in json_array:
+                if 'creativeId' in data:
+                    arr.append(str(data['creativeId']))
+            return ",".join(arr)
+        except Exception:
+            return "Internal error"

+ 0 - 0
udf/__init__.py