describegroups.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. package describegroups
  2. import (
  3. "github.com/segmentio/kafka-go/protocol"
  4. )
  5. func init() {
  6. protocol.Register(&Request{}, &Response{})
  7. }
  8. // Detailed API definition: https://kafka.apache.org/protocol#The_Messages_DescribeGroups
  9. type Request struct {
  10. Groups []string `kafka:"min=v0,max=v4"`
  11. IncludeAuthorizedOperations bool `kafka:"min=v3,max=v4"`
  12. }
  13. func (r *Request) ApiKey() protocol.ApiKey { return protocol.DescribeGroups }
  14. func (r *Request) Group() string {
  15. return r.Groups[0]
  16. }
  17. func (r *Request) Split(cluster protocol.Cluster) (
  18. []protocol.Message,
  19. protocol.Merger,
  20. error,
  21. ) {
  22. messages := []protocol.Message{}
  23. // Split requests by group since they'll need to go to different coordinators.
  24. for _, group := range r.Groups {
  25. messages = append(
  26. messages,
  27. &Request{
  28. Groups: []string{group},
  29. IncludeAuthorizedOperations: r.IncludeAuthorizedOperations,
  30. },
  31. )
  32. }
  33. return messages, new(Response), nil
  34. }
  35. type Response struct {
  36. ThrottleTimeMs int32 `kafka:"min=v1,max=v4"`
  37. Groups []ResponseGroup `kafka:"min=v0,max=v4"`
  38. }
  39. type ResponseGroup struct {
  40. ErrorCode int16 `kafka:"min=v0,max=v4"`
  41. GroupID string `kafka:"min=v0,max=v4"`
  42. GroupState string `kafka:"min=v0,max=v4"`
  43. ProtocolType string `kafka:"min=v0,max=v4"`
  44. ProtocolData string `kafka:"min=v0,max=v4"`
  45. Members []ResponseGroupMember `kafka:"min=v0,max=v4"`
  46. AuthorizedOperations int32 `kafka:"min=v3,max=v4"`
  47. }
  48. type ResponseGroupMember struct {
  49. MemberID string `kafka:"min=v0,max=v4"`
  50. GroupInstanceID string `kafka:"min=v4,max=v4,nullable"`
  51. ClientID string `kafka:"min=v0,max=v4"`
  52. ClientHost string `kafka:"min=v0,max=v4"`
  53. MemberMetadata []byte `kafka:"min=v0,max=v4"`
  54. MemberAssignment []byte `kafka:"min=v0,max=v4"`
  55. }
  56. func (r *Response) ApiKey() protocol.ApiKey { return protocol.DescribeGroups }
  57. func (r *Response) Merge(requests []protocol.Message, results []interface{}) (
  58. protocol.Message,
  59. error,
  60. ) {
  61. response := &Response{}
  62. for _, result := range results {
  63. subResp := result.(*Response)
  64. for _, group := range subResp.Groups {
  65. response.Groups = append(response.Groups, group)
  66. }
  67. }
  68. return response, nil
  69. }