123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433 |
- package kafka
- import (
- "bufio"
- "bytes"
- "context"
- "fmt"
- "net"
- "github.com/segmentio/kafka-go/protocol/describegroups"
- )
- type DescribeGroupsRequest struct {
-
- Addr net.Addr
-
- GroupIDs []string
- }
- type DescribeGroupsResponse struct {
-
- Groups []DescribeGroupsResponseGroup
- }
- type DescribeGroupsResponseGroup struct {
-
-
- Error error
-
- GroupID string
-
- GroupState string
-
- Members []DescribeGroupsResponseMember
- }
- type DescribeGroupsResponseMember struct {
-
- MemberID string
-
- ClientID string
-
- ClientHost string
-
- MemberMetadata DescribeGroupsResponseMemberMetadata
-
- MemberAssignments DescribeGroupsResponseAssignments
- }
- type DescribeGroupsResponseMemberMetadata struct {
-
- Version int
-
- Topics []string
-
- UserData []byte
- }
- type DescribeGroupsResponseAssignments struct {
-
- Version int
-
- Topics []GroupMemberTopic
-
- UserData []byte
- }
- type GroupMemberTopic struct {
-
- Topic string
-
- Partitions []int
- }
- func (c *Client) DescribeGroups(
- ctx context.Context,
- req *DescribeGroupsRequest,
- ) (*DescribeGroupsResponse, error) {
- protoResp, err := c.roundTrip(
- ctx,
- req.Addr,
- &describegroups.Request{
- Groups: req.GroupIDs,
- },
- )
- if err != nil {
- return nil, err
- }
- apiResp := protoResp.(*describegroups.Response)
- resp := &DescribeGroupsResponse{}
- for _, apiGroup := range apiResp.Groups {
- group := DescribeGroupsResponseGroup{
- Error: makeError(apiGroup.ErrorCode, ""),
- GroupID: apiGroup.GroupID,
- GroupState: apiGroup.GroupState,
- }
- for _, member := range apiGroup.Members {
- decodedMetadata, err := decodeMemberMetadata(member.MemberMetadata)
- if err != nil {
- return nil, err
- }
- decodedAssignments, err := decodeMemberAssignments(member.MemberAssignment)
- if err != nil {
- return nil, err
- }
- group.Members = append(group.Members, DescribeGroupsResponseMember{
- MemberID: member.MemberID,
- ClientID: member.ClientID,
- ClientHost: member.ClientHost,
- MemberAssignments: decodedAssignments,
- MemberMetadata: decodedMetadata,
- })
- }
- resp.Groups = append(resp.Groups, group)
- }
- return resp, nil
- }
- type describeGroupsRequestV0 struct {
-
-
- GroupIDs []string
- }
- func (t describeGroupsRequestV0) size() int32 {
- return sizeofStringArray(t.GroupIDs)
- }
- func (t describeGroupsRequestV0) writeTo(wb *writeBuffer) {
- wb.writeStringArray(t.GroupIDs)
- }
- type describeGroupsResponseMemberV0 struct {
-
- MemberID string
-
- ClientID string
-
-
- ClientHost string
-
-
- MemberMetadata []byte
-
-
-
-
- MemberAssignments []byte
- }
- func (t describeGroupsResponseMemberV0) size() int32 {
- return sizeofString(t.MemberID) +
- sizeofString(t.ClientID) +
- sizeofString(t.ClientHost) +
- sizeofBytes(t.MemberMetadata) +
- sizeofBytes(t.MemberAssignments)
- }
- func (t describeGroupsResponseMemberV0) writeTo(wb *writeBuffer) {
- wb.writeString(t.MemberID)
- wb.writeString(t.ClientID)
- wb.writeString(t.ClientHost)
- wb.writeBytes(t.MemberMetadata)
- wb.writeBytes(t.MemberAssignments)
- }
- func (t *describeGroupsResponseMemberV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
- if remain, err = readString(r, size, &t.MemberID); err != nil {
- return
- }
- if remain, err = readString(r, remain, &t.ClientID); err != nil {
- return
- }
- if remain, err = readString(r, remain, &t.ClientHost); err != nil {
- return
- }
- if remain, err = readBytes(r, remain, &t.MemberMetadata); err != nil {
- return
- }
- if remain, err = readBytes(r, remain, &t.MemberAssignments); err != nil {
- return
- }
- return
- }
- type describeGroupsResponseGroupV0 struct {
-
- ErrorCode int16
-
- GroupID string
-
-
- State string
-
-
- ProtocolType string
-
- Protocol string
-
- Members []describeGroupsResponseMemberV0
- }
- func (t describeGroupsResponseGroupV0) size() int32 {
- return sizeofInt16(t.ErrorCode) +
- sizeofString(t.GroupID) +
- sizeofString(t.State) +
- sizeofString(t.ProtocolType) +
- sizeofString(t.Protocol) +
- sizeofArray(len(t.Members), func(i int) int32 { return t.Members[i].size() })
- }
- func (t describeGroupsResponseGroupV0) writeTo(wb *writeBuffer) {
- wb.writeInt16(t.ErrorCode)
- wb.writeString(t.GroupID)
- wb.writeString(t.State)
- wb.writeString(t.ProtocolType)
- wb.writeString(t.Protocol)
- wb.writeArray(len(t.Members), func(i int) { t.Members[i].writeTo(wb) })
- }
- func (t *describeGroupsResponseGroupV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
- if remain, err = readInt16(r, size, &t.ErrorCode); err != nil {
- return
- }
- if remain, err = readString(r, remain, &t.GroupID); err != nil {
- return
- }
- if remain, err = readString(r, remain, &t.State); err != nil {
- return
- }
- if remain, err = readString(r, remain, &t.ProtocolType); err != nil {
- return
- }
- if remain, err = readString(r, remain, &t.Protocol); err != nil {
- return
- }
- fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
- item := describeGroupsResponseMemberV0{}
- if fnRemain, fnErr = (&item).readFrom(r, size); err != nil {
- return
- }
- t.Members = append(t.Members, item)
- return
- }
- if remain, err = readArrayWith(r, remain, fn); err != nil {
- return
- }
- return
- }
- type describeGroupsResponseV0 struct {
-
- Groups []describeGroupsResponseGroupV0
- }
- func (t describeGroupsResponseV0) size() int32 {
- return sizeofArray(len(t.Groups), func(i int) int32 { return t.Groups[i].size() })
- }
- func (t describeGroupsResponseV0) writeTo(wb *writeBuffer) {
- wb.writeArray(len(t.Groups), func(i int) { t.Groups[i].writeTo(wb) })
- }
- func (t *describeGroupsResponseV0) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
- fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
- item := describeGroupsResponseGroupV0{}
- if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil {
- return
- }
- t.Groups = append(t.Groups, item)
- return
- }
- if remain, err = readArrayWith(r, sz, fn); err != nil {
- return
- }
- return
- }
- func decodeMemberMetadata(rawMetadata []byte) (DescribeGroupsResponseMemberMetadata, error) {
- mm := DescribeGroupsResponseMemberMetadata{}
- if len(rawMetadata) == 0 {
- return mm, nil
- }
- buf := bytes.NewBuffer(rawMetadata)
- bufReader := bufio.NewReader(buf)
- remain := len(rawMetadata)
- var err error
- var version16 int16
- if remain, err = readInt16(bufReader, remain, &version16); err != nil {
- return mm, err
- }
- mm.Version = int(version16)
- if remain, err = readStringArray(bufReader, remain, &mm.Topics); err != nil {
- return mm, err
- }
- if remain, err = readBytes(bufReader, remain, &mm.UserData); err != nil {
- return mm, err
- }
- if remain != 0 {
- return mm, fmt.Errorf("Got non-zero number of bytes remaining: %d", remain)
- }
- return mm, nil
- }
- func decodeMemberAssignments(rawAssignments []byte) (DescribeGroupsResponseAssignments, error) {
- ma := DescribeGroupsResponseAssignments{}
- if len(rawAssignments) == 0 {
- return ma, nil
- }
- buf := bytes.NewBuffer(rawAssignments)
- bufReader := bufio.NewReader(buf)
- remain := len(rawAssignments)
- var err error
- var version16 int16
- if remain, err = readInt16(bufReader, remain, &version16); err != nil {
- return ma, err
- }
- ma.Version = int(version16)
- fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
- item := GroupMemberTopic{}
- if fnRemain, fnErr = readString(r, size, &item.Topic); fnErr != nil {
- return
- }
- partitions := []int32{}
- if fnRemain, fnErr = readInt32Array(r, fnRemain, &partitions); fnErr != nil {
- return
- }
- for _, partition := range partitions {
- item.Partitions = append(item.Partitions, int(partition))
- }
- ma.Topics = append(ma.Topics, item)
- return
- }
- if remain, err = readArrayWith(bufReader, remain, fn); err != nil {
- return ma, err
- }
- if remain, err = readBytes(bufReader, remain, &ma.UserData); err != nil {
- return ma, err
- }
- if remain != 0 {
- return ma, fmt.Errorf("Got non-zero number of bytes remaining: %d", remain)
- }
- return ma, nil
- }
- func readInt32Array(r *bufio.Reader, sz int, v *[]int32) (remain int, err error) {
- var content []int32
- fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
- var value int32
- if fnRemain, fnErr = readInt32(r, size, &value); fnErr != nil {
- return
- }
- content = append(content, value)
- return
- }
- if remain, err = readArrayWith(r, sz, fn); err != nil {
- return
- }
- *v = content
- return
- }
|