connector.go 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037
  1. package datahub
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "reflect"
  8. "strconv"
  9. )
  10. type AuthMode string
  11. const (
  12. AK AuthMode = "ak"
  13. STS AuthMode = "sts"
  14. )
  15. type ConnectorType string
  16. const (
  17. SinkOdps ConnectorType = "sink_odps"
  18. SinkOss ConnectorType = "sink_oss"
  19. SinkEs ConnectorType = "sink_es"
  20. SinkAds ConnectorType = "sink_ads"
  21. SinkMysql ConnectorType = "sink_mysql"
  22. SinkFc ConnectorType = "sink_fc"
  23. SinkOts ConnectorType = "sink_ots"
  24. SinkDatahub ConnectorType = "sink_datahub"
  25. SinkHologres ConnectorType = "sink_hologres"
  26. )
  27. func (ct *ConnectorType) String() string {
  28. return string(*ct)
  29. }
  30. func validateConnectorType(ct ConnectorType) bool {
  31. switch ct {
  32. case SinkOdps, SinkOss, SinkEs, SinkAds, SinkMysql, SinkFc, SinkOts, SinkDatahub, SinkHologres:
  33. return true
  34. default:
  35. return false
  36. }
  37. }
  38. type ConnectorState string
  39. const (
  40. ConnectorStopped ConnectorState = "CONNECTOR_STOPPED"
  41. ConnectorRunning ConnectorState = "CONNECTOR_RUNNING"
  42. )
  43. func validateConnectorState(ct ConnectorState) bool {
  44. switch ct {
  45. case ConnectorStopped, ConnectorRunning:
  46. return true
  47. default:
  48. return false
  49. }
  50. }
  51. type ConnectorTimestampUnit string
  52. const (
  53. ConnectorMicrosecond ConnectorTimestampUnit = "MICROSECOND"
  54. ConnectorMillisecond ConnectorTimestampUnit = "MILLISECOND"
  55. ConnectorSecond ConnectorTimestampUnit = "SECOND"
  56. )
  57. type ConnectorConfig struct {
  58. TimestampUnit ConnectorTimestampUnit `json:"TimestampUnit"`
  59. }
  60. type PartitionMode string
  61. const (
  62. UserDefineMode PartitionMode = "USER_DEFINE"
  63. SystemTimeMode PartitionMode = "SYSTEM_TIME"
  64. EventTimeMode PartitionMode = "EVENT_TIME"
  65. )
  66. func (pm *PartitionMode) String() string {
  67. return string(*pm)
  68. }
  69. func NewPartitionConfig() *PartitionConfig {
  70. pc := &PartitionConfig{
  71. ConfigMap: make([]map[string]string, 0, 0),
  72. }
  73. return pc
  74. }
  75. type PartitionConfig struct {
  76. ConfigMap []map[string]string
  77. }
  78. func (pc *PartitionConfig) AddConfig(key, value string) {
  79. m := map[string]string{
  80. key: value,
  81. }
  82. pc.ConfigMap = append(pc.ConfigMap, m)
  83. }
  84. func (pc *PartitionConfig) MarshalJSON() ([]byte, error) {
  85. if pc == nil || len(pc.ConfigMap) == 0 {
  86. return nil, nil
  87. }
  88. buf := &bytes.Buffer{}
  89. buf.Write([]byte{'{'})
  90. length := len(pc.ConfigMap)
  91. for i, m := range pc.ConfigMap {
  92. for k, v := range m {
  93. if _, err := fmt.Fprintf(buf, "\"%s\":\"%s\"", k, v); err != nil {
  94. return nil, errors.New(fmt.Sprintf("partition config is invalid"))
  95. }
  96. }
  97. if i < length-1 {
  98. buf.WriteByte(',')
  99. }
  100. }
  101. buf.WriteByte('}')
  102. return buf.Bytes(), nil
  103. }
  104. func (pc *PartitionConfig) UnmarshalJSON(data []byte) error {
  105. //the data is "xxxxxx",should convert to xxxx, remove the ""
  106. var str *string = new(string)
  107. if err := json.Unmarshal(data, str); err != nil {
  108. return err
  109. }
  110. confParser := make([]map[string]string, 0)
  111. if err := json.Unmarshal([]byte(*str), &confParser); err != nil {
  112. return err
  113. }
  114. confMap := make([]map[string]string, len(confParser))
  115. //convert {"key":"ds","value":"%Y%m%d",...} to {"ds":"%Y%m%d",...}
  116. for i, m := range confParser {
  117. confMap[i] = map[string]string{
  118. m["key"]: m["value"],
  119. }
  120. }
  121. pc.ConfigMap = confMap
  122. return nil
  123. }
  124. /** ODPS CONFIG **/
  125. type SinkOdpsConfig struct {
  126. ConnectorConfig
  127. Endpoint string `json:"OdpsEndpoint"`
  128. Project string `json:"Project"`
  129. Table string `json:"Table"`
  130. AccessId string `json:"AccessId"`
  131. AccessKey string `json:"AccessKey"`
  132. TimeRange int `json:"TimeRange"`
  133. TimeZone string `json:"TimeZone,omitempty"`
  134. PartitionMode PartitionMode `json:"PartitionMode"`
  135. PartitionConfig PartitionConfig `json:"PartitionConfig"`
  136. TunnelEndpoint string `json:"TunnelEndpoint,omitempty"`
  137. SplitKey string `json:"SplitKey,omitempty"`
  138. Base64Encode bool `json:"Base64Encode,omitempty"`
  139. }
  140. func marshalCreateOdpsConnector(ccr *CreateConnectorRequest) ([]byte, error) {
  141. soConf, ok := ccr.Config.(SinkOdpsConfig)
  142. if !ok {
  143. return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkOdpsConfig", reflect.TypeOf(ccr.Config)))
  144. }
  145. // set default value
  146. if soConf.TimestampUnit == "" {
  147. soConf.TimestampUnit = ConnectorMicrosecond
  148. }
  149. ct := &struct {
  150. Action string `json:"Action"`
  151. Type string `json:"Type"`
  152. SinkStartTime int64 `json:"SinkStartTime"`
  153. ColumnFields []string `json:"ColumnFields"`
  154. ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
  155. Config SinkOdpsConfig `json:"Config"`
  156. }{
  157. Action: ccr.Action,
  158. Type: ccr.Type.String(),
  159. SinkStartTime: ccr.SinkStartTime,
  160. ColumnFields: ccr.ColumnFields,
  161. ColumnNameMap: ccr.ColumnNameMap,
  162. Config: soConf,
  163. }
  164. return json.Marshal(ct)
  165. }
  166. func unmarshalGetOdpsConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
  167. //the api return TimeRange is string, so need to convert to int64
  168. type SinkOdpsConfigHelper struct {
  169. SinkOdpsConfig
  170. TimeRange string `json:"TimeRange"`
  171. }
  172. ct := &struct {
  173. GetConnectorResult
  174. Config SinkOdpsConfigHelper `json:"Config"`
  175. }{}
  176. if err := json.Unmarshal(data, ct); err != nil {
  177. return nil, err
  178. }
  179. conf := ct.Config.SinkOdpsConfig
  180. t, err := strconv.Atoi(ct.Config.TimeRange)
  181. if err != nil {
  182. return nil, err
  183. }
  184. conf.TimeRange = t
  185. ret := &ct.GetConnectorResult
  186. ret.Config = conf
  187. ret.CommonResponseResult = *commonResp
  188. return ret, nil
  189. }
  190. // no config update
  191. func marshalUpdateConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
  192. ct := &struct {
  193. Action string `json:"Action"`
  194. ColumnFields []string `json:"ColumnFields,omitempty"`
  195. ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
  196. }{
  197. Action: ucr.Action,
  198. ColumnFields: ucr.ColumnFields,
  199. ColumnNameMap: ucr.ColumnNameMap,
  200. }
  201. return json.Marshal(ct)
  202. }
  203. func marshalUpdateOdpsConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
  204. soConf, ok := ucr.Config.(SinkOdpsConfig)
  205. if !ok {
  206. return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkOdpsConfig", reflect.TypeOf(ucr.Config)))
  207. }
  208. // set default value
  209. if soConf.TimestampUnit == "" {
  210. soConf.TimestampUnit = ConnectorMicrosecond
  211. }
  212. ct := &struct {
  213. Action string `json:"Action"`
  214. ColumnFields []string `json:"ColumnFields,omitempty"`
  215. ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
  216. Config SinkOdpsConfig `json:"Config,omitempty"`
  217. }{
  218. Action: ucr.Action,
  219. ColumnFields: ucr.ColumnFields,
  220. ColumnNameMap: ucr.ColumnNameMap,
  221. Config: soConf,
  222. }
  223. return json.Marshal(ct)
  224. }
  225. /* Oss Config */
  226. type SinkOssConfig struct {
  227. ConnectorConfig
  228. Endpoint string `json:"Endpoint"`
  229. Bucket string `json:"Bucket"`
  230. Prefix string `json:"Prefix"`
  231. TimeFormat string `json:"TimeFormat"`
  232. TimeRange int `json:"TimeRange"`
  233. AuthMode AuthMode `json:"AuthMode"`
  234. AccessId string `json:"AccessId,omitempty"`
  235. AccessKey string `json:"AccessKey,omitempty"`
  236. MaxFileSize int64 `json:"MaxFileSize,omitempty"`
  237. }
  238. func marshalCreateOssConnector(ccr *CreateConnectorRequest) ([]byte, error) {
  239. soConf, ok := ccr.Config.(SinkOssConfig)
  240. if !ok {
  241. return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkOssConfig", reflect.TypeOf(ccr.Config)))
  242. }
  243. // set default value
  244. if soConf.TimestampUnit == "" {
  245. soConf.TimestampUnit = ConnectorMicrosecond
  246. }
  247. ct := &struct {
  248. Action string `json:"Action"`
  249. Type ConnectorType `json:"Type"`
  250. SinkStartTime int64 `json:"SinkStartTime"`
  251. ColumnFields []string `json:"ColumnFields"`
  252. ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
  253. Config SinkOssConfig `json:"Config"`
  254. }{
  255. Action: "create",
  256. Type: ccr.Type,
  257. SinkStartTime: ccr.SinkStartTime,
  258. ColumnFields: ccr.ColumnFields,
  259. ColumnNameMap: ccr.ColumnNameMap,
  260. Config: soConf,
  261. }
  262. return json.Marshal(ct)
  263. }
  264. func unmarshalGetOssConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
  265. type SinkOssConfigHelper struct {
  266. SinkOssConfig
  267. TimeRange string `json:"TimeRange"`
  268. }
  269. ct := &struct {
  270. GetConnectorResult
  271. Config SinkOssConfigHelper `json:"Config"`
  272. }{}
  273. if err := json.Unmarshal(data, ct); err != nil {
  274. return nil, err
  275. }
  276. soConf := ct.Config.SinkOssConfig
  277. t, err := strconv.Atoi(ct.Config.TimeRange)
  278. if err != nil {
  279. return nil, err
  280. }
  281. soConf.TimeRange = t
  282. ret := &ct.GetConnectorResult
  283. ret.Config = soConf
  284. ret.CommonResponseResult = *commonResp
  285. return ret, nil
  286. }
  287. func marshalUpdateOssConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
  288. soConf, ok := ucr.Config.(SinkOssConfig)
  289. if !ok {
  290. return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkOssConfig", reflect.TypeOf(ucr.Config)))
  291. }
  292. // set default value
  293. if soConf.TimestampUnit == "" {
  294. soConf.TimestampUnit = ConnectorMicrosecond
  295. }
  296. ct := &struct {
  297. Action string `json:"Action"`
  298. ColumnFields []string `json:"ColumnFields,omitempty"`
  299. ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
  300. Config SinkOssConfig `json:"Config,omitempty"`
  301. }{
  302. Action: "create",
  303. ColumnFields: ucr.ColumnFields,
  304. ColumnNameMap: ucr.ColumnNameMap,
  305. Config: soConf,
  306. }
  307. return json.Marshal(ct)
  308. }
  309. /* mysql Config */
  310. type SinkMysqlConfig struct {
  311. ConnectorConfig
  312. Host string `json:"Host"`
  313. Port string `json:"Port"`
  314. Database string `json:"Database"`
  315. Table string `json:"Table"`
  316. User string `json:"User"`
  317. Password string `json:"Password"`
  318. Ignore InsertMode `json:"Ignore"`
  319. }
  320. type InsertMode string
  321. const (
  322. IGNORE InsertMode = "true"
  323. OVERWRITE InsertMode = "false"
  324. )
  325. func marshalCreateMysqlConnector(ccr *CreateConnectorRequest) ([]byte, error) {
  326. soConf, ok := ccr.Config.(SinkMysqlConfig)
  327. if !ok {
  328. return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkMysqlConfig", reflect.TypeOf(ccr.Config)))
  329. }
  330. // set default value
  331. if soConf.TimestampUnit == "" {
  332. soConf.TimestampUnit = ConnectorMicrosecond
  333. }
  334. // set default value
  335. if soConf.TimestampUnit == "" {
  336. soConf.TimestampUnit = ConnectorMicrosecond
  337. }
  338. ct := &struct {
  339. Action string `json:"Action"`
  340. Type ConnectorType `json:"Type"`
  341. SinkStartTime int64 `json:"SinkStartTime"`
  342. ColumnFields []string `json:"ColumnFields"`
  343. ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
  344. Config SinkMysqlConfig `json:"Config"`
  345. }{
  346. Action: "create",
  347. Type: ccr.Type,
  348. SinkStartTime: ccr.SinkStartTime,
  349. ColumnFields: ccr.ColumnFields,
  350. ColumnNameMap: ccr.ColumnNameMap,
  351. Config: soConf,
  352. }
  353. return json.Marshal(ct)
  354. }
  355. func unmarshalGetMysqlConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
  356. ct := &struct {
  357. GetConnectorResult
  358. Config SinkMysqlConfig `json:"Config"`
  359. }{}
  360. if err := json.Unmarshal(data, ct); err != nil {
  361. return nil, err
  362. }
  363. ret := &ct.GetConnectorResult
  364. ret.Config = ct.Config
  365. ret.CommonResponseResult = *commonResp
  366. return ret, nil
  367. }
  368. func marshalUpdateMysqlConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
  369. soConf, ok := ucr.Config.(SinkMysqlConfig)
  370. if !ok {
  371. return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkMysqlConfig", reflect.TypeOf(ucr.Config)))
  372. }
  373. // set default value
  374. if soConf.TimestampUnit == "" {
  375. soConf.TimestampUnit = ConnectorMicrosecond
  376. }
  377. ct := &struct {
  378. Action string `json:"Action"`
  379. ColumnFields []string `json:"ColumnFields,omitempty"`
  380. ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
  381. Config SinkMysqlConfig `json:"Config,omitempty"`
  382. }{
  383. Action: "create",
  384. ColumnFields: ucr.ColumnFields,
  385. ColumnNameMap: ucr.ColumnNameMap,
  386. Config: soConf,
  387. }
  388. return json.Marshal(ct)
  389. }
  390. /* Ads Config */
  391. type SinkAdsConfig struct {
  392. SinkMysqlConfig
  393. }
  394. func marshalCreateAdsConnector(ccr *CreateConnectorRequest) ([]byte, error) {
  395. soConf, ok := ccr.Config.(SinkAdsConfig)
  396. if !ok {
  397. return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkAdsConfig", reflect.TypeOf(ccr.Config)))
  398. }
  399. // set default value
  400. if soConf.TimestampUnit == "" {
  401. soConf.TimestampUnit = ConnectorMicrosecond
  402. }
  403. ct := &struct {
  404. Action string `json:"Action"`
  405. Type ConnectorType `json:"Type"`
  406. SinkStartTime int64 `json:"SinkStartTime"`
  407. ColumnFields []string `json:"ColumnFields"`
  408. ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
  409. Config SinkAdsConfig `json:"Config"`
  410. }{
  411. Action: "create",
  412. Type: ccr.Type,
  413. SinkStartTime: ccr.SinkStartTime,
  414. ColumnFields: ccr.ColumnFields,
  415. ColumnNameMap: ccr.ColumnNameMap,
  416. Config: soConf,
  417. }
  418. return json.Marshal(ct)
  419. }
  420. func unmarshalGetAdsConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
  421. ct := &struct {
  422. GetConnectorResult
  423. Config SinkMysqlConfig `json:"Config"`
  424. }{}
  425. if err := json.Unmarshal(data, ct); err != nil {
  426. return nil, err
  427. }
  428. ret := &ct.GetConnectorResult
  429. ret.Config = ct.Config
  430. ret.CommonResponseResult = *commonResp
  431. return ret, nil
  432. }
  433. func marshalUpdateAdsConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
  434. soConf, ok := ucr.Config.(SinkAdsConfig)
  435. if !ok {
  436. return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkAdsConfig", reflect.TypeOf(ucr.Config)))
  437. }
  438. // set default value
  439. if soConf.TimestampUnit == "" {
  440. soConf.TimestampUnit = ConnectorMicrosecond
  441. }
  442. ct := &struct {
  443. Action string `json:"Action"`
  444. ColumnFields []string `json:"ColumnFields,omitempty"`
  445. ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
  446. Config SinkAdsConfig `json:"Config,omitempty"`
  447. }{
  448. Action: "create",
  449. ColumnFields: ucr.ColumnFields,
  450. ColumnNameMap: ucr.ColumnNameMap,
  451. Config: soConf,
  452. }
  453. return json.Marshal(ct)
  454. }
  455. /* datahub Config */
  456. type SinkDatahubConfig struct {
  457. ConnectorConfig
  458. Endpoint string `json:"Endpoint"`
  459. Project string `json:"Project"`
  460. Topic string `json:"Topic"`
  461. AuthMode AuthMode `json:"AuthMode"`
  462. AccessId string `json:"AccessId,omitempty"`
  463. AccessKey string `json:"AccessKey,omitempty"`
  464. }
  465. func marshalCreateDatahubConnector(ccr *CreateConnectorRequest) ([]byte, error) {
  466. soConf, ok := ccr.Config.(SinkDatahubConfig)
  467. if !ok {
  468. return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkDatahubConfig", reflect.TypeOf(ccr.Config)))
  469. }
  470. // set default value
  471. if soConf.TimestampUnit == "" {
  472. soConf.TimestampUnit = ConnectorMicrosecond
  473. }
  474. ct := &struct {
  475. Action string `json:"Action"`
  476. Type ConnectorType `json:"Type"`
  477. SinkStartTime int64 `json:"SinkStartTime"`
  478. ColumnFields []string `json:"ColumnFields"`
  479. ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
  480. Config SinkDatahubConfig `json:"Config"`
  481. }{
  482. Action: "create",
  483. Type: ccr.Type,
  484. SinkStartTime: ccr.SinkStartTime,
  485. ColumnFields: ccr.ColumnFields,
  486. ColumnNameMap: ccr.ColumnNameMap,
  487. Config: soConf,
  488. }
  489. return json.Marshal(ct)
  490. }
  491. func unmarshalGetDatahubConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
  492. ct := &struct {
  493. GetConnectorResult
  494. Config SinkDatahubConfig `json:"Config"`
  495. }{}
  496. if err := json.Unmarshal(data, ct); err != nil {
  497. return nil, err
  498. }
  499. ret := &ct.GetConnectorResult
  500. ret.Config = ct.Config
  501. ret.CommonResponseResult = *commonResp
  502. return ret, nil
  503. }
  504. func marshalUpdateDatahubConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
  505. soConf, ok := ucr.Config.(SinkDatahubConfig)
  506. if !ok {
  507. return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkDatahubConfig", reflect.TypeOf(ucr.Config)))
  508. }
  509. // set default value
  510. if soConf.TimestampUnit == "" {
  511. soConf.TimestampUnit = ConnectorMicrosecond
  512. }
  513. ct := &struct {
  514. Action string `json:"Action"`
  515. ColumnFields []string `json:"ColumnFields,omitempty"`
  516. ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
  517. Config SinkDatahubConfig `json:"Config,omitempty"`
  518. }{
  519. Action: "create",
  520. ColumnFields: ucr.ColumnFields,
  521. ColumnNameMap: ucr.ColumnNameMap,
  522. Config: soConf,
  523. }
  524. return json.Marshal(ct)
  525. }
  526. /* ES Config */
  527. type SinkEsConfig struct {
  528. ConnectorConfig
  529. Index string `json:"Index"`
  530. Endpoint string `json:"Endpoint"`
  531. User string `json:"User"`
  532. Password string `json:"Password"`
  533. IDFields []string `json:"IDFields"`
  534. TypeFields []string `json:"TypeFields"`
  535. RouterFields []string `json:"RouterFields"`
  536. ProxyMode bool `json:"ProxyMode"`
  537. }
  538. func marshalCreateEsConnector(ccr *CreateConnectorRequest) ([]byte, error) {
  539. soConf, ok := ccr.Config.(SinkEsConfig)
  540. if !ok {
  541. return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkEsConfig", reflect.TypeOf(ccr.Config)))
  542. }
  543. // set default value
  544. if soConf.TimestampUnit == "" {
  545. soConf.TimestampUnit = ConnectorMicrosecond
  546. }
  547. // server need ProxyMode be string
  548. type SinkEsConfigHelper struct {
  549. SinkEsConfig
  550. ProxyMode string `json:"ProxyMode"`
  551. }
  552. confHelper := SinkEsConfigHelper{
  553. SinkEsConfig: soConf,
  554. ProxyMode: strconv.FormatBool(soConf.ProxyMode),
  555. }
  556. ct := &struct {
  557. Action string `json:"Action"`
  558. Type ConnectorType `json:"Type"`
  559. SinkStartTime int64 `json:"SinkStartTime"`
  560. ColumnFields []string `json:"ColumnFields"`
  561. ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
  562. Config SinkEsConfigHelper `json:"Config"`
  563. }{
  564. Action: "create",
  565. Type: ccr.Type,
  566. SinkStartTime: ccr.SinkStartTime,
  567. ColumnFields: ccr.ColumnFields,
  568. ColumnNameMap: ccr.ColumnNameMap,
  569. Config: confHelper,
  570. }
  571. return json.Marshal(ct)
  572. }
  573. func unmarshalGetEsConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
  574. type SinkEsConfigHelper struct {
  575. SinkEsConfig
  576. IDFields string `json:"IDFields"`
  577. TypeFields string `json:"TypeFields"`
  578. RouterFields string `json:"RouterFields"`
  579. ProxyMode string `json:"ProxyMode"`
  580. }
  581. ct := &struct {
  582. GetConnectorResult
  583. Config SinkEsConfigHelper `json:"Config"`
  584. }{}
  585. if err := json.Unmarshal(data, ct); err != nil {
  586. return nil, err
  587. }
  588. conf := ct.Config.SinkEsConfig
  589. if ct.Config.ProxyMode != "" {
  590. proxy, err := strconv.ParseBool(ct.Config.ProxyMode)
  591. if err != nil {
  592. return nil, err
  593. }
  594. conf.ProxyMode = proxy
  595. }
  596. idFields := make([]string, 0)
  597. if ct.Config.IDFields != "" {
  598. if err := json.Unmarshal([]byte(ct.Config.IDFields), &idFields); err != nil {
  599. return nil, err
  600. }
  601. }
  602. conf.IDFields = idFields
  603. typeFields := make([]string, 0)
  604. if ct.Config.TypeFields != "" {
  605. if err := json.Unmarshal([]byte(ct.Config.TypeFields), &typeFields); err != nil {
  606. return nil, err
  607. }
  608. conf.TypeFields = typeFields
  609. }
  610. conf.TypeFields = typeFields
  611. routerFields := make([]string, 0)
  612. if ct.Config.RouterFields != "" {
  613. if err := json.Unmarshal([]byte(ct.Config.RouterFields), &routerFields); err != nil {
  614. return nil, err
  615. }
  616. }
  617. conf.RouterFields = routerFields
  618. ret := &ct.GetConnectorResult
  619. ret.CommonResponseResult = *commonResp
  620. ret.Config = conf
  621. return ret, nil
  622. }
  623. func marshalUpdateEsConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
  624. soConf, ok := ucr.Config.(SinkEsConfig)
  625. if !ok {
  626. return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkEsConfig", reflect.TypeOf(ucr.Config)))
  627. }
  628. // set default value
  629. if soConf.TimestampUnit == "" {
  630. soConf.TimestampUnit = ConnectorMicrosecond
  631. }
  632. ct := &struct {
  633. Action string `json:"Action"`
  634. ColumnFields []string `json:"ColumnFields,omitempty"`
  635. ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
  636. Config SinkEsConfig `json:"Config,omitempty"`
  637. }{
  638. Action: "create",
  639. ColumnFields: ucr.ColumnFields,
  640. ColumnNameMap: ucr.ColumnNameMap,
  641. Config: soConf,
  642. }
  643. return json.Marshal(ct)
  644. }
  645. type FcInvokeType string
  646. const (
  647. FcSync FcInvokeType = "sync"
  648. FcAsync FcInvokeType = "async"
  649. )
  650. /* FC Config */
  651. type SinkFcConfig struct {
  652. ConnectorConfig
  653. Endpoint string `json:"Endpoint"`
  654. Service string `json:"Service"`
  655. Function string `json:"Function"`
  656. AuthMode AuthMode `json:"AuthMode"`
  657. AccessId string `json:"AccessId,omitempty"`
  658. AccessKey string `json:"AccessKey,omitempty"`
  659. InvokeType FcInvokeType `json:"InvokeType"`
  660. }
  661. func marshalCreateFcConnector(ccr *CreateConnectorRequest) ([]byte, error) {
  662. soConf, ok := ccr.Config.(SinkFcConfig)
  663. if !ok {
  664. return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkFcConfig", reflect.TypeOf(ccr.Config)))
  665. }
  666. // set default value
  667. if soConf.TimestampUnit == "" {
  668. soConf.TimestampUnit = ConnectorMicrosecond
  669. }
  670. if soConf.InvokeType == "" {
  671. soConf.InvokeType = FcSync
  672. }
  673. ct := &struct {
  674. Action string `json:"Action"`
  675. Type ConnectorType `json:"Type"`
  676. SinkStartTime int64 `json:"SinkStartTime"`
  677. ColumnFields []string `json:"ColumnFields"`
  678. ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
  679. Config SinkFcConfig `json:"Config"`
  680. }{
  681. Action: "create",
  682. Type: ccr.Type,
  683. SinkStartTime: ccr.SinkStartTime,
  684. ColumnFields: ccr.ColumnFields,
  685. ColumnNameMap: ccr.ColumnNameMap,
  686. Config: soConf,
  687. }
  688. return json.Marshal(ct)
  689. }
  690. func unmarshalGetFcConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
  691. ct := &struct {
  692. GetConnectorResult
  693. Config SinkFcConfig `json:"Config"`
  694. }{}
  695. if err := json.Unmarshal(data, ct); err != nil {
  696. return nil, err
  697. }
  698. ret := &ct.GetConnectorResult
  699. ret.Config = ct.Config
  700. ret.CommonResponseResult = *commonResp
  701. return ret, nil
  702. }
  703. func marshalUpdateFcConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
  704. soConf, ok := ucr.Config.(SinkFcConfig)
  705. if !ok {
  706. return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkFcConfig", reflect.TypeOf(ucr.Config)))
  707. }
  708. // set default value
  709. if soConf.TimestampUnit == "" {
  710. soConf.TimestampUnit = ConnectorMicrosecond
  711. }
  712. ct := &struct {
  713. Action string `json:"Action"`
  714. ColumnFields []string `json:"ColumnFields,omitempty"`
  715. ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
  716. Config SinkFcConfig `json:"Config,omitempty"`
  717. }{
  718. Action: "create",
  719. ColumnFields: ucr.ColumnFields,
  720. ColumnNameMap: ucr.ColumnNameMap,
  721. Config: soConf,
  722. }
  723. return json.Marshal(ct)
  724. }
  725. type OtsWriteMode string
  726. const (
  727. OtsPut OtsWriteMode = "PUT"
  728. OtsUpdate OtsWriteMode = "UPDATE"
  729. )
  730. /* Ots Config */
  731. type SinkOtsConfig struct {
  732. ConnectorConfig
  733. Endpoint string `json:"Endpoint"`
  734. InstanceName string `json:"InstanceName"`
  735. TableName string `json:"TableName"`
  736. AuthMode AuthMode `json:"AuthMode"`
  737. AccessId string `json:"AccessId,omitempty"`
  738. AccessKey string `json:"AccessKey,omitempty"`
  739. WriteMode OtsWriteMode `json:"WriteMode"`
  740. }
  741. func marshalCreateOtsConnector(ccr *CreateConnectorRequest) ([]byte, error) {
  742. soConf, ok := ccr.Config.(SinkOtsConfig)
  743. if !ok {
  744. return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkOtsConfig", reflect.TypeOf(ccr.Config)))
  745. }
  746. // set default value
  747. if soConf.TimestampUnit == "" {
  748. soConf.TimestampUnit = ConnectorMicrosecond
  749. }
  750. if soConf.WriteMode == "" {
  751. soConf.WriteMode = OtsPut
  752. }
  753. ct := &struct {
  754. Action string `json:"Action"`
  755. Type ConnectorType `json:"Type"`
  756. SinkStartTime int64 `json:"SinkStartTime"`
  757. ColumnFields []string `json:"ColumnFields"`
  758. ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
  759. Config SinkOtsConfig `json:"Config"`
  760. }{
  761. Action: "create",
  762. Type: ccr.Type,
  763. SinkStartTime: ccr.SinkStartTime,
  764. ColumnFields: ccr.ColumnFields,
  765. ColumnNameMap: ccr.ColumnNameMap,
  766. Config: soConf,
  767. }
  768. return json.Marshal(ct)
  769. }
  770. func unmarshalGetOtsConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
  771. ct := &struct {
  772. GetConnectorResult
  773. Config SinkOtsConfig `json:"Config"`
  774. }{}
  775. if err := json.Unmarshal(data, ct); err != nil {
  776. return nil, err
  777. }
  778. ret := &ct.GetConnectorResult
  779. ret.Config = ct.Config
  780. ret.CommonResponseResult = *commonResp
  781. return ret, nil
  782. }
  783. func marshalUpdateOtsConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
  784. soConf, ok := ucr.Config.(SinkOtsConfig)
  785. if !ok {
  786. return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkMysqlConfig", reflect.TypeOf(ucr.Config)))
  787. }
  788. // set default value
  789. if soConf.TimestampUnit == "" {
  790. soConf.TimestampUnit = ConnectorMicrosecond
  791. }
  792. ct := &struct {
  793. Action string `json:"Action"`
  794. ColumnFields []string `json:"ColumnFields,omitempty"`
  795. ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
  796. Config SinkOtsConfig `json:"Config,omitempty"`
  797. }{
  798. Action: "create",
  799. ColumnFields: ucr.ColumnFields,
  800. ColumnNameMap: ucr.ColumnNameMap,
  801. Config: soConf,
  802. }
  803. return json.Marshal(ct)
  804. }
  805. /* datahub Config */
  806. type SinkHologresConfig struct {
  807. SinkDatahubConfig
  808. InstanceId string `json:"InstanceId,omitempty"`
  809. }
  810. func marshalCreateHologresConnector(ccr *CreateConnectorRequest) ([]byte, error) {
  811. soConf, ok := ccr.Config.(SinkHologresConfig)
  812. if !ok {
  813. return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkHologresConfig", reflect.TypeOf(ccr.Config)))
  814. }
  815. // set default value
  816. if soConf.TimestampUnit == "" {
  817. soConf.TimestampUnit = ConnectorMicrosecond
  818. }
  819. ct := &struct {
  820. Action string `json:"Action"`
  821. Type ConnectorType `json:"Type"`
  822. SinkStartTime int64 `json:"SinkStartTime"`
  823. ColumnFields []string `json:"ColumnFields"`
  824. ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
  825. Config SinkHologresConfig `json:"Config"`
  826. }{
  827. Action: "create",
  828. Type: ccr.Type,
  829. SinkStartTime: ccr.SinkStartTime,
  830. ColumnFields: ccr.ColumnFields,
  831. ColumnNameMap: ccr.ColumnNameMap,
  832. Config: soConf,
  833. }
  834. return json.Marshal(ct)
  835. }
  836. func unmarshalGetHologresConnector(commonResp *CommonResponseResult, data []byte) (*GetConnectorResult, error) {
  837. ct := &struct {
  838. GetConnectorResult
  839. Config SinkHologresConfig `json:"Config"`
  840. }{}
  841. if err := json.Unmarshal(data, ct); err != nil {
  842. return nil, err
  843. }
  844. ret := &ct.GetConnectorResult
  845. ret.Config = ct.Config
  846. ret.CommonResponseResult = *commonResp
  847. return ret, nil
  848. }
  849. func marshalUpdateHologresConnector(ucr *UpdateConnectorRequest) ([]byte, error) {
  850. soConf, ok := ucr.Config.(SinkHologresConfig)
  851. if !ok {
  852. return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("config type error,your input config type is %s,should be SinkHologresConfig", reflect.TypeOf(ucr.Config)))
  853. }
  854. // set default value
  855. if soConf.TimestampUnit == "" {
  856. soConf.TimestampUnit = ConnectorMicrosecond
  857. }
  858. ct := &struct {
  859. Action string `json:"Action"`
  860. ColumnFields []string `json:"ColumnFields,omitempty"`
  861. ColumnNameMap map[string]string `json:"ColumnNameMap,omitempty"`
  862. Config SinkHologresConfig `json:"Config,omitempty"`
  863. }{
  864. Action: "create",
  865. ColumnFields: ucr.ColumnFields,
  866. ColumnNameMap: ucr.ColumnNameMap,
  867. Config: soConf,
  868. }
  869. return json.Marshal(ct)
  870. }
  871. type ConnectorOffset struct {
  872. Timestamp int64 `json:"Timestamp"`
  873. Sequence int64 `json:"Sequence"`
  874. }
  875. type ConnectorShardState string
  876. // Deprecated, will be removed in a future version
  877. const (
  878. Created ConnectorShardState = "CONTEXT_PLANNED"
  879. Eexcuting ConnectorShardState = "CONTEXT_EXECUTING"
  880. Stopped ConnectorShardState = "CONTEXT_PAUSED"
  881. Finished ConnectorShardState = "CONTEXT_FINISHED"
  882. )
  883. const (
  884. ConnectorShardHang ConnectorShardState = "CONTEXT_HANG"
  885. ConnectorShardPlanned ConnectorShardState = "CONTEXT_PLANNED"
  886. ConnectorShardExecuting ConnectorShardState = "CONTEXT_EXECUTING"
  887. ConnectorShardStopped ConnectorShardState = "CONTEXT_STOPPED"
  888. ConnectorShardFinished ConnectorShardState = "CONTEXT_FINISHED"
  889. )
  890. type ConnectorShardStatusEntry struct {
  891. StartSequence int64 `json:"StartSequence"`
  892. EndSequence int64 `json:"EndSequence"`
  893. CurrentSequence int64 `json:"CurrentSequence"`
  894. CurrentTimestamp int64 `json:"CurrentTimestamp"`
  895. UpdateTime int64 `json:"UpdateTime"`
  896. State ConnectorShardState `json:"State"`
  897. LastErrorMessage string `json:"LastErrorMessage"`
  898. DiscardCount int64 `json:"DiscardCount"`
  899. DoneTime int64 `json:"DoneTime"`
  900. WorkerAddress string `json:"WorkerAddress"`
  901. }