syncgroup.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. package kafka
  2. import (
  3. "bufio"
  4. "bytes"
  5. )
  6. type groupAssignment struct {
  7. Version int16
  8. Topics map[string][]int32
  9. UserData []byte
  10. }
  11. func (t groupAssignment) size() int32 {
  12. sz := sizeofInt16(t.Version) + sizeofInt16(int16(len(t.Topics)))
  13. for topic, partitions := range t.Topics {
  14. sz += sizeofString(topic) + sizeofInt32Array(partitions)
  15. }
  16. return sz + sizeofBytes(t.UserData)
  17. }
  18. func (t groupAssignment) writeTo(wb *writeBuffer) {
  19. wb.writeInt16(t.Version)
  20. wb.writeInt32(int32(len(t.Topics)))
  21. for topic, partitions := range t.Topics {
  22. wb.writeString(topic)
  23. wb.writeInt32Array(partitions)
  24. }
  25. wb.writeBytes(t.UserData)
  26. }
  27. func (t *groupAssignment) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  28. // I came across this case when testing for compatibility with bsm/sarama-cluster. It
  29. // appears in some cases, sarama-cluster can send a nil array entry. Admittedly, I
  30. // didn't look too closely at it.
  31. if size == 0 {
  32. t.Topics = map[string][]int32{}
  33. return 0, nil
  34. }
  35. if remain, err = readInt16(r, size, &t.Version); err != nil {
  36. return
  37. }
  38. if remain, err = readMapStringInt32(r, remain, &t.Topics); err != nil {
  39. return
  40. }
  41. if remain, err = readBytes(r, remain, &t.UserData); err != nil {
  42. return
  43. }
  44. return
  45. }
  46. func (t groupAssignment) bytes() []byte {
  47. buf := bytes.NewBuffer(nil)
  48. t.writeTo(&writeBuffer{w: buf})
  49. return buf.Bytes()
  50. }
  51. type syncGroupRequestGroupAssignmentV0 struct {
  52. // MemberID assigned by the group coordinator
  53. MemberID string
  54. // MemberAssignments holds client encoded assignments
  55. //
  56. // See consumer groups section of https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
  57. MemberAssignments []byte
  58. }
  59. func (t syncGroupRequestGroupAssignmentV0) size() int32 {
  60. return sizeofString(t.MemberID) +
  61. sizeofBytes(t.MemberAssignments)
  62. }
  63. func (t syncGroupRequestGroupAssignmentV0) writeTo(wb *writeBuffer) {
  64. wb.writeString(t.MemberID)
  65. wb.writeBytes(t.MemberAssignments)
  66. }
  67. type syncGroupRequestV0 struct {
  68. // GroupID holds the unique group identifier
  69. GroupID string
  70. // GenerationID holds the generation of the group.
  71. GenerationID int32
  72. // MemberID assigned by the group coordinator
  73. MemberID string
  74. GroupAssignments []syncGroupRequestGroupAssignmentV0
  75. }
  76. func (t syncGroupRequestV0) size() int32 {
  77. return sizeofString(t.GroupID) +
  78. sizeofInt32(t.GenerationID) +
  79. sizeofString(t.MemberID) +
  80. sizeofArray(len(t.GroupAssignments), func(i int) int32 { return t.GroupAssignments[i].size() })
  81. }
  82. func (t syncGroupRequestV0) writeTo(wb *writeBuffer) {
  83. wb.writeString(t.GroupID)
  84. wb.writeInt32(t.GenerationID)
  85. wb.writeString(t.MemberID)
  86. wb.writeArray(len(t.GroupAssignments), func(i int) { t.GroupAssignments[i].writeTo(wb) })
  87. }
  88. type syncGroupResponseV0 struct {
  89. // ErrorCode holds response error code
  90. ErrorCode int16
  91. // MemberAssignments holds client encoded assignments
  92. //
  93. // See consumer groups section of https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
  94. MemberAssignments []byte
  95. }
  96. func (t syncGroupResponseV0) size() int32 {
  97. return sizeofInt16(t.ErrorCode) +
  98. sizeofBytes(t.MemberAssignments)
  99. }
  100. func (t syncGroupResponseV0) writeTo(wb *writeBuffer) {
  101. wb.writeInt16(t.ErrorCode)
  102. wb.writeBytes(t.MemberAssignments)
  103. }
  104. func (t *syncGroupResponseV0) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
  105. if remain, err = readInt16(r, sz, &t.ErrorCode); err != nil {
  106. return
  107. }
  108. if remain, err = readBytes(r, remain, &t.MemberAssignments); err != nil {
  109. return
  110. }
  111. return
  112. }