offsetcommit.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package kafka
  2. import "bufio"
  3. type offsetCommitRequestV2Partition struct {
  4. // Partition ID
  5. Partition int32
  6. // Offset to be committed
  7. Offset int64
  8. // Metadata holds any associated metadata the client wants to keep
  9. Metadata string
  10. }
  11. func (t offsetCommitRequestV2Partition) size() int32 {
  12. return sizeofInt32(t.Partition) +
  13. sizeofInt64(t.Offset) +
  14. sizeofString(t.Metadata)
  15. }
  16. func (t offsetCommitRequestV2Partition) writeTo(wb *writeBuffer) {
  17. wb.writeInt32(t.Partition)
  18. wb.writeInt64(t.Offset)
  19. wb.writeString(t.Metadata)
  20. }
  21. type offsetCommitRequestV2Topic struct {
  22. // Topic name
  23. Topic string
  24. // Partitions to commit offsets
  25. Partitions []offsetCommitRequestV2Partition
  26. }
  27. func (t offsetCommitRequestV2Topic) size() int32 {
  28. return sizeofString(t.Topic) +
  29. sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
  30. }
  31. func (t offsetCommitRequestV2Topic) writeTo(wb *writeBuffer) {
  32. wb.writeString(t.Topic)
  33. wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
  34. }
  35. type offsetCommitRequestV2 struct {
  36. // GroupID holds the unique group identifier
  37. GroupID string
  38. // GenerationID holds the generation of the group.
  39. GenerationID int32
  40. // MemberID assigned by the group coordinator
  41. MemberID string
  42. // RetentionTime holds the time period in ms to retain the offset.
  43. RetentionTime int64
  44. // Topics to commit offsets
  45. Topics []offsetCommitRequestV2Topic
  46. }
  47. func (t offsetCommitRequestV2) size() int32 {
  48. return sizeofString(t.GroupID) +
  49. sizeofInt32(t.GenerationID) +
  50. sizeofString(t.MemberID) +
  51. sizeofInt64(t.RetentionTime) +
  52. sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() })
  53. }
  54. func (t offsetCommitRequestV2) writeTo(wb *writeBuffer) {
  55. wb.writeString(t.GroupID)
  56. wb.writeInt32(t.GenerationID)
  57. wb.writeString(t.MemberID)
  58. wb.writeInt64(t.RetentionTime)
  59. wb.writeArray(len(t.Topics), func(i int) { t.Topics[i].writeTo(wb) })
  60. }
  61. type offsetCommitResponseV2PartitionResponse struct {
  62. Partition int32
  63. // ErrorCode holds response error code
  64. ErrorCode int16
  65. }
  66. func (t offsetCommitResponseV2PartitionResponse) size() int32 {
  67. return sizeofInt32(t.Partition) +
  68. sizeofInt16(t.ErrorCode)
  69. }
  70. func (t offsetCommitResponseV2PartitionResponse) writeTo(wb *writeBuffer) {
  71. wb.writeInt32(t.Partition)
  72. wb.writeInt16(t.ErrorCode)
  73. }
  74. func (t *offsetCommitResponseV2PartitionResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  75. if remain, err = readInt32(r, size, &t.Partition); err != nil {
  76. return
  77. }
  78. if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil {
  79. return
  80. }
  81. return
  82. }
  83. type offsetCommitResponseV2Response struct {
  84. Topic string
  85. PartitionResponses []offsetCommitResponseV2PartitionResponse
  86. }
  87. func (t offsetCommitResponseV2Response) size() int32 {
  88. return sizeofString(t.Topic) +
  89. sizeofArray(len(t.PartitionResponses), func(i int) int32 { return t.PartitionResponses[i].size() })
  90. }
  91. func (t offsetCommitResponseV2Response) writeTo(wb *writeBuffer) {
  92. wb.writeString(t.Topic)
  93. wb.writeArray(len(t.PartitionResponses), func(i int) { t.PartitionResponses[i].writeTo(wb) })
  94. }
  95. func (t *offsetCommitResponseV2Response) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  96. if remain, err = readString(r, size, &t.Topic); err != nil {
  97. return
  98. }
  99. fn := func(r *bufio.Reader, withSize int) (fnRemain int, fnErr error) {
  100. item := offsetCommitResponseV2PartitionResponse{}
  101. if fnRemain, fnErr = (&item).readFrom(r, withSize); fnErr != nil {
  102. return
  103. }
  104. t.PartitionResponses = append(t.PartitionResponses, item)
  105. return
  106. }
  107. if remain, err = readArrayWith(r, remain, fn); err != nil {
  108. return
  109. }
  110. return
  111. }
  112. type offsetCommitResponseV2 struct {
  113. Responses []offsetCommitResponseV2Response
  114. }
  115. func (t offsetCommitResponseV2) size() int32 {
  116. return sizeofArray(len(t.Responses), func(i int) int32 { return t.Responses[i].size() })
  117. }
  118. func (t offsetCommitResponseV2) writeTo(wb *writeBuffer) {
  119. wb.writeArray(len(t.Responses), func(i int) { t.Responses[i].writeTo(wb) })
  120. }
  121. func (t *offsetCommitResponseV2) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  122. fn := func(r *bufio.Reader, withSize int) (fnRemain int, fnErr error) {
  123. item := offsetCommitResponseV2Response{}
  124. if fnRemain, fnErr = (&item).readFrom(r, withSize); fnErr != nil {
  125. return
  126. }
  127. t.Responses = append(t.Responses, item)
  128. return
  129. }
  130. if remain, err = readArrayWith(r, size, fn); err != nil {
  131. return
  132. }
  133. return
  134. }