describegroups.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. package kafka
  2. import (
  3. "bufio"
  4. "bytes"
  5. "context"
  6. "fmt"
  7. "net"
  8. "github.com/segmentio/kafka-go/protocol/describegroups"
  9. )
  10. // DescribeGroupsRequest is a request to the DescribeGroups API.
  11. type DescribeGroupsRequest struct {
  12. // Addr is the address of the kafka broker to send the request to.
  13. Addr net.Addr
  14. // GroupIDs is a slice of groups to get details for.
  15. GroupIDs []string
  16. }
  17. // DescribeGroupsResponse is a response from the DescribeGroups API.
  18. type DescribeGroupsResponse struct {
  19. // Groups is a slice of details for the requested groups.
  20. Groups []DescribeGroupsResponseGroup
  21. }
  22. // DescribeGroupsResponseGroup contains the response details for a single group.
  23. type DescribeGroupsResponseGroup struct {
  24. // Error is set to a non-nil value if there was an error fetching the details
  25. // for this group.
  26. Error error
  27. // GroupID is the ID of the group.
  28. GroupID string
  29. // GroupState is a description of the group state.
  30. GroupState string
  31. // Members contains details about each member of the group.
  32. Members []DescribeGroupsResponseMember
  33. }
  34. // MemberInfo represents the membership information for a single group member.
  35. type DescribeGroupsResponseMember struct {
  36. // MemberID is the ID of the group member.
  37. MemberID string
  38. // ClientID is the ID of the client that the group member is using.
  39. ClientID string
  40. // ClientHost is the host of the client that the group member is connecting from.
  41. ClientHost string
  42. // MemberMetadata contains metadata about this group member.
  43. MemberMetadata DescribeGroupsResponseMemberMetadata
  44. // MemberAssignments contains the topic partitions that this member is assigned to.
  45. MemberAssignments DescribeGroupsResponseAssignments
  46. }
  47. // GroupMemberMetadata stores metadata associated with a group member.
  48. type DescribeGroupsResponseMemberMetadata struct {
  49. // Version is the version of the metadata.
  50. Version int
  51. // Topics is the list of topics that the member is assigned to.
  52. Topics []string
  53. // UserData is the user data for the member.
  54. UserData []byte
  55. }
  56. // GroupMemberAssignmentsInfo stores the topic partition assignment data for a group member.
  57. type DescribeGroupsResponseAssignments struct {
  58. // Version is the version of the assignments data.
  59. Version int
  60. // Topics contains the details of the partition assignments for each topic.
  61. Topics []GroupMemberTopic
  62. // UserData is the user data for the member.
  63. UserData []byte
  64. }
  65. // GroupMemberTopic is a mapping from a topic to a list of partitions in the topic. It is used
  66. // to represent the topic partitions that have been assigned to a group member.
  67. type GroupMemberTopic struct {
  68. // Topic is the name of the topic.
  69. Topic string
  70. // Partitions is a slice of partition IDs that this member is assigned to in the topic.
  71. Partitions []int
  72. }
  73. func (c *Client) DescribeGroups(
  74. ctx context.Context,
  75. req *DescribeGroupsRequest,
  76. ) (*DescribeGroupsResponse, error) {
  77. protoResp, err := c.roundTrip(
  78. ctx,
  79. req.Addr,
  80. &describegroups.Request{
  81. Groups: req.GroupIDs,
  82. },
  83. )
  84. if err != nil {
  85. return nil, err
  86. }
  87. apiResp := protoResp.(*describegroups.Response)
  88. resp := &DescribeGroupsResponse{}
  89. for _, apiGroup := range apiResp.Groups {
  90. group := DescribeGroupsResponseGroup{
  91. Error: makeError(apiGroup.ErrorCode, ""),
  92. GroupID: apiGroup.GroupID,
  93. GroupState: apiGroup.GroupState,
  94. }
  95. for _, member := range apiGroup.Members {
  96. decodedMetadata, err := decodeMemberMetadata(member.MemberMetadata)
  97. if err != nil {
  98. return nil, err
  99. }
  100. decodedAssignments, err := decodeMemberAssignments(member.MemberAssignment)
  101. if err != nil {
  102. return nil, err
  103. }
  104. group.Members = append(group.Members, DescribeGroupsResponseMember{
  105. MemberID: member.MemberID,
  106. ClientID: member.ClientID,
  107. ClientHost: member.ClientHost,
  108. MemberAssignments: decodedAssignments,
  109. MemberMetadata: decodedMetadata,
  110. })
  111. }
  112. resp.Groups = append(resp.Groups, group)
  113. }
  114. return resp, nil
  115. }
  116. // See http://kafka.apache.org/protocol.html#The_Messages_DescribeGroups
  117. //
  118. // TODO: Remove everything below and use protocol-based version above everywhere.
  119. type describeGroupsRequestV0 struct {
  120. // List of groupIds to request metadata for (an empty groupId array
  121. // will return empty group metadata).
  122. GroupIDs []string
  123. }
  124. func (t describeGroupsRequestV0) size() int32 {
  125. return sizeofStringArray(t.GroupIDs)
  126. }
  127. func (t describeGroupsRequestV0) writeTo(wb *writeBuffer) {
  128. wb.writeStringArray(t.GroupIDs)
  129. }
  130. type describeGroupsResponseMemberV0 struct {
  131. // MemberID assigned by the group coordinator
  132. MemberID string
  133. // ClientID used in the member's latest join group request
  134. ClientID string
  135. // ClientHost used in the request session corresponding to the member's
  136. // join group.
  137. ClientHost string
  138. // MemberMetadata the metadata corresponding to the current group protocol
  139. // in use (will only be present if the group is stable).
  140. MemberMetadata []byte
  141. // MemberAssignments provided by the group leader (will only be present if
  142. // the group is stable).
  143. //
  144. // See consumer groups section of https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
  145. MemberAssignments []byte
  146. }
  147. func (t describeGroupsResponseMemberV0) size() int32 {
  148. return sizeofString(t.MemberID) +
  149. sizeofString(t.ClientID) +
  150. sizeofString(t.ClientHost) +
  151. sizeofBytes(t.MemberMetadata) +
  152. sizeofBytes(t.MemberAssignments)
  153. }
  154. func (t describeGroupsResponseMemberV0) writeTo(wb *writeBuffer) {
  155. wb.writeString(t.MemberID)
  156. wb.writeString(t.ClientID)
  157. wb.writeString(t.ClientHost)
  158. wb.writeBytes(t.MemberMetadata)
  159. wb.writeBytes(t.MemberAssignments)
  160. }
  161. func (t *describeGroupsResponseMemberV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  162. if remain, err = readString(r, size, &t.MemberID); err != nil {
  163. return
  164. }
  165. if remain, err = readString(r, remain, &t.ClientID); err != nil {
  166. return
  167. }
  168. if remain, err = readString(r, remain, &t.ClientHost); err != nil {
  169. return
  170. }
  171. if remain, err = readBytes(r, remain, &t.MemberMetadata); err != nil {
  172. return
  173. }
  174. if remain, err = readBytes(r, remain, &t.MemberAssignments); err != nil {
  175. return
  176. }
  177. return
  178. }
  179. type describeGroupsResponseGroupV0 struct {
  180. // ErrorCode holds response error code
  181. ErrorCode int16
  182. // GroupID holds the unique group identifier
  183. GroupID string
  184. // State holds current state of the group (one of: Dead, Stable, AwaitingSync,
  185. // PreparingRebalance, or empty if there is no active group)
  186. State string
  187. // ProtocolType holds the current group protocol type (will be empty if there is
  188. // no active group)
  189. ProtocolType string
  190. // Protocol holds the current group protocol (only provided if the group is Stable)
  191. Protocol string
  192. // Members contains the current group members (only provided if the group is not Dead)
  193. Members []describeGroupsResponseMemberV0
  194. }
  195. func (t describeGroupsResponseGroupV0) size() int32 {
  196. return sizeofInt16(t.ErrorCode) +
  197. sizeofString(t.GroupID) +
  198. sizeofString(t.State) +
  199. sizeofString(t.ProtocolType) +
  200. sizeofString(t.Protocol) +
  201. sizeofArray(len(t.Members), func(i int) int32 { return t.Members[i].size() })
  202. }
  203. func (t describeGroupsResponseGroupV0) writeTo(wb *writeBuffer) {
  204. wb.writeInt16(t.ErrorCode)
  205. wb.writeString(t.GroupID)
  206. wb.writeString(t.State)
  207. wb.writeString(t.ProtocolType)
  208. wb.writeString(t.Protocol)
  209. wb.writeArray(len(t.Members), func(i int) { t.Members[i].writeTo(wb) })
  210. }
  211. func (t *describeGroupsResponseGroupV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  212. if remain, err = readInt16(r, size, &t.ErrorCode); err != nil {
  213. return
  214. }
  215. if remain, err = readString(r, remain, &t.GroupID); err != nil {
  216. return
  217. }
  218. if remain, err = readString(r, remain, &t.State); err != nil {
  219. return
  220. }
  221. if remain, err = readString(r, remain, &t.ProtocolType); err != nil {
  222. return
  223. }
  224. if remain, err = readString(r, remain, &t.Protocol); err != nil {
  225. return
  226. }
  227. fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
  228. item := describeGroupsResponseMemberV0{}
  229. if fnRemain, fnErr = (&item).readFrom(r, size); err != nil {
  230. return
  231. }
  232. t.Members = append(t.Members, item)
  233. return
  234. }
  235. if remain, err = readArrayWith(r, remain, fn); err != nil {
  236. return
  237. }
  238. return
  239. }
  240. type describeGroupsResponseV0 struct {
  241. // Groups holds selected group information
  242. Groups []describeGroupsResponseGroupV0
  243. }
  244. func (t describeGroupsResponseV0) size() int32 {
  245. return sizeofArray(len(t.Groups), func(i int) int32 { return t.Groups[i].size() })
  246. }
  247. func (t describeGroupsResponseV0) writeTo(wb *writeBuffer) {
  248. wb.writeArray(len(t.Groups), func(i int) { t.Groups[i].writeTo(wb) })
  249. }
  250. func (t *describeGroupsResponseV0) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
  251. fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
  252. item := describeGroupsResponseGroupV0{}
  253. if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil {
  254. return
  255. }
  256. t.Groups = append(t.Groups, item)
  257. return
  258. }
  259. if remain, err = readArrayWith(r, sz, fn); err != nil {
  260. return
  261. }
  262. return
  263. }
  264. // decodeMemberMetadata converts raw metadata bytes to a
  265. // DescribeGroupsResponseMemberMetadata struct.
  266. //
  267. // See https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java#L49
  268. // for protocol details.
  269. func decodeMemberMetadata(rawMetadata []byte) (DescribeGroupsResponseMemberMetadata, error) {
  270. mm := DescribeGroupsResponseMemberMetadata{}
  271. if len(rawMetadata) == 0 {
  272. return mm, nil
  273. }
  274. buf := bytes.NewBuffer(rawMetadata)
  275. bufReader := bufio.NewReader(buf)
  276. remain := len(rawMetadata)
  277. var err error
  278. var version16 int16
  279. if remain, err = readInt16(bufReader, remain, &version16); err != nil {
  280. return mm, err
  281. }
  282. mm.Version = int(version16)
  283. if remain, err = readStringArray(bufReader, remain, &mm.Topics); err != nil {
  284. return mm, err
  285. }
  286. if remain, err = readBytes(bufReader, remain, &mm.UserData); err != nil {
  287. return mm, err
  288. }
  289. if remain != 0 {
  290. return mm, fmt.Errorf("Got non-zero number of bytes remaining: %d", remain)
  291. }
  292. return mm, nil
  293. }
  294. // decodeMemberAssignments converts raw assignment bytes to a DescribeGroupsResponseAssignments
  295. // struct.
  296. //
  297. // See https://github.com/apache/kafka/blob/2.4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerProtocol.java#L49
  298. // for protocol details.
  299. func decodeMemberAssignments(rawAssignments []byte) (DescribeGroupsResponseAssignments, error) {
  300. ma := DescribeGroupsResponseAssignments{}
  301. if len(rawAssignments) == 0 {
  302. return ma, nil
  303. }
  304. buf := bytes.NewBuffer(rawAssignments)
  305. bufReader := bufio.NewReader(buf)
  306. remain := len(rawAssignments)
  307. var err error
  308. var version16 int16
  309. if remain, err = readInt16(bufReader, remain, &version16); err != nil {
  310. return ma, err
  311. }
  312. ma.Version = int(version16)
  313. fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
  314. item := GroupMemberTopic{}
  315. if fnRemain, fnErr = readString(r, size, &item.Topic); fnErr != nil {
  316. return
  317. }
  318. partitions := []int32{}
  319. if fnRemain, fnErr = readInt32Array(r, fnRemain, &partitions); fnErr != nil {
  320. return
  321. }
  322. for _, partition := range partitions {
  323. item.Partitions = append(item.Partitions, int(partition))
  324. }
  325. ma.Topics = append(ma.Topics, item)
  326. return
  327. }
  328. if remain, err = readArrayWith(bufReader, remain, fn); err != nil {
  329. return ma, err
  330. }
  331. if remain, err = readBytes(bufReader, remain, &ma.UserData); err != nil {
  332. return ma, err
  333. }
  334. if remain != 0 {
  335. return ma, fmt.Errorf("Got non-zero number of bytes remaining: %d", remain)
  336. }
  337. return ma, nil
  338. }
  339. // readInt32Array reads an array of int32s. It's adapted from the implementation of
  340. // readStringArray.
  341. func readInt32Array(r *bufio.Reader, sz int, v *[]int32) (remain int, err error) {
  342. var content []int32
  343. fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
  344. var value int32
  345. if fnRemain, fnErr = readInt32(r, size, &value); fnErr != nil {
  346. return
  347. }
  348. content = append(content, value)
  349. return
  350. }
  351. if remain, err = readArrayWith(r, sz, fn); err != nil {
  352. return
  353. }
  354. *v = content
  355. return
  356. }