request.go 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package protocol
  2. import (
  3. "fmt"
  4. "io"
  5. )
  6. func ReadRequest(r io.Reader) (apiVersion int16, correlationID int32, clientID string, msg Message, err error) {
  7. d := &decoder{reader: r, remain: 4}
  8. size := d.readInt32()
  9. if err = d.err; err != nil {
  10. err = dontExpectEOF(err)
  11. return
  12. }
  13. d.remain = int(size)
  14. apiKey := ApiKey(d.readInt16())
  15. apiVersion = d.readInt16()
  16. correlationID = d.readInt32()
  17. clientID = d.readString()
  18. if i := int(apiKey); i < 0 || i >= len(apiTypes) {
  19. err = fmt.Errorf("unsupported api key: %d", i)
  20. return
  21. }
  22. if err = d.err; err != nil {
  23. err = dontExpectEOF(err)
  24. return
  25. }
  26. t := &apiTypes[apiKey]
  27. if t == nil {
  28. err = fmt.Errorf("unsupported api: %s", apiNames[apiKey])
  29. return
  30. }
  31. minVersion := t.minVersion()
  32. maxVersion := t.maxVersion()
  33. if apiVersion < minVersion || apiVersion > maxVersion {
  34. err = fmt.Errorf("unsupported %s version: v%d not in range v%d-v%d", apiKey, apiVersion, minVersion, maxVersion)
  35. return
  36. }
  37. req := &t.requests[apiVersion-minVersion]
  38. if req.flexible {
  39. // In the flexible case, there's a tag buffer at the end of the request header
  40. taggedCount := int(d.readUnsignedVarInt())
  41. for i := 0; i < taggedCount; i++ {
  42. d.readUnsignedVarInt() // tagID
  43. size := d.readUnsignedVarInt()
  44. // Just throw away the values for now
  45. d.read(int(size))
  46. }
  47. }
  48. msg = req.new()
  49. req.decode(d, valueOf(msg))
  50. d.discardAll()
  51. if err = d.err; err != nil {
  52. err = dontExpectEOF(err)
  53. }
  54. return
  55. }
  56. func WriteRequest(w io.Writer, apiVersion int16, correlationID int32, clientID string, msg Message) error {
  57. apiKey := msg.ApiKey()
  58. if i := int(apiKey); i < 0 || i >= len(apiTypes) {
  59. return fmt.Errorf("unsupported api key: %d", i)
  60. }
  61. t := &apiTypes[apiKey]
  62. if t == nil {
  63. return fmt.Errorf("unsupported api: %s", apiNames[apiKey])
  64. }
  65. minVersion := t.minVersion()
  66. maxVersion := t.maxVersion()
  67. if apiVersion < minVersion || apiVersion > maxVersion {
  68. return fmt.Errorf("unsupported %s version: v%d not in range v%d-v%d", apiKey, apiVersion, minVersion, maxVersion)
  69. }
  70. r := &t.requests[apiVersion-minVersion]
  71. v := valueOf(msg)
  72. b := newPageBuffer()
  73. defer b.unref()
  74. e := &encoder{writer: b}
  75. e.writeInt32(0) // placeholder for the request size
  76. e.writeInt16(int16(apiKey))
  77. e.writeInt16(apiVersion)
  78. e.writeInt32(correlationID)
  79. if r.flexible {
  80. // Flexible messages use a nullable string for the client ID, then extra space for a
  81. // tag buffer, which begins with a size value. Since we're not writing any fields into the
  82. // latter, we can just write zero for now.
  83. //
  84. // See
  85. // https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields
  86. // for details.
  87. e.writeNullString(clientID)
  88. e.writeUnsignedVarInt(0)
  89. } else {
  90. // Technically, recent versions of kafka interpret this field as a nullable
  91. // string, however kafka 0.10 expected a non-nullable string and fails with
  92. // a NullPointerException when it receives a null client id.
  93. e.writeString(clientID)
  94. }
  95. r.encode(e, v)
  96. err := e.err
  97. if err == nil {
  98. size := packUint32(uint32(b.Size()) - 4)
  99. b.WriteAt(size[:], 0)
  100. _, err = b.WriteTo(w)
  101. }
  102. return err
  103. }