123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- package kafka
- import (
- "bufio"
- "context"
- "fmt"
- "net"
- "time"
- "github.com/segmentio/kafka-go/protocol/deletetopics"
- )
- // DeleteTopicsRequest represents a request sent to a kafka broker to delete
- // topics.
- type DeleteTopicsRequest struct {
- // Address of the kafka broker to send the request to.
- Addr net.Addr
- // Names of topics to delete.
- Topics []string
- }
- // DeleteTopicsResponse represents a response from a kafka broker to a topic
- // deletion request.
- type DeleteTopicsResponse struct {
- // The amount of time that the broker throttled the request.
- //
- // This field will be zero if the kafka broker did no support the
- // DeleteTopics API in version 1 or above.
- Throttle time.Duration
- // Mapping of topic names to errors that occurred while attempting to delete
- // the topics.
- //
- // The errors contain the kafka error code. Programs may use the standard
- // errors.Is function to test the error against kafka error codes.
- Errors map[string]error
- }
- // DeleteTopics sends a topic deletion request to a kafka broker and returns the
- // response.
- func (c *Client) DeleteTopics(ctx context.Context, req *DeleteTopicsRequest) (*DeleteTopicsResponse, error) {
- m, err := c.roundTrip(ctx, req.Addr, &deletetopics.Request{
- TopicNames: req.Topics,
- TimeoutMs: c.timeoutMs(ctx, defaultDeleteTopicsTimeout),
- })
- if err != nil {
- return nil, fmt.Errorf("kafka.(*Client).DeleteTopics: %w", err)
- }
- res := m.(*deletetopics.Response)
- ret := &DeleteTopicsResponse{
- Throttle: makeDuration(res.ThrottleTimeMs),
- Errors: make(map[string]error, len(res.Responses)),
- }
- for _, t := range res.Responses {
- if t.ErrorCode == 0 {
- ret.Errors[t.Name] = nil
- } else {
- ret.Errors[t.Name] = Error(t.ErrorCode)
- }
- }
- return ret, nil
- }
- // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics
- type deleteTopicsRequestV0 struct {
- // Topics holds the topic names
- Topics []string
- // Timeout holds the time in ms to wait for a topic to be completely deleted
- // on the controller node. Values <= 0 will trigger topic deletion and return
- // immediately.
- Timeout int32
- }
- func (t deleteTopicsRequestV0) size() int32 {
- return sizeofStringArray(t.Topics) +
- sizeofInt32(t.Timeout)
- }
- func (t deleteTopicsRequestV0) writeTo(wb *writeBuffer) {
- wb.writeStringArray(t.Topics)
- wb.writeInt32(t.Timeout)
- }
- type deleteTopicsResponseV0 struct {
- // TopicErrorCodes holds per topic error codes
- TopicErrorCodes []deleteTopicsResponseV0TopicErrorCode
- }
- func (t deleteTopicsResponseV0) size() int32 {
- return sizeofArray(len(t.TopicErrorCodes), func(i int) int32 { return t.TopicErrorCodes[i].size() })
- }
- func (t *deleteTopicsResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
- fn := func(withReader *bufio.Reader, withSize int) (fnRemain int, fnErr error) {
- var item deleteTopicsResponseV0TopicErrorCode
- if fnRemain, fnErr = (&item).readFrom(withReader, withSize); err != nil {
- return
- }
- t.TopicErrorCodes = append(t.TopicErrorCodes, item)
- return
- }
- if remain, err = readArrayWith(r, size, fn); err != nil {
- return
- }
- return
- }
- func (t deleteTopicsResponseV0) writeTo(wb *writeBuffer) {
- wb.writeArray(len(t.TopicErrorCodes), func(i int) { t.TopicErrorCodes[i].writeTo(wb) })
- }
- type deleteTopicsResponseV0TopicErrorCode struct {
- // Topic holds the topic name
- Topic string
- // ErrorCode holds the error code
- ErrorCode int16
- }
- func (t deleteTopicsResponseV0TopicErrorCode) size() int32 {
- return sizeofString(t.Topic) +
- sizeofInt16(t.ErrorCode)
- }
- func (t *deleteTopicsResponseV0TopicErrorCode) readFrom(r *bufio.Reader, size int) (remain int, err error) {
- if remain, err = readString(r, size, &t.Topic); err != nil {
- return
- }
- if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil {
- return
- }
- return
- }
- func (t deleteTopicsResponseV0TopicErrorCode) writeTo(wb *writeBuffer) {
- wb.writeString(t.Topic)
- wb.writeInt16(t.ErrorCode)
- }
- // deleteTopics deletes the specified topics.
- //
- // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics
- func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponseV0, error) {
- var response deleteTopicsResponseV0
- err := c.writeOperation(
- func(deadline time.Time, id int32) error {
- if request.Timeout == 0 {
- now := time.Now()
- deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
- request.Timeout = milliseconds(deadlineToTimeout(deadline, now))
- }
- return c.writeRequest(deleteTopics, v0, id, request)
- },
- func(deadline time.Time, size int) error {
- return expectZeroSize(func() (remain int, err error) {
- return (&response).readFrom(&c.rbuf, size)
- }())
- },
- )
- if err != nil {
- return deleteTopicsResponseV0{}, err
- }
- for _, c := range response.TopicErrorCodes {
- if c.ErrorCode != 0 {
- return response, Error(c.ErrorCode)
- }
- }
- return response, nil
- }
|