conn.go 44 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594
  1. package kafka
  2. import (
  3. "bufio"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "math"
  8. "net"
  9. "os"
  10. "path/filepath"
  11. "runtime"
  12. "sync"
  13. "sync/atomic"
  14. "time"
  15. )
  16. var (
  17. errInvalidWriteTopic = errors.New("writes must NOT set Topic on kafka.Message")
  18. errInvalidWritePartition = errors.New("writes must NOT set Partition on kafka.Message")
  19. )
  20. // Conn represents a connection to a kafka broker.
  21. //
  22. // Instances of Conn are safe to use concurrently from multiple goroutines.
  23. type Conn struct {
  24. // base network connection
  25. conn net.Conn
  26. // number of inflight requests on the connection.
  27. inflight int32
  28. // offset management (synchronized on the mutex field)
  29. mutex sync.Mutex
  30. offset int64
  31. // read buffer (synchronized on rlock)
  32. rlock sync.Mutex
  33. rbuf bufio.Reader
  34. // write buffer (synchronized on wlock)
  35. wlock sync.Mutex
  36. wbuf bufio.Writer
  37. wb writeBuffer
  38. // deadline management
  39. wdeadline connDeadline
  40. rdeadline connDeadline
  41. // immutable values of the connection object
  42. clientID string
  43. topic string
  44. partition int32
  45. fetchMaxBytes int32
  46. fetchMinSize int32
  47. // correlation ID generator (synchronized on wlock)
  48. correlationID int32
  49. // number of replica acks required when publishing to a partition
  50. requiredAcks int32
  51. // lazily loaded API versions used by this connection
  52. apiVersions atomic.Value // apiVersionMap
  53. transactionalID *string
  54. }
  55. type apiVersionMap map[apiKey]ApiVersion
  56. func (v apiVersionMap) negotiate(key apiKey, sortedSupportedVersions ...apiVersion) apiVersion {
  57. x := v[key]
  58. for i := len(sortedSupportedVersions) - 1; i >= 0; i-- {
  59. s := sortedSupportedVersions[i]
  60. if apiVersion(x.MaxVersion) >= s {
  61. return s
  62. }
  63. }
  64. return -1
  65. }
  66. // ConnConfig is a configuration object used to create new instances of Conn.
  67. type ConnConfig struct {
  68. ClientID string
  69. Topic string
  70. Partition int
  71. // The transactional id to use for transactional delivery. Idempotent
  72. // deliver should be enabled if transactional id is configured.
  73. // For more details look at transactional.id description here: http://kafka.apache.org/documentation.html#producerconfigs
  74. // Empty string means that this connection can't be transactional.
  75. TransactionalID string
  76. }
  77. // ReadBatchConfig is a configuration object used for reading batches of messages.
  78. type ReadBatchConfig struct {
  79. MinBytes int
  80. MaxBytes int
  81. // IsolationLevel controls the visibility of transactional records.
  82. // ReadUncommitted makes all records visible. With ReadCommitted only
  83. // non-transactional and committed records are visible.
  84. IsolationLevel IsolationLevel
  85. // MaxWait is the amount of time for the broker while waiting to hit the
  86. // min/max byte targets. This setting is independent of any network-level
  87. // timeouts or deadlines.
  88. //
  89. // For backward compatibility, when this field is left zero, kafka-go will
  90. // infer the max wait from the connection's read deadline.
  91. MaxWait time.Duration
  92. }
  93. type IsolationLevel int8
  94. const (
  95. ReadUncommitted IsolationLevel = 0
  96. ReadCommitted IsolationLevel = 1
  97. )
  98. var (
  99. // DefaultClientID is the default value used as ClientID of kafka
  100. // connections.
  101. DefaultClientID string
  102. )
  103. func init() {
  104. progname := filepath.Base(os.Args[0])
  105. hostname, _ := os.Hostname()
  106. DefaultClientID = fmt.Sprintf("%s@%s (github.com/segmentio/kafka-go)", progname, hostname)
  107. }
  108. // NewConn returns a new kafka connection for the given topic and partition.
  109. func NewConn(conn net.Conn, topic string, partition int) *Conn {
  110. return NewConnWith(conn, ConnConfig{
  111. Topic: topic,
  112. Partition: partition,
  113. })
  114. }
  115. func emptyToNullable(transactionalID string) (result *string) {
  116. if transactionalID != "" {
  117. result = &transactionalID
  118. }
  119. return result
  120. }
  121. // NewConnWith returns a new kafka connection configured with config.
  122. // The offset is initialized to FirstOffset.
  123. func NewConnWith(conn net.Conn, config ConnConfig) *Conn {
  124. if len(config.ClientID) == 0 {
  125. config.ClientID = DefaultClientID
  126. }
  127. if config.Partition < 0 || config.Partition > math.MaxInt32 {
  128. panic(fmt.Sprintf("invalid partition number: %d", config.Partition))
  129. }
  130. c := &Conn{
  131. conn: conn,
  132. rbuf: *bufio.NewReader(conn),
  133. wbuf: *bufio.NewWriter(conn),
  134. clientID: config.ClientID,
  135. topic: config.Topic,
  136. partition: int32(config.Partition),
  137. offset: FirstOffset,
  138. requiredAcks: -1,
  139. transactionalID: emptyToNullable(config.TransactionalID),
  140. }
  141. c.wb.w = &c.wbuf
  142. // The fetch request needs to ask for a MaxBytes value that is at least
  143. // enough to load the control data of the response. To avoid having to
  144. // recompute it on every read, it is cached here in the Conn value.
  145. c.fetchMinSize = (fetchResponseV2{
  146. Topics: []fetchResponseTopicV2{{
  147. TopicName: config.Topic,
  148. Partitions: []fetchResponsePartitionV2{{
  149. Partition: int32(config.Partition),
  150. MessageSet: messageSet{{}},
  151. }},
  152. }},
  153. }).size()
  154. c.fetchMaxBytes = math.MaxInt32 - c.fetchMinSize
  155. return c
  156. }
  157. func (c *Conn) negotiateVersion(key apiKey, sortedSupportedVersions ...apiVersion) (apiVersion, error) {
  158. v, err := c.loadVersions()
  159. if err != nil {
  160. return -1, err
  161. }
  162. a := v.negotiate(key, sortedSupportedVersions...)
  163. if a < 0 {
  164. return -1, fmt.Errorf("no matching versions were found between the client and the broker for API key %d", key)
  165. }
  166. return a, nil
  167. }
  168. func (c *Conn) loadVersions() (apiVersionMap, error) {
  169. v, _ := c.apiVersions.Load().(apiVersionMap)
  170. if v != nil {
  171. return v, nil
  172. }
  173. brokerVersions, err := c.ApiVersions()
  174. if err != nil {
  175. return nil, err
  176. }
  177. v = make(apiVersionMap, len(brokerVersions))
  178. for _, a := range brokerVersions {
  179. v[apiKey(a.ApiKey)] = a
  180. }
  181. c.apiVersions.Store(v)
  182. return v, nil
  183. }
  184. // Controller requests kafka for the current controller and returns its URL
  185. func (c *Conn) Controller() (broker Broker, err error) {
  186. err = c.readOperation(
  187. func(deadline time.Time, id int32) error {
  188. return c.writeRequest(metadata, v1, id, topicMetadataRequestV1([]string{}))
  189. },
  190. func(deadline time.Time, size int) error {
  191. var res metadataResponseV1
  192. if err := c.readResponse(size, &res); err != nil {
  193. return err
  194. }
  195. for _, brokerMeta := range res.Brokers {
  196. if brokerMeta.NodeID == res.ControllerID {
  197. broker = Broker{ID: int(brokerMeta.NodeID),
  198. Port: int(brokerMeta.Port),
  199. Host: brokerMeta.Host,
  200. Rack: brokerMeta.Rack}
  201. break
  202. }
  203. }
  204. return nil
  205. },
  206. )
  207. return broker, err
  208. }
  209. // Brokers retrieve the broker list from the Kafka metadata
  210. func (c *Conn) Brokers() ([]Broker, error) {
  211. var brokers []Broker
  212. err := c.readOperation(
  213. func(deadline time.Time, id int32) error {
  214. return c.writeRequest(metadata, v1, id, topicMetadataRequestV1([]string{}))
  215. },
  216. func(deadline time.Time, size int) error {
  217. var res metadataResponseV1
  218. if err := c.readResponse(size, &res); err != nil {
  219. return err
  220. }
  221. brokers = make([]Broker, len(res.Brokers))
  222. for i, brokerMeta := range res.Brokers {
  223. brokers[i] = Broker{
  224. ID: int(brokerMeta.NodeID),
  225. Port: int(brokerMeta.Port),
  226. Host: brokerMeta.Host,
  227. Rack: brokerMeta.Rack,
  228. }
  229. }
  230. return nil
  231. },
  232. )
  233. return brokers, err
  234. }
  235. // DeleteTopics deletes the specified topics.
  236. func (c *Conn) DeleteTopics(topics ...string) error {
  237. _, err := c.deleteTopics(deleteTopicsRequestV0{
  238. Topics: topics,
  239. })
  240. return err
  241. }
  242. // describeGroups retrieves the specified groups
  243. //
  244. // See http://kafka.apache.org/protocol.html#The_Messages_DescribeGroups
  245. func (c *Conn) describeGroups(request describeGroupsRequestV0) (describeGroupsResponseV0, error) {
  246. var response describeGroupsResponseV0
  247. err := c.readOperation(
  248. func(deadline time.Time, id int32) error {
  249. return c.writeRequest(describeGroups, v0, id, request)
  250. },
  251. func(deadline time.Time, size int) error {
  252. return expectZeroSize(func() (remain int, err error) {
  253. return (&response).readFrom(&c.rbuf, size)
  254. }())
  255. },
  256. )
  257. if err != nil {
  258. return describeGroupsResponseV0{}, err
  259. }
  260. for _, group := range response.Groups {
  261. if group.ErrorCode != 0 {
  262. return describeGroupsResponseV0{}, Error(group.ErrorCode)
  263. }
  264. }
  265. return response, nil
  266. }
  267. // findCoordinator finds the coordinator for the specified group or transaction
  268. //
  269. // See http://kafka.apache.org/protocol.html#The_Messages_FindCoordinator
  270. func (c *Conn) findCoordinator(request findCoordinatorRequestV0) (findCoordinatorResponseV0, error) {
  271. var response findCoordinatorResponseV0
  272. err := c.readOperation(
  273. func(deadline time.Time, id int32) error {
  274. return c.writeRequest(findCoordinator, v0, id, request)
  275. },
  276. func(deadline time.Time, size int) error {
  277. return expectZeroSize(func() (remain int, err error) {
  278. return (&response).readFrom(&c.rbuf, size)
  279. }())
  280. },
  281. )
  282. if err != nil {
  283. return findCoordinatorResponseV0{}, err
  284. }
  285. if response.ErrorCode != 0 {
  286. return findCoordinatorResponseV0{}, Error(response.ErrorCode)
  287. }
  288. return response, nil
  289. }
  290. // heartbeat sends a heartbeat message required by consumer groups
  291. //
  292. // See http://kafka.apache.org/protocol.html#The_Messages_Heartbeat
  293. func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error) {
  294. var response heartbeatResponseV0
  295. err := c.writeOperation(
  296. func(deadline time.Time, id int32) error {
  297. return c.writeRequest(heartbeat, v0, id, request)
  298. },
  299. func(deadline time.Time, size int) error {
  300. return expectZeroSize(func() (remain int, err error) {
  301. return (&response).readFrom(&c.rbuf, size)
  302. }())
  303. },
  304. )
  305. if err != nil {
  306. return heartbeatResponseV0{}, err
  307. }
  308. if response.ErrorCode != 0 {
  309. return heartbeatResponseV0{}, Error(response.ErrorCode)
  310. }
  311. return response, nil
  312. }
  313. // joinGroup attempts to join a consumer group
  314. //
  315. // See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup
  316. func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error) {
  317. var response joinGroupResponseV1
  318. err := c.writeOperation(
  319. func(deadline time.Time, id int32) error {
  320. return c.writeRequest(joinGroup, v1, id, request)
  321. },
  322. func(deadline time.Time, size int) error {
  323. return expectZeroSize(func() (remain int, err error) {
  324. return (&response).readFrom(&c.rbuf, size)
  325. }())
  326. },
  327. )
  328. if err != nil {
  329. return joinGroupResponseV1{}, err
  330. }
  331. if response.ErrorCode != 0 {
  332. return joinGroupResponseV1{}, Error(response.ErrorCode)
  333. }
  334. return response, nil
  335. }
  336. // leaveGroup leaves the consumer from the consumer group
  337. //
  338. // See http://kafka.apache.org/protocol.html#The_Messages_LeaveGroup
  339. func (c *Conn) leaveGroup(request leaveGroupRequestV0) (leaveGroupResponseV0, error) {
  340. var response leaveGroupResponseV0
  341. err := c.writeOperation(
  342. func(deadline time.Time, id int32) error {
  343. return c.writeRequest(leaveGroup, v0, id, request)
  344. },
  345. func(deadline time.Time, size int) error {
  346. return expectZeroSize(func() (remain int, err error) {
  347. return (&response).readFrom(&c.rbuf, size)
  348. }())
  349. },
  350. )
  351. if err != nil {
  352. return leaveGroupResponseV0{}, err
  353. }
  354. if response.ErrorCode != 0 {
  355. return leaveGroupResponseV0{}, Error(response.ErrorCode)
  356. }
  357. return response, nil
  358. }
  359. // listGroups lists all the consumer groups
  360. //
  361. // See http://kafka.apache.org/protocol.html#The_Messages_ListGroups
  362. func (c *Conn) listGroups(request listGroupsRequestV1) (listGroupsResponseV1, error) {
  363. var response listGroupsResponseV1
  364. err := c.readOperation(
  365. func(deadline time.Time, id int32) error {
  366. return c.writeRequest(listGroups, v1, id, request)
  367. },
  368. func(deadline time.Time, size int) error {
  369. return expectZeroSize(func() (remain int, err error) {
  370. return (&response).readFrom(&c.rbuf, size)
  371. }())
  372. },
  373. )
  374. if err != nil {
  375. return listGroupsResponseV1{}, err
  376. }
  377. if response.ErrorCode != 0 {
  378. return listGroupsResponseV1{}, Error(response.ErrorCode)
  379. }
  380. return response, nil
  381. }
  382. // offsetCommit commits the specified topic partition offsets
  383. //
  384. // See http://kafka.apache.org/protocol.html#The_Messages_OffsetCommit
  385. func (c *Conn) offsetCommit(request offsetCommitRequestV2) (offsetCommitResponseV2, error) {
  386. var response offsetCommitResponseV2
  387. err := c.writeOperation(
  388. func(deadline time.Time, id int32) error {
  389. return c.writeRequest(offsetCommit, v2, id, request)
  390. },
  391. func(deadline time.Time, size int) error {
  392. return expectZeroSize(func() (remain int, err error) {
  393. return (&response).readFrom(&c.rbuf, size)
  394. }())
  395. },
  396. )
  397. if err != nil {
  398. return offsetCommitResponseV2{}, err
  399. }
  400. for _, r := range response.Responses {
  401. for _, pr := range r.PartitionResponses {
  402. if pr.ErrorCode != 0 {
  403. return offsetCommitResponseV2{}, Error(pr.ErrorCode)
  404. }
  405. }
  406. }
  407. return response, nil
  408. }
  409. // offsetFetch fetches the offsets for the specified topic partitions.
  410. // -1 indicates that there is no offset saved for the partition.
  411. //
  412. // See http://kafka.apache.org/protocol.html#The_Messages_OffsetFetch
  413. func (c *Conn) offsetFetch(request offsetFetchRequestV1) (offsetFetchResponseV1, error) {
  414. var response offsetFetchResponseV1
  415. err := c.readOperation(
  416. func(deadline time.Time, id int32) error {
  417. return c.writeRequest(offsetFetch, v1, id, request)
  418. },
  419. func(deadline time.Time, size int) error {
  420. return expectZeroSize(func() (remain int, err error) {
  421. return (&response).readFrom(&c.rbuf, size)
  422. }())
  423. },
  424. )
  425. if err != nil {
  426. return offsetFetchResponseV1{}, err
  427. }
  428. for _, r := range response.Responses {
  429. for _, pr := range r.PartitionResponses {
  430. if pr.ErrorCode != 0 {
  431. return offsetFetchResponseV1{}, Error(pr.ErrorCode)
  432. }
  433. }
  434. }
  435. return response, nil
  436. }
  437. // syncGroup completes the handshake to join a consumer group
  438. //
  439. // See http://kafka.apache.org/protocol.html#The_Messages_SyncGroup
  440. func (c *Conn) syncGroup(request syncGroupRequestV0) (syncGroupResponseV0, error) {
  441. var response syncGroupResponseV0
  442. err := c.readOperation(
  443. func(deadline time.Time, id int32) error {
  444. return c.writeRequest(syncGroup, v0, id, request)
  445. },
  446. func(deadline time.Time, size int) error {
  447. return expectZeroSize(func() (remain int, err error) {
  448. return (&response).readFrom(&c.rbuf, size)
  449. }())
  450. },
  451. )
  452. if err != nil {
  453. return syncGroupResponseV0{}, err
  454. }
  455. if response.ErrorCode != 0 {
  456. return syncGroupResponseV0{}, Error(response.ErrorCode)
  457. }
  458. return response, nil
  459. }
  460. // Close closes the kafka connection.
  461. func (c *Conn) Close() error {
  462. return c.conn.Close()
  463. }
  464. // LocalAddr returns the local network address.
  465. func (c *Conn) LocalAddr() net.Addr {
  466. return c.conn.LocalAddr()
  467. }
  468. // RemoteAddr returns the remote network address.
  469. func (c *Conn) RemoteAddr() net.Addr {
  470. return c.conn.RemoteAddr()
  471. }
  472. // SetDeadline sets the read and write deadlines associated with the connection.
  473. // It is equivalent to calling both SetReadDeadline and SetWriteDeadline.
  474. //
  475. // A deadline is an absolute time after which I/O operations fail with a timeout
  476. // (see type Error) instead of blocking. The deadline applies to all future and
  477. // pending I/O, not just the immediately following call to Read or Write. After
  478. // a deadline has been exceeded, the connection may be closed if it was found to
  479. // be in an unrecoverable state.
  480. //
  481. // A zero value for t means I/O operations will not time out.
  482. func (c *Conn) SetDeadline(t time.Time) error {
  483. c.rdeadline.setDeadline(t)
  484. c.wdeadline.setDeadline(t)
  485. return nil
  486. }
  487. // SetReadDeadline sets the deadline for future Read calls and any
  488. // currently-blocked Read call.
  489. // A zero value for t means Read will not time out.
  490. func (c *Conn) SetReadDeadline(t time.Time) error {
  491. c.rdeadline.setDeadline(t)
  492. return nil
  493. }
  494. // SetWriteDeadline sets the deadline for future Write calls and any
  495. // currently-blocked Write call.
  496. // Even if write times out, it may return n > 0, indicating that some of the
  497. // data was successfully written.
  498. // A zero value for t means Write will not time out.
  499. func (c *Conn) SetWriteDeadline(t time.Time) error {
  500. c.wdeadline.setDeadline(t)
  501. return nil
  502. }
  503. // Offset returns the current offset of the connection as pair of integers,
  504. // where the first one is an offset value and the second one indicates how
  505. // to interpret it.
  506. //
  507. // See Seek for more details about the offset and whence values.
  508. func (c *Conn) Offset() (offset int64, whence int) {
  509. c.mutex.Lock()
  510. offset = c.offset
  511. c.mutex.Unlock()
  512. switch offset {
  513. case FirstOffset:
  514. offset = 0
  515. whence = SeekStart
  516. case LastOffset:
  517. offset = 0
  518. whence = SeekEnd
  519. default:
  520. whence = SeekAbsolute
  521. }
  522. return
  523. }
  524. const (
  525. SeekStart = 0 // Seek relative to the first offset available in the partition.
  526. SeekAbsolute = 1 // Seek to an absolute offset.
  527. SeekEnd = 2 // Seek relative to the last offset available in the partition.
  528. SeekCurrent = 3 // Seek relative to the current offset.
  529. // This flag may be combined to any of the SeekAbsolute and SeekCurrent
  530. // constants to skip the bound check that the connection would do otherwise.
  531. // Programs can use this flag to avoid making a metadata request to the kafka
  532. // broker to read the current first and last offsets of the partition.
  533. SeekDontCheck = 1 << 30
  534. )
  535. // Seek sets the offset for the next read or write operation according to whence, which
  536. // should be one of SeekStart, SeekAbsolute, SeekEnd, or SeekCurrent.
  537. // When seeking relative to the end, the offset is subtracted from the current offset.
  538. // Note that for historical reasons, these do not align with the usual whence constants
  539. // as in lseek(2) or os.Seek.
  540. // The method returns the new absolute offset of the connection.
  541. func (c *Conn) Seek(offset int64, whence int) (int64, error) {
  542. seekDontCheck := (whence & SeekDontCheck) != 0
  543. whence &= ^SeekDontCheck
  544. switch whence {
  545. case SeekStart, SeekAbsolute, SeekEnd, SeekCurrent:
  546. default:
  547. return 0, fmt.Errorf("whence must be one of 0, 1, 2, or 3. (whence = %d)", whence)
  548. }
  549. if seekDontCheck {
  550. if whence == SeekAbsolute {
  551. c.mutex.Lock()
  552. c.offset = offset
  553. c.mutex.Unlock()
  554. return offset, nil
  555. }
  556. if whence == SeekCurrent {
  557. c.mutex.Lock()
  558. c.offset += offset
  559. offset = c.offset
  560. c.mutex.Unlock()
  561. return offset, nil
  562. }
  563. }
  564. if whence == SeekAbsolute {
  565. c.mutex.Lock()
  566. unchanged := offset == c.offset
  567. c.mutex.Unlock()
  568. if unchanged {
  569. return offset, nil
  570. }
  571. }
  572. if whence == SeekCurrent {
  573. c.mutex.Lock()
  574. offset = c.offset + offset
  575. c.mutex.Unlock()
  576. }
  577. first, last, err := c.ReadOffsets()
  578. if err != nil {
  579. return 0, err
  580. }
  581. switch whence {
  582. case SeekStart:
  583. offset = first + offset
  584. case SeekEnd:
  585. offset = last - offset
  586. }
  587. if offset < first || offset > last {
  588. return 0, OffsetOutOfRange
  589. }
  590. c.mutex.Lock()
  591. c.offset = offset
  592. c.mutex.Unlock()
  593. return offset, nil
  594. }
  595. // Read reads the message at the current offset from the connection, advancing
  596. // the offset on success so the next call to a read method will produce the next
  597. // message.
  598. // The method returns the number of bytes read, or an error if something went
  599. // wrong.
  600. //
  601. // While it is safe to call Read concurrently from multiple goroutines it may
  602. // be hard for the program to predict the results as the connection offset will
  603. // be read and written by multiple goroutines, they could read duplicates, or
  604. // messages may be seen by only some of the goroutines.
  605. //
  606. // The method fails with io.ErrShortBuffer if the buffer passed as argument is
  607. // too small to hold the message value.
  608. //
  609. // This method is provided to satisfy the net.Conn interface but is much less
  610. // efficient than using the more general purpose ReadBatch method.
  611. func (c *Conn) Read(b []byte) (int, error) {
  612. batch := c.ReadBatch(1, len(b))
  613. n, err := batch.Read(b)
  614. return n, coalesceErrors(silentEOF(err), batch.Close())
  615. }
  616. // ReadMessage reads the message at the current offset from the connection,
  617. // advancing the offset on success so the next call to a read method will
  618. // produce the next message.
  619. //
  620. // Because this method allocate memory buffers for the message key and value
  621. // it is less memory-efficient than Read, but has the advantage of never
  622. // failing with io.ErrShortBuffer.
  623. //
  624. // While it is safe to call Read concurrently from multiple goroutines it may
  625. // be hard for the program to predict the results as the connection offset will
  626. // be read and written by multiple goroutines, they could read duplicates, or
  627. // messages may be seen by only some of the goroutines.
  628. //
  629. // This method is provided for convenience purposes but is much less efficient
  630. // than using the more general purpose ReadBatch method.
  631. func (c *Conn) ReadMessage(maxBytes int) (Message, error) {
  632. batch := c.ReadBatch(1, maxBytes)
  633. msg, err := batch.ReadMessage()
  634. return msg, coalesceErrors(silentEOF(err), batch.Close())
  635. }
  636. // ReadBatch reads a batch of messages from the kafka server. The method always
  637. // returns a non-nil Batch value. If an error occurred, either sending the fetch
  638. // request or reading the response, the error will be made available by the
  639. // returned value of the batch's Close method.
  640. //
  641. // While it is safe to call ReadBatch concurrently from multiple goroutines it
  642. // may be hard for the program to predict the results as the connection offset
  643. // will be read and written by multiple goroutines, they could read duplicates,
  644. // or messages may be seen by only some of the goroutines.
  645. //
  646. // A program doesn't specify the number of messages in wants from a batch, but
  647. // gives the minimum and maximum number of bytes that it wants to receive from
  648. // the kafka server.
  649. func (c *Conn) ReadBatch(minBytes, maxBytes int) *Batch {
  650. return c.ReadBatchWith(ReadBatchConfig{
  651. MinBytes: minBytes,
  652. MaxBytes: maxBytes,
  653. })
  654. }
  655. // ReadBatchWith in every way is similar to ReadBatch. ReadBatch is configured
  656. // with the default values in ReadBatchConfig except for minBytes and maxBytes.
  657. func (c *Conn) ReadBatchWith(cfg ReadBatchConfig) *Batch {
  658. var adjustedDeadline time.Time
  659. var maxFetch = int(c.fetchMaxBytes)
  660. if cfg.MinBytes < 0 || cfg.MinBytes > maxFetch {
  661. return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes of %d out of [1,%d] bounds", cfg.MinBytes, maxFetch)}
  662. }
  663. if cfg.MaxBytes < 0 || cfg.MaxBytes > maxFetch {
  664. return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: maxBytes of %d out of [1,%d] bounds", cfg.MaxBytes, maxFetch)}
  665. }
  666. if cfg.MinBytes > cfg.MaxBytes {
  667. return &Batch{err: fmt.Errorf("kafka.(*Conn).ReadBatch: minBytes (%d) > maxBytes (%d)", cfg.MinBytes, cfg.MaxBytes)}
  668. }
  669. offset, whence := c.Offset()
  670. offset, err := c.Seek(offset, whence|SeekDontCheck)
  671. if err != nil {
  672. return &Batch{err: dontExpectEOF(err)}
  673. }
  674. fetchVersion, err := c.negotiateVersion(fetch, v2, v5, v10)
  675. if err != nil {
  676. return &Batch{err: dontExpectEOF(err)}
  677. }
  678. id, err := c.doRequest(&c.rdeadline, func(deadline time.Time, id int32) error {
  679. now := time.Now()
  680. var timeout time.Duration
  681. if cfg.MaxWait > 0 {
  682. // explicitly-configured case: no changes are made to the deadline,
  683. // and the timeout is sent exactly as specified.
  684. timeout = cfg.MaxWait
  685. } else {
  686. // default case: use the original logic to adjust the conn's
  687. // deadline.T
  688. deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
  689. timeout = deadlineToTimeout(deadline, now)
  690. }
  691. // save this variable outside of the closure for later use in detecting
  692. // truncated messages.
  693. adjustedDeadline = deadline
  694. switch fetchVersion {
  695. case v10:
  696. return c.wb.writeFetchRequestV10(
  697. id,
  698. c.clientID,
  699. c.topic,
  700. c.partition,
  701. offset,
  702. cfg.MinBytes,
  703. cfg.MaxBytes+int(c.fetchMinSize),
  704. timeout,
  705. int8(cfg.IsolationLevel),
  706. )
  707. case v5:
  708. return c.wb.writeFetchRequestV5(
  709. id,
  710. c.clientID,
  711. c.topic,
  712. c.partition,
  713. offset,
  714. cfg.MinBytes,
  715. cfg.MaxBytes+int(c.fetchMinSize),
  716. timeout,
  717. int8(cfg.IsolationLevel),
  718. )
  719. default:
  720. return c.wb.writeFetchRequestV2(
  721. id,
  722. c.clientID,
  723. c.topic,
  724. c.partition,
  725. offset,
  726. cfg.MinBytes,
  727. cfg.MaxBytes+int(c.fetchMinSize),
  728. timeout,
  729. )
  730. }
  731. })
  732. if err != nil {
  733. return &Batch{err: dontExpectEOF(err)}
  734. }
  735. _, size, lock, err := c.waitResponse(&c.rdeadline, id)
  736. if err != nil {
  737. return &Batch{err: dontExpectEOF(err)}
  738. }
  739. var throttle int32
  740. var highWaterMark int64
  741. var remain int
  742. switch fetchVersion {
  743. case v10:
  744. throttle, highWaterMark, remain, err = readFetchResponseHeaderV10(&c.rbuf, size)
  745. case v5:
  746. throttle, highWaterMark, remain, err = readFetchResponseHeaderV5(&c.rbuf, size)
  747. default:
  748. throttle, highWaterMark, remain, err = readFetchResponseHeaderV2(&c.rbuf, size)
  749. }
  750. if err == errShortRead {
  751. err = checkTimeoutErr(adjustedDeadline)
  752. }
  753. var msgs *messageSetReader
  754. if err == nil {
  755. if highWaterMark == offset {
  756. msgs = &messageSetReader{empty: true}
  757. } else {
  758. msgs, err = newMessageSetReader(&c.rbuf, remain)
  759. }
  760. }
  761. if err == errShortRead {
  762. err = checkTimeoutErr(adjustedDeadline)
  763. }
  764. return &Batch{
  765. conn: c,
  766. msgs: msgs,
  767. deadline: adjustedDeadline,
  768. throttle: makeDuration(throttle),
  769. lock: lock,
  770. topic: c.topic, // topic is copied to Batch to prevent race with Batch.close
  771. partition: int(c.partition), // partition is copied to Batch to prevent race with Batch.close
  772. offset: offset,
  773. highWaterMark: highWaterMark,
  774. // there shouldn't be a short read on initially setting up the batch.
  775. // as such, any io.EOF is re-mapped to an io.ErrUnexpectedEOF so that we
  776. // don't accidentally signal that we successfully reached the end of the
  777. // batch.
  778. err: dontExpectEOF(err),
  779. }
  780. }
  781. // ReadOffset returns the offset of the first message with a timestamp equal or
  782. // greater to t.
  783. func (c *Conn) ReadOffset(t time.Time) (int64, error) {
  784. return c.readOffset(timestamp(t))
  785. }
  786. // ReadFirstOffset returns the first offset available on the connection.
  787. func (c *Conn) ReadFirstOffset() (int64, error) {
  788. return c.readOffset(FirstOffset)
  789. }
  790. // ReadLastOffset returns the last offset available on the connection.
  791. func (c *Conn) ReadLastOffset() (int64, error) {
  792. return c.readOffset(LastOffset)
  793. }
  794. // ReadOffsets returns the absolute first and last offsets of the topic used by
  795. // the connection.
  796. func (c *Conn) ReadOffsets() (first, last int64, err error) {
  797. // We have to submit two different requests to fetch the first and last
  798. // offsets because kafka refuses requests that ask for multiple offsets
  799. // on the same topic and partition.
  800. if first, err = c.ReadFirstOffset(); err != nil {
  801. return
  802. }
  803. if last, err = c.ReadLastOffset(); err != nil {
  804. first = 0 // don't leak the value on error
  805. return
  806. }
  807. return
  808. }
  809. func (c *Conn) readOffset(t int64) (offset int64, err error) {
  810. err = c.readOperation(
  811. func(deadline time.Time, id int32) error {
  812. return c.wb.writeListOffsetRequestV1(id, c.clientID, c.topic, c.partition, t)
  813. },
  814. func(deadline time.Time, size int) error {
  815. return expectZeroSize(readArrayWith(&c.rbuf, size, func(r *bufio.Reader, size int) (int, error) {
  816. // We skip the topic name because we've made a request for
  817. // a single topic.
  818. size, err := discardString(r, size)
  819. if err != nil {
  820. return size, err
  821. }
  822. // Reading the array of partitions, there will be only one
  823. // partition which gives the offset we're looking for.
  824. return readArrayWith(r, size, func(r *bufio.Reader, size int) (int, error) {
  825. var p partitionOffsetV1
  826. size, err := p.readFrom(r, size)
  827. if err != nil {
  828. return size, err
  829. }
  830. if p.ErrorCode != 0 {
  831. return size, Error(p.ErrorCode)
  832. }
  833. offset = p.Offset
  834. return size, nil
  835. })
  836. }))
  837. },
  838. )
  839. return
  840. }
  841. // ReadPartitions returns the list of available partitions for the given list of
  842. // topics.
  843. //
  844. // If the method is called with no topic, it uses the topic configured on the
  845. // connection. If there are none, the method fetches all partitions of the kafka
  846. // cluster.
  847. func (c *Conn) ReadPartitions(topics ...string) (partitions []Partition, err error) {
  848. if len(topics) == 0 {
  849. if len(c.topic) != 0 {
  850. defaultTopics := [...]string{c.topic}
  851. topics = defaultTopics[:]
  852. } else {
  853. // topics needs to be explicitly nil-ed out or the broker will
  854. // interpret it as a request for 0 partitions instead of all.
  855. topics = nil
  856. }
  857. }
  858. err = c.readOperation(
  859. func(deadline time.Time, id int32) error {
  860. return c.writeRequest(metadata, v1, id, topicMetadataRequestV1(topics))
  861. },
  862. func(deadline time.Time, size int) error {
  863. var res metadataResponseV1
  864. if err := c.readResponse(size, &res); err != nil {
  865. return err
  866. }
  867. brokers := make(map[int32]Broker, len(res.Brokers))
  868. for _, b := range res.Brokers {
  869. brokers[b.NodeID] = Broker{
  870. Host: b.Host,
  871. Port: int(b.Port),
  872. ID: int(b.NodeID),
  873. Rack: b.Rack,
  874. }
  875. }
  876. makeBrokers := func(ids ...int32) []Broker {
  877. b := make([]Broker, len(ids))
  878. for i, id := range ids {
  879. b[i] = brokers[id]
  880. }
  881. return b
  882. }
  883. for _, t := range res.Topics {
  884. if t.TopicErrorCode != 0 && (c.topic == "" || t.TopicName == c.topic) {
  885. // We only report errors if they happened for the topic of
  886. // the connection, otherwise the topic will simply have no
  887. // partitions in the result set.
  888. return Error(t.TopicErrorCode)
  889. }
  890. for _, p := range t.Partitions {
  891. partitions = append(partitions, Partition{
  892. Topic: t.TopicName,
  893. Leader: brokers[p.Leader],
  894. Replicas: makeBrokers(p.Replicas...),
  895. Isr: makeBrokers(p.Isr...),
  896. ID: int(p.PartitionID),
  897. })
  898. }
  899. }
  900. return nil
  901. },
  902. )
  903. return
  904. }
  905. // Write writes a message to the kafka broker that this connection was
  906. // established to. The method returns the number of bytes written, or an error
  907. // if something went wrong.
  908. //
  909. // The operation either succeeds or fail, it never partially writes the message.
  910. //
  911. // This method is exposed to satisfy the net.Conn interface but is less efficient
  912. // than the more general purpose WriteMessages method.
  913. func (c *Conn) Write(b []byte) (int, error) {
  914. return c.WriteCompressedMessages(nil, Message{Value: b})
  915. }
  916. // WriteMessages writes a batch of messages to the connection's topic and
  917. // partition, returning the number of bytes written. The write is an atomic
  918. // operation, it either fully succeeds or fails.
  919. func (c *Conn) WriteMessages(msgs ...Message) (int, error) {
  920. return c.WriteCompressedMessages(nil, msgs...)
  921. }
  922. // WriteCompressedMessages writes a batch of messages to the connection's topic
  923. // and partition, returning the number of bytes written. The write is an atomic
  924. // operation, it either fully succeeds or fails.
  925. //
  926. // If the compression codec is not nil, the messages will be compressed.
  927. func (c *Conn) WriteCompressedMessages(codec CompressionCodec, msgs ...Message) (nbytes int, err error) {
  928. nbytes, _, _, _, err = c.writeCompressedMessages(codec, msgs...)
  929. return
  930. }
  931. // WriteCompressedMessagesAt writes a batch of messages to the connection's topic
  932. // and partition, returning the number of bytes written, partition and offset numbers
  933. // and timestamp assigned by the kafka broker to the message set. The write is an atomic
  934. // operation, it either fully succeeds or fails.
  935. //
  936. // If the compression codec is not nil, the messages will be compressed.
  937. func (c *Conn) WriteCompressedMessagesAt(codec CompressionCodec, msgs ...Message) (nbytes int, partition int32, offset int64, appendTime time.Time, err error) {
  938. return c.writeCompressedMessages(codec, msgs...)
  939. }
  940. func (c *Conn) writeCompressedMessages(codec CompressionCodec, msgs ...Message) (nbytes int, partition int32, offset int64, appendTime time.Time, err error) {
  941. if len(msgs) == 0 {
  942. return
  943. }
  944. writeTime := time.Now()
  945. for i, msg := range msgs {
  946. // users may believe they can set the Topic and/or Partition
  947. // on the kafka message.
  948. if msg.Topic != "" && msg.Topic != c.topic {
  949. err = errInvalidWriteTopic
  950. return
  951. }
  952. if msg.Partition != 0 {
  953. err = errInvalidWritePartition
  954. return
  955. }
  956. if msg.Time.IsZero() {
  957. msgs[i].Time = writeTime
  958. }
  959. nbytes += len(msg.Key) + len(msg.Value)
  960. }
  961. var produceVersion apiVersion
  962. if produceVersion, err = c.negotiateVersion(produce, v2, v3, v7); err != nil {
  963. return
  964. }
  965. err = c.writeOperation(
  966. func(deadline time.Time, id int32) error {
  967. now := time.Now()
  968. deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
  969. switch produceVersion {
  970. case v7:
  971. recordBatch, err :=
  972. newRecordBatch(
  973. codec,
  974. msgs...,
  975. )
  976. if err != nil {
  977. return err
  978. }
  979. return c.wb.writeProduceRequestV7(
  980. id,
  981. c.clientID,
  982. c.topic,
  983. c.partition,
  984. deadlineToTimeout(deadline, now),
  985. int16(atomic.LoadInt32(&c.requiredAcks)),
  986. c.transactionalID,
  987. recordBatch,
  988. )
  989. case v3:
  990. recordBatch, err :=
  991. newRecordBatch(
  992. codec,
  993. msgs...,
  994. )
  995. if err != nil {
  996. return err
  997. }
  998. return c.wb.writeProduceRequestV3(
  999. id,
  1000. c.clientID,
  1001. c.topic,
  1002. c.partition,
  1003. deadlineToTimeout(deadline, now),
  1004. int16(atomic.LoadInt32(&c.requiredAcks)),
  1005. c.transactionalID,
  1006. recordBatch,
  1007. )
  1008. default:
  1009. return c.wb.writeProduceRequestV2(
  1010. codec,
  1011. id,
  1012. c.clientID,
  1013. c.topic,
  1014. c.partition,
  1015. deadlineToTimeout(deadline, now),
  1016. int16(atomic.LoadInt32(&c.requiredAcks)),
  1017. msgs...,
  1018. )
  1019. }
  1020. },
  1021. func(deadline time.Time, size int) error {
  1022. return expectZeroSize(readArrayWith(&c.rbuf, size, func(r *bufio.Reader, size int) (int, error) {
  1023. // Skip the topic, we've produced the message to only one topic,
  1024. // no need to waste resources loading it in memory.
  1025. size, err := discardString(r, size)
  1026. if err != nil {
  1027. return size, err
  1028. }
  1029. // Read the list of partitions, there should be only one since
  1030. // we've produced a message to a single partition.
  1031. size, err = readArrayWith(r, size, func(r *bufio.Reader, size int) (int, error) {
  1032. switch produceVersion {
  1033. case v7:
  1034. var p produceResponsePartitionV7
  1035. size, err := p.readFrom(r, size)
  1036. if err == nil && p.ErrorCode != 0 {
  1037. err = Error(p.ErrorCode)
  1038. }
  1039. if err == nil {
  1040. partition = p.Partition
  1041. offset = p.Offset
  1042. appendTime = time.Unix(0, p.Timestamp*int64(time.Millisecond))
  1043. }
  1044. return size, err
  1045. default:
  1046. var p produceResponsePartitionV2
  1047. size, err := p.readFrom(r, size)
  1048. if err == nil && p.ErrorCode != 0 {
  1049. err = Error(p.ErrorCode)
  1050. }
  1051. if err == nil {
  1052. partition = p.Partition
  1053. offset = p.Offset
  1054. appendTime = time.Unix(0, p.Timestamp*int64(time.Millisecond))
  1055. }
  1056. return size, err
  1057. }
  1058. })
  1059. if err != nil {
  1060. return size, err
  1061. }
  1062. // The response is trailed by the throttle time, also skipping
  1063. // since it's not interesting here.
  1064. return discardInt32(r, size)
  1065. }))
  1066. },
  1067. )
  1068. if err != nil {
  1069. nbytes = 0
  1070. }
  1071. return
  1072. }
  1073. // SetRequiredAcks sets the number of acknowledges from replicas that the
  1074. // connection requests when producing messages.
  1075. func (c *Conn) SetRequiredAcks(n int) error {
  1076. switch n {
  1077. case -1, 1:
  1078. atomic.StoreInt32(&c.requiredAcks, int32(n))
  1079. return nil
  1080. default:
  1081. return InvalidRequiredAcks
  1082. }
  1083. }
  1084. func (c *Conn) writeRequestHeader(apiKey apiKey, apiVersion apiVersion, correlationID int32, size int32) {
  1085. hdr := c.requestHeader(apiKey, apiVersion, correlationID)
  1086. hdr.Size = (hdr.size() + size) - 4
  1087. hdr.writeTo(&c.wb)
  1088. }
  1089. func (c *Conn) writeRequest(apiKey apiKey, apiVersion apiVersion, correlationID int32, req request) error {
  1090. hdr := c.requestHeader(apiKey, apiVersion, correlationID)
  1091. hdr.Size = (hdr.size() + req.size()) - 4
  1092. hdr.writeTo(&c.wb)
  1093. req.writeTo(&c.wb)
  1094. return c.wbuf.Flush()
  1095. }
  1096. func (c *Conn) readResponse(size int, res interface{}) error {
  1097. size, err := read(&c.rbuf, size, res)
  1098. switch err.(type) {
  1099. case Error:
  1100. var e error
  1101. if size, e = discardN(&c.rbuf, size, size); e != nil {
  1102. err = e
  1103. }
  1104. }
  1105. return expectZeroSize(size, err)
  1106. }
  1107. func (c *Conn) peekResponseSizeAndID() (int32, int32, error) {
  1108. b, err := c.rbuf.Peek(8)
  1109. if err != nil {
  1110. return 0, 0, err
  1111. }
  1112. size, id := makeInt32(b[:4]), makeInt32(b[4:])
  1113. return size, id, nil
  1114. }
  1115. func (c *Conn) skipResponseSizeAndID() {
  1116. c.rbuf.Discard(8)
  1117. }
  1118. func (c *Conn) readDeadline() time.Time {
  1119. return c.rdeadline.deadline()
  1120. }
  1121. func (c *Conn) writeDeadline() time.Time {
  1122. return c.wdeadline.deadline()
  1123. }
  1124. func (c *Conn) readOperation(write func(time.Time, int32) error, read func(time.Time, int) error) error {
  1125. return c.do(&c.rdeadline, write, read)
  1126. }
  1127. func (c *Conn) writeOperation(write func(time.Time, int32) error, read func(time.Time, int) error) error {
  1128. return c.do(&c.wdeadline, write, read)
  1129. }
  1130. func (c *Conn) enter() {
  1131. atomic.AddInt32(&c.inflight, +1)
  1132. }
  1133. func (c *Conn) leave() {
  1134. atomic.AddInt32(&c.inflight, -1)
  1135. }
  1136. func (c *Conn) concurrency() int {
  1137. return int(atomic.LoadInt32(&c.inflight))
  1138. }
  1139. func (c *Conn) do(d *connDeadline, write func(time.Time, int32) error, read func(time.Time, int) error) error {
  1140. id, err := c.doRequest(d, write)
  1141. if err != nil {
  1142. return err
  1143. }
  1144. deadline, size, lock, err := c.waitResponse(d, id)
  1145. if err != nil {
  1146. return err
  1147. }
  1148. if err = read(deadline, size); err != nil {
  1149. switch err.(type) {
  1150. case Error:
  1151. default:
  1152. c.conn.Close()
  1153. }
  1154. }
  1155. d.unsetConnReadDeadline()
  1156. lock.Unlock()
  1157. return err
  1158. }
  1159. func (c *Conn) doRequest(d *connDeadline, write func(time.Time, int32) error) (id int32, err error) {
  1160. c.enter()
  1161. c.wlock.Lock()
  1162. c.correlationID++
  1163. id = c.correlationID
  1164. err = write(d.setConnWriteDeadline(c.conn), id)
  1165. d.unsetConnWriteDeadline()
  1166. if err != nil {
  1167. // When an error occurs there's no way to know if the connection is in a
  1168. // recoverable state so we're better off just giving up at this point to
  1169. // avoid any risk of corrupting the following operations.
  1170. c.conn.Close()
  1171. c.leave()
  1172. }
  1173. c.wlock.Unlock()
  1174. return
  1175. }
  1176. func (c *Conn) waitResponse(d *connDeadline, id int32) (deadline time.Time, size int, lock *sync.Mutex, err error) {
  1177. for {
  1178. var rsz int32
  1179. var rid int32
  1180. c.rlock.Lock()
  1181. deadline = d.setConnReadDeadline(c.conn)
  1182. rsz, rid, err = c.peekResponseSizeAndID()
  1183. if err != nil {
  1184. d.unsetConnReadDeadline()
  1185. c.conn.Close()
  1186. c.rlock.Unlock()
  1187. break
  1188. }
  1189. if id == rid {
  1190. c.skipResponseSizeAndID()
  1191. size, lock = int(rsz-4), &c.rlock
  1192. // Don't unlock the read mutex to yield ownership to the caller.
  1193. break
  1194. }
  1195. if c.concurrency() == 1 {
  1196. // If the goroutine is the only one waiting on this connection it
  1197. // should be impossible to read a correlation id different from the
  1198. // one it expects. This is a sign that the data we are reading on
  1199. // the wire is corrupted and the connection needs to be closed.
  1200. err = io.ErrNoProgress
  1201. c.rlock.Unlock()
  1202. break
  1203. }
  1204. // Optimistically release the read lock if a response has already
  1205. // been received but the current operation is not the target for it.
  1206. c.rlock.Unlock()
  1207. runtime.Gosched()
  1208. }
  1209. c.leave()
  1210. return
  1211. }
  1212. func (c *Conn) requestHeader(apiKey apiKey, apiVersion apiVersion, correlationID int32) requestHeader {
  1213. return requestHeader{
  1214. ApiKey: int16(apiKey),
  1215. ApiVersion: int16(apiVersion),
  1216. CorrelationID: correlationID,
  1217. ClientID: c.clientID,
  1218. }
  1219. }
  1220. func (c *Conn) ApiVersions() ([]ApiVersion, error) {
  1221. deadline := &c.rdeadline
  1222. if deadline.deadline().IsZero() {
  1223. // ApiVersions is called automatically when API version negotiation
  1224. // needs to happen, so we are not guaranteed that a read deadline has
  1225. // been set yet. Fallback to use the write deadline in case it was
  1226. // set, for example when version negotiation is initiated during a
  1227. // produce request.
  1228. deadline = &c.wdeadline
  1229. }
  1230. id, err := c.doRequest(deadline, func(_ time.Time, id int32) error {
  1231. h := requestHeader{
  1232. ApiKey: int16(apiVersions),
  1233. ApiVersion: int16(v0),
  1234. CorrelationID: id,
  1235. ClientID: c.clientID,
  1236. }
  1237. h.Size = (h.size() - 4)
  1238. h.writeTo(&c.wb)
  1239. return c.wbuf.Flush()
  1240. })
  1241. if err != nil {
  1242. return nil, err
  1243. }
  1244. _, size, lock, err := c.waitResponse(deadline, id)
  1245. if err != nil {
  1246. return nil, err
  1247. }
  1248. defer lock.Unlock()
  1249. var errorCode int16
  1250. if size, err = readInt16(&c.rbuf, size, &errorCode); err != nil {
  1251. return nil, err
  1252. }
  1253. var arrSize int32
  1254. if size, err = readInt32(&c.rbuf, size, &arrSize); err != nil {
  1255. return nil, err
  1256. }
  1257. r := make([]ApiVersion, arrSize)
  1258. for i := 0; i < int(arrSize); i++ {
  1259. if size, err = readInt16(&c.rbuf, size, &r[i].ApiKey); err != nil {
  1260. return nil, err
  1261. }
  1262. if size, err = readInt16(&c.rbuf, size, &r[i].MinVersion); err != nil {
  1263. return nil, err
  1264. }
  1265. if size, err = readInt16(&c.rbuf, size, &r[i].MaxVersion); err != nil {
  1266. return nil, err
  1267. }
  1268. }
  1269. if errorCode != 0 {
  1270. return r, Error(errorCode)
  1271. }
  1272. return r, nil
  1273. }
  1274. // connDeadline is a helper type to implement read/write deadline management on
  1275. // the kafka connection.
  1276. type connDeadline struct {
  1277. mutex sync.Mutex
  1278. value time.Time
  1279. rconn net.Conn
  1280. wconn net.Conn
  1281. }
  1282. func (d *connDeadline) deadline() time.Time {
  1283. d.mutex.Lock()
  1284. t := d.value
  1285. d.mutex.Unlock()
  1286. return t
  1287. }
  1288. func (d *connDeadline) setDeadline(t time.Time) {
  1289. d.mutex.Lock()
  1290. d.value = t
  1291. if d.rconn != nil {
  1292. d.rconn.SetReadDeadline(t)
  1293. }
  1294. if d.wconn != nil {
  1295. d.wconn.SetWriteDeadline(t)
  1296. }
  1297. d.mutex.Unlock()
  1298. }
  1299. func (d *connDeadline) setConnReadDeadline(conn net.Conn) time.Time {
  1300. d.mutex.Lock()
  1301. deadline := d.value
  1302. d.rconn = conn
  1303. d.rconn.SetReadDeadline(deadline)
  1304. d.mutex.Unlock()
  1305. return deadline
  1306. }
  1307. func (d *connDeadline) setConnWriteDeadline(conn net.Conn) time.Time {
  1308. d.mutex.Lock()
  1309. deadline := d.value
  1310. d.wconn = conn
  1311. d.wconn.SetWriteDeadline(deadline)
  1312. d.mutex.Unlock()
  1313. return deadline
  1314. }
  1315. func (d *connDeadline) unsetConnReadDeadline() {
  1316. d.mutex.Lock()
  1317. d.rconn = nil
  1318. d.mutex.Unlock()
  1319. }
  1320. func (d *connDeadline) unsetConnWriteDeadline() {
  1321. d.mutex.Lock()
  1322. d.wconn = nil
  1323. d.mutex.Unlock()
  1324. }
  1325. // saslHandshake sends the SASL handshake message. This will determine whether
  1326. // the Mechanism is supported by the cluster. If it's not, this function will
  1327. // error out with UnsupportedSASLMechanism.
  1328. //
  1329. // If the mechanism is unsupported, the handshake request will reply with the
  1330. // list of the cluster's configured mechanisms, which could potentially be used
  1331. // to facilitate negotiation. At the moment, we are not negotiating the
  1332. // mechanism as we believe that brokers are usually known to the client, and
  1333. // therefore the client should already know which mechanisms are supported.
  1334. //
  1335. // See http://kafka.apache.org/protocol.html#The_Messages_SaslHandshake
  1336. func (c *Conn) saslHandshake(mechanism string) error {
  1337. // The wire format for V0 and V1 is identical, but the version
  1338. // number will affect how the SASL authentication
  1339. // challenge/responses are sent
  1340. var resp saslHandshakeResponseV0
  1341. version, err := c.negotiateVersion(saslHandshake, v0, v1)
  1342. if err != nil {
  1343. return err
  1344. }
  1345. err = c.writeOperation(
  1346. func(deadline time.Time, id int32) error {
  1347. return c.writeRequest(saslHandshake, version, id, &saslHandshakeRequestV0{Mechanism: mechanism})
  1348. },
  1349. func(deadline time.Time, size int) error {
  1350. return expectZeroSize(func() (int, error) {
  1351. return (&resp).readFrom(&c.rbuf, size)
  1352. }())
  1353. },
  1354. )
  1355. if err == nil && resp.ErrorCode != 0 {
  1356. err = Error(resp.ErrorCode)
  1357. }
  1358. return err
  1359. }
  1360. // saslAuthenticate sends the SASL authenticate message. This function must
  1361. // be immediately preceded by a successful saslHandshake.
  1362. //
  1363. // See http://kafka.apache.org/protocol.html#The_Messages_SaslAuthenticate
  1364. func (c *Conn) saslAuthenticate(data []byte) ([]byte, error) {
  1365. // if we sent a v1 handshake, then we must encapsulate the authentication
  1366. // request in a saslAuthenticateRequest. otherwise, we read and write raw
  1367. // bytes.
  1368. version, err := c.negotiateVersion(saslHandshake, v0, v1)
  1369. if err != nil {
  1370. return nil, err
  1371. }
  1372. if version == v1 {
  1373. var request = saslAuthenticateRequestV0{Data: data}
  1374. var response saslAuthenticateResponseV0
  1375. err := c.writeOperation(
  1376. func(deadline time.Time, id int32) error {
  1377. return c.writeRequest(saslAuthenticate, v0, id, request)
  1378. },
  1379. func(deadline time.Time, size int) error {
  1380. return expectZeroSize(func() (remain int, err error) {
  1381. return (&response).readFrom(&c.rbuf, size)
  1382. }())
  1383. },
  1384. )
  1385. if err == nil && response.ErrorCode != 0 {
  1386. err = Error(response.ErrorCode)
  1387. }
  1388. return response.Data, err
  1389. }
  1390. // fall back to opaque bytes on the wire. the broker is expecting these if
  1391. // it just processed a v0 sasl handshake.
  1392. c.wb.writeInt32(int32(len(data)))
  1393. if _, err := c.wb.Write(data); err != nil {
  1394. return nil, err
  1395. }
  1396. if err := c.wb.Flush(); err != nil {
  1397. return nil, err
  1398. }
  1399. var respLen int32
  1400. if _, err := readInt32(&c.rbuf, 4, &respLen); err != nil {
  1401. return nil, err
  1402. }
  1403. resp, _, err := readNewBytes(&c.rbuf, int(respLen), int(respLen))
  1404. return resp, err
  1405. }