joingroup.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. package kafka
  2. import (
  3. "bufio"
  4. "bytes"
  5. )
  6. type memberGroupMetadata struct {
  7. // MemberID assigned by the group coordinator or null if joining for the
  8. // first time.
  9. MemberID string
  10. Metadata groupMetadata
  11. }
  12. type groupMetadata struct {
  13. Version int16
  14. Topics []string
  15. UserData []byte
  16. }
  17. func (t groupMetadata) size() int32 {
  18. return sizeofInt16(t.Version) +
  19. sizeofStringArray(t.Topics) +
  20. sizeofBytes(t.UserData)
  21. }
  22. func (t groupMetadata) writeTo(wb *writeBuffer) {
  23. wb.writeInt16(t.Version)
  24. wb.writeStringArray(t.Topics)
  25. wb.writeBytes(t.UserData)
  26. }
  27. func (t groupMetadata) bytes() []byte {
  28. buf := bytes.NewBuffer(nil)
  29. t.writeTo(&writeBuffer{w: buf})
  30. return buf.Bytes()
  31. }
  32. func (t *groupMetadata) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  33. if remain, err = readInt16(r, size, &t.Version); err != nil {
  34. return
  35. }
  36. if remain, err = readStringArray(r, remain, &t.Topics); err != nil {
  37. return
  38. }
  39. if remain, err = readBytes(r, remain, &t.UserData); err != nil {
  40. return
  41. }
  42. return
  43. }
  44. type joinGroupRequestGroupProtocolV1 struct {
  45. ProtocolName string
  46. ProtocolMetadata []byte
  47. }
  48. func (t joinGroupRequestGroupProtocolV1) size() int32 {
  49. return sizeofString(t.ProtocolName) +
  50. sizeofBytes(t.ProtocolMetadata)
  51. }
  52. func (t joinGroupRequestGroupProtocolV1) writeTo(wb *writeBuffer) {
  53. wb.writeString(t.ProtocolName)
  54. wb.writeBytes(t.ProtocolMetadata)
  55. }
  56. type joinGroupRequestV1 struct {
  57. // GroupID holds the unique group identifier
  58. GroupID string
  59. // SessionTimeout holds the coordinator considers the consumer dead if it
  60. // receives no heartbeat after this timeout in ms.
  61. SessionTimeout int32
  62. // RebalanceTimeout holds the maximum time that the coordinator will wait
  63. // for each member to rejoin when rebalancing the group in ms
  64. RebalanceTimeout int32
  65. // MemberID assigned by the group coordinator or the zero string if joining
  66. // for the first time.
  67. MemberID string
  68. // ProtocolType holds the unique name for class of protocols implemented by group
  69. ProtocolType string
  70. // GroupProtocols holds the list of protocols that the member supports
  71. GroupProtocols []joinGroupRequestGroupProtocolV1
  72. }
  73. func (t joinGroupRequestV1) size() int32 {
  74. return sizeofString(t.GroupID) +
  75. sizeofInt32(t.SessionTimeout) +
  76. sizeofInt32(t.RebalanceTimeout) +
  77. sizeofString(t.MemberID) +
  78. sizeofString(t.ProtocolType) +
  79. sizeofArray(len(t.GroupProtocols), func(i int) int32 { return t.GroupProtocols[i].size() })
  80. }
  81. func (t joinGroupRequestV1) writeTo(wb *writeBuffer) {
  82. wb.writeString(t.GroupID)
  83. wb.writeInt32(t.SessionTimeout)
  84. wb.writeInt32(t.RebalanceTimeout)
  85. wb.writeString(t.MemberID)
  86. wb.writeString(t.ProtocolType)
  87. wb.writeArray(len(t.GroupProtocols), func(i int) { t.GroupProtocols[i].writeTo(wb) })
  88. }
  89. type joinGroupResponseMemberV1 struct {
  90. // MemberID assigned by the group coordinator
  91. MemberID string
  92. MemberMetadata []byte
  93. }
  94. func (t joinGroupResponseMemberV1) size() int32 {
  95. return sizeofString(t.MemberID) +
  96. sizeofBytes(t.MemberMetadata)
  97. }
  98. func (t joinGroupResponseMemberV1) writeTo(wb *writeBuffer) {
  99. wb.writeString(t.MemberID)
  100. wb.writeBytes(t.MemberMetadata)
  101. }
  102. func (t *joinGroupResponseMemberV1) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  103. if remain, err = readString(r, size, &t.MemberID); err != nil {
  104. return
  105. }
  106. if remain, err = readBytes(r, remain, &t.MemberMetadata); err != nil {
  107. return
  108. }
  109. return
  110. }
  111. type joinGroupResponseV1 struct {
  112. // ErrorCode holds response error code
  113. ErrorCode int16
  114. // GenerationID holds the generation of the group.
  115. GenerationID int32
  116. // GroupProtocol holds the group protocol selected by the coordinator
  117. GroupProtocol string
  118. // LeaderID holds the leader of the group
  119. LeaderID string
  120. // MemberID assigned by the group coordinator
  121. MemberID string
  122. Members []joinGroupResponseMemberV1
  123. }
  124. func (t joinGroupResponseV1) size() int32 {
  125. return sizeofInt16(t.ErrorCode) +
  126. sizeofInt32(t.GenerationID) +
  127. sizeofString(t.GroupProtocol) +
  128. sizeofString(t.LeaderID) +
  129. sizeofString(t.MemberID) +
  130. sizeofArray(len(t.MemberID), func(i int) int32 { return t.Members[i].size() })
  131. }
  132. func (t joinGroupResponseV1) writeTo(wb *writeBuffer) {
  133. wb.writeInt16(t.ErrorCode)
  134. wb.writeInt32(t.GenerationID)
  135. wb.writeString(t.GroupProtocol)
  136. wb.writeString(t.LeaderID)
  137. wb.writeString(t.MemberID)
  138. wb.writeArray(len(t.Members), func(i int) { t.Members[i].writeTo(wb) })
  139. }
  140. func (t *joinGroupResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  141. if remain, err = readInt16(r, size, &t.ErrorCode); err != nil {
  142. return
  143. }
  144. if remain, err = readInt32(r, remain, &t.GenerationID); err != nil {
  145. return
  146. }
  147. if remain, err = readString(r, remain, &t.GroupProtocol); err != nil {
  148. return
  149. }
  150. if remain, err = readString(r, remain, &t.LeaderID); err != nil {
  151. return
  152. }
  153. if remain, err = readString(r, remain, &t.MemberID); err != nil {
  154. return
  155. }
  156. fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
  157. var item joinGroupResponseMemberV1
  158. if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil {
  159. return
  160. }
  161. t.Members = append(t.Members, item)
  162. return
  163. }
  164. if remain, err = readArrayWith(r, remain, fn); err != nil {
  165. return
  166. }
  167. return
  168. }