implement.go 61 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738
  1. package datahub
  2. import (
  3. "errors"
  4. "fmt"
  5. "github.com/aliyun/aliyun-datahub-sdk-go/datahub/util"
  6. "time"
  7. )
  8. type DataHub struct {
  9. Client *RestClient
  10. // for batch client
  11. cType CompressorType
  12. schemaClient *schemaRegistryClient
  13. }
  14. // ListProjects list all projects
  15. func (datahub *DataHub) ListProject() (*ListProjectResult, error) {
  16. path := projectsPath
  17. reqPara := &RequestParameter{
  18. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  19. }
  20. responseBody, commonResp, err := datahub.Client.Get(path, reqPara)
  21. if err != nil {
  22. return nil, err
  23. }
  24. return NewListProjectResult(responseBody, commonResp)
  25. }
  26. // ListProjects list projects with filter
  27. func (datahub *DataHub) ListProjectWithFilter(filter string) (*ListProjectResult, error) {
  28. path := projectsPath
  29. reqPara := &RequestParameter{
  30. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  31. Query: map[string]string{httpFilterQuery: filter},
  32. }
  33. responseBody, commonResp, err := datahub.Client.Get(path, reqPara)
  34. if err != nil {
  35. return nil, err
  36. }
  37. return NewListProjectResult(responseBody, commonResp)
  38. }
  39. // CreateProject create new project
  40. func (datahub *DataHub) CreateProject(projectName, comment string) (*CreateProjectResult, error) {
  41. if !util.CheckProjectName(projectName) {
  42. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  43. }
  44. if !util.CheckComment(comment) {
  45. return nil, NewInvalidParameterErrorWithMessage(commentInvalid)
  46. }
  47. path := fmt.Sprintf(projectPath, projectName)
  48. reqPara := &RequestParameter{
  49. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  50. }
  51. requestBody := &CreateProjectRequest{
  52. Comment: comment,
  53. }
  54. _, commonResp, err := datahub.Client.Post(path, requestBody, reqPara)
  55. if err != nil {
  56. return nil, err
  57. }
  58. return NewCreateProjectResult(commonResp)
  59. }
  60. // UpdateProject update project
  61. func (datahub *DataHub) UpdateProject(projectName, comment string) (*UpdateProjectResult, error) {
  62. if !util.CheckProjectName(projectName) {
  63. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  64. }
  65. if !util.CheckComment(comment) {
  66. return nil, NewInvalidParameterErrorWithMessage(commentInvalid)
  67. }
  68. path := fmt.Sprintf(projectPath, projectName)
  69. reqPara := &RequestParameter{
  70. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  71. }
  72. requestBody := &UpdateProjectRequest{
  73. Comment: comment,
  74. }
  75. _, commonResp, err := datahub.Client.Put(path, requestBody, reqPara)
  76. if err != nil {
  77. return nil, err
  78. }
  79. return NewUpdateProjectResult(commonResp)
  80. }
  81. // DeleteProject delete project
  82. func (datahub *DataHub) DeleteProject(projectName string) (*DeleteProjectResult, error) {
  83. if !util.CheckProjectName(projectName) {
  84. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  85. }
  86. path := fmt.Sprintf(projectPath, projectName)
  87. reqPara := &RequestParameter{
  88. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  89. }
  90. _, commonResp, err := datahub.Client.Delete(path, reqPara)
  91. if err != nil {
  92. return nil, err
  93. }
  94. return NewDeleteProjectResult(commonResp)
  95. }
  96. // GetProject get a project deatil named the given name
  97. func (datahub *DataHub) GetProject(projectName string) (*GetProjectResult, error) {
  98. if !util.CheckProjectName(projectName) {
  99. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  100. }
  101. path := fmt.Sprintf(projectPath, projectName)
  102. reqPara := &RequestParameter{
  103. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  104. }
  105. respBody, commonResp, err := datahub.Client.Get(path, reqPara)
  106. if err != nil {
  107. return nil, err
  108. }
  109. result, err := NewGetProjectResult(respBody, commonResp)
  110. if err != nil {
  111. return nil, err
  112. }
  113. result.ProjectName = projectName
  114. return result, nil
  115. }
  116. // Update project vpc white list.
  117. func (datahub *DataHub) UpdateProjectVpcWhitelist(projectName, vpcIds string) (*UpdateProjectVpcWhitelistResult, error) {
  118. if !util.CheckProjectName(projectName) {
  119. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  120. }
  121. path := fmt.Sprintf(projectPath, projectName)
  122. reqPara := &RequestParameter{
  123. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  124. }
  125. requestBody := &UpdateProjectVpcWhitelistRequest{
  126. VpcIds: vpcIds,
  127. }
  128. _, commonResp, err := datahub.Client.Put(path, requestBody, reqPara)
  129. if err != nil {
  130. return nil, err
  131. }
  132. return NewUpdateProjectVpcWhitelistResult(commonResp)
  133. }
  134. func (datahub *DataHub) WaitAllShardsReady(projectName, topicName string) bool {
  135. return datahub.WaitAllShardsReadyWithTime(projectName, topicName, minWaitingTimeInMs/1000)
  136. }
  137. func (datahub *DataHub) WaitAllShardsReadyWithTime(projectName, topicName string, timeout int64) bool {
  138. ready := make(chan bool)
  139. if timeout > 0 {
  140. go func(timeout int64) {
  141. time.Sleep(time.Duration(timeout) * time.Second)
  142. ready <- false
  143. }(timeout)
  144. }
  145. go func(datahub DataHubApi) {
  146. for {
  147. ls, err := datahub.ListShard(projectName, topicName)
  148. if err != nil {
  149. time.Sleep(1 * time.Microsecond)
  150. continue
  151. }
  152. ok := true
  153. for _, shard := range ls.Shards {
  154. switch shard.State {
  155. case ACTIVE, CLOSED:
  156. continue
  157. default:
  158. ok = false
  159. break
  160. }
  161. }
  162. if ok {
  163. break
  164. }
  165. }
  166. ready <- true
  167. }(datahub)
  168. return <-ready
  169. }
  170. func (datahub *DataHub) ListTopic(projectName string) (*ListTopicResult, error) {
  171. if !util.CheckProjectName(projectName) {
  172. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  173. }
  174. path := fmt.Sprintf(topicsPath, projectName)
  175. reqPara := &RequestParameter{
  176. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  177. }
  178. respBody, commonResp, err := datahub.Client.Get(path, reqPara)
  179. if err != nil {
  180. return nil, err
  181. }
  182. return NewListTopicResult(respBody, commonResp)
  183. }
  184. func (datahub *DataHub) ListTopicWithFilter(projectName, filter string) (*ListTopicResult, error) {
  185. if !util.CheckProjectName(projectName) {
  186. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  187. }
  188. path := fmt.Sprintf(topicsPath, projectName)
  189. reqPara := &RequestParameter{
  190. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  191. Query: map[string]string{httpFilterQuery: filter},
  192. }
  193. respBody, commonResp, err := datahub.Client.Get(path, reqPara)
  194. if err != nil {
  195. return nil, err
  196. }
  197. return NewListTopicResult(respBody, commonResp)
  198. }
  199. func (datahub *DataHub) CreateBlobTopic(projectName, topicName, comment string, shardCount, lifeCycle int) (*CreateBlobTopicResult, error) {
  200. para := &CreateTopicParameter{
  201. ShardCount: shardCount,
  202. LifeCycle: lifeCycle,
  203. Comment: comment,
  204. RecordType: BLOB,
  205. RecordSchema: nil,
  206. ExpandMode: SPLIT_EXTEND,
  207. }
  208. ret, err := datahub.CreateTopicWithPara(projectName, topicName, para)
  209. if err != nil {
  210. return nil, err
  211. }
  212. return NewCreateBlobTopicResult(&ret.CommonResponseResult)
  213. }
  214. func (datahub *DataHub) CreateTupleTopic(projectName, topicName, comment string, shardCount, lifeCycle int, recordSchema *RecordSchema) (*CreateTupleTopicResult, error) {
  215. para := &CreateTopicParameter{
  216. ShardCount: shardCount,
  217. LifeCycle: lifeCycle,
  218. Comment: comment,
  219. RecordType: TUPLE,
  220. RecordSchema: recordSchema,
  221. ExpandMode: SPLIT_EXTEND,
  222. }
  223. ret, err := datahub.CreateTopicWithPara(projectName, topicName, para)
  224. if err != nil {
  225. return nil, err
  226. }
  227. return NewCreateTupleTopicResult(&ret.CommonResponseResult)
  228. }
  229. func (datahub *DataHub) CreateTopicWithPara(projectName, topicName string, para *CreateTopicParameter) (*CreateTopicWithParaResult, error) {
  230. if !util.CheckProjectName(projectName) {
  231. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  232. }
  233. if !util.CheckTopicName(topicName) {
  234. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  235. }
  236. if para == nil {
  237. return nil, NewInvalidParameterErrorWithMessage(parameterNull)
  238. }
  239. if !util.CheckComment(para.Comment) {
  240. return nil, NewInvalidParameterErrorWithMessage(commentInvalid)
  241. }
  242. if para.RecordType != TUPLE && para.RecordType != BLOB {
  243. return nil, NewInvalidParameterErrorWithMessage(fmt.Sprintf("Invalid RecordType: %s", para.RecordType))
  244. }
  245. if para.RecordType == TUPLE && para.RecordSchema == nil {
  246. return nil, NewInvalidParameterErrorWithMessage("Tuple topic must set RecordSchema")
  247. }
  248. if para.LifeCycle <= 0 {
  249. return nil, NewInvalidParameterErrorWithMessage(lifecycleInvalid)
  250. }
  251. path := fmt.Sprintf(topicPath, projectName, topicName)
  252. reqPara := &RequestParameter{
  253. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  254. }
  255. ctr := &CreateTopicRequest{
  256. Action: "create",
  257. ShardCount: para.ShardCount,
  258. Lifecycle: para.LifeCycle,
  259. RecordType: para.RecordType,
  260. RecordSchema: para.RecordSchema,
  261. Comment: para.Comment,
  262. ExpandMode: para.ExpandMode,
  263. }
  264. _, commonResp, err := datahub.Client.Post(path, ctr, reqPara)
  265. if err != nil {
  266. return nil, err
  267. }
  268. return NewCreateTopicWithParaResult(commonResp)
  269. }
  270. func (datahub *DataHub) UpdateTopic(projectName, topicName, comment string) (*UpdateTopicResult, error) {
  271. para := &UpdateTopicParameter{
  272. Comment: comment,
  273. }
  274. return datahub.UpdateTopicWithPara(projectName, topicName, para)
  275. }
  276. // Update topic meta information. Only support comment and lifeCycle now.
  277. func (datahub *DataHub) UpdateTopicWithPara(projectName, topicName string, para *UpdateTopicParameter) (*UpdateTopicResult, error) {
  278. if !util.CheckProjectName(projectName) {
  279. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  280. }
  281. if !util.CheckTopicName(topicName) {
  282. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  283. }
  284. if para == nil {
  285. return nil, NewInvalidParameterErrorWithMessage(parameterNull)
  286. }
  287. if !util.CheckComment(para.Comment) {
  288. return nil, NewInvalidParameterErrorWithMessage(commentInvalid)
  289. }
  290. path := fmt.Sprintf(topicPath, projectName, topicName)
  291. reqPara := &RequestParameter{
  292. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  293. }
  294. ut := &UpdateTopicRequest{
  295. Lifecycle: para.LifeCycle,
  296. Comment: para.Comment,
  297. }
  298. _, commonResp, err := datahub.Client.Put(path, ut, reqPara)
  299. if err != nil {
  300. return nil, err
  301. }
  302. return NewUpdateTopicResult(commonResp)
  303. }
  304. func (datahub *DataHub) DeleteTopic(projectName, topicName string) (*DeleteTopicResult, error) {
  305. if !util.CheckProjectName(projectName) {
  306. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  307. }
  308. if !util.CheckTopicName(topicName) {
  309. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  310. }
  311. path := fmt.Sprintf(topicPath, projectName, topicName)
  312. reqPara := &RequestParameter{
  313. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  314. }
  315. _, commonResp, err := datahub.Client.Delete(path, reqPara)
  316. if err != nil {
  317. return nil, err
  318. }
  319. return NewDeleteTopicResult(commonResp)
  320. }
  321. func (datahub *DataHub) GetTopic(projectName, topicName string) (*GetTopicResult, error) {
  322. if !util.CheckProjectName(projectName) {
  323. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  324. }
  325. if !util.CheckTopicName(topicName) {
  326. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  327. }
  328. path := fmt.Sprintf(topicPath, projectName, topicName)
  329. reqPara := &RequestParameter{
  330. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  331. }
  332. respBody, commonResp, err := datahub.Client.Get(path, reqPara)
  333. if err != nil {
  334. return nil, err
  335. }
  336. result, err := NewGetTopicResult(respBody, commonResp)
  337. if err != nil {
  338. return nil, err
  339. }
  340. result.ProjectName = projectName
  341. result.TopicName = topicName
  342. return result, nil
  343. }
  344. func (datahub *DataHub) ListShard(projectName, topicName string) (*ListShardResult, error) {
  345. if !util.CheckProjectName(projectName) {
  346. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  347. }
  348. if !util.CheckTopicName(topicName) {
  349. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  350. }
  351. path := fmt.Sprintf(shardsPath, projectName, topicName)
  352. reqPara := &RequestParameter{
  353. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  354. }
  355. respBody, commonResp, err := datahub.Client.Get(path, reqPara)
  356. if err != nil {
  357. return nil, err
  358. }
  359. return NewListShardResult(respBody, commonResp)
  360. }
  361. func (datahub *DataHub) SplitShard(projectName, topicName, shardId string) (*SplitShardResult, error) {
  362. if !util.CheckProjectName(projectName) {
  363. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  364. }
  365. if !util.CheckTopicName(topicName) {
  366. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  367. }
  368. if !util.CheckShardId(shardId) {
  369. return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
  370. }
  371. splitKey, err := generateSpliteKey(projectName, topicName, shardId, datahub)
  372. if err != nil {
  373. return nil, err
  374. }
  375. path := fmt.Sprintf(shardsPath, projectName, topicName)
  376. reqPara := &RequestParameter{
  377. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  378. }
  379. ssr := &SplitShardRequest{
  380. Action: "split",
  381. ShardId: shardId,
  382. SplitKey: splitKey,
  383. }
  384. respBody, commonResp, err := datahub.Client.Post(path, ssr, reqPara)
  385. if err != nil {
  386. return nil, err
  387. }
  388. return NewSplitShardResult(respBody, commonResp)
  389. }
  390. func (datahub *DataHub) SplitShardBySplitKey(projectName, topicName, shardId, splitKey string) (*SplitShardResult, error) {
  391. if !util.CheckProjectName(projectName) {
  392. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  393. }
  394. if !util.CheckTopicName(topicName) {
  395. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  396. }
  397. if !util.CheckShardId(shardId) {
  398. return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
  399. }
  400. path := fmt.Sprintf(shardsPath, projectName, topicName)
  401. reqPara := &RequestParameter{
  402. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  403. }
  404. ssr := &SplitShardRequest{
  405. Action: "split",
  406. ShardId: shardId,
  407. SplitKey: splitKey,
  408. }
  409. respBody, commonResp, err := datahub.Client.Post(path, ssr, reqPara)
  410. if err != nil {
  411. return nil, err
  412. }
  413. return NewSplitShardResult(respBody, commonResp)
  414. }
  415. func (datahub *DataHub) MergeShard(projectName, topicName, shardId, adjacentShardId string) (*MergeShardResult, error) {
  416. if !util.CheckProjectName(projectName) {
  417. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  418. }
  419. if !util.CheckTopicName(topicName) {
  420. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  421. }
  422. if !util.CheckShardId(shardId) || !util.CheckShardId(adjacentShardId) {
  423. return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
  424. }
  425. path := fmt.Sprintf(shardsPath, projectName, topicName)
  426. reqPara := &RequestParameter{
  427. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  428. }
  429. mss := &MergeShardRequest{
  430. Action: "merge",
  431. ShardId: shardId,
  432. AdjacentShardId: adjacentShardId,
  433. }
  434. respBody, commonResp, err := datahub.Client.Post(path, mss, reqPara)
  435. if err != nil {
  436. return nil, err
  437. }
  438. return NewMergeShardResult(respBody, commonResp)
  439. }
  440. func (datahub *DataHub) ExtendShard(projectName, topicName string, shardCount int) (*ExtendShardResult, error) {
  441. if !util.CheckProjectName(projectName) {
  442. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  443. }
  444. if !util.CheckTopicName(topicName) {
  445. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  446. }
  447. if shardCount <= 0 {
  448. return nil, NewInvalidParameterErrorWithMessage("shardCount is invalid")
  449. }
  450. path := fmt.Sprintf(shardsPath, projectName, topicName)
  451. reqPara := &RequestParameter{
  452. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  453. }
  454. mss := &ExtendShardRequest{
  455. Action: "extend",
  456. ExtendMode: "TO",
  457. ShardCount: shardCount,
  458. }
  459. _, commonResp, err := datahub.Client.Post(path, mss, reqPara)
  460. if err != nil {
  461. return nil, err
  462. }
  463. return NewExtendShardResult(commonResp)
  464. }
  465. func (datahub *DataHub) GetCursor(projectName, topicName, shardId string, ctype CursorType, param ...int64) (*GetCursorResult, error) {
  466. if !util.CheckProjectName(projectName) {
  467. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  468. }
  469. if !util.CheckTopicName(topicName) {
  470. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  471. }
  472. if !util.CheckShardId(shardId) {
  473. return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
  474. }
  475. if len(param) > 1 {
  476. return nil, NewInvalidParameterErrorWithMessage(parameterNumInvalid)
  477. }
  478. path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
  479. reqPara := &RequestParameter{
  480. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  481. }
  482. gcr := &GetCursorRequest{
  483. Action: "cursor",
  484. CursorType: ctype,
  485. }
  486. switch ctype {
  487. case OLDEST, LATEST:
  488. if len(param) != 0 {
  489. return nil, NewInvalidParameterErrorWithMessage("Not need extra parameter when CursorType OLDEST or LATEST")
  490. }
  491. case SYSTEM_TIME:
  492. if len(param) != 1 {
  493. return nil, NewInvalidParameterErrorWithMessage("Timestamp must be set when CursorType is SYSTEM_TIME")
  494. }
  495. gcr.SystemTime = param[0]
  496. case SEQUENCE:
  497. if len(param) != 1 {
  498. return nil, NewInvalidParameterErrorWithMessage("Sequence must be set when CursorType is SEQUENCE")
  499. }
  500. gcr.Sequence = param[0]
  501. }
  502. respBody, commonResp, err := datahub.Client.Post(path, gcr, reqPara)
  503. if err != nil {
  504. return nil, err
  505. }
  506. return NewGetCursorResult(respBody, commonResp)
  507. }
  508. func (datahub *DataHub) PutRecords(projectName, topicName string, records []IRecord) (*PutRecordsResult, error) {
  509. if !util.CheckProjectName(projectName) {
  510. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  511. }
  512. if !util.CheckTopicName(topicName) {
  513. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  514. }
  515. if records == nil || len(records) == 0 {
  516. return nil, NewInvalidParameterErrorWithMessage(recordsInvalid)
  517. }
  518. path := fmt.Sprintf(shardsPath, projectName, topicName)
  519. reqPara := &RequestParameter{
  520. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  521. }
  522. prr := &PutRecordsRequest{
  523. Action: "pub",
  524. Records: records,
  525. }
  526. respBody, commonResp, err := datahub.Client.Post(path, prr, reqPara)
  527. if err != nil {
  528. return nil, err
  529. }
  530. return NewPutRecordsResult(respBody, commonResp)
  531. }
  532. func (datahub *DataHub) PutRecordsByShard(projectName, topicName, shardId string, records []IRecord) (*PutRecordsByShardResult, error) {
  533. return nil, errors.New("not support this method")
  534. }
  535. func (datahub *DataHub) GetTupleRecords(projectName, topicName, shardId, cursor string, limit int, recordSchema *RecordSchema) (*GetRecordsResult, error) {
  536. if !util.CheckProjectName(projectName) {
  537. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  538. }
  539. if !util.CheckTopicName(topicName) {
  540. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  541. }
  542. if !util.CheckShardId(shardId) {
  543. return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
  544. }
  545. if recordSchema == nil {
  546. return nil, NewInvalidParameterErrorWithMessage(missingRecordSchema)
  547. }
  548. path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
  549. reqPara := &RequestParameter{
  550. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  551. }
  552. grr := &GetRecordRequest{
  553. Action: "sub",
  554. Cursor: cursor,
  555. Limit: limit,
  556. }
  557. respBody, commonResp, err := datahub.Client.Post(path, grr, reqPara)
  558. if err != nil {
  559. return nil, err
  560. }
  561. ret, err := NewGetRecordsResult(respBody, recordSchema, commonResp)
  562. if err != nil {
  563. return nil, err
  564. }
  565. for _, record := range ret.Records {
  566. if _, ok := record.(*TupleRecord); !ok {
  567. return nil, NewInvalidParameterErrorWithMessage("Shouldn't call this method for BLOB topic")
  568. }
  569. }
  570. return ret, nil
  571. }
  572. func (datahub *DataHub) GetBlobRecords(projectName, topicName, shardId, cursor string, limit int) (*GetRecordsResult, error) {
  573. if !util.CheckProjectName(projectName) {
  574. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  575. }
  576. if !util.CheckTopicName(topicName) {
  577. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  578. }
  579. if !util.CheckShardId(shardId) {
  580. return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
  581. }
  582. path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
  583. reqPara := &RequestParameter{
  584. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  585. }
  586. grr := &GetRecordRequest{
  587. Action: "sub",
  588. Cursor: cursor,
  589. Limit: limit,
  590. }
  591. respBody, commonResp, err := datahub.Client.Post(path, grr, reqPara)
  592. if err != nil {
  593. return nil, err
  594. }
  595. return NewGetRecordsResult(respBody, nil, commonResp)
  596. }
  597. func (datahub *DataHub) AppendField(projectName, topicName string, field Field) (*AppendFieldResult, error) {
  598. if !util.CheckProjectName(projectName) {
  599. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  600. }
  601. if !util.CheckTopicName(topicName) {
  602. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  603. }
  604. path := fmt.Sprintf(topicPath, projectName, topicName)
  605. reqPara := &RequestParameter{
  606. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  607. }
  608. afr := &AppendFieldRequest{
  609. Action: "AppendField",
  610. FieldName: field.Name,
  611. FieldType: field.Type,
  612. }
  613. _, commonResp, err := datahub.Client.Post(path, afr, reqPara)
  614. if err != nil {
  615. return nil, err
  616. }
  617. return NewAppendFieldResult(commonResp)
  618. }
  619. func (datahub *DataHub) GetMeterInfo(projectName, topicName, shardId string) (*GetMeterInfoResult, error) {
  620. if !util.CheckProjectName(projectName) {
  621. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  622. }
  623. if !util.CheckTopicName(topicName) {
  624. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  625. }
  626. if !util.CheckShardId(shardId) {
  627. return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
  628. }
  629. path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
  630. reqPara := &RequestParameter{
  631. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  632. }
  633. gmir := &GetMeterInfoRequest{
  634. Action: "meter",
  635. }
  636. respBody, commonResp, err := datahub.Client.Post(path, gmir, reqPara)
  637. if err != nil {
  638. return nil, err
  639. }
  640. return NewGetMeterInfoResult(respBody, commonResp)
  641. }
  642. func (datahub *DataHub) ListConnector(projectName, topicName string) (*ListConnectorResult, error) {
  643. if !util.CheckProjectName(projectName) {
  644. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  645. }
  646. if !util.CheckTopicName(topicName) {
  647. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  648. }
  649. path := fmt.Sprintf(connectorsPath, projectName, topicName)
  650. reqPara := &RequestParameter{
  651. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  652. Query: map[string]string{httpHeaderConnectorMode: "id"},
  653. }
  654. respBody, commonResp, err := datahub.Client.Get(path, reqPara)
  655. if err != nil {
  656. return nil, err
  657. }
  658. return NewListConnectorResult(respBody, commonResp)
  659. }
  660. func (datahub *DataHub) CreateConnector(projectName, topicName string, cType ConnectorType, columnFields []string, config interface{}) (*CreateConnectorResult, error) {
  661. return datahub.CreateConnectorWithStartTime(projectName, topicName, cType, columnFields, -1, config)
  662. }
  663. func (datahub *DataHub) CreateConnectorWithStartTime(projectName, topicName string, cType ConnectorType,
  664. columnFields []string, sinkStartTime int64, config interface{}) (*CreateConnectorResult, error) {
  665. para := &CreateConnectorParameter{
  666. SinkStartTime: sinkStartTime,
  667. ConnectorType: cType,
  668. ColumnFields: columnFields,
  669. ColumnNameMap: nil,
  670. Config: config,
  671. }
  672. return datahub.CreateConnectorWithPara(projectName, topicName, para)
  673. }
  674. func (datahub *DataHub) CreateConnectorWithPara(projectName, topicName string, para *CreateConnectorParameter) (*CreateConnectorResult, error) {
  675. if !util.CheckProjectName(projectName) {
  676. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  677. }
  678. if !util.CheckTopicName(topicName) {
  679. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  680. }
  681. if para == nil {
  682. return nil, NewInvalidParameterErrorWithMessage(parameterNull)
  683. }
  684. if !validateConnectorType(para.ConnectorType) {
  685. return nil, NewInvalidParameterErrorWithMessage(parameterTypeInvalid)
  686. }
  687. path := fmt.Sprintf(connectorPath, projectName, topicName, para.ConnectorType.String())
  688. reqPara := &RequestParameter{
  689. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  690. }
  691. ccr := &CreateConnectorRequest{
  692. Action: "create",
  693. Type: para.ConnectorType,
  694. SinkStartTime: para.SinkStartTime,
  695. ColumnFields: para.ColumnFields,
  696. ColumnNameMap: para.ColumnNameMap,
  697. Config: para.Config,
  698. }
  699. respBody, commonResp, err := datahub.Client.Post(path, ccr, reqPara)
  700. if err != nil {
  701. return nil, err
  702. }
  703. return NewCreateConnectorResult(respBody, commonResp)
  704. }
  705. func (datahub *DataHub) GetConnector(projectName, topicName, connectorId string) (*GetConnectorResult, error) {
  706. if !util.CheckProjectName(projectName) {
  707. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  708. }
  709. if !util.CheckTopicName(topicName) {
  710. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  711. }
  712. path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
  713. reqPara := &RequestParameter{
  714. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  715. }
  716. respBody, commonResp, err := datahub.Client.Get(path, reqPara)
  717. if err != nil {
  718. return nil, err
  719. }
  720. return NewGetConnectorResult(respBody, commonResp)
  721. }
  722. func (datahub *DataHub) UpdateConnector(projectName, topicName, connectorId string, config interface{}) (*UpdateConnectorResult, error) {
  723. para := &UpdateConnectorParameter{
  724. ColumnFields: nil,
  725. ColumnNameMap: nil,
  726. Config: config,
  727. }
  728. return datahub.UpdateConnectorWithPara(projectName, topicName, connectorId, para)
  729. }
  730. func (datahub *DataHub) UpdateConnectorWithPara(projectName, topicName, connectorId string, para *UpdateConnectorParameter) (*UpdateConnectorResult, error) {
  731. if !util.CheckProjectName(projectName) {
  732. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  733. }
  734. if !util.CheckTopicName(topicName) {
  735. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  736. }
  737. if para == nil {
  738. return nil, NewInvalidParameterErrorWithMessage(parameterNull)
  739. }
  740. path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
  741. reqPara := &RequestParameter{
  742. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  743. }
  744. ucr := &UpdateConnectorRequest{
  745. Action: "updateconfig",
  746. ColumnFields: para.ColumnFields,
  747. ColumnNameMap: para.ColumnNameMap,
  748. Config: para.Config,
  749. }
  750. _, commonResp, err := datahub.Client.Post(path, ucr, reqPara)
  751. if err != nil {
  752. return nil, err
  753. }
  754. return NewUpdateConnectorResult(commonResp)
  755. }
  756. func (datahub *DataHub) DeleteConnector(projectName, topicName, connectorId string) (*DeleteConnectorResult, error) {
  757. if !util.CheckProjectName(projectName) {
  758. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  759. }
  760. if !util.CheckTopicName(topicName) {
  761. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  762. }
  763. path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
  764. reqPara := &RequestParameter{
  765. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  766. }
  767. _, commonResp, err := datahub.Client.Delete(path, reqPara)
  768. if err != nil {
  769. return nil, err
  770. }
  771. return NewDeleteConnectorResult(commonResp)
  772. }
  773. func (datahub *DataHub) GetConnectorDoneTime(projectName, topicName, connectorId string) (*GetConnectorDoneTimeResult, error) {
  774. if !util.CheckProjectName(projectName) {
  775. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  776. }
  777. if !util.CheckTopicName(topicName) {
  778. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  779. }
  780. path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
  781. reqPara := &RequestParameter{
  782. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  783. Query: map[string]string{"donetime": ""},
  784. }
  785. respBody, commonResp, err := datahub.Client.Get(path, reqPara)
  786. if err != nil {
  787. return nil, err
  788. }
  789. return NewGetConnectorDoneTimeResult(respBody, commonResp)
  790. }
  791. func (datahub *DataHub) GetConnectorShardStatus(projectName, topicName, connectorId string) (*GetConnectorShardStatusResult, error) {
  792. if !util.CheckProjectName(projectName) {
  793. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  794. }
  795. if !util.CheckTopicName(topicName) {
  796. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  797. }
  798. path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
  799. reqPara := &RequestParameter{
  800. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  801. }
  802. gcss := &GetConnectorShardStatusRequest{
  803. Action: "Status",
  804. }
  805. respBody, commonResp, err := datahub.Client.Post(path, gcss, reqPara)
  806. if err != nil {
  807. return nil, err
  808. }
  809. return NewGetConnectorShardStatusResult(respBody, commonResp)
  810. }
  811. func (datahub *DataHub) GetConnectorShardStatusByShard(projectName, topicName, connectorId, shardId string) (*GetConnectorShardStatusByShardResult, error) {
  812. if !util.CheckProjectName(projectName) {
  813. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  814. }
  815. if !util.CheckTopicName(topicName) {
  816. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  817. }
  818. if !util.CheckShardId(shardId) {
  819. return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
  820. }
  821. path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
  822. reqPara := &RequestParameter{
  823. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  824. }
  825. gcss := &GetConnectorShardStatusRequest{
  826. Action: "Status",
  827. ShardId: shardId,
  828. }
  829. respBody, commonResp, err := datahub.Client.Post(path, gcss, reqPara)
  830. if err != nil {
  831. return nil, err
  832. }
  833. return NewGetConnectorShardStatusByShardResult(respBody, commonResp)
  834. }
  835. func (datahub *DataHub) ReloadConnector(projectName, topicName, connectorId string) (*ReloadConnectorResult, error) {
  836. if !util.CheckProjectName(projectName) {
  837. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  838. }
  839. if !util.CheckTopicName(topicName) {
  840. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  841. }
  842. path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
  843. reqPara := &RequestParameter{
  844. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  845. }
  846. rcr := &ReloadConnectorRequest{
  847. Action: "Reload",
  848. }
  849. _, commonResp, err := datahub.Client.Post(path, rcr, reqPara)
  850. if err != nil {
  851. return nil, err
  852. }
  853. return NewReloadConnectorResult(commonResp)
  854. }
  855. func (datahub *DataHub) ReloadConnectorByShard(projectName, topicName, connectorId, shardId string) (*ReloadConnectorByShardResult, error) {
  856. if !util.CheckProjectName(projectName) {
  857. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  858. }
  859. if !util.CheckTopicName(topicName) {
  860. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  861. }
  862. if !util.CheckShardId(shardId) {
  863. return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
  864. }
  865. path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
  866. reqPara := &RequestParameter{
  867. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  868. }
  869. rcr := &ReloadConnectorRequest{
  870. Action: "Reload",
  871. ShardId: shardId,
  872. }
  873. _, commonResp, err := datahub.Client.Post(path, rcr, reqPara)
  874. if err != nil {
  875. return nil, err
  876. }
  877. return NewReloadConnectorByShardResult(commonResp)
  878. }
  879. func (datahub *DataHub) UpdateConnectorState(projectName, topicName, connectorId string, state ConnectorState) (*UpdateConnectorStateResult, error) {
  880. if !util.CheckProjectName(projectName) {
  881. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  882. }
  883. if !util.CheckTopicName(topicName) {
  884. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  885. }
  886. if !validateConnectorState(state) {
  887. return nil, NewInvalidParameterErrorWithMessage(parameterTypeInvalid)
  888. }
  889. path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
  890. reqPara := &RequestParameter{
  891. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  892. }
  893. ucsr := &UpdateConnectorStateRequest{
  894. Action: "updatestate",
  895. State: state,
  896. }
  897. _, commonResp, err := datahub.Client.Post(path, ucsr, reqPara)
  898. if err != nil {
  899. return nil, err
  900. }
  901. return NewUpdateConnectorStateResult(commonResp)
  902. }
  903. func (datahub *DataHub) UpdateConnectorOffset(projectName, topicName, connectorId, shardId string, offset ConnectorOffset) (*UpdateConnectorOffsetResult, error) {
  904. if !util.CheckProjectName(projectName) {
  905. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  906. }
  907. if !util.CheckTopicName(topicName) {
  908. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  909. }
  910. if !util.CheckShardId(shardId) {
  911. return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
  912. }
  913. path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
  914. reqPara := &RequestParameter{
  915. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  916. }
  917. ucor := &UpdateConnectorOffsetRequest{
  918. Action: "updateshardcontext",
  919. ShardId: shardId,
  920. Timestamp: offset.Timestamp,
  921. Sequence: offset.Sequence,
  922. }
  923. _, commonResp, err := datahub.Client.Post(path, ucor, reqPara)
  924. if err != nil {
  925. return nil, err
  926. }
  927. return NewUpdateConnectorOffsetResult(commonResp)
  928. }
  929. func (datahub *DataHub) AppendConnectorField(projectName, topicName, connectorId, fieldName string) (*AppendConnectorFieldResult, error) {
  930. if !util.CheckProjectName(projectName) {
  931. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  932. }
  933. if !util.CheckTopicName(topicName) {
  934. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  935. }
  936. path := fmt.Sprintf(connectorPath, projectName, topicName, connectorId)
  937. reqPara := &RequestParameter{
  938. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  939. }
  940. acfr := &AppendConnectorFieldRequest{
  941. Action: "appendfield",
  942. FieldName: fieldName,
  943. }
  944. _, commonResp, err := datahub.Client.Post(path, acfr, reqPara)
  945. if err != nil {
  946. return nil, err
  947. }
  948. return NewAppendConnectorFieldResult(commonResp)
  949. }
  950. func (datahub *DataHub) ListSubscription(projectName, topicName string, pageIndex, pageSize int) (*ListSubscriptionResult, error) {
  951. if !util.CheckProjectName(projectName) {
  952. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  953. }
  954. if !util.CheckTopicName(topicName) {
  955. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  956. }
  957. path := fmt.Sprintf(subscriptionsPath, projectName, topicName)
  958. reqPara := &RequestParameter{
  959. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  960. }
  961. lsr := &ListSubscriptionRequest{
  962. Action: "list",
  963. PageIndex: pageIndex,
  964. PageSize: pageSize,
  965. }
  966. respBody, commonResp, err := datahub.Client.Post(path, lsr, reqPara)
  967. if err != nil {
  968. return nil, err
  969. }
  970. return NewListSubscriptionResult(respBody, commonResp)
  971. }
  972. func (datahub *DataHub) CreateSubscription(projectName, topicName, comment string) (*CreateSubscriptionResult, error) {
  973. if !util.CheckProjectName(projectName) {
  974. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  975. }
  976. if !util.CheckTopicName(topicName) {
  977. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  978. }
  979. if !util.CheckComment(comment) {
  980. return nil, NewInvalidParameterErrorWithMessage(commentInvalid)
  981. }
  982. path := fmt.Sprintf(subscriptionsPath, projectName, topicName)
  983. reqPara := &RequestParameter{
  984. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  985. }
  986. csr := &CreateSubscriptionRequest{
  987. Action: "create",
  988. Comment: comment,
  989. }
  990. respBody, commonResp, err := datahub.Client.Post(path, csr, reqPara)
  991. if err != nil {
  992. return nil, err
  993. }
  994. return NewCreateSubscriptionResult(respBody, commonResp)
  995. }
  996. func (datahub *DataHub) UpdateSubscription(projectName, topicName, subId, comment string) (*UpdateSubscriptionResult, error) {
  997. if !util.CheckProjectName(projectName) {
  998. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  999. }
  1000. if !util.CheckTopicName(topicName) {
  1001. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  1002. }
  1003. if !util.CheckComment(comment) {
  1004. return nil, NewInvalidParameterErrorWithMessage(commentInvalid)
  1005. }
  1006. path := fmt.Sprintf(subscriptionPath, projectName, topicName, subId)
  1007. reqPara := &RequestParameter{
  1008. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  1009. }
  1010. usr := &UpdateSubscriptionRequest{
  1011. Comment: comment,
  1012. }
  1013. _, commonResp, err := datahub.Client.Put(path, usr, reqPara)
  1014. if err != nil {
  1015. return nil, err
  1016. }
  1017. return NewUpdateSubscriptionResult(commonResp)
  1018. }
  1019. func (datahub *DataHub) DeleteSubscription(projectName, topicName, subId string) (*DeleteSubscriptionResult, error) {
  1020. if !util.CheckProjectName(projectName) {
  1021. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  1022. }
  1023. if !util.CheckTopicName(topicName) {
  1024. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  1025. }
  1026. path := fmt.Sprintf(subscriptionPath, projectName, topicName, subId)
  1027. reqPara := &RequestParameter{
  1028. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  1029. }
  1030. _, commonResp, err := datahub.Client.Delete(path, reqPara)
  1031. if err != nil {
  1032. return nil, err
  1033. }
  1034. return NewDeleteSubscriptionResult(commonResp)
  1035. }
  1036. func (datahub *DataHub) GetSubscription(projectName, topicName, subId string) (*GetSubscriptionResult, error) {
  1037. if !util.CheckProjectName(projectName) {
  1038. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  1039. }
  1040. if !util.CheckTopicName(topicName) {
  1041. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  1042. }
  1043. path := fmt.Sprintf(subscriptionPath, projectName, topicName, subId)
  1044. reqPara := &RequestParameter{
  1045. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  1046. }
  1047. respBody, commonResp, err := datahub.Client.Get(path, reqPara)
  1048. if err != nil {
  1049. return nil, err
  1050. }
  1051. return NewGetSubscriptionResult(respBody, commonResp)
  1052. }
  1053. func (datahub *DataHub) UpdateSubscriptionState(projectName, topicName, subId string, state SubscriptionState) (*UpdateSubscriptionStateResult, error) {
  1054. if !util.CheckProjectName(projectName) {
  1055. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  1056. }
  1057. if !util.CheckTopicName(topicName) {
  1058. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  1059. }
  1060. path := fmt.Sprintf(subscriptionPath, projectName, topicName, subId)
  1061. reqPara := &RequestParameter{
  1062. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  1063. }
  1064. usr := &UpdateSubscriptionStateRequest{
  1065. State: state,
  1066. }
  1067. _, commonResp, err := datahub.Client.Put(path, usr, reqPara)
  1068. if err != nil {
  1069. return nil, err
  1070. }
  1071. return NewUpdateSubscriptionStateResult(commonResp)
  1072. }
  1073. func (datahub *DataHub) OpenSubscriptionSession(projectName, topicName, subId string, shardIds []string) (*OpenSubscriptionSessionResult, error) {
  1074. if !util.CheckProjectName(projectName) {
  1075. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  1076. }
  1077. if !util.CheckTopicName(topicName) {
  1078. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  1079. }
  1080. for _, id := range shardIds {
  1081. if !util.CheckShardId(id) {
  1082. return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
  1083. }
  1084. }
  1085. path := fmt.Sprintf(offsetsPath, projectName, topicName, subId)
  1086. reqPara := &RequestParameter{
  1087. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  1088. }
  1089. ossr := &OpenSubscriptionSessionRequest{
  1090. Action: "open",
  1091. ShardIds: shardIds,
  1092. }
  1093. respBody, commonResp, err := datahub.Client.Post(path, ossr, reqPara)
  1094. if err != nil {
  1095. return nil, err
  1096. }
  1097. return NewOpenSubscriptionSessionResult(respBody, commonResp)
  1098. }
  1099. func (datahub *DataHub) GetSubscriptionOffset(projectName, topicName, subId string, shardIds []string) (*GetSubscriptionOffsetResult, error) {
  1100. if !util.CheckProjectName(projectName) {
  1101. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  1102. }
  1103. if !util.CheckTopicName(topicName) {
  1104. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  1105. }
  1106. for _, id := range shardIds {
  1107. if !util.CheckShardId(id) {
  1108. return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
  1109. }
  1110. }
  1111. path := fmt.Sprintf(offsetsPath, projectName, topicName, subId)
  1112. reqPara := &RequestParameter{
  1113. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  1114. }
  1115. gsor := &GetSubscriptionOffsetRequest{
  1116. Action: "get",
  1117. ShardIds: shardIds,
  1118. }
  1119. respBody, commonResp, err := datahub.Client.Post(path, gsor, reqPara)
  1120. if err != nil {
  1121. return nil, err
  1122. }
  1123. return NewGetSubscriptionOffsetResult(respBody, commonResp)
  1124. }
  1125. func (datahub *DataHub) CommitSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) (*CommitSubscriptionOffsetResult, error) {
  1126. if !util.CheckProjectName(projectName) {
  1127. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  1128. }
  1129. if !util.CheckTopicName(topicName) {
  1130. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  1131. }
  1132. path := fmt.Sprintf(offsetsPath, projectName, topicName, subId)
  1133. reqPara := &RequestParameter{
  1134. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  1135. }
  1136. req := &CommitSubscriptionOffsetRequest{
  1137. Action: "commit",
  1138. Offsets: offsets,
  1139. }
  1140. _, commonResp, err := datahub.Client.Put(path, req, reqPara)
  1141. if err != nil {
  1142. return nil, err
  1143. }
  1144. return NewCommitSubscriptionOffsetResult(commonResp)
  1145. }
  1146. func (datahub *DataHub) ResetSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) (*ResetSubscriptionOffsetResult, error) {
  1147. if !util.CheckProjectName(projectName) {
  1148. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  1149. }
  1150. if !util.CheckTopicName(topicName) {
  1151. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  1152. }
  1153. path := fmt.Sprintf(offsetsPath, projectName, topicName, subId)
  1154. reqPara := &RequestParameter{
  1155. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  1156. }
  1157. req := &ResetSubscriptionOffsetRequest{
  1158. Action: "reset",
  1159. Offsets: offsets,
  1160. }
  1161. _, commonResp, err := datahub.Client.Put(path, req, reqPara)
  1162. if err != nil {
  1163. return nil, err
  1164. }
  1165. return NewResetSubscriptionOffsetResult(commonResp)
  1166. }
  1167. func (datahub *DataHub) Heartbeat(projectName, topicName, consumerGroup, consumerId string, versionId int64, holdShardList, readEndShardList []string) (*HeartbeatResult, error) {
  1168. if !util.CheckProjectName(projectName) {
  1169. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  1170. }
  1171. if !util.CheckTopicName(topicName) {
  1172. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  1173. }
  1174. for _, id := range holdShardList {
  1175. if !util.CheckShardId(id) {
  1176. return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
  1177. }
  1178. }
  1179. for _, id := range readEndShardList {
  1180. if !util.CheckShardId(id) {
  1181. return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
  1182. }
  1183. }
  1184. path := fmt.Sprintf(consumerGroupPath, projectName, topicName, consumerGroup)
  1185. reqPara := &RequestParameter{
  1186. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  1187. }
  1188. hr := &HeartbeatRequest{
  1189. Action: "heartbeat",
  1190. ConsumerId: consumerId,
  1191. VersionId: versionId,
  1192. HoldShardList: holdShardList,
  1193. ReadEndShardList: readEndShardList,
  1194. }
  1195. respBody, commonResp, err := datahub.Client.Post(path, hr, reqPara)
  1196. if err != nil {
  1197. return nil, err
  1198. }
  1199. return NewHeartbeatResult(respBody, commonResp)
  1200. }
  1201. func (datahub *DataHub) JoinGroup(projectName, topicName, consumerGroup string, sessionTimeout int64) (*JoinGroupResult, error) {
  1202. if !util.CheckProjectName(projectName) {
  1203. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  1204. }
  1205. if !util.CheckTopicName(topicName) {
  1206. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  1207. }
  1208. path := fmt.Sprintf(consumerGroupPath, projectName, topicName, consumerGroup)
  1209. reqPara := &RequestParameter{
  1210. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  1211. }
  1212. jgr := &JoinGroupRequest{
  1213. Action: "joinGroup",
  1214. SessionTimeout: sessionTimeout,
  1215. }
  1216. respBody, commonResp, err := datahub.Client.Post(path, jgr, reqPara)
  1217. if err != nil {
  1218. return nil, err
  1219. }
  1220. return NewJoinGroupResult(respBody, commonResp)
  1221. }
  1222. func (datahub *DataHub) SyncGroup(projectName, topicName, consumerGroup, consumerId string, versionId int64, releaseShardList, readEndShardList []string) (*SyncGroupResult, error) {
  1223. if !util.CheckProjectName(projectName) {
  1224. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  1225. }
  1226. if !util.CheckTopicName(topicName) {
  1227. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  1228. }
  1229. if len(releaseShardList) == 0 || len(readEndShardList) == 0 {
  1230. return nil, NewInvalidParameterErrorWithMessage(shardListInvalid)
  1231. }
  1232. for _, id := range releaseShardList {
  1233. if !util.CheckShardId(id) {
  1234. return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
  1235. }
  1236. }
  1237. for _, id := range readEndShardList {
  1238. if !util.CheckShardId(id) {
  1239. return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
  1240. }
  1241. }
  1242. path := fmt.Sprintf(consumerGroupPath, projectName, topicName, consumerGroup)
  1243. reqPara := &RequestParameter{
  1244. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  1245. }
  1246. sgr := &SyncGroupRequest{
  1247. Action: "syncGroup",
  1248. ConsumerId: consumerId,
  1249. VersionId: versionId,
  1250. ReleaseShardList: releaseShardList,
  1251. ReadEndShardList: readEndShardList,
  1252. }
  1253. _, commonResp, err := datahub.Client.Post(path, sgr, reqPara)
  1254. if err != nil {
  1255. return nil, err
  1256. }
  1257. return NewSyncGroupResult(commonResp)
  1258. }
  1259. func (datahub *DataHub) LeaveGroup(projectName, topicName, consumerGroup, consumerId string, versionId int64) (*LeaveGroupResult, error) {
  1260. if !util.CheckProjectName(projectName) {
  1261. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  1262. }
  1263. if !util.CheckTopicName(topicName) {
  1264. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  1265. }
  1266. path := fmt.Sprintf(consumerGroupPath, projectName, topicName, consumerGroup)
  1267. reqPara := &RequestParameter{
  1268. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  1269. }
  1270. lgr := &LeaveGroupRequest{
  1271. Action: "leaveGroup",
  1272. ConsumerId: consumerId,
  1273. VersionId: versionId,
  1274. }
  1275. _, commonResp, err := datahub.Client.Post(path, lgr, reqPara)
  1276. if err != nil {
  1277. return nil, err
  1278. }
  1279. return NewLeaveGroupResult(commonResp)
  1280. }
  1281. func (datahub *DataHub) ListTopicSchema(projectName, topicName string) (*ListTopicSchemaResult, error) {
  1282. if !util.CheckProjectName(projectName) {
  1283. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  1284. }
  1285. if !util.CheckTopicName(topicName) {
  1286. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  1287. }
  1288. path := fmt.Sprintf(topicPath, projectName, topicName)
  1289. reqPara := &RequestParameter{
  1290. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  1291. }
  1292. lts := &ListTopicSchemaRequest{
  1293. Action: "ListSchema",
  1294. }
  1295. respBody, commonResp, err := datahub.Client.Post(path, lts, reqPara)
  1296. if err != nil {
  1297. return nil, err
  1298. }
  1299. return NewListTopicSchemaResult(respBody, commonResp)
  1300. }
  1301. func (datahub *DataHub) GetTopicSchemaByVersion(projectName, topicName string, versionId int) (*GetTopicSchemaResult, error) {
  1302. if !util.CheckProjectName(projectName) {
  1303. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  1304. }
  1305. if !util.CheckTopicName(topicName) {
  1306. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  1307. }
  1308. path := fmt.Sprintf(topicPath, projectName, topicName)
  1309. reqPara := &RequestParameter{
  1310. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  1311. }
  1312. lts := &GetTopicSchemaRequest{
  1313. Action: "GetSchema",
  1314. VersionId: versionId,
  1315. RecordSchema: nil,
  1316. }
  1317. respBody, commonResp, err := datahub.Client.Post(path, lts, reqPara)
  1318. if err != nil {
  1319. return nil, err
  1320. }
  1321. return NewGetTopicSchemaResult(respBody, commonResp)
  1322. }
  1323. func (datahub *DataHub) GetTopicSchemaBySchema(projectName, topicName string, recordSchema *RecordSchema) (*GetTopicSchemaResult, error) {
  1324. if !util.CheckProjectName(projectName) {
  1325. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  1326. }
  1327. if !util.CheckTopicName(topicName) {
  1328. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  1329. }
  1330. path := fmt.Sprintf(topicPath, projectName, topicName)
  1331. reqPara := &RequestParameter{
  1332. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  1333. }
  1334. lts := &GetTopicSchemaRequest{
  1335. Action: "GetSchema",
  1336. VersionId: -1,
  1337. RecordSchema: recordSchema,
  1338. }
  1339. respBody, commonResp, err := datahub.Client.Post(path, lts, reqPara)
  1340. if err != nil {
  1341. return nil, err
  1342. }
  1343. return NewGetTopicSchemaResult(respBody, commonResp)
  1344. }
  1345. func (datahub *DataHub) RegisterTopicSchema(projectName, topicName string, recordSchema *RecordSchema) (*RegisterTopicSchemaResult, error) {
  1346. if !util.CheckProjectName(projectName) {
  1347. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  1348. }
  1349. if !util.CheckTopicName(topicName) {
  1350. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  1351. }
  1352. path := fmt.Sprintf(topicPath, projectName, topicName)
  1353. reqPara := &RequestParameter{
  1354. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  1355. }
  1356. lts := &RegisterTopicSchemaRequest{
  1357. Action: "RegisterSchema",
  1358. RecordSchema: recordSchema,
  1359. }
  1360. respBody, commonResp, err := datahub.Client.Post(path, lts, reqPara)
  1361. if err != nil {
  1362. return nil, err
  1363. }
  1364. return NewRegisterTopicSchemaResult(respBody, commonResp)
  1365. }
  1366. func (datahub *DataHub) DeleteTopicSchema(projectName, topicName string, versionId int) (*DeleteTopicSchemaResult, error) {
  1367. if !util.CheckProjectName(projectName) {
  1368. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  1369. }
  1370. if !util.CheckTopicName(topicName) {
  1371. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  1372. }
  1373. path := fmt.Sprintf(topicPath, projectName, topicName)
  1374. reqPara := &RequestParameter{
  1375. Header: map[string]string{httpHeaderContentType: httpJsonContent},
  1376. }
  1377. lts := &DeleteTopicSchemaRequest{
  1378. Action: "DeleteSchema",
  1379. VersionId: versionId,
  1380. }
  1381. _, commonResp, err := datahub.Client.Post(path, lts, reqPara)
  1382. if err != nil {
  1383. return nil, err
  1384. }
  1385. return NewDeleteTopicSchemaResult(commonResp)
  1386. }
  1387. func (datahub *DataHub) getSchemaRegistry() *schemaRegistryClient {
  1388. return datahub.schemaClient
  1389. }
  1390. type DataHubPB struct {
  1391. DataHub
  1392. }
  1393. func (datahub *DataHubPB) PutRecords(projectName, topicName string, records []IRecord) (*PutRecordsResult, error) {
  1394. if !util.CheckProjectName(projectName) {
  1395. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  1396. }
  1397. if !util.CheckTopicName(topicName) {
  1398. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  1399. }
  1400. path := fmt.Sprintf(shardsPath, projectName, topicName)
  1401. reqPara := &RequestParameter{
  1402. Header: map[string]string{
  1403. httpHeaderContentType: httpProtoContent,
  1404. httpHeaderRequestAction: httpPublistContent},
  1405. }
  1406. prr := &PutPBRecordsRequest{
  1407. Records: records,
  1408. }
  1409. respBody, commonResp, err := datahub.Client.Post(path, prr, reqPara)
  1410. if err != nil {
  1411. return nil, err
  1412. }
  1413. return NewPutPBRecordsResult(respBody, commonResp)
  1414. }
  1415. func (datahub *DataHubPB) PutRecordsByShard(projectName, topicName, shardId string, records []IRecord) (*PutRecordsByShardResult, error) {
  1416. if !util.CheckProjectName(projectName) {
  1417. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  1418. }
  1419. if !util.CheckTopicName(topicName) {
  1420. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  1421. }
  1422. if !util.CheckShardId(shardId) {
  1423. return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
  1424. }
  1425. path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
  1426. reqPara := &RequestParameter{
  1427. Header: map[string]string{
  1428. httpHeaderContentType: httpProtoContent,
  1429. httpHeaderRequestAction: httpPublistContent},
  1430. }
  1431. prr := &PutPBRecordsRequest{
  1432. Records: records,
  1433. }
  1434. _, commonResp, err := datahub.Client.Post(path, prr, reqPara)
  1435. if err != nil {
  1436. return nil, err
  1437. }
  1438. return NewPutRecordsByShardResult(commonResp)
  1439. }
  1440. func (datahub *DataHubPB) GetTupleRecords(projectName, topicName, shardId, cursor string, limit int, recordSchema *RecordSchema) (*GetRecordsResult, error) {
  1441. if !util.CheckProjectName(projectName) {
  1442. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  1443. }
  1444. if !util.CheckTopicName(topicName) {
  1445. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  1446. }
  1447. if !util.CheckShardId(shardId) {
  1448. return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
  1449. }
  1450. path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
  1451. reqPara := &RequestParameter{
  1452. Header: map[string]string{
  1453. httpHeaderContentType: httpProtoContent,
  1454. httpHeaderRequestAction: httpSubscribeContent},
  1455. }
  1456. grr := &GetPBRecordRequest{
  1457. Cursor: cursor,
  1458. Limit: limit,
  1459. }
  1460. respBody, commonResp, err := datahub.Client.Post(path, grr, reqPara)
  1461. if err != nil {
  1462. return nil, err
  1463. }
  1464. return NewGetPBRecordsResult(respBody, recordSchema, commonResp)
  1465. }
  1466. func (datahub *DataHubPB) GetBlobRecords(projectName, topicName, shardId, cursor string, limit int) (*GetRecordsResult, error) {
  1467. if !util.CheckProjectName(projectName) {
  1468. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  1469. }
  1470. if !util.CheckTopicName(topicName) {
  1471. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  1472. }
  1473. if !util.CheckShardId(shardId) {
  1474. return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
  1475. }
  1476. path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
  1477. reqPara := &RequestParameter{
  1478. Header: map[string]string{
  1479. httpHeaderContentType: httpProtoContent,
  1480. httpHeaderRequestAction: httpSubscribeContent},
  1481. }
  1482. grr := &GetPBRecordRequest{
  1483. Cursor: cursor,
  1484. Limit: limit,
  1485. }
  1486. respBody, commonResp, err := datahub.Client.Post(path, grr, reqPara)
  1487. if err != nil {
  1488. return nil, err
  1489. }
  1490. return NewGetPBRecordsResult(respBody, nil, commonResp)
  1491. }
  1492. type DataHubBatch struct {
  1493. DataHub
  1494. }
  1495. func (datahub *DataHubBatch) PutRecords(projectName, topicName string, records []IRecord) (*PutRecordsResult, error) {
  1496. return nil, errors.New("not support this method")
  1497. }
  1498. func (datahub *DataHubBatch) PutRecordsByShard(projectName, topicName, shardId string, records []IRecord) (*PutRecordsByShardResult, error) {
  1499. if !util.CheckProjectName(projectName) {
  1500. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  1501. }
  1502. if !util.CheckTopicName(topicName) {
  1503. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  1504. }
  1505. if !util.CheckShardId(shardId) {
  1506. return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
  1507. }
  1508. path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
  1509. reqPara := &RequestParameter{
  1510. Header: map[string]string{
  1511. httpHeaderContentType: httpProtoBatchContent,
  1512. httpHeaderRequestAction: httpPublistContent},
  1513. }
  1514. serializer := newBatchSerializer(projectName, topicName, datahub.cType, datahub.schemaClient)
  1515. prr := &PutBatchRecordsRequest{
  1516. serializer: serializer,
  1517. Records: records,
  1518. }
  1519. _, commonResp, err := datahub.Client.Post(path, prr, reqPara)
  1520. if err != nil {
  1521. return nil, err
  1522. }
  1523. return NewPutRecordsByShardResult(commonResp)
  1524. }
  1525. func (datahub *DataHubBatch) GetTupleRecords(projectName, topicName, shardId, cursor string, limit int, recordSchema *RecordSchema) (*GetRecordsResult, error) {
  1526. if !util.CheckProjectName(projectName) {
  1527. return nil, NewInvalidParameterErrorWithMessage(projectNameInvalid)
  1528. }
  1529. if !util.CheckTopicName(topicName) {
  1530. return nil, NewInvalidParameterErrorWithMessage(topicNameInvalid)
  1531. }
  1532. if !util.CheckShardId(shardId) {
  1533. return nil, NewInvalidParameterErrorWithMessage(shardIdInvalid)
  1534. }
  1535. path := fmt.Sprintf(shardPath, projectName, topicName, shardId)
  1536. reqPara := &RequestParameter{
  1537. Header: map[string]string{
  1538. httpHeaderContentType: httpProtoBatchContent,
  1539. httpHeaderRequestAction: httpSubscribeContent},
  1540. }
  1541. gbr := &GetBatchRecordRequest{
  1542. GetPBRecordRequest{
  1543. Cursor: cursor,
  1544. Limit: limit,
  1545. },
  1546. }
  1547. respBody, commonResp, err := datahub.Client.Post(path, gbr, reqPara)
  1548. if err != nil {
  1549. return nil, err
  1550. }
  1551. deserializer := newBatchDeserializer(projectName, topicName, shardId, recordSchema, datahub.schemaClient)
  1552. return NewGetBatchRecordsResult(respBody, recordSchema, commonResp, deserializer)
  1553. }
  1554. func (datahub *DataHubBatch) GetBlobRecords(projectName, topicName, shardId, cursor string, limit int) (*GetRecordsResult, error) {
  1555. return datahub.GetTupleRecords(projectName, topicName, shardId, cursor, limit, nil)
  1556. }