requestmodel.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635
  1. package datahub
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/golang/protobuf/proto"
  7. "github.com/aliyun/aliyun-datahub-sdk-go/datahub/pbmodel"
  8. "github.com/aliyun/aliyun-datahub-sdk-go/datahub/util"
  9. "reflect"
  10. )
  11. // handel the http request
  12. type RequestModel interface {
  13. // serialize the requestModel and maybe need add some message on http header
  14. requestBodyEncode() ([]byte, error)
  15. }
  16. // empty request
  17. type EmptyRequest struct {
  18. }
  19. func (br *EmptyRequest) requestBodyEncode() ([]byte, error) {
  20. return nil, nil
  21. }
  22. type CreateProjectRequest struct {
  23. Comment string `json:"Comment"`
  24. }
  25. func (cpr *CreateProjectRequest) requestBodyEncode() ([]byte, error) {
  26. return json.Marshal(cpr)
  27. }
  28. type UpdateProjectRequest struct {
  29. Comment string `json:"Comment"`
  30. }
  31. func (upr *UpdateProjectRequest) requestBodyEncode() ([]byte, error) {
  32. return json.Marshal(upr)
  33. }
  34. type UpdateProjectVpcWhitelistRequest struct {
  35. VpcIds string `json:"VpcIds"`
  36. }
  37. func (upv *UpdateProjectVpcWhitelistRequest) requestBodyEncode() ([]byte, error) {
  38. return json.Marshal(upv)
  39. }
  40. type CreateTopicRequest struct {
  41. Action string `json:"Action"`
  42. ShardCount int `json:"ShardCount"`
  43. Lifecycle int `json:"Lifecycle"`
  44. RecordType RecordType `json:"RecordType"`
  45. RecordSchema *RecordSchema `json:"RecordSchema,omitempty"`
  46. Comment string `json:"Comment"`
  47. ExpandMode ExpandMode `json:"ExpandMode"`
  48. }
  49. func (ctr *CreateTopicRequest) MarshalJSON() ([]byte, error) {
  50. msg := &struct {
  51. Action string `json:"Action"`
  52. ShardCount int `json:"ShardCount"`
  53. Lifecycle int `json:"Lifecycle"`
  54. RecordType RecordType `json:"RecordType"`
  55. RecordSchema string `json:"RecordSchema,omitempty"`
  56. Comment string `json:"Comment"`
  57. ExpandMode ExpandMode `json:"ExpandMode"`
  58. }{
  59. Action: ctr.Action,
  60. ShardCount: ctr.ShardCount,
  61. Lifecycle: ctr.Lifecycle,
  62. RecordType: ctr.RecordType,
  63. Comment: ctr.Comment,
  64. ExpandMode: ctr.ExpandMode,
  65. }
  66. switch ctr.RecordType {
  67. case TUPLE:
  68. msg.RecordSchema = ctr.RecordSchema.String()
  69. default:
  70. msg.RecordSchema = ""
  71. }
  72. return json.Marshal(msg)
  73. }
  74. func (ctr *CreateTopicRequest) requestBodyEncode() ([]byte, error) {
  75. return json.Marshal(ctr)
  76. }
  77. type UpdateTopicRequest struct {
  78. Comment string `json:"Comment,omitempty"`
  79. Lifecycle int `json:"Lifecycle,omitempty"`
  80. }
  81. func (utr *UpdateTopicRequest) requestBodyEncode() ([]byte, error) {
  82. return json.Marshal(utr)
  83. }
  84. type SplitShardRequest struct {
  85. Action string `json:"Action"`
  86. ShardId string `json:"ShardId"`
  87. SplitKey string `json:"SplitKey,omitempty"`
  88. }
  89. func (ssr *SplitShardRequest) requestBodyEncode() ([]byte, error) {
  90. return json.Marshal(ssr)
  91. }
  92. type MergeShardRequest struct {
  93. Action string `json:"Action"`
  94. ShardId string `json:"ShardId"`
  95. AdjacentShardId string `json:"AdjacentShardId"`
  96. }
  97. func (msr *MergeShardRequest) requestBodyEncode() ([]byte, error) {
  98. return json.Marshal(msr)
  99. }
  100. type ExtendShardRequest struct {
  101. Action string `json:"Action"`
  102. ExtendMode string `json:"ExtendMode"`
  103. ShardCount int `json:"ShardNumber"`
  104. }
  105. func (esr *ExtendShardRequest) requestBodyEncode() ([]byte, error) {
  106. return json.Marshal(esr)
  107. }
  108. type GetCursorRequest struct {
  109. Action string `json:"Action"`
  110. CursorType CursorType `json:"Type"`
  111. SystemTime int64 `json:"SystemTime"`
  112. Sequence int64 `json:"Sequence"`
  113. }
  114. func (gcr *GetCursorRequest) requestBodyEncode() ([]byte, error) {
  115. type ReqMsg struct {
  116. Action string `json:"Action"`
  117. Type CursorType `json:"Type"`
  118. }
  119. reqMsg := ReqMsg{
  120. Action: gcr.Action,
  121. Type: gcr.CursorType,
  122. }
  123. switch gcr.CursorType {
  124. case OLDEST, LATEST:
  125. return json.Marshal(reqMsg)
  126. case SYSTEM_TIME:
  127. return json.Marshal(struct {
  128. ReqMsg
  129. SystemTime int64 `json:"SystemTime"`
  130. }{
  131. ReqMsg: reqMsg,
  132. SystemTime: gcr.SystemTime,
  133. })
  134. case SEQUENCE:
  135. return json.Marshal(struct {
  136. ReqMsg
  137. Sequence int64 `json:"Sequence"`
  138. }{
  139. ReqMsg: reqMsg,
  140. Sequence: gcr.Sequence,
  141. })
  142. default:
  143. return nil, errors.New(fmt.Sprintf("Cursor not support type %s", gcr.CursorType))
  144. }
  145. }
  146. type PutRecordsRequest struct {
  147. Action string `json:"Action"`
  148. Records []IRecord `json:"Records"`
  149. }
  150. func (prr *PutRecordsRequest) requestBodyEncode() ([]byte, error) {
  151. return json.Marshal(prr)
  152. }
  153. func (ptr *PutRecordsRequest) MarshalJSON() ([]byte, error) {
  154. msg := &struct {
  155. Action string `json:"Action"`
  156. Records []RecordEntry `json:"Records"`
  157. }{
  158. Action: ptr.Action,
  159. Records: make([]RecordEntry, len(ptr.Records)),
  160. }
  161. for idx, val := range ptr.Records {
  162. msg.Records[idx].Data = val.GetData()
  163. msg.Records[idx].BaseRecord = val.GetBaseRecord()
  164. }
  165. return json.Marshal(msg)
  166. }
  167. type GetRecordRequest struct {
  168. Action string `json:"Action"`
  169. Cursor string `json:"Cursor"`
  170. Limit int `json:"Limit"`
  171. }
  172. func (grr *GetRecordRequest) requestBodyEncode() ([]byte, error) {
  173. return json.Marshal(grr)
  174. }
  175. type AppendFieldRequest struct {
  176. Action string `json:"Action"`
  177. FieldName string `json:"FieldName"`
  178. FieldType FieldType `json:"FieldType"`
  179. }
  180. func (afr *AppendFieldRequest) requestBodyEncode() ([]byte, error) {
  181. return json.Marshal(afr)
  182. }
  183. type GetMeterInfoRequest struct {
  184. Action string `json:"Action"`
  185. }
  186. func (gmir *GetMeterInfoRequest) requestBodyEncode() ([]byte, error) {
  187. return json.Marshal(gmir)
  188. }
  189. type CreateConnectorRequest struct {
  190. Action string `json:"Action"`
  191. Type ConnectorType `json:"Type"`
  192. SinkStartTime int64 `json:"SinkStartTime"`
  193. ColumnFields []string `json:"ColumnFields"`
  194. ColumnNameMap map[string]string `json:"ColumnNameMap"`
  195. Config interface{} `json:"Config"`
  196. }
  197. func (ccr *CreateConnectorRequest) requestBodyEncode() ([]byte, error) {
  198. switch ccr.Type {
  199. case SinkOdps:
  200. return marshalCreateOdpsConnector(ccr)
  201. case SinkOss:
  202. return marshalCreateOssConnector(ccr)
  203. case SinkEs:
  204. return marshalCreateEsConnector(ccr)
  205. case SinkAds:
  206. return marshalCreateAdsConnector(ccr)
  207. case SinkMysql:
  208. return marshalCreateMysqlConnector(ccr)
  209. case SinkFc:
  210. return marshalCreateFcConnector(ccr)
  211. case SinkOts:
  212. return marshalCreateOtsConnector(ccr)
  213. case SinkDatahub:
  214. return marshalCreateDatahubConnector(ccr)
  215. case SinkHologres:
  216. return marshalCreateHologresConnector(ccr)
  217. default:
  218. return nil, errors.New(fmt.Sprintf("not support connector type config: %s", ccr.Type.String()))
  219. }
  220. }
  221. type UpdateConnectorRequest struct {
  222. Action string `json:"Action"`
  223. ColumnFields []string `json:"ColumnFields"`
  224. ColumnNameMap map[string]string `json:"ColumnNameMap"`
  225. Config interface{} `json:"Config"`
  226. }
  227. func (ucr *UpdateConnectorRequest) requestBodyEncode() ([]byte, error) {
  228. if ucr.Config == nil {
  229. return marshalUpdateConnector(ucr)
  230. }
  231. switch ucr.Config.(type) {
  232. case SinkOdpsConfig:
  233. return marshalUpdateOdpsConnector(ucr)
  234. case SinkOssConfig:
  235. return marshalUpdateOssConnector(ucr)
  236. case SinkEsConfig:
  237. return marshalUpdateEsConnector(ucr)
  238. case SinkAdsConfig:
  239. return marshalUpdateAdsConnector(ucr)
  240. case SinkMysqlConfig:
  241. return marshalUpdateMysqlConnector(ucr)
  242. case SinkFcConfig:
  243. return marshalUpdateFcConnector(ucr)
  244. case SinkOtsConfig:
  245. return marshalUpdateOtsConnector(ucr)
  246. case SinkDatahubConfig:
  247. return marshalUpdateDatahubConnector(ucr)
  248. case SinkHologresConfig:
  249. return marshalUpdateHologresConnector(ucr)
  250. default:
  251. return nil, errors.New(fmt.Sprintf("this connector type not support, %t", reflect.TypeOf(ucr.Config)))
  252. }
  253. }
  254. type ReloadConnectorRequest struct {
  255. Action string `json:"Action"`
  256. ShardId string `json:"ShardId,omitempty"`
  257. }
  258. func (rcr *ReloadConnectorRequest) requestBodyEncode() ([]byte, error) {
  259. return json.Marshal(rcr)
  260. }
  261. type UpdateConnectorStateRequest struct {
  262. Action string `json:"Action"`
  263. State ConnectorState `json:"State"`
  264. }
  265. func (ucsr *UpdateConnectorStateRequest) requestBodyEncode() ([]byte, error) {
  266. return json.Marshal(ucsr)
  267. }
  268. type UpdateConnectorOffsetRequest struct {
  269. Action string `json:"Action"`
  270. ShardId string `json:"ShardId"`
  271. Timestamp int64 `json:"CurrentTime"`
  272. Sequence int64 `json:"CurrentSequence"`
  273. }
  274. func (ucor *UpdateConnectorOffsetRequest) requestBodyEncode() ([]byte, error) {
  275. return json.Marshal(ucor)
  276. }
  277. type GetConnectorShardStatusRequest struct {
  278. Action string `json:"Action"`
  279. ShardId string `json:"ShardId,omitempty"`
  280. }
  281. func (gcss *GetConnectorShardStatusRequest) requestBodyEncode() ([]byte, error) {
  282. return json.Marshal(gcss)
  283. }
  284. type AppendConnectorFieldRequest struct {
  285. Action string `json:"Action"`
  286. FieldName string `json:"FieldName"`
  287. }
  288. func (acfr *AppendConnectorFieldRequest) requestBodyEncode() ([]byte, error) {
  289. return json.Marshal(acfr)
  290. }
  291. type CreateSubscriptionRequest struct {
  292. Action string `json:"Action"`
  293. Comment string `json:"Comment"`
  294. }
  295. func (csr *CreateSubscriptionRequest) requestBodyEncode() ([]byte, error) {
  296. return json.Marshal(csr)
  297. }
  298. type ListSubscriptionRequest struct {
  299. Action string `json:"Action"`
  300. PageIndex int `json:"PageIndex"`
  301. PageSize int `json:"PageSize"`
  302. }
  303. func (lsr *ListSubscriptionRequest) requestBodyEncode() ([]byte, error) {
  304. return json.Marshal(lsr)
  305. }
  306. type UpdateSubscriptionRequest struct {
  307. //Action string `json:"Action"`
  308. Comment string `json:"Comment"`
  309. }
  310. func (usr *UpdateSubscriptionRequest) requestBodyEncode() ([]byte, error) {
  311. return json.Marshal(usr)
  312. }
  313. type UpdateSubscriptionStateRequest struct {
  314. State SubscriptionState `json:"State"`
  315. }
  316. func (ussr *UpdateSubscriptionStateRequest) requestBodyEncode() ([]byte, error) {
  317. return json.Marshal(ussr)
  318. }
  319. type OpenSubscriptionSessionRequest struct {
  320. Action string `json:"Action"`
  321. ShardIds []string `json:"ShardIds"`
  322. }
  323. func (ossr *OpenSubscriptionSessionRequest) requestBodyEncode() ([]byte, error) {
  324. return json.Marshal(ossr)
  325. }
  326. type GetSubscriptionOffsetRequest struct {
  327. Action string `json:"Action"`
  328. ShardIds []string `json:"ShardIds"`
  329. }
  330. func (gsor *GetSubscriptionOffsetRequest) requestBodyEncode() ([]byte, error) {
  331. return json.Marshal(gsor)
  332. }
  333. type CommitSubscriptionOffsetRequest struct {
  334. Action string `json:"Action"`
  335. Offsets map[string]SubscriptionOffset `json:"Offsets"`
  336. }
  337. func (csor *CommitSubscriptionOffsetRequest) requestBodyEncode() ([]byte, error) {
  338. return json.Marshal(csor)
  339. }
  340. type ResetSubscriptionOffsetRequest struct {
  341. Action string `json:"Action"`
  342. Offsets map[string]SubscriptionOffset `json:"Offsets"`
  343. }
  344. func (rsor *ResetSubscriptionOffsetRequest) requestBodyEncode() ([]byte, error) {
  345. return json.Marshal(rsor)
  346. }
  347. type HeartbeatRequest struct {
  348. Action string `json:"Action"`
  349. ConsumerId string `json:"ConsumerId"`
  350. VersionId int64 `json:"VersionId"`
  351. HoldShardList []string `json:"HoldShardList,omitempty"`
  352. ReadEndShardList [] string `json:"ReadEndShardList,omitempty"`
  353. }
  354. func (hr *HeartbeatRequest) requestBodyEncode() ([]byte, error) {
  355. return json.Marshal(hr)
  356. }
  357. type JoinGroupRequest struct {
  358. Action String `json:"Action"`
  359. SessionTimeout int64 `json:"SessionTimeout"`
  360. }
  361. func (jgr *JoinGroupRequest) requestBodyEncode() ([]byte, error) {
  362. return json.Marshal(jgr)
  363. }
  364. type SyncGroupRequest struct {
  365. Action string `json:"Action"`
  366. ConsumerId string `json:"ConsumerId"`
  367. VersionId int64 `json:"VersionId"`
  368. ReleaseShardList []string `json:"ReleaseShardList,omitempty"`
  369. ReadEndShardList []string `json:"ReadEndShardList,omitempty"`
  370. }
  371. func (sgr *SyncGroupRequest) requestBodyEncode() ([]byte, error) {
  372. return json.Marshal(sgr)
  373. }
  374. type LeaveGroupRequest struct {
  375. Action string `json:"Action"`
  376. ConsumerId string `json:"ConsumerId"`
  377. VersionId int64 `json:"VersionId"`
  378. }
  379. func (lgr *LeaveGroupRequest) requestBodyEncode() ([]byte, error) {
  380. return json.Marshal(lgr)
  381. }
  382. type ListTopicSchemaRequest struct {
  383. Action string `json:"Action"`
  384. }
  385. func (lts *ListTopicSchemaRequest) requestBodyEncode() ([]byte, error) {
  386. return json.Marshal(lts)
  387. }
  388. type GetTopicSchemaRequest struct {
  389. Action string `json:"Action"`
  390. VersionId int `json:"VersionId"`
  391. RecordSchema *RecordSchema `json:"RecordSchema,omitempty"`
  392. }
  393. func (gts *GetTopicSchemaRequest) requestBodyEncode() ([]byte, error) {
  394. msg := &struct {
  395. Action string `json:"Action"`
  396. VersionId int `json:"VersionId"`
  397. RecordSchema string `json:"RecordSchema,omitempty"`
  398. }{
  399. Action: gts.Action,
  400. VersionId: gts.VersionId,
  401. }
  402. if gts.RecordSchema != nil {
  403. msg.RecordSchema = gts.RecordSchema.String()
  404. }
  405. return json.Marshal(msg)
  406. }
  407. type RegisterTopicSchemaRequest struct {
  408. Action string `json:"Action"`
  409. RecordSchema *RecordSchema `json:"RecordSchema"`
  410. }
  411. func (rts *RegisterTopicSchemaRequest) requestBodyEncode() ([]byte, error) {
  412. msg := &struct {
  413. Action string `json:"Action"`
  414. RecordSchema string `json:"RecordSchema,omitempty"`
  415. }{
  416. Action: rts.Action,
  417. }
  418. if rts.RecordSchema != nil {
  419. msg.RecordSchema = rts.RecordSchema.String()
  420. }
  421. return json.Marshal(msg)
  422. }
  423. type DeleteTopicSchemaRequest struct {
  424. Action string `json:"Action"`
  425. VersionId int `json:"VersionId"`
  426. }
  427. func (lgr *DeleteTopicSchemaRequest) requestBodyEncode() ([]byte, error) {
  428. return json.Marshal(lgr)
  429. }
  430. type PutPBRecordsRequest struct {
  431. Records []IRecord `json:"Records"`
  432. }
  433. func (pr *PutPBRecordsRequest) requestBodyEncode() ([]byte, error) {
  434. res := make([]*pbmodel.RecordEntry, len(pr.Records))
  435. for idx, val := range pr.Records {
  436. bRecord := val.GetBaseRecord()
  437. data := val.GetData()
  438. fds := make([]*pbmodel.FieldData, 0)
  439. switch data.(type) {
  440. case []byte:
  441. fd := &pbmodel.FieldData{
  442. Value: data.([]byte),
  443. }
  444. fds = append(fds, fd)
  445. default:
  446. v, ok := data.([]interface{})
  447. if !ok {
  448. return nil, errors.New("data format is invalid")
  449. }
  450. for _, str := range v {
  451. fd := &pbmodel.FieldData{}
  452. if str == nil {
  453. fd.Value = nil
  454. } else {
  455. fd.Value = []byte(fmt.Sprintf("%s", str))
  456. }
  457. fds = append(fds, fd)
  458. }
  459. }
  460. rd := &pbmodel.RecordData{
  461. Data: fds,
  462. }
  463. recordEntry := &pbmodel.RecordEntry{
  464. ShardId: proto.String(bRecord.ShardId),
  465. Data: rd,
  466. }
  467. if len(bRecord.Attributes) > 0 {
  468. sps := make([]*pbmodel.StringPair, len(bRecord.Attributes))
  469. index := 0
  470. for k, v := range bRecord.Attributes {
  471. strv := fmt.Sprintf("%v", v)
  472. sp := &pbmodel.StringPair{
  473. Key: proto.String(k),
  474. Value: proto.String(strv),
  475. }
  476. sps[index] = sp
  477. index++
  478. }
  479. ra := &pbmodel.RecordAttributes{
  480. Attributes: sps,
  481. }
  482. recordEntry.Attributes = ra
  483. }
  484. res[idx] = recordEntry
  485. }
  486. prr := &pbmodel.PutRecordsRequest{
  487. Records: res,
  488. }
  489. buf, err := proto.Marshal(prr)
  490. if err != nil {
  491. return nil, err
  492. }
  493. x := util.WrapMessage(buf)
  494. return x, nil
  495. }
  496. type GetPBRecordRequest struct {
  497. Cursor string `json:"Cursor"`
  498. Limit int `json:"Limit"`
  499. }
  500. func (gpr *GetPBRecordRequest) requestBodyEncode() ([]byte, error) {
  501. limit := int32(gpr.Limit)
  502. grr := &pbmodel.GetRecordsRequest{
  503. Cursor: &gpr.Cursor,
  504. Limit: &limit,
  505. }
  506. buf, err := proto.Marshal(grr)
  507. if err != nil {
  508. return nil, err
  509. }
  510. wBuf := util.WrapMessage(buf)
  511. return wBuf, nil
  512. }
  513. type PutBatchRecordsRequest struct {
  514. serializer *batchSerializer
  515. Records []IRecord
  516. }
  517. func (pbr *PutBatchRecordsRequest) requestBodyEncode() ([]byte, error) {
  518. batchBuf, err := pbr.serializer.serialize(pbr.Records)
  519. if err != nil {
  520. return nil, err
  521. }
  522. entry := &pbmodel.BinaryRecordEntry{
  523. Data: batchBuf,
  524. }
  525. protoReq := &pbmodel.PutBinaryRecordsRequest{
  526. Records: []*pbmodel.BinaryRecordEntry{entry},
  527. }
  528. buf, err := proto.Marshal(protoReq)
  529. if err != nil {
  530. return nil, err
  531. }
  532. return util.WrapMessage(buf), nil
  533. }
  534. type GetBatchRecordRequest struct {
  535. GetPBRecordRequest
  536. }