findcoordinator.go 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. package kafka
  2. import (
  3. "bufio"
  4. )
  5. // FindCoordinatorRequestV0 requests the coordinator for the specified group or transaction
  6. //
  7. // See http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator
  8. type findCoordinatorRequestV0 struct {
  9. // CoordinatorKey holds id to use for finding the coordinator (for groups, this is
  10. // the groupId, for transactional producers, this is the transactional id)
  11. CoordinatorKey string
  12. }
  13. func (t findCoordinatorRequestV0) size() int32 {
  14. return sizeofString(t.CoordinatorKey)
  15. }
  16. func (t findCoordinatorRequestV0) writeTo(wb *writeBuffer) {
  17. wb.writeString(t.CoordinatorKey)
  18. }
  19. type findCoordinatorResponseCoordinatorV0 struct {
  20. // NodeID holds the broker id.
  21. NodeID int32
  22. // Host of the broker
  23. Host string
  24. // Port on which broker accepts requests
  25. Port int32
  26. }
  27. func (t findCoordinatorResponseCoordinatorV0) size() int32 {
  28. return sizeofInt32(t.NodeID) +
  29. sizeofString(t.Host) +
  30. sizeofInt32(t.Port)
  31. }
  32. func (t findCoordinatorResponseCoordinatorV0) writeTo(wb *writeBuffer) {
  33. wb.writeInt32(t.NodeID)
  34. wb.writeString(t.Host)
  35. wb.writeInt32(t.Port)
  36. }
  37. func (t *findCoordinatorResponseCoordinatorV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  38. if remain, err = readInt32(r, size, &t.NodeID); err != nil {
  39. return
  40. }
  41. if remain, err = readString(r, remain, &t.Host); err != nil {
  42. return
  43. }
  44. if remain, err = readInt32(r, remain, &t.Port); err != nil {
  45. return
  46. }
  47. return
  48. }
  49. type findCoordinatorResponseV0 struct {
  50. // ErrorCode holds response error code
  51. ErrorCode int16
  52. // Coordinator holds host and port information for the coordinator
  53. Coordinator findCoordinatorResponseCoordinatorV0
  54. }
  55. func (t findCoordinatorResponseV0) size() int32 {
  56. return sizeofInt16(t.ErrorCode) +
  57. t.Coordinator.size()
  58. }
  59. func (t findCoordinatorResponseV0) writeTo(wb *writeBuffer) {
  60. wb.writeInt16(t.ErrorCode)
  61. t.Coordinator.writeTo(wb)
  62. }
  63. func (t *findCoordinatorResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  64. if remain, err = readInt16(r, size, &t.ErrorCode); err != nil {
  65. return
  66. }
  67. if remain, err = (&t.Coordinator).readFrom(r, remain); err != nil {
  68. return
  69. }
  70. return
  71. }