controlbuf.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942
  1. /*
  2. *
  3. * Copyright 2014 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. package transport
  19. import (
  20. "bytes"
  21. "fmt"
  22. "runtime"
  23. "sync"
  24. "sync/atomic"
  25. "golang.org/x/net/http2"
  26. "golang.org/x/net/http2/hpack"
  27. )
  28. var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
  29. e.SetMaxDynamicTableSizeLimit(v)
  30. }
  31. type itemNode struct {
  32. it interface{}
  33. next *itemNode
  34. }
  35. type itemList struct {
  36. head *itemNode
  37. tail *itemNode
  38. }
  39. func (il *itemList) enqueue(i interface{}) {
  40. n := &itemNode{it: i}
  41. if il.tail == nil {
  42. il.head, il.tail = n, n
  43. return
  44. }
  45. il.tail.next = n
  46. il.tail = n
  47. }
  48. // peek returns the first item in the list without removing it from the
  49. // list.
  50. func (il *itemList) peek() interface{} {
  51. return il.head.it
  52. }
  53. func (il *itemList) dequeue() interface{} {
  54. if il.head == nil {
  55. return nil
  56. }
  57. i := il.head.it
  58. il.head = il.head.next
  59. if il.head == nil {
  60. il.tail = nil
  61. }
  62. return i
  63. }
  64. func (il *itemList) dequeueAll() *itemNode {
  65. h := il.head
  66. il.head, il.tail = nil, nil
  67. return h
  68. }
  69. func (il *itemList) isEmpty() bool {
  70. return il.head == nil
  71. }
  72. // The following defines various control items which could flow through
  73. // the control buffer of transport. They represent different aspects of
  74. // control tasks, e.g., flow control, settings, streaming resetting, etc.
  75. // maxQueuedTransportResponseFrames is the most queued "transport response"
  76. // frames we will buffer before preventing new reads from occurring on the
  77. // transport. These are control frames sent in response to client requests,
  78. // such as RST_STREAM due to bad headers or settings acks.
  79. const maxQueuedTransportResponseFrames = 50
  80. type cbItem interface {
  81. isTransportResponseFrame() bool
  82. }
  83. // registerStream is used to register an incoming stream with loopy writer.
  84. type registerStream struct {
  85. streamID uint32
  86. wq *writeQuota
  87. }
  88. func (*registerStream) isTransportResponseFrame() bool { return false }
  89. // headerFrame is also used to register stream on the client-side.
  90. type headerFrame struct {
  91. streamID uint32
  92. hf []hpack.HeaderField
  93. endStream bool // Valid on server side.
  94. initStream func(uint32) error // Used only on the client side.
  95. onWrite func()
  96. wq *writeQuota // write quota for the stream created.
  97. cleanup *cleanupStream // Valid on the server side.
  98. onOrphaned func(error) // Valid on client-side
  99. }
  100. func (h *headerFrame) isTransportResponseFrame() bool {
  101. return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
  102. }
  103. type cleanupStream struct {
  104. streamID uint32
  105. rst bool
  106. rstCode http2.ErrCode
  107. onWrite func()
  108. }
  109. func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
  110. type dataFrame struct {
  111. streamID uint32
  112. endStream bool
  113. h []byte
  114. d []byte
  115. // onEachWrite is called every time
  116. // a part of d is written out.
  117. onEachWrite func()
  118. }
  119. func (*dataFrame) isTransportResponseFrame() bool { return false }
  120. type incomingWindowUpdate struct {
  121. streamID uint32
  122. increment uint32
  123. }
  124. func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
  125. type outgoingWindowUpdate struct {
  126. streamID uint32
  127. increment uint32
  128. }
  129. func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
  130. return false // window updates are throttled by thresholds
  131. }
  132. type incomingSettings struct {
  133. ss []http2.Setting
  134. }
  135. func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
  136. type outgoingSettings struct {
  137. ss []http2.Setting
  138. }
  139. func (*outgoingSettings) isTransportResponseFrame() bool { return false }
  140. type incomingGoAway struct {
  141. }
  142. func (*incomingGoAway) isTransportResponseFrame() bool { return false }
  143. type goAway struct {
  144. code http2.ErrCode
  145. debugData []byte
  146. headsUp bool
  147. closeConn bool
  148. }
  149. func (*goAway) isTransportResponseFrame() bool { return false }
  150. type ping struct {
  151. ack bool
  152. data [8]byte
  153. }
  154. func (*ping) isTransportResponseFrame() bool { return true }
  155. type outFlowControlSizeRequest struct {
  156. resp chan uint32
  157. }
  158. func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
  159. type outStreamState int
  160. const (
  161. active outStreamState = iota
  162. empty
  163. waitingOnStreamQuota
  164. )
  165. type outStream struct {
  166. id uint32
  167. state outStreamState
  168. itl *itemList
  169. bytesOutStanding int
  170. wq *writeQuota
  171. next *outStream
  172. prev *outStream
  173. }
  174. func (s *outStream) deleteSelf() {
  175. if s.prev != nil {
  176. s.prev.next = s.next
  177. }
  178. if s.next != nil {
  179. s.next.prev = s.prev
  180. }
  181. s.next, s.prev = nil, nil
  182. }
  183. type outStreamList struct {
  184. // Following are sentinel objects that mark the
  185. // beginning and end of the list. They do not
  186. // contain any item lists. All valid objects are
  187. // inserted in between them.
  188. // This is needed so that an outStream object can
  189. // deleteSelf() in O(1) time without knowing which
  190. // list it belongs to.
  191. head *outStream
  192. tail *outStream
  193. }
  194. func newOutStreamList() *outStreamList {
  195. head, tail := new(outStream), new(outStream)
  196. head.next = tail
  197. tail.prev = head
  198. return &outStreamList{
  199. head: head,
  200. tail: tail,
  201. }
  202. }
  203. func (l *outStreamList) enqueue(s *outStream) {
  204. e := l.tail.prev
  205. e.next = s
  206. s.prev = e
  207. s.next = l.tail
  208. l.tail.prev = s
  209. }
  210. // remove from the beginning of the list.
  211. func (l *outStreamList) dequeue() *outStream {
  212. b := l.head.next
  213. if b == l.tail {
  214. return nil
  215. }
  216. b.deleteSelf()
  217. return b
  218. }
  219. // controlBuffer is a way to pass information to loopy.
  220. // Information is passed as specific struct types called control frames.
  221. // A control frame not only represents data, messages or headers to be sent out
  222. // but can also be used to instruct loopy to update its internal state.
  223. // It shouldn't be confused with an HTTP2 frame, although some of the control frames
  224. // like dataFrame and headerFrame do go out on wire as HTTP2 frames.
  225. type controlBuffer struct {
  226. ch chan struct{}
  227. done <-chan struct{}
  228. mu sync.Mutex
  229. consumerWaiting bool
  230. list *itemList
  231. err error
  232. // transportResponseFrames counts the number of queued items that represent
  233. // the response of an action initiated by the peer. trfChan is created
  234. // when transportResponseFrames >= maxQueuedTransportResponseFrames and is
  235. // closed and nilled when transportResponseFrames drops below the
  236. // threshold. Both fields are protected by mu.
  237. transportResponseFrames int
  238. trfChan atomic.Value // *chan struct{}
  239. }
  240. func newControlBuffer(done <-chan struct{}) *controlBuffer {
  241. return &controlBuffer{
  242. ch: make(chan struct{}, 1),
  243. list: &itemList{},
  244. done: done,
  245. }
  246. }
  247. // throttle blocks if there are too many incomingSettings/cleanupStreams in the
  248. // controlbuf.
  249. func (c *controlBuffer) throttle() {
  250. ch, _ := c.trfChan.Load().(*chan struct{})
  251. if ch != nil {
  252. select {
  253. case <-*ch:
  254. case <-c.done:
  255. }
  256. }
  257. }
  258. func (c *controlBuffer) put(it cbItem) error {
  259. _, err := c.executeAndPut(nil, it)
  260. return err
  261. }
  262. func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) {
  263. var wakeUp bool
  264. c.mu.Lock()
  265. if c.err != nil {
  266. c.mu.Unlock()
  267. return false, c.err
  268. }
  269. if f != nil {
  270. if !f(it) { // f wasn't successful
  271. c.mu.Unlock()
  272. return false, nil
  273. }
  274. }
  275. if c.consumerWaiting {
  276. wakeUp = true
  277. c.consumerWaiting = false
  278. }
  279. c.list.enqueue(it)
  280. if it.isTransportResponseFrame() {
  281. c.transportResponseFrames++
  282. if c.transportResponseFrames == maxQueuedTransportResponseFrames {
  283. // We are adding the frame that puts us over the threshold; create
  284. // a throttling channel.
  285. ch := make(chan struct{})
  286. c.trfChan.Store(&ch)
  287. }
  288. }
  289. c.mu.Unlock()
  290. if wakeUp {
  291. select {
  292. case c.ch <- struct{}{}:
  293. default:
  294. }
  295. }
  296. return true, nil
  297. }
  298. // Note argument f should never be nil.
  299. func (c *controlBuffer) execute(f func(it interface{}) bool, it interface{}) (bool, error) {
  300. c.mu.Lock()
  301. if c.err != nil {
  302. c.mu.Unlock()
  303. return false, c.err
  304. }
  305. if !f(it) { // f wasn't successful
  306. c.mu.Unlock()
  307. return false, nil
  308. }
  309. c.mu.Unlock()
  310. return true, nil
  311. }
  312. func (c *controlBuffer) get(block bool) (interface{}, error) {
  313. for {
  314. c.mu.Lock()
  315. if c.err != nil {
  316. c.mu.Unlock()
  317. return nil, c.err
  318. }
  319. if !c.list.isEmpty() {
  320. h := c.list.dequeue().(cbItem)
  321. if h.isTransportResponseFrame() {
  322. if c.transportResponseFrames == maxQueuedTransportResponseFrames {
  323. // We are removing the frame that put us over the
  324. // threshold; close and clear the throttling channel.
  325. ch := c.trfChan.Load().(*chan struct{})
  326. close(*ch)
  327. c.trfChan.Store((*chan struct{})(nil))
  328. }
  329. c.transportResponseFrames--
  330. }
  331. c.mu.Unlock()
  332. return h, nil
  333. }
  334. if !block {
  335. c.mu.Unlock()
  336. return nil, nil
  337. }
  338. c.consumerWaiting = true
  339. c.mu.Unlock()
  340. select {
  341. case <-c.ch:
  342. case <-c.done:
  343. c.finish()
  344. return nil, ErrConnClosing
  345. }
  346. }
  347. }
  348. func (c *controlBuffer) finish() {
  349. c.mu.Lock()
  350. if c.err != nil {
  351. c.mu.Unlock()
  352. return
  353. }
  354. c.err = ErrConnClosing
  355. // There may be headers for streams in the control buffer.
  356. // These streams need to be cleaned out since the transport
  357. // is still not aware of these yet.
  358. for head := c.list.dequeueAll(); head != nil; head = head.next {
  359. hdr, ok := head.it.(*headerFrame)
  360. if !ok {
  361. continue
  362. }
  363. if hdr.onOrphaned != nil { // It will be nil on the server-side.
  364. hdr.onOrphaned(ErrConnClosing)
  365. }
  366. }
  367. c.mu.Unlock()
  368. }
  369. type side int
  370. const (
  371. clientSide side = iota
  372. serverSide
  373. )
  374. // Loopy receives frames from the control buffer.
  375. // Each frame is handled individually; most of the work done by loopy goes
  376. // into handling data frames. Loopy maintains a queue of active streams, and each
  377. // stream maintains a queue of data frames; as loopy receives data frames
  378. // it gets added to the queue of the relevant stream.
  379. // Loopy goes over this list of active streams by processing one node every iteration,
  380. // thereby closely resemebling to a round-robin scheduling over all streams. While
  381. // processing a stream, loopy writes out data bytes from this stream capped by the min
  382. // of http2MaxFrameLen, connection-level flow control and stream-level flow control.
  383. type loopyWriter struct {
  384. side side
  385. cbuf *controlBuffer
  386. sendQuota uint32
  387. oiws uint32 // outbound initial window size.
  388. // estdStreams is map of all established streams that are not cleaned-up yet.
  389. // On client-side, this is all streams whose headers were sent out.
  390. // On server-side, this is all streams whose headers were received.
  391. estdStreams map[uint32]*outStream // Established streams.
  392. // activeStreams is a linked-list of all streams that have data to send and some
  393. // stream-level flow control quota.
  394. // Each of these streams internally have a list of data items(and perhaps trailers
  395. // on the server-side) to be sent out.
  396. activeStreams *outStreamList
  397. framer *framer
  398. hBuf *bytes.Buffer // The buffer for HPACK encoding.
  399. hEnc *hpack.Encoder // HPACK encoder.
  400. bdpEst *bdpEstimator
  401. draining bool
  402. // Side-specific handlers
  403. ssGoAwayHandler func(*goAway) (bool, error)
  404. }
  405. func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter {
  406. var buf bytes.Buffer
  407. l := &loopyWriter{
  408. side: s,
  409. cbuf: cbuf,
  410. sendQuota: defaultWindowSize,
  411. oiws: defaultWindowSize,
  412. estdStreams: make(map[uint32]*outStream),
  413. activeStreams: newOutStreamList(),
  414. framer: fr,
  415. hBuf: &buf,
  416. hEnc: hpack.NewEncoder(&buf),
  417. bdpEst: bdpEst,
  418. }
  419. return l
  420. }
  421. const minBatchSize = 1000
  422. // run should be run in a separate goroutine.
  423. // It reads control frames from controlBuf and processes them by:
  424. // 1. Updating loopy's internal state, or/and
  425. // 2. Writing out HTTP2 frames on the wire.
  426. //
  427. // Loopy keeps all active streams with data to send in a linked-list.
  428. // All streams in the activeStreams linked-list must have both:
  429. // 1. Data to send, and
  430. // 2. Stream level flow control quota available.
  431. //
  432. // In each iteration of run loop, other than processing the incoming control
  433. // frame, loopy calls processData, which processes one node from the activeStreams linked-list.
  434. // This results in writing of HTTP2 frames into an underlying write buffer.
  435. // When there's no more control frames to read from controlBuf, loopy flushes the write buffer.
  436. // As an optimization, to increase the batch size for each flush, loopy yields the processor, once
  437. // if the batch size is too low to give stream goroutines a chance to fill it up.
  438. func (l *loopyWriter) run() (err error) {
  439. defer func() {
  440. if err == ErrConnClosing {
  441. // Don't log ErrConnClosing as error since it happens
  442. // 1. When the connection is closed by some other known issue.
  443. // 2. User closed the connection.
  444. // 3. A graceful close of connection.
  445. if logger.V(logLevel) {
  446. logger.Infof("transport: loopyWriter.run returning. %v", err)
  447. }
  448. err = nil
  449. }
  450. }()
  451. for {
  452. it, err := l.cbuf.get(true)
  453. if err != nil {
  454. return err
  455. }
  456. if err = l.handle(it); err != nil {
  457. return err
  458. }
  459. if _, err = l.processData(); err != nil {
  460. return err
  461. }
  462. gosched := true
  463. hasdata:
  464. for {
  465. it, err := l.cbuf.get(false)
  466. if err != nil {
  467. return err
  468. }
  469. if it != nil {
  470. if err = l.handle(it); err != nil {
  471. return err
  472. }
  473. if _, err = l.processData(); err != nil {
  474. return err
  475. }
  476. continue hasdata
  477. }
  478. isEmpty, err := l.processData()
  479. if err != nil {
  480. return err
  481. }
  482. if !isEmpty {
  483. continue hasdata
  484. }
  485. if gosched {
  486. gosched = false
  487. if l.framer.writer.offset < minBatchSize {
  488. runtime.Gosched()
  489. continue hasdata
  490. }
  491. }
  492. l.framer.writer.Flush()
  493. break hasdata
  494. }
  495. }
  496. }
  497. func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error {
  498. return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
  499. }
  500. func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error {
  501. // Otherwise update the quota.
  502. if w.streamID == 0 {
  503. l.sendQuota += w.increment
  504. return nil
  505. }
  506. // Find the stream and update it.
  507. if str, ok := l.estdStreams[w.streamID]; ok {
  508. str.bytesOutStanding -= int(w.increment)
  509. if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
  510. str.state = active
  511. l.activeStreams.enqueue(str)
  512. return nil
  513. }
  514. }
  515. return nil
  516. }
  517. func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
  518. return l.framer.fr.WriteSettings(s.ss...)
  519. }
  520. func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
  521. if err := l.applySettings(s.ss); err != nil {
  522. return err
  523. }
  524. return l.framer.fr.WriteSettingsAck()
  525. }
  526. func (l *loopyWriter) registerStreamHandler(h *registerStream) error {
  527. str := &outStream{
  528. id: h.streamID,
  529. state: empty,
  530. itl: &itemList{},
  531. wq: h.wq,
  532. }
  533. l.estdStreams[h.streamID] = str
  534. return nil
  535. }
  536. func (l *loopyWriter) headerHandler(h *headerFrame) error {
  537. if l.side == serverSide {
  538. str, ok := l.estdStreams[h.streamID]
  539. if !ok {
  540. if logger.V(logLevel) {
  541. logger.Warningf("transport: loopy doesn't recognize the stream: %d", h.streamID)
  542. }
  543. return nil
  544. }
  545. // Case 1.A: Server is responding back with headers.
  546. if !h.endStream {
  547. return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
  548. }
  549. // else: Case 1.B: Server wants to close stream.
  550. if str.state != empty { // either active or waiting on stream quota.
  551. // add it str's list of items.
  552. str.itl.enqueue(h)
  553. return nil
  554. }
  555. if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
  556. return err
  557. }
  558. return l.cleanupStreamHandler(h.cleanup)
  559. }
  560. // Case 2: Client wants to originate stream.
  561. str := &outStream{
  562. id: h.streamID,
  563. state: empty,
  564. itl: &itemList{},
  565. wq: h.wq,
  566. }
  567. str.itl.enqueue(h)
  568. return l.originateStream(str)
  569. }
  570. func (l *loopyWriter) originateStream(str *outStream) error {
  571. hdr := str.itl.dequeue().(*headerFrame)
  572. if err := hdr.initStream(str.id); err != nil {
  573. if err == ErrConnClosing {
  574. return err
  575. }
  576. // Other errors(errStreamDrain) need not close transport.
  577. return nil
  578. }
  579. if err := l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
  580. return err
  581. }
  582. l.estdStreams[str.id] = str
  583. return nil
  584. }
  585. func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error {
  586. if onWrite != nil {
  587. onWrite()
  588. }
  589. l.hBuf.Reset()
  590. for _, f := range hf {
  591. if err := l.hEnc.WriteField(f); err != nil {
  592. if logger.V(logLevel) {
  593. logger.Warningf("transport: loopyWriter.writeHeader encountered error while encoding headers: %v", err)
  594. }
  595. }
  596. }
  597. var (
  598. err error
  599. endHeaders, first bool
  600. )
  601. first = true
  602. for !endHeaders {
  603. size := l.hBuf.Len()
  604. if size > http2MaxFrameLen {
  605. size = http2MaxFrameLen
  606. } else {
  607. endHeaders = true
  608. }
  609. if first {
  610. first = false
  611. err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{
  612. StreamID: streamID,
  613. BlockFragment: l.hBuf.Next(size),
  614. EndStream: endStream,
  615. EndHeaders: endHeaders,
  616. })
  617. } else {
  618. err = l.framer.fr.WriteContinuation(
  619. streamID,
  620. endHeaders,
  621. l.hBuf.Next(size),
  622. )
  623. }
  624. if err != nil {
  625. return err
  626. }
  627. }
  628. return nil
  629. }
  630. func (l *loopyWriter) preprocessData(df *dataFrame) error {
  631. str, ok := l.estdStreams[df.streamID]
  632. if !ok {
  633. return nil
  634. }
  635. // If we got data for a stream it means that
  636. // stream was originated and the headers were sent out.
  637. str.itl.enqueue(df)
  638. if str.state == empty {
  639. str.state = active
  640. l.activeStreams.enqueue(str)
  641. }
  642. return nil
  643. }
  644. func (l *loopyWriter) pingHandler(p *ping) error {
  645. if !p.ack {
  646. l.bdpEst.timesnap(p.data)
  647. }
  648. return l.framer.fr.WritePing(p.ack, p.data)
  649. }
  650. func (l *loopyWriter) outFlowControlSizeRequestHandler(o *outFlowControlSizeRequest) error {
  651. o.resp <- l.sendQuota
  652. return nil
  653. }
  654. func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
  655. c.onWrite()
  656. if str, ok := l.estdStreams[c.streamID]; ok {
  657. // On the server side it could be a trailers-only response or
  658. // a RST_STREAM before stream initialization thus the stream might
  659. // not be established yet.
  660. delete(l.estdStreams, c.streamID)
  661. str.deleteSelf()
  662. }
  663. if c.rst { // If RST_STREAM needs to be sent.
  664. if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
  665. return err
  666. }
  667. }
  668. if l.side == clientSide && l.draining && len(l.estdStreams) == 0 {
  669. return ErrConnClosing
  670. }
  671. return nil
  672. }
  673. func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
  674. if l.side == clientSide {
  675. l.draining = true
  676. if len(l.estdStreams) == 0 {
  677. return ErrConnClosing
  678. }
  679. }
  680. return nil
  681. }
  682. func (l *loopyWriter) goAwayHandler(g *goAway) error {
  683. // Handling of outgoing GoAway is very specific to side.
  684. if l.ssGoAwayHandler != nil {
  685. draining, err := l.ssGoAwayHandler(g)
  686. if err != nil {
  687. return err
  688. }
  689. l.draining = draining
  690. }
  691. return nil
  692. }
  693. func (l *loopyWriter) handle(i interface{}) error {
  694. switch i := i.(type) {
  695. case *incomingWindowUpdate:
  696. return l.incomingWindowUpdateHandler(i)
  697. case *outgoingWindowUpdate:
  698. return l.outgoingWindowUpdateHandler(i)
  699. case *incomingSettings:
  700. return l.incomingSettingsHandler(i)
  701. case *outgoingSettings:
  702. return l.outgoingSettingsHandler(i)
  703. case *headerFrame:
  704. return l.headerHandler(i)
  705. case *registerStream:
  706. return l.registerStreamHandler(i)
  707. case *cleanupStream:
  708. return l.cleanupStreamHandler(i)
  709. case *incomingGoAway:
  710. return l.incomingGoAwayHandler(i)
  711. case *dataFrame:
  712. return l.preprocessData(i)
  713. case *ping:
  714. return l.pingHandler(i)
  715. case *goAway:
  716. return l.goAwayHandler(i)
  717. case *outFlowControlSizeRequest:
  718. return l.outFlowControlSizeRequestHandler(i)
  719. default:
  720. return fmt.Errorf("transport: unknown control message type %T", i)
  721. }
  722. }
  723. func (l *loopyWriter) applySettings(ss []http2.Setting) error {
  724. for _, s := range ss {
  725. switch s.ID {
  726. case http2.SettingInitialWindowSize:
  727. o := l.oiws
  728. l.oiws = s.Val
  729. if o < l.oiws {
  730. // If the new limit is greater make all depleted streams active.
  731. for _, stream := range l.estdStreams {
  732. if stream.state == waitingOnStreamQuota {
  733. stream.state = active
  734. l.activeStreams.enqueue(stream)
  735. }
  736. }
  737. }
  738. case http2.SettingHeaderTableSize:
  739. updateHeaderTblSize(l.hEnc, s.Val)
  740. }
  741. }
  742. return nil
  743. }
  744. // processData removes the first stream from active streams, writes out at most 16KB
  745. // of its data and then puts it at the end of activeStreams if there's still more data
  746. // to be sent and stream has some stream-level flow control.
  747. func (l *loopyWriter) processData() (bool, error) {
  748. if l.sendQuota == 0 {
  749. return true, nil
  750. }
  751. str := l.activeStreams.dequeue() // Remove the first stream.
  752. if str == nil {
  753. return true, nil
  754. }
  755. dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
  756. // A data item is represented by a dataFrame, since it later translates into
  757. // multiple HTTP2 data frames.
  758. // Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data.
  759. // As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
  760. // maximum possilbe HTTP2 frame size.
  761. if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
  762. // Client sends out empty data frame with endStream = true
  763. if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
  764. return false, err
  765. }
  766. str.itl.dequeue() // remove the empty data item from stream
  767. if str.itl.isEmpty() {
  768. str.state = empty
  769. } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
  770. if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
  771. return false, err
  772. }
  773. if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
  774. return false, nil
  775. }
  776. } else {
  777. l.activeStreams.enqueue(str)
  778. }
  779. return false, nil
  780. }
  781. var (
  782. buf []byte
  783. )
  784. // Figure out the maximum size we can send
  785. maxSize := http2MaxFrameLen
  786. if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
  787. str.state = waitingOnStreamQuota
  788. return false, nil
  789. } else if maxSize > strQuota {
  790. maxSize = strQuota
  791. }
  792. if maxSize > int(l.sendQuota) { // connection-level flow control.
  793. maxSize = int(l.sendQuota)
  794. }
  795. // Compute how much of the header and data we can send within quota and max frame length
  796. hSize := min(maxSize, len(dataItem.h))
  797. dSize := min(maxSize-hSize, len(dataItem.d))
  798. if hSize != 0 {
  799. if dSize == 0 {
  800. buf = dataItem.h
  801. } else {
  802. // We can add some data to grpc message header to distribute bytes more equally across frames.
  803. // Copy on the stack to avoid generating garbage
  804. var localBuf [http2MaxFrameLen]byte
  805. copy(localBuf[:hSize], dataItem.h)
  806. copy(localBuf[hSize:], dataItem.d[:dSize])
  807. buf = localBuf[:hSize+dSize]
  808. }
  809. } else {
  810. buf = dataItem.d
  811. }
  812. size := hSize + dSize
  813. // Now that outgoing flow controls are checked we can replenish str's write quota
  814. str.wq.replenish(size)
  815. var endStream bool
  816. // If this is the last data message on this stream and all of it can be written in this iteration.
  817. if dataItem.endStream && len(dataItem.h)+len(dataItem.d) <= size {
  818. endStream = true
  819. }
  820. if dataItem.onEachWrite != nil {
  821. dataItem.onEachWrite()
  822. }
  823. if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
  824. return false, err
  825. }
  826. str.bytesOutStanding += size
  827. l.sendQuota -= uint32(size)
  828. dataItem.h = dataItem.h[hSize:]
  829. dataItem.d = dataItem.d[dSize:]
  830. if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
  831. str.itl.dequeue()
  832. }
  833. if str.itl.isEmpty() {
  834. str.state = empty
  835. } else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers.
  836. if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
  837. return false, err
  838. }
  839. if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
  840. return false, err
  841. }
  842. } else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
  843. str.state = waitingOnStreamQuota
  844. } else { // Otherwise add it back to the list of active streams.
  845. l.activeStreams.enqueue(str)
  846. }
  847. return false, nil
  848. }
  849. func min(a, b int) int {
  850. if a < b {
  851. return a
  852. }
  853. return b
  854. }