deletetopics.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package kafka
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "net"
  7. "time"
  8. "github.com/segmentio/kafka-go/protocol/deletetopics"
  9. )
  10. // DeleteTopicsRequest represents a request sent to a kafka broker to delete
  11. // topics.
  12. type DeleteTopicsRequest struct {
  13. // Address of the kafka broker to send the request to.
  14. Addr net.Addr
  15. // Names of topics to delete.
  16. Topics []string
  17. }
  18. // DeleteTopicsResponse represents a response from a kafka broker to a topic
  19. // deletion request.
  20. type DeleteTopicsResponse struct {
  21. // The amount of time that the broker throttled the request.
  22. //
  23. // This field will be zero if the kafka broker did no support the
  24. // DeleteTopics API in version 1 or above.
  25. Throttle time.Duration
  26. // Mapping of topic names to errors that occurred while attempting to delete
  27. // the topics.
  28. //
  29. // The errors contain the kafka error code. Programs may use the standard
  30. // errors.Is function to test the error against kafka error codes.
  31. Errors map[string]error
  32. }
  33. // DeleteTopics sends a topic deletion request to a kafka broker and returns the
  34. // response.
  35. func (c *Client) DeleteTopics(ctx context.Context, req *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
  36. m, err := c.roundTrip(ctx, req.Addr, &deletetopics.Request{
  37. TopicNames: req.Topics,
  38. TimeoutMs: c.timeoutMs(ctx, defaultDeleteTopicsTimeout),
  39. })
  40. if err != nil {
  41. return nil, fmt.Errorf("kafka.(*Client).DeleteTopics: %w", err)
  42. }
  43. res := m.(*deletetopics.Response)
  44. ret := &DeleteTopicsResponse{
  45. Throttle: makeDuration(res.ThrottleTimeMs),
  46. Errors: make(map[string]error, len(res.Responses)),
  47. }
  48. for _, t := range res.Responses {
  49. if t.ErrorCode == 0 {
  50. ret.Errors[t.Name] = nil
  51. } else {
  52. ret.Errors[t.Name] = Error(t.ErrorCode)
  53. }
  54. }
  55. return ret, nil
  56. }
  57. // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics
  58. type deleteTopicsRequestV0 struct {
  59. // Topics holds the topic names
  60. Topics []string
  61. // Timeout holds the time in ms to wait for a topic to be completely deleted
  62. // on the controller node. Values <= 0 will trigger topic deletion and return
  63. // immediately.
  64. Timeout int32
  65. }
  66. func (t deleteTopicsRequestV0) size() int32 {
  67. return sizeofStringArray(t.Topics) +
  68. sizeofInt32(t.Timeout)
  69. }
  70. func (t deleteTopicsRequestV0) writeTo(wb *writeBuffer) {
  71. wb.writeStringArray(t.Topics)
  72. wb.writeInt32(t.Timeout)
  73. }
  74. type deleteTopicsResponseV0 struct {
  75. // TopicErrorCodes holds per topic error codes
  76. TopicErrorCodes []deleteTopicsResponseV0TopicErrorCode
  77. }
  78. func (t deleteTopicsResponseV0) size() int32 {
  79. return sizeofArray(len(t.TopicErrorCodes), func(i int) int32 { return t.TopicErrorCodes[i].size() })
  80. }
  81. func (t *deleteTopicsResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  82. fn := func(withReader *bufio.Reader, withSize int) (fnRemain int, fnErr error) {
  83. var item deleteTopicsResponseV0TopicErrorCode
  84. if fnRemain, fnErr = (&item).readFrom(withReader, withSize); err != nil {
  85. return
  86. }
  87. t.TopicErrorCodes = append(t.TopicErrorCodes, item)
  88. return
  89. }
  90. if remain, err = readArrayWith(r, size, fn); err != nil {
  91. return
  92. }
  93. return
  94. }
  95. func (t deleteTopicsResponseV0) writeTo(wb *writeBuffer) {
  96. wb.writeArray(len(t.TopicErrorCodes), func(i int) { t.TopicErrorCodes[i].writeTo(wb) })
  97. }
  98. type deleteTopicsResponseV0TopicErrorCode struct {
  99. // Topic holds the topic name
  100. Topic string
  101. // ErrorCode holds the error code
  102. ErrorCode int16
  103. }
  104. func (t deleteTopicsResponseV0TopicErrorCode) size() int32 {
  105. return sizeofString(t.Topic) +
  106. sizeofInt16(t.ErrorCode)
  107. }
  108. func (t *deleteTopicsResponseV0TopicErrorCode) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  109. if remain, err = readString(r, size, &t.Topic); err != nil {
  110. return
  111. }
  112. if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil {
  113. return
  114. }
  115. return
  116. }
  117. func (t deleteTopicsResponseV0TopicErrorCode) writeTo(wb *writeBuffer) {
  118. wb.writeString(t.Topic)
  119. wb.writeInt16(t.ErrorCode)
  120. }
  121. // deleteTopics deletes the specified topics.
  122. //
  123. // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics
  124. func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponseV0, error) {
  125. var response deleteTopicsResponseV0
  126. err := c.writeOperation(
  127. func(deadline time.Time, id int32) error {
  128. if request.Timeout == 0 {
  129. now := time.Now()
  130. deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
  131. request.Timeout = milliseconds(deadlineToTimeout(deadline, now))
  132. }
  133. return c.writeRequest(deleteTopics, v0, id, request)
  134. },
  135. func(deadline time.Time, size int) error {
  136. return expectZeroSize(func() (remain int, err error) {
  137. return (&response).readFrom(&c.rbuf, size)
  138. }())
  139. },
  140. )
  141. if err != nil {
  142. return deleteTopicsResponseV0{}, err
  143. }
  144. for _, c := range response.TopicErrorCodes {
  145. if c.ErrorCode != 0 {
  146. return response, Error(c.ErrorCode)
  147. }
  148. }
  149. return response, nil
  150. }