12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- package describegroups
- import (
- "github.com/segmentio/kafka-go/protocol"
- )
- func init() {
- protocol.Register(&Request{}, &Response{})
- }
- // Detailed API definition: https://kafka.apache.org/protocol#The_Messages_DescribeGroups
- type Request struct {
- Groups []string `kafka:"min=v0,max=v4"`
- IncludeAuthorizedOperations bool `kafka:"min=v3,max=v4"`
- }
- func (r *Request) ApiKey() protocol.ApiKey { return protocol.DescribeGroups }
- func (r *Request) Group() string {
- return r.Groups[0]
- }
- func (r *Request) Split(cluster protocol.Cluster) (
- []protocol.Message,
- protocol.Merger,
- error,
- ) {
- messages := []protocol.Message{}
- // Split requests by group since they'll need to go to different coordinators.
- for _, group := range r.Groups {
- messages = append(
- messages,
- &Request{
- Groups: []string{group},
- IncludeAuthorizedOperations: r.IncludeAuthorizedOperations,
- },
- )
- }
- return messages, new(Response), nil
- }
- type Response struct {
- ThrottleTimeMs int32 `kafka:"min=v1,max=v4"`
- Groups []ResponseGroup `kafka:"min=v0,max=v4"`
- }
- type ResponseGroup struct {
- ErrorCode int16 `kafka:"min=v0,max=v4"`
- GroupID string `kafka:"min=v0,max=v4"`
- GroupState string `kafka:"min=v0,max=v4"`
- ProtocolType string `kafka:"min=v0,max=v4"`
- ProtocolData string `kafka:"min=v0,max=v4"`
- Members []ResponseGroupMember `kafka:"min=v0,max=v4"`
- AuthorizedOperations int32 `kafka:"min=v3,max=v4"`
- }
- type ResponseGroupMember struct {
- MemberID string `kafka:"min=v0,max=v4"`
- GroupInstanceID string `kafka:"min=v4,max=v4,nullable"`
- ClientID string `kafka:"min=v0,max=v4"`
- ClientHost string `kafka:"min=v0,max=v4"`
- MemberMetadata []byte `kafka:"min=v0,max=v4"`
- MemberAssignment []byte `kafka:"min=v0,max=v4"`
- }
- func (r *Response) ApiKey() protocol.ApiKey { return protocol.DescribeGroups }
- func (r *Response) Merge(requests []protocol.Message, results []interface{}) (
- protocol.Message,
- error,
- ) {
- response := &Response{}
- for _, result := range results {
- subResp := result.(*Response)
- for _, group := range subResp.Groups {
- response.Groups = append(response.Groups, group)
- }
- }
- return response, nil
- }
|