123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- package kafka
- import (
- "bufio"
- "bytes"
- )
- type groupAssignment struct {
- Version int16
- Topics map[string][]int32
- UserData []byte
- }
- func (t groupAssignment) size() int32 {
- sz := sizeofInt16(t.Version) + sizeofInt16(int16(len(t.Topics)))
- for topic, partitions := range t.Topics {
- sz += sizeofString(topic) + sizeofInt32Array(partitions)
- }
- return sz + sizeofBytes(t.UserData)
- }
- func (t groupAssignment) writeTo(wb *writeBuffer) {
- wb.writeInt16(t.Version)
- wb.writeInt32(int32(len(t.Topics)))
- for topic, partitions := range t.Topics {
- wb.writeString(topic)
- wb.writeInt32Array(partitions)
- }
- wb.writeBytes(t.UserData)
- }
- func (t *groupAssignment) readFrom(r *bufio.Reader, size int) (remain int, err error) {
-
-
-
- if size == 0 {
- t.Topics = map[string][]int32{}
- return 0, nil
- }
- if remain, err = readInt16(r, size, &t.Version); err != nil {
- return
- }
- if remain, err = readMapStringInt32(r, remain, &t.Topics); err != nil {
- return
- }
- if remain, err = readBytes(r, remain, &t.UserData); err != nil {
- return
- }
- return
- }
- func (t groupAssignment) bytes() []byte {
- buf := bytes.NewBuffer(nil)
- t.writeTo(&writeBuffer{w: buf})
- return buf.Bytes()
- }
- type syncGroupRequestGroupAssignmentV0 struct {
-
- MemberID string
-
-
-
- MemberAssignments []byte
- }
- func (t syncGroupRequestGroupAssignmentV0) size() int32 {
- return sizeofString(t.MemberID) +
- sizeofBytes(t.MemberAssignments)
- }
- func (t syncGroupRequestGroupAssignmentV0) writeTo(wb *writeBuffer) {
- wb.writeString(t.MemberID)
- wb.writeBytes(t.MemberAssignments)
- }
- type syncGroupRequestV0 struct {
-
- GroupID string
-
- GenerationID int32
-
- MemberID string
- GroupAssignments []syncGroupRequestGroupAssignmentV0
- }
- func (t syncGroupRequestV0) size() int32 {
- return sizeofString(t.GroupID) +
- sizeofInt32(t.GenerationID) +
- sizeofString(t.MemberID) +
- sizeofArray(len(t.GroupAssignments), func(i int) int32 { return t.GroupAssignments[i].size() })
- }
- func (t syncGroupRequestV0) writeTo(wb *writeBuffer) {
- wb.writeString(t.GroupID)
- wb.writeInt32(t.GenerationID)
- wb.writeString(t.MemberID)
- wb.writeArray(len(t.GroupAssignments), func(i int) { t.GroupAssignments[i].writeTo(wb) })
- }
- type syncGroupResponseV0 struct {
-
- ErrorCode int16
-
-
-
- MemberAssignments []byte
- }
- func (t syncGroupResponseV0) size() int32 {
- return sizeofInt16(t.ErrorCode) +
- sizeofBytes(t.MemberAssignments)
- }
- func (t syncGroupResponseV0) writeTo(wb *writeBuffer) {
- wb.writeInt16(t.ErrorCode)
- wb.writeBytes(t.MemberAssignments)
- }
- func (t *syncGroupResponseV0) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
- if remain, err = readInt16(r, sz, &t.ErrorCode); err != nil {
- return
- }
- if remain, err = readBytes(r, remain, &t.MemberAssignments); err != nil {
- return
- }
- return
- }
|