123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- package kafka
- import (
- "encoding/binary"
- "fmt"
- "strconv"
- )
- type ApiVersion struct {
- ApiKey int16
- MinVersion int16
- MaxVersion int16
- }
- func (v ApiVersion) Format(w fmt.State, r rune) {
- switch r {
- case 's':
- fmt.Fprint(w, apiKey(v.ApiKey))
- case 'd':
- switch {
- case w.Flag('-'):
- fmt.Fprint(w, v.MinVersion)
- case w.Flag('+'):
- fmt.Fprint(w, v.MaxVersion)
- default:
- fmt.Fprint(w, v.ApiKey)
- }
- case 'v':
- switch {
- case w.Flag('-'):
- fmt.Fprintf(w, "v%d", v.MinVersion)
- case w.Flag('+'):
- fmt.Fprintf(w, "v%d", v.MaxVersion)
- case w.Flag('#'):
- fmt.Fprintf(w, "kafka.ApiVersion{ApiKey:%d MinVersion:%d MaxVersion:%d}", v.ApiKey, v.MinVersion, v.MaxVersion)
- default:
- fmt.Fprintf(w, "%s[v%d:v%d]", apiKey(v.ApiKey), v.MinVersion, v.MaxVersion)
- }
- }
- }
- type apiKey int16
- const (
- produce apiKey = 0
- fetch apiKey = 1
- listOffsets apiKey = 2
- metadata apiKey = 3
- leaderAndIsr apiKey = 4
- stopReplica apiKey = 5
- updateMetadata apiKey = 6
- controlledShutdown apiKey = 7
- offsetCommit apiKey = 8
- offsetFetch apiKey = 9
- findCoordinator apiKey = 10
- joinGroup apiKey = 11
- heartbeat apiKey = 12
- leaveGroup apiKey = 13
- syncGroup apiKey = 14
- describeGroups apiKey = 15
- listGroups apiKey = 16
- saslHandshake apiKey = 17
- apiVersions apiKey = 18
- createTopics apiKey = 19
- deleteTopics apiKey = 20
- deleteRecords apiKey = 21
- initProducerId apiKey = 22
- offsetForLeaderEpoch apiKey = 23
- addPartitionsToTxn apiKey = 24
- addOffsetsToTxn apiKey = 25
- endTxn apiKey = 26
- writeTxnMarkers apiKey = 27
- txnOffsetCommit apiKey = 28
- describeAcls apiKey = 29
- createAcls apiKey = 30
- deleteAcls apiKey = 31
- describeConfigs apiKey = 32
- alterConfigs apiKey = 33
- alterReplicaLogDirs apiKey = 34
- describeLogDirs apiKey = 35
- saslAuthenticate apiKey = 36
- createPartitions apiKey = 37
- createDelegationToken apiKey = 38
- renewDelegationToken apiKey = 39
- expireDelegationToken apiKey = 40
- describeDelegationToken apiKey = 41
- deleteGroups apiKey = 42
- electLeaders apiKey = 43
- incrementalAlterConfigs apiKey = 44
- alterPartitionReassignments apiKey = 45
- listPartitionReassignments apiKey = 46
- offsetDelete apiKey = 47
- )
- func (k apiKey) String() string {
- if i := int(k); i >= 0 && i < len(apiKeyStrings) {
- return apiKeyStrings[i]
- }
- return strconv.Itoa(int(k))
- }
- type apiVersion int16
- const (
- v0 = 0
- v1 = 1
- v2 = 2
- v3 = 3
- v4 = 4
- v5 = 5
- v6 = 6
- v7 = 7
- v8 = 8
- v9 = 9
- v10 = 10
- )
- var apiKeyStrings = [...]string{
- produce: "Produce",
- fetch: "Fetch",
- listOffsets: "ListOffsets",
- metadata: "Metadata",
- leaderAndIsr: "LeaderAndIsr",
- stopReplica: "StopReplica",
- updateMetadata: "UpdateMetadata",
- controlledShutdown: "ControlledShutdown",
- offsetCommit: "OffsetCommit",
- offsetFetch: "OffsetFetch",
- findCoordinator: "FindCoordinator",
- joinGroup: "JoinGroup",
- heartbeat: "Heartbeat",
- leaveGroup: "LeaveGroup",
- syncGroup: "SyncGroup",
- describeGroups: "DescribeGroups",
- listGroups: "ListGroups",
- saslHandshake: "SaslHandshake",
- apiVersions: "ApiVersions",
- createTopics: "CreateTopics",
- deleteTopics: "DeleteTopics",
- deleteRecords: "DeleteRecords",
- initProducerId: "InitProducerId",
- offsetForLeaderEpoch: "OffsetForLeaderEpoch",
- addPartitionsToTxn: "AddPartitionsToTxn",
- addOffsetsToTxn: "AddOffsetsToTxn",
- endTxn: "EndTxn",
- writeTxnMarkers: "WriteTxnMarkers",
- txnOffsetCommit: "TxnOffsetCommit",
- describeAcls: "DescribeAcls",
- createAcls: "CreateAcls",
- deleteAcls: "DeleteAcls",
- describeConfigs: "DescribeConfigs",
- alterConfigs: "AlterConfigs",
- alterReplicaLogDirs: "AlterReplicaLogDirs",
- describeLogDirs: "DescribeLogDirs",
- saslAuthenticate: "SaslAuthenticate",
- createPartitions: "CreatePartitions",
- createDelegationToken: "CreateDelegationToken",
- renewDelegationToken: "RenewDelegationToken",
- expireDelegationToken: "ExpireDelegationToken",
- describeDelegationToken: "DescribeDelegationToken",
- deleteGroups: "DeleteGroups",
- electLeaders: "ElectLeaders",
- incrementalAlterConfigs: "IncrementalAlfterConfigs",
- alterPartitionReassignments: "AlterPartitionReassignments",
- listPartitionReassignments: "ListPartitionReassignments",
- offsetDelete: "OffsetDelete",
- }
- type requestHeader struct {
- Size int32
- ApiKey int16
- ApiVersion int16
- CorrelationID int32
- ClientID string
- }
- func (h requestHeader) size() int32 {
- return 4 + 2 + 2 + 4 + sizeofString(h.ClientID)
- }
- func (h requestHeader) writeTo(wb *writeBuffer) {
- wb.writeInt32(h.Size)
- wb.writeInt16(h.ApiKey)
- wb.writeInt16(h.ApiVersion)
- wb.writeInt32(h.CorrelationID)
- wb.writeString(h.ClientID)
- }
- type request interface {
- size() int32
- writable
- }
- func makeInt8(b []byte) int8 {
- return int8(b[0])
- }
- func makeInt16(b []byte) int16 {
- return int16(binary.BigEndian.Uint16(b))
- }
- func makeInt32(b []byte) int32 {
- return int32(binary.BigEndian.Uint32(b))
- }
- func makeInt64(b []byte) int64 {
- return int64(binary.BigEndian.Uint64(b))
- }
- func expectZeroSize(sz int, err error) error {
- if err == nil && sz != 0 {
- err = fmt.Errorf("reading a response left %d unread bytes", sz)
- }
- return err
- }
|