protocol.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. package protocol
  2. import (
  3. "fmt"
  4. "io"
  5. "net"
  6. "reflect"
  7. "strconv"
  8. "strings"
  9. )
  10. // Message is an interface implemented by all request and response types of the
  11. // kafka protocol.
  12. //
  13. // This interface is used mostly as a safe-guard to provide a compile-time check
  14. // for values passed to functions dealing kafka message types.
  15. type Message interface {
  16. ApiKey() ApiKey
  17. }
  18. type ApiKey int16
  19. func (k ApiKey) String() string {
  20. if i := int(k); i >= 0 && i < len(apiNames) {
  21. return apiNames[i]
  22. }
  23. return strconv.Itoa(int(k))
  24. }
  25. func (k ApiKey) MinVersion() int16 { return k.apiType().minVersion() }
  26. func (k ApiKey) MaxVersion() int16 { return k.apiType().maxVersion() }
  27. func (k ApiKey) SelectVersion(minVersion, maxVersion int16) int16 {
  28. min := k.MinVersion()
  29. max := k.MaxVersion()
  30. switch {
  31. case min > maxVersion:
  32. return min
  33. case max < maxVersion:
  34. return max
  35. default:
  36. return maxVersion
  37. }
  38. }
  39. func (k ApiKey) apiType() apiType {
  40. if i := int(k); i >= 0 && i < len(apiTypes) {
  41. return apiTypes[i]
  42. }
  43. return apiType{}
  44. }
  45. const (
  46. Produce ApiKey = 0
  47. Fetch ApiKey = 1
  48. ListOffsets ApiKey = 2
  49. Metadata ApiKey = 3
  50. LeaderAndIsr ApiKey = 4
  51. StopReplica ApiKey = 5
  52. UpdateMetadata ApiKey = 6
  53. ControlledShutdown ApiKey = 7
  54. OffsetCommit ApiKey = 8
  55. OffsetFetch ApiKey = 9
  56. FindCoordinator ApiKey = 10
  57. JoinGroup ApiKey = 11
  58. Heartbeat ApiKey = 12
  59. LeaveGroup ApiKey = 13
  60. SyncGroup ApiKey = 14
  61. DescribeGroups ApiKey = 15
  62. ListGroups ApiKey = 16
  63. SaslHandshake ApiKey = 17
  64. ApiVersions ApiKey = 18
  65. CreateTopics ApiKey = 19
  66. DeleteTopics ApiKey = 20
  67. DeleteRecords ApiKey = 21
  68. InitProducerId ApiKey = 22
  69. OffsetForLeaderEpoch ApiKey = 23
  70. AddPartitionsToTxn ApiKey = 24
  71. AddOffsetsToTxn ApiKey = 25
  72. EndTxn ApiKey = 26
  73. WriteTxnMarkers ApiKey = 27
  74. TxnOffsetCommit ApiKey = 28
  75. DescribeAcls ApiKey = 29
  76. CreateAcls ApiKey = 30
  77. DeleteAcls ApiKey = 31
  78. DescribeConfigs ApiKey = 32
  79. AlterConfigs ApiKey = 33
  80. AlterReplicaLogDirs ApiKey = 34
  81. DescribeLogDirs ApiKey = 35
  82. SaslAuthenticate ApiKey = 36
  83. CreatePartitions ApiKey = 37
  84. CreateDelegationToken ApiKey = 38
  85. RenewDelegationToken ApiKey = 39
  86. ExpireDelegationToken ApiKey = 40
  87. DescribeDelegationToken ApiKey = 41
  88. DeleteGroups ApiKey = 42
  89. ElectLeaders ApiKey = 43
  90. IncrementalAlterConfigs ApiKey = 44
  91. AlterPartitionReassignments ApiKey = 45
  92. ListPartitionReassignments ApiKey = 46
  93. OffsetDelete ApiKey = 47
  94. DescribeClientQuotas ApiKey = 48
  95. AlterClientQuotas ApiKey = 49
  96. numApis = 50
  97. )
  98. var apiNames = [numApis]string{
  99. Produce: "Produce",
  100. Fetch: "Fetch",
  101. ListOffsets: "ListOffsets",
  102. Metadata: "Metadata",
  103. LeaderAndIsr: "LeaderAndIsr",
  104. StopReplica: "StopReplica",
  105. UpdateMetadata: "UpdateMetadata",
  106. ControlledShutdown: "ControlledShutdown",
  107. OffsetCommit: "OffsetCommit",
  108. OffsetFetch: "OffsetFetch",
  109. FindCoordinator: "FindCoordinator",
  110. JoinGroup: "JoinGroup",
  111. Heartbeat: "Heartbeat",
  112. LeaveGroup: "LeaveGroup",
  113. SyncGroup: "SyncGroup",
  114. DescribeGroups: "DescribeGroups",
  115. ListGroups: "ListGroups",
  116. SaslHandshake: "SaslHandshake",
  117. ApiVersions: "ApiVersions",
  118. CreateTopics: "CreateTopics",
  119. DeleteTopics: "DeleteTopics",
  120. DeleteRecords: "DeleteRecords",
  121. InitProducerId: "InitProducerId",
  122. OffsetForLeaderEpoch: "OffsetForLeaderEpoch",
  123. AddPartitionsToTxn: "AddPartitionsToTxn",
  124. AddOffsetsToTxn: "AddOffsetsToTxn",
  125. EndTxn: "EndTxn",
  126. WriteTxnMarkers: "WriteTxnMarkers",
  127. TxnOffsetCommit: "TxnOffsetCommit",
  128. DescribeAcls: "DescribeAcls",
  129. CreateAcls: "CreateAcls",
  130. DeleteAcls: "DeleteAcls",
  131. DescribeConfigs: "DescribeConfigs",
  132. AlterConfigs: "AlterConfigs",
  133. AlterReplicaLogDirs: "AlterReplicaLogDirs",
  134. DescribeLogDirs: "DescribeLogDirs",
  135. SaslAuthenticate: "SaslAuthenticate",
  136. CreatePartitions: "CreatePartitions",
  137. CreateDelegationToken: "CreateDelegationToken",
  138. RenewDelegationToken: "RenewDelegationToken",
  139. ExpireDelegationToken: "ExpireDelegationToken",
  140. DescribeDelegationToken: "DescribeDelegationToken",
  141. DeleteGroups: "DeleteGroups",
  142. ElectLeaders: "ElectLeaders",
  143. IncrementalAlterConfigs: "IncrementalAlterConfigs",
  144. AlterPartitionReassignments: "AlterPartitionReassignments",
  145. ListPartitionReassignments: "ListPartitionReassignments",
  146. OffsetDelete: "OffsetDelete",
  147. DescribeClientQuotas: "DescribeClientQuotas",
  148. AlterClientQuotas: "AlterClientQuotas",
  149. }
  150. type messageType struct {
  151. version int16
  152. flexible bool
  153. gotype reflect.Type
  154. decode decodeFunc
  155. encode encodeFunc
  156. }
  157. func (t *messageType) new() Message {
  158. return reflect.New(t.gotype).Interface().(Message)
  159. }
  160. type apiType struct {
  161. requests []messageType
  162. responses []messageType
  163. }
  164. func (t apiType) minVersion() int16 {
  165. if len(t.requests) == 0 {
  166. return 0
  167. }
  168. return t.requests[0].version
  169. }
  170. func (t apiType) maxVersion() int16 {
  171. if len(t.requests) == 0 {
  172. return 0
  173. }
  174. return t.requests[len(t.requests)-1].version
  175. }
  176. var apiTypes [numApis]apiType
  177. // Register is automatically called by sub-packages are imported to install a
  178. // new pair of request/response message types.
  179. func Register(req, res Message) {
  180. k1 := req.ApiKey()
  181. k2 := res.ApiKey()
  182. if k1 != k2 {
  183. panic(fmt.Sprintf("[%T/%T]: request and response API keys mismatch: %d != %d", req, res, k1, k2))
  184. }
  185. apiTypes[k1] = apiType{
  186. requests: typesOf(req),
  187. responses: typesOf(res),
  188. }
  189. }
  190. func typesOf(v interface{}) []messageType {
  191. return makeTypes(reflect.TypeOf(v).Elem())
  192. }
  193. func makeTypes(t reflect.Type) []messageType {
  194. minVersion := int16(-1)
  195. maxVersion := int16(-1)
  196. // All future versions will be flexible (according to spec), so don't need to
  197. // worry about maxes here.
  198. minFlexibleVersion := int16(-1)
  199. forEachStructField(t, func(_ reflect.Type, _ index, tag string) {
  200. forEachStructTag(tag, func(tag structTag) bool {
  201. if minVersion < 0 || tag.MinVersion < minVersion {
  202. minVersion = tag.MinVersion
  203. }
  204. if maxVersion < 0 || tag.MaxVersion > maxVersion {
  205. maxVersion = tag.MaxVersion
  206. }
  207. if tag.TagID > -2 && (minFlexibleVersion < 0 || tag.MinVersion < minFlexibleVersion) {
  208. minFlexibleVersion = tag.MinVersion
  209. }
  210. return true
  211. })
  212. })
  213. types := make([]messageType, 0, (maxVersion-minVersion)+1)
  214. for v := minVersion; v <= maxVersion; v++ {
  215. flexible := minFlexibleVersion >= 0 && v >= minFlexibleVersion
  216. types = append(types, messageType{
  217. version: v,
  218. gotype: t,
  219. flexible: flexible,
  220. decode: decodeFuncOf(t, v, flexible, structTag{}),
  221. encode: encodeFuncOf(t, v, flexible, structTag{}),
  222. })
  223. }
  224. return types
  225. }
  226. type structTag struct {
  227. MinVersion int16
  228. MaxVersion int16
  229. Compact bool
  230. Nullable bool
  231. TagID int
  232. }
  233. func forEachStructTag(tag string, do func(structTag) bool) {
  234. if tag == "-" {
  235. return // special case to ignore the field
  236. }
  237. forEach(tag, '|', func(s string) bool {
  238. tag := structTag{
  239. MinVersion: -1,
  240. MaxVersion: -1,
  241. // Legitimate tag IDs can start at 0. We use -1 as a placeholder to indicate
  242. // that the message type is flexible, so that leaves -2 as the default for
  243. // indicating that there is no tag ID and the message is not flexible.
  244. TagID: -2,
  245. }
  246. var err error
  247. forEach(s, ',', func(s string) bool {
  248. switch {
  249. case strings.HasPrefix(s, "min="):
  250. tag.MinVersion, err = parseVersion(s[4:])
  251. case strings.HasPrefix(s, "max="):
  252. tag.MaxVersion, err = parseVersion(s[4:])
  253. case s == "tag":
  254. tag.TagID = -1
  255. case strings.HasPrefix(s, "tag="):
  256. tag.TagID, err = strconv.Atoi(s[4:])
  257. case s == "compact":
  258. tag.Compact = true
  259. case s == "nullable":
  260. tag.Nullable = true
  261. default:
  262. err = fmt.Errorf("unrecognized option: %q", s)
  263. }
  264. return err == nil
  265. })
  266. if err != nil {
  267. panic(fmt.Errorf("malformed struct tag: %w", err))
  268. }
  269. if tag.MinVersion < 0 && tag.MaxVersion >= 0 {
  270. panic(fmt.Errorf("missing minimum version in struct tag: %q", s))
  271. }
  272. if tag.MaxVersion < 0 && tag.MinVersion >= 0 {
  273. panic(fmt.Errorf("missing maximum version in struct tag: %q", s))
  274. }
  275. if tag.MinVersion > tag.MaxVersion {
  276. panic(fmt.Errorf("invalid version range in struct tag: %q", s))
  277. }
  278. return do(tag)
  279. })
  280. }
  281. func forEach(s string, sep byte, do func(string) bool) bool {
  282. for len(s) != 0 {
  283. p := ""
  284. i := strings.IndexByte(s, sep)
  285. if i < 0 {
  286. p, s = s, ""
  287. } else {
  288. p, s = s[:i], s[i+1:]
  289. }
  290. if !do(p) {
  291. return false
  292. }
  293. }
  294. return true
  295. }
  296. func forEachStructField(t reflect.Type, do func(reflect.Type, index, string)) {
  297. for i, n := 0, t.NumField(); i < n; i++ {
  298. f := t.Field(i)
  299. if f.PkgPath != "" && f.Name != "_" {
  300. continue
  301. }
  302. kafkaTag, ok := f.Tag.Lookup("kafka")
  303. if !ok {
  304. kafkaTag = "|"
  305. }
  306. do(f.Type, indexOf(f), kafkaTag)
  307. }
  308. }
  309. func parseVersion(s string) (int16, error) {
  310. if !strings.HasPrefix(s, "v") {
  311. return 0, fmt.Errorf("invalid version number: %q", s)
  312. }
  313. i, err := strconv.ParseInt(s[1:], 10, 16)
  314. if err != nil {
  315. return 0, fmt.Errorf("invalid version number: %q: %w", s, err)
  316. }
  317. if i < 0 {
  318. return 0, fmt.Errorf("invalid negative version number: %q", s)
  319. }
  320. return int16(i), nil
  321. }
  322. func dontExpectEOF(err error) error {
  323. switch err {
  324. case nil:
  325. return nil
  326. case io.EOF:
  327. return io.ErrUnexpectedEOF
  328. default:
  329. return err
  330. }
  331. }
  332. type Broker struct {
  333. Rack string
  334. Host string
  335. Port int32
  336. ID int32
  337. }
  338. func (b Broker) String() string {
  339. return net.JoinHostPort(b.Host, itoa(b.Port))
  340. }
  341. func (b Broker) Format(w fmt.State, v rune) {
  342. switch v {
  343. case 'd':
  344. io.WriteString(w, itoa(b.ID))
  345. case 's':
  346. io.WriteString(w, b.String())
  347. case 'v':
  348. io.WriteString(w, itoa(b.ID))
  349. io.WriteString(w, " ")
  350. io.WriteString(w, b.String())
  351. if b.Rack != "" {
  352. io.WriteString(w, " ")
  353. io.WriteString(w, b.Rack)
  354. }
  355. }
  356. }
  357. func itoa(i int32) string {
  358. return strconv.Itoa(int(i))
  359. }
  360. type Topic struct {
  361. Name string
  362. Error int16
  363. Partitions map[int32]Partition
  364. }
  365. type Partition struct {
  366. ID int32
  367. Error int16
  368. Leader int32
  369. Replicas []int32
  370. ISR []int32
  371. Offline []int32
  372. }
  373. // BrokerMessage is an extension of the Message interface implemented by some
  374. // request types to customize the broker assignment logic.
  375. type BrokerMessage interface {
  376. // Given a representation of the kafka cluster state as argument, returns
  377. // the broker that the message should be routed to.
  378. Broker(Cluster) (Broker, error)
  379. }
  380. // GroupMessage is an extension of the Message interface implemented by some
  381. // request types to inform the program that they should be routed to a group
  382. // coordinator.
  383. type GroupMessage interface {
  384. // Returns the group configured on the message.
  385. Group() string
  386. }
  387. // PreparedMessage is an extension of the Message interface implemented by some
  388. // request types which may need to run some pre-processing on their state before
  389. // being sent.
  390. type PreparedMessage interface {
  391. // Prepares the message before being sent to a kafka broker using the API
  392. // version passed as argument.
  393. Prepare(apiVersion int16)
  394. }
  395. // Splitter is an interface implemented by messages that can be split into
  396. // multiple requests and have their results merged back by a Merger.
  397. type Splitter interface {
  398. // For a given cluster layout, returns the list of messages constructed
  399. // from the receiver for each requests that should be sent to the cluster.
  400. // The second return value is a Merger which can be used to merge back the
  401. // results of each request into a single message (or an error).
  402. Split(Cluster) ([]Message, Merger, error)
  403. }
  404. // Merger is an interface implemented by messages which can merge multiple
  405. // results into one response.
  406. type Merger interface {
  407. // Given a list of message and associated results, merge them back into a
  408. // response (or an error). The results must be either Message or error
  409. // values, other types should trigger a panic.
  410. Merge(messages []Message, results []interface{}) (Message, error)
  411. }
  412. // Result converts r to a Message or and error, or panics if r could be be
  413. // converted to these types.
  414. func Result(r interface{}) (Message, error) {
  415. switch v := r.(type) {
  416. case Message:
  417. return v, nil
  418. case error:
  419. return nil, v
  420. default:
  421. panic(fmt.Errorf("BUG: result must be a message or an error but not %T", v))
  422. }
  423. }