123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264 |
- package datahub
- func NewClientWithConfig(endpoint string, config *Config, account Account) DataHubApi {
- if config.UserAgent == "" {
- config.UserAgent = DefaultUserAgent()
- }
- if config.HttpClient == nil {
- config.HttpClient = DefaultHttpClient()
- }
- if !validateCompressorType(config.CompressorType) {
- config.CompressorType = NOCOMPRESS
- }
- dh := &DataHub{
- Client: NewRestClient(endpoint, config.UserAgent, config.HttpClient,
- account, config.CompressorType),
- cType: config.CompressorType,
- }
- if config.EnableSchemaRegistry {
- dh.schemaClient = NewSchemaClient(dh)
- // compress data in batch record, no need to compress http body
- if config.CompressorType != NOCOMPRESS {
- dh.Client.CompressorType = NOCOMPRESS
- }
- return &DataHubBatch{
- DataHub: *dh,
- }
- } else {
- if config.EnableBinary {
- return &DataHubPB{
- DataHub: *dh,
- }
- } else {
- return dh
- }
- }
- }
- func New(accessId, accessKey, endpoint string) DataHubApi {
- config := NewDefaultConfig()
- return NewClientWithConfig(endpoint, config, NewAliyunAccount(accessId, accessKey))
- }
- func NewBatchClient(accessId, accessKey, endpoint string) DataHubApi {
- config := NewDefaultConfig()
- config.EnableSchemaRegistry = true
- config.CompressorType = LZ4
- return NewClientWithConfig(endpoint, config, NewAliyunAccount(accessId, accessKey))
- }
- // Datahub provides restful apis for visiting examples service.
- type DataHubApi interface {
- // List all projects the user owns.
- ListProject() (*ListProjectResult, error)
- // List all projects the user owns with filter.
- ListProjectWithFilter(filter string) (*ListProjectResult, error)
- // Create a examples project.
- CreateProject(projectName, comment string) (*CreateProjectResult, error)
- // Update project information. Only support comment
- UpdateProject(projectName, comment string) (*UpdateProjectResult, error)
- // Delete the specified project. If any topics exist in the project, the delete operation will fail.
- DeleteProject(projectName string) (*DeleteProjectResult, error)
- // Get the information of the specified project.
- GetProject(projectName string) (*GetProjectResult, error)
- // Update project vpc white list.
- UpdateProjectVpcWhitelist(projectName, vpcIds string) (*UpdateProjectVpcWhitelistResult, error)
- // Wait for all shards' status of this topic is ACTIVE. Default timeout is 60s.
- WaitAllShardsReady(projectName, topicName string) bool
- // Wait for all shards' status of this topic is ACTIVE.
- // The unit is seconds.
- // If timeout < 0, it will block util all shards ready
- WaitAllShardsReadyWithTime(projectName, topicName string, timeout int64) bool
- // List all topics in the project.
- ListTopic(projectName string) (*ListTopicResult, error)
- // List all topics in the project with filter.
- ListTopicWithFilter(projectName, filter string) (*ListTopicResult, error)
- // Create a examples topic with type: BLOB
- CreateBlobTopic(projectName, topicName, comment string, shardCount, lifeCycle int) (*CreateBlobTopicResult, error)
- // Create a examples topic with type: TUPLE
- CreateTupleTopic(projectName, topicName, comment string, shardCount, lifeCycle int, recordSchema *RecordSchema) (*CreateTupleTopicResult, error)
- // Create topic with specific parameter
- CreateTopicWithPara(projectName, topicName string, para *CreateTopicParameter) (*CreateTopicWithParaResult, error)
- // Update topic meta information.
- UpdateTopic(projectName, topicName, comment string) (*UpdateTopicResult, error)
- // Update topic meta information. Only support comment and lifeCycle now.
- UpdateTopicWithPara(projectName, topicName string, para *UpdateTopicParameter) (*UpdateTopicResult, error)
- // Delete a specified topic.
- DeleteTopic(projectName, topicName string) (*DeleteTopicResult, error)
- // Get the information of the specified topic.
- GetTopic(projectName, topicName string) (*GetTopicResult, error)
- // List shard information {ShardEntry} of a topic.
- ListShard(projectName, topicName string) (*ListShardResult, error)
- // Split a shard. In function, sdk will automatically compute the split key which is used to split shard.
- SplitShard(projectName, topicName, shardId string) (*SplitShardResult, error)
- // Split a shard by the specified splitKey.
- SplitShardBySplitKey(projectName, topicName, shardId, splitKey string) (*SplitShardResult, error)
- // Merge the specified shard and its adjacent shard. Only adjacent shards can be merged.
- MergeShard(projectName, topicName, shardId, adjacentShardId string) (*MergeShardResult, error)
- // Extend shard num.
- ExtendShard(projectName, topicName string, shardCount int) (*ExtendShardResult, error)
- // Get the data cursor of a shard. This function support OLDEST, LATEST, SYSTEM_TIME and SEQUENCE.
- // If choose OLDEST or LATEST, the last parameter will not be needed.
- // if choose SYSTEM_TIME or SEQUENCE. it needs to a parameter as sequence num or timestamp.
- GetCursor(projectName, topicName, shardId string, ctype CursorType, param ...int64) (*GetCursorResult, error)
- // Write data records into a DataHub topic.
- // The PutRecordsResult includes unsuccessfully processed records.
- // Datahub attempts to process all records in each record.
- // A single record failure does not stop the processing of subsequent records.
- PutRecords(projectName, topicName string, records []IRecord) (*PutRecordsResult, error)
- PutRecordsByShard(projectName, topicName, shardId string, records []IRecord) (*PutRecordsByShardResult, error)
- // Get the TUPLE records of a shard.
- GetTupleRecords(projectName, topicName, shardId, cursor string, limit int, recordSchema *RecordSchema) (*GetRecordsResult, error)
- // Get the BLOB records of a shard.
- GetBlobRecords(projectName, topicName, shardId, cursor string, limit int) (*GetRecordsResult, error)
- // Append a field to a TUPLE topic.
- // Field AllowNull should be true.
- AppendField(projectName, topicName string, field Field) (*AppendFieldResult, error)
- // Get metering info of the specified shard
- GetMeterInfo(projectName, topicName, shardId string) (*GetMeterInfoResult, error)
- // List name of connectors.
- ListConnector(projectName, topicName string) (*ListConnectorResult, error)
- // Create data connectors.
- CreateConnector(projectName, topicName string, cType ConnectorType, columnFields []string, config interface{}) (*CreateConnectorResult, error)
- // Create connector with start time(unit:ms)
- CreateConnectorWithStartTime(projectName, topicName string, cType ConnectorType,
- columnFields []string, sinkStartTime int64, config interface{}) (*CreateConnectorResult, error)
- // Create connector with parameter
- CreateConnectorWithPara(projectName, topicName string, para *CreateConnectorParameter) (*CreateConnectorResult, error)
- // Update connector config of the specified data connector.
- // Config should be SinkOdpsConfig, SinkOssConfig ...
- UpdateConnector(projectName, topicName, connectorId string, config interface{}) (*UpdateConnectorResult, error)
- // Update connector with parameter
- UpdateConnectorWithPara(projectName, topicName, connectorId string, para *UpdateConnectorParameter) (*UpdateConnectorResult, error)
- // Delete a data connector.
- DeleteConnector(projectName, topicName, connectorId string) (*DeleteConnectorResult, error)
- // Get information of the specified data connector.
- GetConnector(projectName, topicName, connectorId string) (*GetConnectorResult, error)
- // Get the done time of a data connector. This method mainly used to get MaxCompute synchronize point.
- GetConnectorDoneTime(projectName, topicName, connectorId string) (*GetConnectorDoneTimeResult, error)
- // Get the detail information of the shard task which belongs to the specified data connector.
- GetConnectorShardStatus(projectName, topicName, connectorId string) (*GetConnectorShardStatusResult, error)
- // Get the detail information of the shard task which belongs to the specified data connector.
- GetConnectorShardStatusByShard(projectName, topicName, connectorId, shardId string) (*GetConnectorShardStatusByShardResult, error)
- // Reload a data connector.
- ReloadConnector(projectName, topicName, connectorId string) (*ReloadConnectorResult, error)
- // Reload the specified shard of the data connector.
- ReloadConnectorByShard(projectName, topicName, connectorId, shardId string) (*ReloadConnectorByShardResult, error)
- // Update the state of the data connector
- UpdateConnectorState(projectName, topicName, connectorId string, state ConnectorState) (*UpdateConnectorStateResult, error)
- // Update connector sink offset. The operation must be operated after connector stopped.
- UpdateConnectorOffset(projectName, topicName, connectorId, shardId string, offset ConnectorOffset) (*UpdateConnectorOffsetResult, error)
- // Append data connector field.
- // Before run this method, you should ensure that this field is in both the topic and the connector.
- AppendConnectorField(projectName, topicName, connectorId, fieldName string) (*AppendConnectorFieldResult, error)
- // List subscriptions in the topic.
- ListSubscription(projectName, topicName string, pageIndex, pageSize int) (*ListSubscriptionResult, error)
- // Create a subscription, and then you should commit offsets with this subscription.
- CreateSubscription(projectName, topicName, comment string) (*CreateSubscriptionResult, error)
- // Update a subscription. Now only support update comment information.
- UpdateSubscription(projectName, topicName, subId, comment string) (*UpdateSubscriptionResult, error)
- // Delete a subscription.
- DeleteSubscription(projectName, topicName, subId string) (*DeleteSubscriptionResult, error)
- // Get the detail information of a subscription.
- GetSubscription(projectName, topicName, subId string) (*GetSubscriptionResult, error)
- // Update a subscription' state. You can change the state of a subscription to SUB_ONLINE or SUB_OFFLINE.
- // When offline, you can not commit offsets of the subscription.
- UpdateSubscriptionState(projectName, topicName, subId string, state SubscriptionState) (*UpdateSubscriptionStateResult, error)
- // Init and get a subscription session, and returns offset if any offset stored before.
- // Subscription should be initialized before use. This operation makes sure that only one client use this subscription.
- // If this function be called in elsewhere, the seesion will be invalid and can not commit offsets of the subscription.
- OpenSubscriptionSession(projectName, topicName, subId string, shardIds []string) (*OpenSubscriptionSessionResult, error)
- // Get offsets of a subscription.This method dost not return sessionId in SubscriptionOffset.
- // Only the SubscriptionOffset containing sessionId can commit offset.
- GetSubscriptionOffset(projectName, topicName, subId string, shardIds []string) (*GetSubscriptionOffsetResult, error)
- // Update offsets of shards to server. This operation allows you store offsets on the server side.
- CommitSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) (*CommitSubscriptionOffsetResult, error)
- // Reset offsets of shards to server. This operation allows you reset offsets on the server side.
- ResetSubscriptionOffset(projectName, topicName, subId string, offsets map[string]SubscriptionOffset) (*ResetSubscriptionOffsetResult, error)
- // Heartbeat request to let server know consumer status.
- Heartbeat(projectName, topicName, consumerGroup, consumerId string, versionId int64, holdShardList, readEndShardList []string) (*HeartbeatResult, error)
- // Join a consumer group.
- JoinGroup(projectName, topicName, consumerGroup string, sessionTimeout int64) (*JoinGroupResult, error)
- // Sync consumer group info.
- SyncGroup(projectName, topicName, consumerGroup, consumerId string, versionId int64, releaseShardList, readEndShardList []string) (*SyncGroupResult, error)
- // Leave consumer group info.
- LeaveGroup(projectName, topicName, consumerGroup, consumerId string, versionId int64) (*LeaveGroupResult, error)
- // List topic schema.
- ListTopicSchema(projectName, topicName string) (*ListTopicSchemaResult, error)
- // Get topic schema by versionId.
- GetTopicSchemaByVersion(projectName, topicName string, versionId int) (*GetTopicSchemaResult, error)
- // Get topic schema by schema string.
- GetTopicSchemaBySchema(projectName, topicName string, recordSchema *RecordSchema) (*GetTopicSchemaResult, error)
- // Register schema to a topic.
- RegisterTopicSchema(projectName, topicName string, recordSchema *RecordSchema) (*RegisterTopicSchemaResult, error)
- // Delete topic schema by versionId
- DeleteTopicSchema(projectName, topicName string, versionId int) (*DeleteTopicSchemaResult, error)
- }
|