resultmodel.go 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174
  1. package datahub
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "github.com/golang/protobuf/proto"
  7. "net/http"
  8. "github.com/aliyun/aliyun-datahub-sdk-go/datahub/pbmodel"
  9. "github.com/aliyun/aliyun-datahub-sdk-go/datahub/util"
  10. )
  11. // for the common response and detect error
  12. type CommonResponseResult struct {
  13. // StatusCode http return code
  14. StatusCode int
  15. // RequestId examples request id return by server
  16. RequestId string
  17. }
  18. func newCommonResponseResult(code int, header *http.Header, body []byte) (*CommonResponseResult, error) {
  19. result := &CommonResponseResult{
  20. StatusCode: code,
  21. RequestId: header.Get(httpHeaderRequestId),
  22. }
  23. var err error
  24. switch {
  25. case code >= 400:
  26. var datahubErr DatahubClientError
  27. if err = json.Unmarshal(body, &datahubErr); err != nil {
  28. return nil, err
  29. }
  30. err = errorHandler(code, result.RequestId, datahubErr.Code, datahubErr.Message)
  31. default:
  32. err = nil
  33. }
  34. return result, err
  35. }
  36. // the result of ListProject
  37. type ListProjectResult struct {
  38. CommonResponseResult
  39. ProjectNames []string `json:"ProjectNames"`
  40. }
  41. // convert the response body to ListProjectResult
  42. func NewListProjectResult(data []byte, commonResp *CommonResponseResult) (*ListProjectResult, error) {
  43. lpr := &ListProjectResult{
  44. CommonResponseResult: *commonResp,
  45. ProjectNames: make([]string, 0, 0),
  46. }
  47. if err := json.Unmarshal(data, lpr); err != nil {
  48. return nil, err
  49. }
  50. return lpr, nil
  51. }
  52. type CreateProjectResult struct {
  53. CommonResponseResult
  54. }
  55. func NewCreateProjectResult(commonResp *CommonResponseResult) (*CreateProjectResult, error) {
  56. cpr := &CreateProjectResult{
  57. CommonResponseResult: *commonResp,
  58. }
  59. return cpr, nil
  60. }
  61. type UpdateProjectResult struct {
  62. CommonResponseResult
  63. }
  64. func NewUpdateProjectResult(commonResp *CommonResponseResult) (*UpdateProjectResult, error) {
  65. upr := &UpdateProjectResult{
  66. CommonResponseResult: *commonResp,
  67. }
  68. return upr, nil
  69. }
  70. type DeleteProjectResult struct {
  71. CommonResponseResult
  72. }
  73. func NewDeleteProjectResult(commonResp *CommonResponseResult) (*DeleteProjectResult, error) {
  74. dpr := &DeleteProjectResult{
  75. CommonResponseResult: *commonResp,
  76. }
  77. return dpr, nil
  78. }
  79. // the result of GetProject
  80. type GetProjectResult struct {
  81. CommonResponseResult
  82. ProjectName string
  83. CreateTime int64 `json:"CreateTime"`
  84. LastModifyTime int64 `json:"LastModifyTime"`
  85. Comment string `json"Comment"`
  86. }
  87. // convert the response body to GetProjectResult
  88. func NewGetProjectResult(data []byte, commonResp *CommonResponseResult) (*GetProjectResult, error) {
  89. gpr := &GetProjectResult{
  90. CommonResponseResult: *commonResp,
  91. }
  92. if err := json.Unmarshal(data, gpr); err != nil {
  93. return nil, err
  94. }
  95. return gpr, nil
  96. }
  97. type UpdateProjectVpcWhitelistResult struct {
  98. CommonResponseResult
  99. }
  100. func NewUpdateProjectVpcWhitelistResult(commonResp *CommonResponseResult) (*UpdateProjectVpcWhitelistResult, error) {
  101. upvw := &UpdateProjectVpcWhitelistResult{
  102. CommonResponseResult: *commonResp,
  103. }
  104. return upvw, nil
  105. }
  106. type ListTopicResult struct {
  107. CommonResponseResult
  108. TopicNames [] string `json:"TopicNames"`
  109. }
  110. func NewListTopicResult(data []byte, commonResp *CommonResponseResult) (*ListTopicResult, error) {
  111. lt := &ListTopicResult{
  112. CommonResponseResult: *commonResp,
  113. }
  114. if err := json.Unmarshal(data, lt); err != nil {
  115. return nil, err
  116. }
  117. return lt, nil
  118. }
  119. type CreateBlobTopicResult struct {
  120. CommonResponseResult
  121. }
  122. func NewCreateBlobTopicResult(commonResp *CommonResponseResult) (*CreateBlobTopicResult, error) {
  123. cbrt := &CreateBlobTopicResult{
  124. CommonResponseResult: *commonResp,
  125. }
  126. return cbrt, nil
  127. }
  128. type CreateTupleTopicResult struct {
  129. CommonResponseResult
  130. }
  131. func NewCreateTupleTopicResult(commonResp *CommonResponseResult) (*CreateTupleTopicResult, error) {
  132. cttr := &CreateTupleTopicResult{
  133. CommonResponseResult: *commonResp,
  134. }
  135. return cttr, nil
  136. }
  137. type CreateTopicWithParaResult struct {
  138. CommonResponseResult
  139. }
  140. func NewCreateTopicWithParaResult(commonResp *CommonResponseResult) (*CreateTopicWithParaResult, error) {
  141. ctwp := &CreateTopicWithParaResult{
  142. CommonResponseResult: *commonResp,
  143. }
  144. return ctwp, nil
  145. }
  146. type UpdateTopicResult struct {
  147. CommonResponseResult
  148. }
  149. func NewUpdateTopicResult(commonResp *CommonResponseResult) (*UpdateTopicResult, error) {
  150. utr := &UpdateTopicResult{
  151. CommonResponseResult: *commonResp,
  152. }
  153. return utr, nil
  154. }
  155. type DeleteTopicResult struct {
  156. CommonResponseResult
  157. }
  158. func NewDeleteTopicResult(commonResp *CommonResponseResult) (*DeleteTopicResult, error) {
  159. dtr := &DeleteTopicResult{
  160. CommonResponseResult: *commonResp,
  161. }
  162. return dtr, nil
  163. }
  164. type GetTopicResult struct {
  165. CommonResponseResult
  166. ProjectName string
  167. TopicName string
  168. ShardCount int `json:"ShardCount"`
  169. LifeCycle int `json:"LifeCycle"`
  170. RecordType RecordType `json:"RecordType"`
  171. RecordSchema *RecordSchema `json:"RecordSchema"`
  172. Comment string `json:"Comment"`
  173. CreateTime int64 `json:"CreateTime"`
  174. LastModifyTime int64 `json:"LastModifyTime"`
  175. TopicStatus TopicStatus `json:"Status"`
  176. ExpandMode ExpandMode `json:"ExpandMode"`
  177. }
  178. // for deserialize the RecordSchema
  179. func (gtr *GetTopicResult) UnmarshalJSON(data []byte) error {
  180. msg := &struct {
  181. ShardCount int `json:"ShardCount"`
  182. LifeCycle int `json:"LifeCycle"`
  183. RecordType RecordType `json:"RecordType"`
  184. RecordSchema string `json:"RecordSchema"`
  185. Comment string `json:"Comment"`
  186. CreateTime int64 `json:"CreateTime"`
  187. LastModifyTime int64 `json:"LastModifyTime"`
  188. TopicStatus TopicStatus `json:"Status"`
  189. ExpandMode ExpandMode `json:"ExpandMode"`
  190. }{}
  191. if err := json.Unmarshal(data, msg); err != nil {
  192. return err
  193. }
  194. gtr.ShardCount = msg.ShardCount
  195. gtr.LifeCycle = msg.LifeCycle
  196. gtr.RecordType = msg.RecordType
  197. gtr.Comment = msg.Comment
  198. gtr.CreateTime = msg.CreateTime
  199. gtr.LastModifyTime = msg.LastModifyTime
  200. gtr.TopicStatus = msg.TopicStatus
  201. gtr.ExpandMode = msg.ExpandMode
  202. if msg.RecordType == TUPLE {
  203. rs := &RecordSchema{}
  204. if err := json.Unmarshal([]byte(msg.RecordSchema), rs); err != nil {
  205. return err
  206. }
  207. for idx := range rs.Fields {
  208. rs.Fields[idx].AllowNull = !rs.Fields[idx].AllowNull
  209. }
  210. gtr.RecordSchema = rs
  211. }
  212. return nil
  213. }
  214. func NewGetTopicResult(data []byte, commonResp *CommonResponseResult) (*GetTopicResult, error) {
  215. gr := &GetTopicResult{
  216. CommonResponseResult: *commonResp,
  217. }
  218. if err := json.Unmarshal(data, gr); err != nil {
  219. return nil, err
  220. }
  221. return gr, nil
  222. }
  223. type ListShardResult struct {
  224. CommonResponseResult
  225. Shards []ShardEntry `json:"Shards"`
  226. Protocol string `json:"Protocol"`
  227. IntervalMs int64 `json:"Interval"`
  228. }
  229. func NewListShardResult(data []byte, commonResp *CommonResponseResult) (*ListShardResult, error) {
  230. lsr := &ListShardResult{
  231. CommonResponseResult: *commonResp,
  232. }
  233. if err := json.Unmarshal(data, lsr); err != nil {
  234. return nil, err
  235. }
  236. return lsr, nil
  237. }
  238. type SplitShardResult struct {
  239. CommonResponseResult
  240. NewShards []ShardEntry `json:"NewShards"`
  241. }
  242. func NewSplitShardResult(data []byte, commonResp *CommonResponseResult) (*SplitShardResult, error) {
  243. ssr := &SplitShardResult{
  244. CommonResponseResult: *commonResp,
  245. }
  246. if err := json.Unmarshal(data, ssr); err != nil {
  247. return nil, err
  248. }
  249. return ssr, nil
  250. }
  251. type MergeShardResult struct {
  252. CommonResponseResult
  253. ShardId string `json:"ShardId"`
  254. BeginHashKey string `json:"BeginHashKey"`
  255. EndHashKey string `json:"EndHashKey"`
  256. }
  257. func NewMergeShardResult(data []byte, commonResp *CommonResponseResult) (*MergeShardResult, error) {
  258. ssr := &MergeShardResult{
  259. CommonResponseResult: *commonResp,
  260. }
  261. if err := json.Unmarshal(data, ssr); err != nil {
  262. return nil, err
  263. }
  264. return ssr, nil
  265. }
  266. type ExtendShardResult struct {
  267. CommonResponseResult
  268. }
  269. func NewExtendShardResult(commonResp *CommonResponseResult) (*ExtendShardResult, error) {
  270. esr := &ExtendShardResult{
  271. CommonResponseResult: *commonResp,
  272. }
  273. return esr, nil
  274. }
  275. type GetCursorResult struct {
  276. CommonResponseResult
  277. Cursor string `json:"Cursor"`
  278. RecordTime int64 `json:"RecordTime"`
  279. Sequence int64 `json:"Sequence"`
  280. }
  281. func NewGetCursorResult(data []byte, commonResp *CommonResponseResult) (*GetCursorResult, error) {
  282. gcr := &GetCursorResult{
  283. CommonResponseResult: *commonResp,
  284. }
  285. if err := json.Unmarshal(data, gcr); err != nil {
  286. return nil, err
  287. }
  288. return gcr, nil
  289. }
  290. type PutRecordsResult struct {
  291. CommonResponseResult
  292. FailedRecordCount int `json:"FailedRecordCount"`
  293. FailedRecords []FailedRecord `json:"FailedRecords"`
  294. }
  295. func NewPutRecordsResult(data []byte, commonResp *CommonResponseResult) (*PutRecordsResult, error) {
  296. prr := &PutRecordsResult{
  297. CommonResponseResult: *commonResp,
  298. }
  299. if err := json.Unmarshal(data, prr); err != nil {
  300. return nil, err
  301. }
  302. return prr, nil
  303. }
  304. func NewPutPBRecordsResult(data []byte, commonResp *CommonResponseResult) (*PutRecordsResult, error) {
  305. pr := &PutRecordsResult{
  306. CommonResponseResult: *commonResp,
  307. }
  308. data, err := util.UnwrapMessage(data)
  309. if err != nil {
  310. return nil, err
  311. }
  312. prr := &pbmodel.PutRecordsResponse{}
  313. if err := proto.Unmarshal(data, prr); err != nil {
  314. return nil, err
  315. }
  316. pr.FailedRecordCount = int(*prr.FailedCount)
  317. if pr.FailedRecordCount > 0 {
  318. records := make([]FailedRecord, pr.FailedRecordCount)
  319. for idx, v := range prr.FailedRecords {
  320. records[idx].ErrorCode = *v.ErrorCode
  321. records[idx].ErrorMessage = *v.ErrorMessage
  322. records[idx].Index = int(*v.Index)
  323. }
  324. pr.FailedRecords = records
  325. }
  326. return pr, nil
  327. }
  328. type PutRecordsByShardResult struct {
  329. CommonResponseResult
  330. }
  331. func NewPutRecordsByShardResult(commonResp *CommonResponseResult) (*PutRecordsByShardResult, error) {
  332. prbs := &PutRecordsByShardResult{
  333. CommonResponseResult: *commonResp,
  334. }
  335. return prbs, nil
  336. }
  337. type GetRecordsResult struct {
  338. CommonResponseResult
  339. NextCursor string `json:"NextCursor"`
  340. RecordCount int `json:"RecordCount"`
  341. StartSequence int64 `json:"StartSeq"`
  342. LatestSequence int64 `json:"LatestSeq"`
  343. LatestTime int64 `json:"LatestTime"`
  344. Records []IRecord `json:"Records"`
  345. RecordSchema *RecordSchema `json:"-"`
  346. }
  347. func (grr *GetRecordsResult) UnmarshalJSON(data []byte) error {
  348. msg := &struct {
  349. NextCursor string `json:"NextCursor"`
  350. RecordCount int `json:"RecordCount"`
  351. StartSequence int64 `json:"StartSeq"`
  352. LatestSequence int64 `json:"LatestSeq"`
  353. LatestTime int64 `json:"LatestTime"`
  354. Records []*struct {
  355. SystemTime int64 `json:"SystemTime"`
  356. NextCursor string `json:"NextCursor"`
  357. CurrentCursor string `json:"Cursor"`
  358. Sequence int64 `json:"Sequence"`
  359. Attributes map[string]interface{} `json:"Attributes"`
  360. Data interface{} `json:"Data"`
  361. } `json:"Records"`
  362. }{}
  363. err := json.Unmarshal(data, msg)
  364. if err != nil {
  365. return err
  366. }
  367. grr.NextCursor = msg.NextCursor
  368. grr.RecordCount = msg.RecordCount
  369. grr.StartSequence = msg.StartSequence
  370. grr.LatestSequence = msg.LatestSequence
  371. grr.LatestTime = msg.LatestTime
  372. grr.Records = make([]IRecord, len(msg.Records))
  373. for idx, record := range msg.Records {
  374. if record.Data == nil {
  375. return errors.New("invalid record response, record data is nil")
  376. }
  377. switch dt := record.Data.(type) {
  378. case []interface{}, []string:
  379. if grr.RecordSchema == nil {
  380. return errors.New("tuple record type must set record schema")
  381. }
  382. grr.Records[idx] = NewTupleRecord(grr.RecordSchema, record.SystemTime)
  383. case string:
  384. grr.Records[idx] = NewBlobRecord([]byte(dt), record.SystemTime)
  385. default:
  386. return errors.New(fmt.Sprintf("illegal record data type[%T]", dt))
  387. }
  388. if err := grr.Records[idx].FillData(record.Data); err != nil {
  389. return err
  390. }
  391. for key, val := range record.Attributes {
  392. grr.Records[idx].SetAttribute(key, val)
  393. }
  394. br := BaseRecord{
  395. SystemTime: msg.Records[idx].SystemTime,
  396. NextCursor: msg.Records[idx].NextCursor,
  397. Cursor: msg.Records[idx].CurrentCursor,
  398. Sequence: msg.Records[idx].Sequence,
  399. Attributes: msg.Records[idx].Attributes,
  400. }
  401. grr.Records[idx].SetBaseRecord(br)
  402. }
  403. return nil
  404. }
  405. func NewGetRecordsResult(data []byte, schema *RecordSchema, commonResp *CommonResponseResult) (*GetRecordsResult, error) {
  406. grr := &GetRecordsResult{
  407. CommonResponseResult: *commonResp,
  408. RecordSchema: schema,
  409. }
  410. if err := json.Unmarshal(data, grr); err != nil {
  411. return nil, err
  412. }
  413. return grr, nil
  414. }
  415. func NewGetPBRecordsResult(data []byte, schema *RecordSchema, commonResp *CommonResponseResult) (*GetRecordsResult, error) {
  416. data, err := util.UnwrapMessage(data)
  417. if err != nil {
  418. return nil, err
  419. }
  420. grr := &pbmodel.GetRecordsResponse{}
  421. if err := proto.Unmarshal(data, grr); err != nil {
  422. return nil, err
  423. }
  424. result := &GetRecordsResult{
  425. CommonResponseResult: *commonResp,
  426. RecordSchema: schema,
  427. }
  428. if grr.NextCursor != nil {
  429. result.NextCursor = *(grr.NextCursor)
  430. }
  431. if grr.StartSequence != nil {
  432. result.StartSequence = *grr.StartSequence
  433. }
  434. if grr.LatestSequence != nil {
  435. result.LatestSequence = *grr.LatestSequence
  436. }
  437. if grr.LatestTime != nil {
  438. result.LatestTime = *grr.LatestTime
  439. }
  440. if grr.RecordCount != nil {
  441. result.RecordCount = int(*grr.RecordCount)
  442. if result.RecordCount > 0 {
  443. result.Records = make([]IRecord, result.RecordCount)
  444. for idx, record := range grr.Records {
  445. //Tuple topic
  446. if result.RecordSchema != nil {
  447. tr := NewTupleRecord(result.RecordSchema, *record.SystemTime)
  448. if err := fillTupleData(tr, record); err != nil {
  449. return nil, err
  450. }
  451. result.Records[idx] = tr
  452. } else {
  453. br := NewBlobRecord(record.Data.Data[0].Value, *record.SystemTime)
  454. if err := fillBlobData(br, record); err != nil {
  455. return nil, err
  456. }
  457. result.Records[idx] = br
  458. }
  459. }
  460. }
  461. }
  462. return result, nil
  463. }
  464. func fillTupleData(tr *TupleRecord, recordEntry *pbmodel.RecordEntry) error {
  465. if recordEntry.ShardId != nil {
  466. tr.ShardId = *recordEntry.ShardId
  467. }
  468. if recordEntry.HashKey != nil {
  469. tr.HashKey = *recordEntry.HashKey
  470. }
  471. if recordEntry.PartitionKey != nil {
  472. tr.Sequence = *recordEntry.Sequence
  473. }
  474. if recordEntry.Cursor != nil {
  475. tr.Cursor = *recordEntry.Cursor
  476. }
  477. if recordEntry.NextCursor != nil {
  478. tr.NextCursor = *recordEntry.NextCursor
  479. }
  480. if recordEntry.Sequence != nil {
  481. tr.Sequence = *recordEntry.Sequence
  482. }
  483. if recordEntry.SystemTime != nil {
  484. tr.SystemTime = *recordEntry.SystemTime
  485. }
  486. if recordEntry.Attributes != nil {
  487. for _, pair := range recordEntry.Attributes.Attributes {
  488. tr.Attributes[*pair.Key] = *pair.Value
  489. }
  490. }
  491. data := recordEntry.Data.Data
  492. for idx, v := range data {
  493. if v.Value != nil {
  494. tv, err := castValueFromString(string(v.Value), tr.RecordSchema.Fields[idx].Type)
  495. if err != nil {
  496. return err
  497. }
  498. tr.Values[idx] = tv
  499. }
  500. }
  501. return nil
  502. }
  503. func fillBlobData(br *BlobRecord, recordEntry *pbmodel.RecordEntry) error {
  504. if recordEntry.ShardId != nil {
  505. br.ShardId = *recordEntry.ShardId
  506. }
  507. if recordEntry.HashKey != nil {
  508. br.HashKey = *recordEntry.HashKey
  509. }
  510. if recordEntry.PartitionKey != nil {
  511. br.Sequence = *recordEntry.Sequence
  512. }
  513. if recordEntry.Cursor != nil {
  514. br.Cursor = *recordEntry.Cursor
  515. }
  516. if recordEntry.NextCursor != nil {
  517. br.NextCursor = *recordEntry.NextCursor
  518. }
  519. if recordEntry.Sequence != nil {
  520. br.Sequence = *recordEntry.Sequence
  521. }
  522. if recordEntry.SystemTime != nil {
  523. br.SystemTime = *recordEntry.SystemTime
  524. }
  525. if recordEntry.Attributes != nil {
  526. for _, pair := range recordEntry.Attributes.Attributes {
  527. br.Attributes[*pair.Key] = *pair.Value
  528. }
  529. }
  530. br.RawData = recordEntry.Data.Data[0].Value
  531. return nil
  532. }
  533. func NewGetBatchRecordsResult(data []byte, schema *RecordSchema, commonResp *CommonResponseResult, deserializer *batchDeserializer) (*GetRecordsResult, error) {
  534. data, err := util.UnwrapMessage(data)
  535. if err != nil {
  536. return nil, err
  537. }
  538. gbr := &pbmodel.GetBinaryRecordsResponse{}
  539. if err := proto.Unmarshal(data, gbr); err != nil {
  540. return nil, err
  541. }
  542. result := &GetRecordsResult{
  543. CommonResponseResult: *commonResp,
  544. RecordSchema: schema,
  545. }
  546. if gbr.NextCursor != nil {
  547. result.NextCursor = *(gbr.NextCursor)
  548. }
  549. if gbr.StartSequence != nil {
  550. result.StartSequence = *gbr.StartSequence
  551. }
  552. if gbr.LatestSequence != nil {
  553. result.LatestSequence = *gbr.LatestSequence
  554. }
  555. if gbr.LatestTime != nil {
  556. result.LatestTime = *gbr.LatestTime
  557. }
  558. // 这里的RecordCount不是record数量,而是batch的数量
  559. if gbr.RecordCount != nil {
  560. if *gbr.RecordCount > 0 {
  561. result.Records = make([]IRecord, 0, *gbr.RecordCount)
  562. for _, record := range gbr.Records {
  563. meta := &respMeta{
  564. cursor: record.GetCursor(),
  565. nextCursor: record.GetNextCursor(),
  566. sequence: record.GetSequence(),
  567. systemTime: record.GetSystemTime(),
  568. serial: int64(record.GetSerial()),
  569. }
  570. recordList, err := deserializer.deserialize(record.Data, meta)
  571. if err != nil {
  572. return nil, err
  573. }
  574. result.Records = append(result.Records, recordList...)
  575. }
  576. }
  577. }
  578. result.RecordCount = len(result.Records)
  579. return result, nil
  580. }
  581. type AppendFieldResult struct {
  582. CommonResponseResult
  583. }
  584. func NewAppendFieldResult(commonResp *CommonResponseResult) (*AppendFieldResult, error) {
  585. afr := &AppendFieldResult{
  586. CommonResponseResult: *commonResp,
  587. }
  588. return afr, nil
  589. }
  590. type GetMeterInfoResult struct {
  591. CommonResponseResult
  592. ActiveTime int64 `json:"ActiveTime"`
  593. Storage int64 `json:"Storage"`
  594. }
  595. func NewGetMeterInfoResult(data []byte, commonResp *CommonResponseResult) (*GetMeterInfoResult, error) {
  596. gmir := &GetMeterInfoResult{
  597. CommonResponseResult: *commonResp,
  598. }
  599. if err := json.Unmarshal(data, gmir); err != nil {
  600. return nil, err
  601. }
  602. return gmir, nil
  603. }
  604. type ListConnectorResult struct {
  605. CommonResponseResult
  606. ConnectorIds []string `json:"Connectors"`
  607. }
  608. func NewListConnectorResult(data []byte, commonResp *CommonResponseResult) (*ListConnectorResult, error) {
  609. lcr := &ListConnectorResult{
  610. CommonResponseResult: *commonResp,
  611. }
  612. if err := json.Unmarshal(data, lcr); err != nil {
  613. return nil, err
  614. }
  615. return lcr, nil
  616. }
  617. type CreateConnectorResult struct {
  618. CommonResponseResult
  619. ConnectorId string `json:"ConnectorId"`
  620. }
  621. func NewCreateConnectorResult(data []byte, commonResp *CommonResponseResult) (*CreateConnectorResult, error) {
  622. ccr := &CreateConnectorResult{
  623. CommonResponseResult: *commonResp,
  624. }
  625. if err := json.Unmarshal(data, ccr); err != nil {
  626. return nil, err
  627. }
  628. return ccr, nil
  629. }
  630. type UpdateConnectorResult struct {
  631. CommonResponseResult
  632. }
  633. func NewUpdateConnectorResult(commonResp *CommonResponseResult) (*UpdateConnectorResult, error) {
  634. ucr := &UpdateConnectorResult{
  635. CommonResponseResult: *commonResp,
  636. }
  637. return ucr, nil
  638. }
  639. type DeleteConnectorResult struct {
  640. CommonResponseResult
  641. }
  642. func NewDeleteConnectorResult(commonResp *CommonResponseResult) (*DeleteConnectorResult, error) {
  643. dcr := &DeleteConnectorResult{
  644. CommonResponseResult: *commonResp,
  645. }
  646. return dcr, nil
  647. }
  648. type GetConnectorResult struct {
  649. CommonResponseResult
  650. CreateTime int64 `json:"CreateTime"`
  651. LastModifyTime int64 `json:"LastModifyTime"`
  652. ConnectorId string `json:"ConnectorId"`
  653. ClusterAddress string `json:"ClusterAddress"`
  654. Type ConnectorType `json:"Type"`
  655. State ConnectorState `json:"State"`
  656. ColumnFields []string `json:"ColumnFields"`
  657. ExtraConfig map[string]string `json:"ExtraInfo"`
  658. Creator string `json:"Creator"`
  659. Owner string `json:"Owner"`
  660. Config interface{} `json:"Config"`
  661. }
  662. func NewGetConnectorResult(data []byte, commonResp *CommonResponseResult) (*GetConnectorResult, error) {
  663. cType := &struct {
  664. Type ConnectorType `json:"Type"`
  665. }{}
  666. if err := json.Unmarshal(data, cType); err != nil {
  667. return nil, err
  668. }
  669. switch cType.Type {
  670. case SinkOdps:
  671. return unmarshalGetOdpsConnector(commonResp, data)
  672. case SinkOss:
  673. return unmarshalGetOssConnector(commonResp, data)
  674. case SinkEs:
  675. return unmarshalGetEsConnector(commonResp, data)
  676. case SinkAds:
  677. return unmarshalGetAdsConnector(commonResp, data)
  678. case SinkMysql:
  679. return unmarshalGetMysqlConnector(commonResp, data)
  680. case SinkFc:
  681. return unmarshalGetFcConnector(commonResp, data)
  682. case SinkOts:
  683. return unmarshalGetOtsConnector(commonResp, data)
  684. case SinkDatahub:
  685. return unmarshalGetDatahubConnector(commonResp, data)
  686. case SinkHologres:
  687. return unmarshalGetHologresConnector(commonResp, data)
  688. default:
  689. return nil, errors.New(fmt.Sprintf("not support connector type %s", cType.Type.String()))
  690. }
  691. }
  692. type GetConnectorDoneTimeResult struct {
  693. CommonResponseResult
  694. DoneTime int64 `json:"DoneTime"`
  695. TimeZone string `json:"TimeZone"`
  696. TimeWindow int `json:"TimeWindow"`
  697. }
  698. func NewGetConnectorDoneTimeResult(data []byte, commonResp *CommonResponseResult) (*GetConnectorDoneTimeResult, error) {
  699. gcdt := &GetConnectorDoneTimeResult{
  700. CommonResponseResult: *commonResp,
  701. }
  702. if err := json.Unmarshal(data, gcdt); err != nil {
  703. return nil, err
  704. }
  705. return gcdt, nil
  706. }
  707. type GetConnectorShardStatusResult struct {
  708. CommonResponseResult
  709. ShardStatus map[string]ConnectorShardStatusEntry `json:"ShardStatusInfos"`
  710. }
  711. func NewGetConnectorShardStatusResult(data []byte, commonResp *CommonResponseResult) (*GetConnectorShardStatusResult, error) {
  712. gcss := &GetConnectorShardStatusResult{
  713. CommonResponseResult: *commonResp,
  714. }
  715. if err := json.Unmarshal(data, gcss); err != nil {
  716. return nil, err
  717. }
  718. return gcss, nil
  719. }
  720. type GetConnectorShardStatusByShardResult struct {
  721. CommonResponseResult
  722. ConnectorShardStatusEntry
  723. }
  724. func NewGetConnectorShardStatusByShardResult(data []byte, commonResp *CommonResponseResult) (*GetConnectorShardStatusByShardResult, error) {
  725. csse := &ConnectorShardStatusEntry{}
  726. if err := json.Unmarshal(data, csse); err != nil {
  727. return nil, err
  728. }
  729. gcss := &GetConnectorShardStatusByShardResult{
  730. CommonResponseResult: *commonResp,
  731. ConnectorShardStatusEntry: *csse,
  732. }
  733. return gcss, nil
  734. }
  735. type ReloadConnectorResult struct {
  736. CommonResponseResult
  737. }
  738. func NewReloadConnectorResult(commonResp *CommonResponseResult) (*ReloadConnectorResult, error) {
  739. rcr := &ReloadConnectorResult{
  740. CommonResponseResult: *commonResp,
  741. }
  742. return rcr, nil
  743. }
  744. type ReloadConnectorByShardResult struct {
  745. CommonResponseResult
  746. }
  747. func NewReloadConnectorByShardResult(commonResp *CommonResponseResult) (*ReloadConnectorByShardResult, error) {
  748. rcsr := &ReloadConnectorByShardResult{
  749. CommonResponseResult: *commonResp,
  750. }
  751. return rcsr, nil
  752. }
  753. type UpdateConnectorStateResult struct {
  754. CommonResponseResult
  755. }
  756. func NewUpdateConnectorStateResult(commonResp *CommonResponseResult) (*UpdateConnectorStateResult, error) {
  757. ucsr := &UpdateConnectorStateResult{
  758. CommonResponseResult: *commonResp,
  759. }
  760. return ucsr, nil
  761. }
  762. type UpdateConnectorOffsetResult struct {
  763. CommonResponseResult
  764. }
  765. func NewUpdateConnectorOffsetResult(commonResp *CommonResponseResult) (*UpdateConnectorOffsetResult, error) {
  766. ucor := &UpdateConnectorOffsetResult{
  767. CommonResponseResult: *commonResp,
  768. }
  769. return ucor, nil
  770. }
  771. type AppendConnectorFieldResult struct {
  772. CommonResponseResult
  773. }
  774. func NewAppendConnectorFieldResult(commonResp *CommonResponseResult) (*AppendConnectorFieldResult, error) {
  775. acfr := &AppendConnectorFieldResult{
  776. CommonResponseResult: *commonResp,
  777. }
  778. return acfr, nil
  779. }
  780. type ListSubscriptionResult struct {
  781. CommonResponseResult
  782. TotalCount int64 `json:"TotalCount"`
  783. Subscriptions []SubscriptionEntry `json:"Subscriptions"`
  784. }
  785. func NewListSubscriptionResult(data []byte, commonResp *CommonResponseResult) (*ListSubscriptionResult, error) {
  786. lsr := &ListSubscriptionResult{
  787. CommonResponseResult: *commonResp,
  788. }
  789. if err := json.Unmarshal(data, lsr); err != nil {
  790. return nil, err
  791. }
  792. return lsr, nil
  793. }
  794. type CreateSubscriptionResult struct {
  795. CommonResponseResult
  796. SubId string `json:"SubId"`
  797. }
  798. func NewCreateSubscriptionResult(data []byte, commonResp *CommonResponseResult) (*CreateSubscriptionResult, error) {
  799. csr := &CreateSubscriptionResult{
  800. CommonResponseResult: *commonResp,
  801. }
  802. if err := json.Unmarshal(data, csr); err != nil {
  803. return nil, err
  804. }
  805. return csr, nil
  806. }
  807. type UpdateSubscriptionResult struct {
  808. CommonResponseResult
  809. }
  810. func NewUpdateSubscriptionResult(commonResp *CommonResponseResult) (*UpdateSubscriptionResult, error) {
  811. usr := &UpdateSubscriptionResult{
  812. CommonResponseResult: *commonResp,
  813. }
  814. return usr, nil
  815. }
  816. type DeleteSubscriptionResult struct {
  817. CommonResponseResult
  818. }
  819. func NewDeleteSubscriptionResult(commonResp *CommonResponseResult) (*DeleteSubscriptionResult, error) {
  820. dsr := &DeleteSubscriptionResult{
  821. CommonResponseResult: *commonResp,
  822. }
  823. return dsr, nil
  824. }
  825. type GetSubscriptionResult struct {
  826. CommonResponseResult
  827. SubscriptionEntry
  828. }
  829. func NewGetSubscriptionResult(data []byte, commonResp *CommonResponseResult) (*GetSubscriptionResult, error) {
  830. gsr := &GetSubscriptionResult{
  831. CommonResponseResult: *commonResp,
  832. }
  833. if err := json.Unmarshal(data, gsr); err != nil {
  834. return nil, err
  835. }
  836. return gsr, nil
  837. }
  838. type UpdateSubscriptionStateResult struct {
  839. CommonResponseResult
  840. }
  841. func NewUpdateSubscriptionStateResult(commonResp *CommonResponseResult) (*UpdateSubscriptionStateResult, error) {
  842. ussr := &UpdateSubscriptionStateResult{
  843. CommonResponseResult: *commonResp,
  844. }
  845. return ussr, nil
  846. }
  847. type OpenSubscriptionSessionResult struct {
  848. CommonResponseResult
  849. Offsets map[string]SubscriptionOffset `json:"Offsets"`
  850. }
  851. func NewOpenSubscriptionSessionResult(data []byte, commonResp *CommonResponseResult) (*OpenSubscriptionSessionResult, error) {
  852. ossr := &OpenSubscriptionSessionResult{
  853. CommonResponseResult: *commonResp,
  854. }
  855. if err := json.Unmarshal(data, ossr); err != nil {
  856. return nil, err
  857. }
  858. return ossr, nil
  859. }
  860. type GetSubscriptionOffsetResult struct {
  861. CommonResponseResult
  862. Offsets map[string]SubscriptionOffset `json:"Offsets"`
  863. }
  864. func NewGetSubscriptionOffsetResult(data []byte, commonResp *CommonResponseResult) (*GetSubscriptionOffsetResult, error) {
  865. gsor := &GetSubscriptionOffsetResult{
  866. CommonResponseResult: *commonResp,
  867. }
  868. if err := json.Unmarshal(data, gsor); err != nil {
  869. return nil, err
  870. }
  871. return gsor, nil
  872. }
  873. type CommitSubscriptionOffsetResult struct {
  874. CommonResponseResult
  875. }
  876. func NewCommitSubscriptionOffsetResult(commonResp *CommonResponseResult) (*CommitSubscriptionOffsetResult, error) {
  877. csor := &CommitSubscriptionOffsetResult{
  878. CommonResponseResult: *commonResp,
  879. }
  880. return csor, nil
  881. }
  882. type ResetSubscriptionOffsetResult struct {
  883. CommonResponseResult
  884. }
  885. func NewResetSubscriptionOffsetResult(commonResp *CommonResponseResult) (*ResetSubscriptionOffsetResult, error) {
  886. rsor := &ResetSubscriptionOffsetResult{
  887. CommonResponseResult: *commonResp,
  888. }
  889. return rsor, nil
  890. }
  891. type HeartbeatResult struct {
  892. CommonResponseResult
  893. PlanVersion int64 `json:"PlanVersion"`
  894. ShardList []string `json:"ShardList"`
  895. TotalPlan string `json:"TotalPlan"`
  896. }
  897. func NewHeartbeatResult(data []byte, commonResp *CommonResponseResult) (*HeartbeatResult, error) {
  898. hr := &HeartbeatResult{
  899. CommonResponseResult: *commonResp,
  900. }
  901. if err := json.Unmarshal(data, hr); err != nil {
  902. return nil, err
  903. }
  904. return hr, nil
  905. }
  906. type JoinGroupResult struct {
  907. CommonResponseResult
  908. ConsumerId string `json:"ConsumerId"`
  909. VersionId int64 `json:"VersionId"`
  910. SessionTimeout int64 `json:"SessionTimeout"`
  911. }
  912. func NewJoinGroupResult(data []byte, commonResp *CommonResponseResult) (*JoinGroupResult, error) {
  913. jgr := &JoinGroupResult{
  914. CommonResponseResult: *commonResp,
  915. }
  916. if err := json.Unmarshal(data, jgr); err != nil {
  917. return nil, err
  918. }
  919. return jgr, nil
  920. }
  921. type SyncGroupResult struct {
  922. CommonResponseResult
  923. }
  924. func NewSyncGroupResult(commonResp *CommonResponseResult) (*SyncGroupResult, error) {
  925. sgr := &SyncGroupResult{
  926. CommonResponseResult: *commonResp,
  927. }
  928. return sgr, nil
  929. }
  930. type LeaveGroupResult struct {
  931. CommonResponseResult
  932. }
  933. func NewLeaveGroupResult(commonResp *CommonResponseResult) (*LeaveGroupResult, error) {
  934. lgr := &LeaveGroupResult{
  935. CommonResponseResult: *commonResp,
  936. }
  937. return lgr, nil
  938. }
  939. type ListTopicSchemaResult struct {
  940. CommonResponseResult
  941. SchemaInfoList []RecordSchemaInfo `json:"RecordSchemaList"`
  942. }
  943. // for deserialize the RecordSchema
  944. func (gtr *ListTopicSchemaResult) UnmarshalJSON(data []byte) error {
  945. type RecordSchemaInfoHelper struct {
  946. VersionId int `json:"VersionId"`
  947. RecordSchema string `json:"RecordSchema"`
  948. }
  949. msg := &struct {
  950. SchemaInfoList []RecordSchemaInfoHelper `json:"RecordSchemaList"`
  951. }{}
  952. if err := json.Unmarshal(data, msg); err != nil {
  953. return err
  954. }
  955. for _, info := range msg.SchemaInfoList {
  956. schema := &RecordSchema{}
  957. if err := json.Unmarshal([]byte(info.RecordSchema), schema); err != nil {
  958. return err
  959. }
  960. for idx := range schema.Fields {
  961. schema.Fields[idx].AllowNull = !schema.Fields[idx].AllowNull
  962. }
  963. schemaInfo := RecordSchemaInfo{
  964. VersionId: info.VersionId,
  965. RecordSchema: *schema,
  966. }
  967. gtr.SchemaInfoList = append(gtr.SchemaInfoList, schemaInfo)
  968. }
  969. return nil
  970. }
  971. func NewListTopicSchemaResult(data []byte, commonResp *CommonResponseResult) (*ListTopicSchemaResult, error) {
  972. ret := &ListTopicSchemaResult{
  973. CommonResponseResult: *commonResp,
  974. }
  975. if err := json.Unmarshal(data, ret); err != nil {
  976. return nil, err
  977. }
  978. return ret, nil
  979. }
  980. type GetTopicSchemaResult struct {
  981. CommonResponseResult
  982. VersionId int `json:"VersionId"`
  983. RecordSchema RecordSchema `json:"RecordSchema"`
  984. }
  985. func (gtr *GetTopicSchemaResult) UnmarshalJSON(data []byte) error {
  986. msg := &struct {
  987. VersionId int `json:"VersionId"`
  988. RecordSchema string `json:"RecordSchema"`
  989. }{}
  990. if err := json.Unmarshal(data, msg); err != nil {
  991. return err
  992. }
  993. schema := &RecordSchema{}
  994. if err := json.Unmarshal([]byte(msg.RecordSchema), schema); err != nil {
  995. return err
  996. }
  997. for idx := range schema.Fields {
  998. schema.Fields[idx].AllowNull = !schema.Fields[idx].AllowNull
  999. }
  1000. gtr.VersionId = msg.VersionId
  1001. gtr.RecordSchema = *schema
  1002. return nil
  1003. }
  1004. func NewGetTopicSchemaResult(data []byte, commonResp *CommonResponseResult) (*GetTopicSchemaResult, error) {
  1005. ret := &GetTopicSchemaResult{
  1006. CommonResponseResult: *commonResp,
  1007. }
  1008. if err := json.Unmarshal(data, ret); err != nil {
  1009. return nil, err
  1010. }
  1011. return ret, nil
  1012. }
  1013. type RegisterTopicSchemaResult struct {
  1014. CommonResponseResult
  1015. VersionId int `json:"VersionId"`
  1016. }
  1017. func NewRegisterTopicSchemaResult(data []byte, commonResp *CommonResponseResult) (*RegisterTopicSchemaResult, error) {
  1018. ret := &RegisterTopicSchemaResult{
  1019. CommonResponseResult: *commonResp,
  1020. }
  1021. if err := json.Unmarshal(data, ret); err != nil {
  1022. return nil, err
  1023. }
  1024. return ret, nil
  1025. }
  1026. type DeleteTopicSchemaResult struct {
  1027. CommonResponseResult
  1028. }
  1029. func NewDeleteTopicSchemaResult(commonResp *CommonResponseResult) (*DeleteTopicSchemaResult, error) {
  1030. ret := &DeleteTopicSchemaResult{
  1031. CommonResponseResult: *commonResp,
  1032. }
  1033. return ret, nil
  1034. }