|
@@ -1,109 +0,0 @@
|
|
|
-package com.tzld.piaoquan.recommend.model.produce.service;
|
|
|
-
|
|
|
-import com.aliyun.odps.Instance;
|
|
|
-import com.aliyun.odps.Odps;
|
|
|
-import com.aliyun.odps.OdpsException;
|
|
|
-import com.aliyun.odps.TableSchema;
|
|
|
-import com.aliyun.odps.account.Account;
|
|
|
-import com.aliyun.odps.account.AliyunAccount;
|
|
|
-import com.aliyun.odps.data.Record;
|
|
|
-import com.aliyun.odps.data.SimpleJsonValue;
|
|
|
-import com.aliyun.odps.task.SQLTask;
|
|
|
-import com.google.common.base.Joiner;
|
|
|
-import com.tzld.piaoquan.recommend.model.produce.util.CommonCollectionUtils;
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.apache.spark.aliyun.odps.OdpsOps;
|
|
|
-import org.apache.spark.api.java.JavaRDD;
|
|
|
-import org.apache.spark.api.java.JavaSparkContext;
|
|
|
-import org.apache.spark.api.java.function.Function2;
|
|
|
-
|
|
|
-import java.util.Collections;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
-
|
|
|
-/**
|
|
|
- * https://help.aliyun.com/zh/maxcompute/user-guide/java-sdk-1/?spm=a2c4g.11174283.0.0.6d0111c1E15lI3
|
|
|
- *
|
|
|
- * @author dyp
|
|
|
- */
|
|
|
-@Slf4j
|
|
|
-public class ODPSService {
|
|
|
- private final String accessId = "LTAIWYUujJAm7CbH";
|
|
|
- private final String accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P";
|
|
|
- private final String odpsUrl = "http://service.odps.aliyun.com/api";
|
|
|
- private final String tunnelUrl = "http://dt.cn-hangzhou.maxcompute.aliyun.com";
|
|
|
- private final String sqlFormat = "select %s from %s where 1=1 %s ;";
|
|
|
- private final String countSqlFormat = "select count(1) as count from %s where 1=1 %s ;";
|
|
|
-
|
|
|
-
|
|
|
- public JavaRDD<Map<String, String>> read(JavaSparkContext jsc, String project, String table, String partition,
|
|
|
- int partitionNum) {
|
|
|
- OdpsOps odpsOps = new OdpsOps(jsc.sc(), accessId, accessKey, odpsUrl, tunnelUrl);
|
|
|
-
|
|
|
- JavaRDD<Map<String, String>> readData = odpsOps.readTableWithJava(project, table, partition,
|
|
|
- new RecordToMap(), partitionNum);
|
|
|
- return readData;
|
|
|
- }
|
|
|
-
|
|
|
- static class RecordToMap implements Function2<Record, TableSchema, Map<String, String>> {
|
|
|
- private List<String> cols;
|
|
|
-
|
|
|
- public RecordToMap(List<String> cols) {
|
|
|
- this.cols = cols;
|
|
|
- }
|
|
|
-
|
|
|
- public RecordToMap() {
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public Map<String, String> call(Record r, TableSchema schema) {
|
|
|
- Map<String, String> map = new HashMap<>();
|
|
|
- for (int i = 0; i < schema.getColumns().size(); i++) {
|
|
|
- if (cols == null || cols.contains(r.getColumns()[i].getName())) {
|
|
|
- Object obj = r.get(i);
|
|
|
- if (obj instanceof SimpleJsonValue) {
|
|
|
- map.put(r.getColumns()[i].getName(), ((SimpleJsonValue) obj).toString());
|
|
|
- } else if (obj instanceof Long) {
|
|
|
- map.put(r.getColumns()[i].getName(), ((Long) obj) + "");
|
|
|
- } else {
|
|
|
- map.put(r.getColumns()[i].getName(), r.getString(i));
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- return map;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private List<Map<String, String>> read(String project,
|
|
|
- String table,
|
|
|
- List<String> colNames,
|
|
|
- String condition) {
|
|
|
- Account account = new AliyunAccount(accessId, accessKey);
|
|
|
- Odps odps = new Odps(account);
|
|
|
- odps.setEndpoint(odpsUrl);
|
|
|
- odps.setDefaultProject(project);
|
|
|
-
|
|
|
- String sql = String.format(sqlFormat, Joiner.on(",").join(colNames), table, condition);
|
|
|
-
|
|
|
- List<Record> records;
|
|
|
- try {
|
|
|
- Instance i = SQLTask.run(odps, sql);
|
|
|
- i.waitForSuccess();
|
|
|
- records = SQLTask.getResult(i);
|
|
|
- } catch (OdpsException e) {
|
|
|
- log.error("request odps error", e);
|
|
|
- return Collections.emptyList();
|
|
|
- }
|
|
|
-
|
|
|
- List<Map<String, String>> fieldValues = CommonCollectionUtils.toList(records, r -> {
|
|
|
- Map<String, String> map = new HashMap<>();
|
|
|
- for (int i = 0; i < r.getColumnCount(); i++) {
|
|
|
- map.put(r.getColumns()[i].getName(), r.getString(i));
|
|
|
- }
|
|
|
- return map;
|
|
|
- });
|
|
|
-
|
|
|
- return fieldValues;
|
|
|
- }
|
|
|
-}
|