datahub.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. package datahub
  2. func NewClientWithConfig(endpoint string, config *Config, account Account) DataHubApi {
  3. if config.UserAgent == "" {
  4. config.UserAgent = DefaultUserAgent()
  5. }
  6. if config.HttpClient == nil {
  7. config.HttpClient = DefaultHttpClient()
  8. }
  9. if !validateCompressorType(config.CompressorType) {
  10. config.CompressorType = NOCOMPRESS
  11. }
  12. dh := &DataHub{
  13. Client: NewRestClient(endpoint, config.UserAgent, config.HttpClient,
  14. account, config.CompressorType),
  15. cType: config.CompressorType,
  16. }
  17. if config.EnableSchemaRegistry {
  18. dh.schemaClient = NewSchemaClient(dh)
  19. // compress data in batch record, no need to compress http body
  20. if config.CompressorType != NOCOMPRESS {
  21. dh.Client.CompressorType = NOCOMPRESS
  22. }
  23. return &DataHubBatch{
  24. DataHub: *dh,
  25. }
  26. } else {
  27. if config.EnableBinary {
  28. return &DataHubPB{
  29. DataHub: *dh,
  30. }
  31. } else {
  32. return dh
  33. }
  34. }
  35. }
  36. func New(accessId, accessKey, endpoint string) DataHubApi {
  37. config := NewDefaultConfig()
  38. return NewClientWithConfig(endpoint, config, NewAliyunAccount(accessId, accessKey))
  39. }
  40. func NewBatchClient(accessId, accessKey, endpoint string) DataHubApi {
  41. config := NewDefaultConfig()
  42. config.EnableSchemaRegistry = true
  43. config.CompressorType = LZ4
  44. return NewClientWithConfig(endpoint, config, NewAliyunAccount(accessId, accessKey))
  45. }
  46. // Datahub provides restful apis for visiting examples service.
  47. type DataHubApi interface {
  48. // List all projects the user owns.
  49. ListProject() (*ListProjectResult, error)
  50. // List all projects the user owns with filter.
  51. ListProjectWithFilter(filter string) (*ListProjectResult, error)
  52. // Create a examples project.
  53. CreateProject(projectName, comment string) (*CreateProjectResult, error)
  54. // Update project information. Only support comment
  55. UpdateProject(projectName, comment string) (*UpdateProjectResult, error)
  56. // Delete the specified project. If any topics exist in the project, the delete operation will fail.
  57. DeleteProject(projectName string) (*DeleteProjectResult, error)
  58. // Get the information of the specified project.
  59. GetProject(projectName string) (*GetProjectResult, error)
  60. // Update project vpc white list.
  61. UpdateProjectVpcWhitelist(projectName, vpcIds string) (*UpdateProjectVpcWhitelistResult, error)
  62. // Wait for all shards' status of this topic is ACTIVE. Default timeout is 60s.
  63. WaitAllShardsReady(projectName, topicName string) bool
  64. // Wait for all shards' status of this topic is ACTIVE.
  65. // The unit is seconds.
  66. // If timeout < 0, it will block util all shards ready
  67. WaitAllShardsReadyWithTime(projectName, topicName string, timeout int64) bool
  68. // List all topics in the project.
  69. ListTopic(projectName string) (*ListTopicResult, error)
  70. // List all topics in the project with filter.
  71. ListTopicWithFilter(projectName, filter string) (*ListTopicResult, error)
  72. // Create a examples topic with type: BLOB
  73. CreateBlobTopic(projectName, topicName, comment string, shardCount, lifeCycle int) (*CreateBlobTopicResult, error)
  74. // Create a examples topic with type: TUPLE
  75. CreateTupleTopic(projectName, topicName, comment string, shardCount, lifeCycle int, recordSchema *RecordSchema) (*CreateTupleTopicResult, error)
  76. // Create topic with specific parameter
  77. CreateTopicWithPara(projectName, topicName string, para *CreateTopicParameter) (*CreateTopicWithParaResult, error)
  78. // Update topic meta information.
  79. UpdateTopic(projectName, topicName, comment string) (*UpdateTopicResult, error)
  80. // Update topic meta information. Only support comment and lifeCycle now.
  81. UpdateTopicWithPara(projectName, topicName string, para *UpdateTopicParameter) (*UpdateTopicResult, error)
  82. // Delete a specified topic.
  83. DeleteTopic(projectName, topicName string) (*DeleteTopicResult, error)
  84. // Get the information of the specified topic.
  85. GetTopic(projectName, topicName string) (*GetTopicResult, error)
  86. // List shard information {ShardEntry} of a topic.
  87. ListShard(projectName, topicName string) (*ListShardResult, error)
  88. // Split a shard. In function, sdk will automatically compute the split key which is used to split shard.
  89. SplitShard(projectName, topicName, shardId string) (*SplitShardResult, error)
  90. // Split a shard by the specified splitKey.
  91. SplitShardBySplitKey(projectName, topicName, shardId, splitKey string) (*SplitShardResult, error)
  92. // Merge the specified shard and its adjacent shard. Only adjacent shards can be merged.
  93. MergeShard(projectName, topicName, shardId, adjacentShardId string) (*MergeShardResult, error)
  94. // Extend shard num.
  95. ExtendShard(projectName, topicName string, shardCount int) (*ExtendShardResult, error)
  96. // Get the data cursor of a shard. This function support OLDEST, LATEST, SYSTEM_TIME and SEQUENCE.
  97. // If choose OLDEST or LATEST, the last parameter will not be needed.
  98. // if choose SYSTEM_TIME or SEQUENCE. it needs to a parameter as sequence num or timestamp.
  99. GetCursor(projectName, topicName, shardId string, ctype CursorType, param ...int64) (*GetCursorResult, error)
  100. // Write data records into a DataHub topic.
  101. // The PutRecordsResult includes unsuccessfully processed records.
  102. // Datahub attempts to process all records in each record.
  103. // A single record failure does not stop the processing of subsequent records.
  104. PutRecords(projectName, topicName string, records []IRecord) (*PutRecordsResult, error)
  105. PutRecordsByShard(projectName, topicName, shardId string, records []IRecord) (*PutRecordsByShardResult, error)
  106. // Get the TUPLE records of a shard.
  107. GetTupleRecords(projectName, topicName, shardId, cursor string, limit int, recordSchema *RecordSchema) (*GetRecordsResult, error)
  108. // Get the BLOB records of a shard.
  109. GetBlobRecords(projectName, topicName, shardId, cursor string, limit int) (*GetRecordsResult, error)
  110. // Append a field to a TUPLE topic.
  111. // Field AllowNull should be true.
  112. AppendField(projectName, topicName string, field Field) (*AppendFieldResult, error)
  113. // Get metering info of the specified shard
  114. GetMeterInfo(projectName, topicName, shardId string) (*GetMeterInfoResult, error)
  115. // List name of connectors.
  116. ListConnector(projectName, topicName string) (*ListConnectorResult, error)
  117. // Create data connectors.
  118. CreateConnector(projectName, topicName string, cType ConnectorType, columnFields []string, config interface{}) (*CreateConnectorResult, error)
  119. // Create connector with start time(unit:ms)
  120. CreateConnectorWithStartTime(projectName, topicName string, cType ConnectorType,
  121. columnFields []string, sinkStartTime int64, config interface{}) (*CreateConnectorResult, error)
  122. // Create connector with parameter
  123. CreateConnectorWithPara(projectName, topicName string, para *CreateConnectorParameter) (*CreateConnectorResult, error)
  124. // Update connector config of the specified data connector.
  125. // Config should be SinkOdpsConfig, SinkOssConfig ...
  126. UpdateConnector(projectName, topicName, connectorId string, config interface{}) (*UpdateConnectorResult, error)
  127. // Update connector with parameter
  128. UpdateConnectorWithPara(projectName, topicName, connectorId string, para *UpdateConnectorParameter) (*UpdateConnectorResult, error)
  129. // Delete a data connector.
  130. DeleteConnector(projectName, topicName, connectorId string) (*DeleteConnectorResult, error)
  131. // Get information of the specified data connector.
  132. GetConnector(projectName, topicName, connectorId string) (*GetConnectorResult, error)
  133. // Get the done time of a data connector. This method mainly used to get MaxCompute synchronize point.
  134. GetConnectorDoneTime(projectName, topicName, connectorId string) (*GetConnectorDoneTimeResult, error)
  135. // Get the detail information of the shard task which belongs to the specified data connector.
  136. GetConnectorShardStatus(projectName, topicName, connectorId string) (*GetConnectorShardStatusResult, error)
  137. // Get the detail information of the shard task which belongs to the specified data connector.
  138. GetConnectorShardStatusByShard(projectName, topicName, connectorId, shardId string) (*GetConnectorShardStatusByShardResult, error)
  139. // Reload a data connector.
  140. ReloadConnector(projectName, topicName, connectorId string) (*ReloadConnectorResult, error)
  141. // Reload the specified shard of the data connector.
  142. ReloadConnectorByShard(projectName, topicName, connectorId, shardId string) (*ReloadConnectorByShardResult, error)
  143. // Update the state of the data connector
  144. UpdateConnectorState(projectName, topicName, connectorId string, state ConnectorState) (*UpdateConnectorStateResult, error)
  145. // Update connector sink offset. The operation must be operated after connector stopped.
  146. UpdateConnectorOffset(projectName, topicName, connectorId, shardId string, offset ConnectorOffset) (*UpdateConnectorOffsetResult, error)
  147. // Append data connector field.
  148. // Before run this method, you should ensure that this field is in both the topic and the connector.
  149. AppendConnectorField(projectName, topicName, connectorId, fieldName string) (*AppendConnectorFieldResult, error)
  150. // List subscriptions in the topic.
  151. ListSubscription(projectName, topicName string, pageIndex, pageSize int) (*ListSubscriptionResult, error)
  152. // Create a subscription, and then you should commit offsets with this subscription.
  153. CreateSubscription(projectName, topicName, comment string) (*CreateSubscriptionResult, error)
  154. // Update a subscription. Now only support update comment information.
  155. UpdateSubscription(projectName, topicName, subId, comment string) (*UpdateSubscriptionResult, error)
  156. // Delete a subscription.
  157. DeleteSubscription(projectName, topicName, subId string) (*DeleteSubscriptionResult, error)
  158. // Get the detail information of a subscription.
  159. GetSubscription(projectName, topicName, subId string) (*GetSubscriptionResult, error)
  160. // Update a subscription' state. You can change the state of a subscription to SUB_ONLINE or SUB_OFFLINE.
  161. // When offline, you can not commit offsets of the subscription.
  162. UpdateSubscriptionState(projectName, topicName, subId string, state SubscriptionState) (*UpdateSubscriptionStateResult, error)
  163. // Init and get a subscription session, and returns offset if any offset stored before.
  164. // Subscription should be initialized before use. This operation makes sure that only one client use this subscription.
  165. // If this function be called in elsewhere, the seesion will be invalid and can not commit offsets of the subscription.
  166. OpenSubscriptionSession(projectName, topicName, subId string, shardIds []string) (*OpenSubscriptionSessionResult, error)
  167. // Get offsets of a subscription.This method dost not return sessionId in SubscriptionOffset.
  168. // Only the SubscriptionOffset containing sessionId can commit offset.
  169. GetSubscriptionOffset(projectName, topicName, subId string, shardIds []string) (*GetSubscriptionOffsetResult, error)
  170. // Update offsets of shards to server. This operation allows you store offsets on the server side.
  171. CommitSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) (*CommitSubscriptionOffsetResult, error)
  172. // Reset offsets of shards to server. This operation allows you reset offsets on the server side.
  173. ResetSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) (*ResetSubscriptionOffsetResult, error)
  174. // Heartbeat request to let server know consumer status.
  175. Heartbeat(projectName, topicName, consumerGroup, consumerId string, versionId int64, holdShardList, readEndShardList []string) (*HeartbeatResult, error)
  176. // Join a consumer group.
  177. JoinGroup(projectName, topicName, consumerGroup string, sessionTimeout int64) (*JoinGroupResult, error)
  178. // Sync consumer group info.
  179. SyncGroup(projectName, topicName, consumerGroup, consumerId string, versionId int64, releaseShardList, readEndShardList []string) (*SyncGroupResult, error)
  180. // Leave consumer group info.
  181. LeaveGroup(projectName, topicName, consumerGroup, consumerId string, versionId int64) (*LeaveGroupResult, error)
  182. // List topic schema.
  183. ListTopicSchema(projectName, topicName string) (*ListTopicSchemaResult, error)
  184. // Get topic schema by versionId.
  185. GetTopicSchemaByVersion(projectName, topicName string, versionId int) (*GetTopicSchemaResult, error)
  186. // Get topic schema by schema string.
  187. GetTopicSchemaBySchema(projectName, topicName string, recordSchema *RecordSchema) (*GetTopicSchemaResult, error)
  188. // Register schema to a topic.
  189. RegisterTopicSchema(projectName, topicName string, recordSchema *RecordSchema) (*RegisterTopicSchemaResult, error)
  190. // Delete topic schema by versionId
  191. DeleteTopicSchema(projectName, topicName string, versionId int) (*DeleteTopicSchemaResult, error)
  192. }