createtopics.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. package kafka
  2. import (
  3. "bufio"
  4. "context"
  5. "fmt"
  6. "net"
  7. "time"
  8. "github.com/segmentio/kafka-go/protocol/createtopics"
  9. )
  10. // CreateTopicRequests represents a request sent to a kafka broker to create
  11. // new topics.
  12. type CreateTopicsRequest struct {
  13. // Address of the kafka broker to send the request to.
  14. Addr net.Addr
  15. // List of topics to create and their configuration.
  16. Topics []TopicConfig
  17. // When set to true, topics are not created but the configuration is
  18. // validated as if they were.
  19. //
  20. // This field will be ignored if the kafka broker did no support the
  21. // CreateTopics API in version 1 or above.
  22. ValidateOnly bool
  23. }
  24. // CreateTopicResponse represents a response from a kafka broker to a topic
  25. // creation request.
  26. type CreateTopicsResponse struct {
  27. // The amount of time that the broker throttled the request.
  28. //
  29. // This field will be zero if the kafka broker did no support the
  30. // CreateTopics API in version 2 or above.
  31. Throttle time.Duration
  32. // Mapping of topic names to errors that occurred while attempting to create
  33. // the topics.
  34. //
  35. // The errors contain the kafka error code. Programs may use the standard
  36. // errors.Is function to test the error against kafka error codes.
  37. Errors map[string]error
  38. }
  39. // CreateTopics sends a topic creation request to a kafka broker and returns the
  40. // response.
  41. func (c *Client) CreateTopics(ctx context.Context, req *CreateTopicsRequest) (*CreateTopicsResponse, error) {
  42. topics := make([]createtopics.RequestTopic, len(req.Topics))
  43. for i, t := range req.Topics {
  44. topics[i] = createtopics.RequestTopic{
  45. Name: t.Topic,
  46. NumPartitions: int32(t.NumPartitions),
  47. ReplicationFactor: int16(t.ReplicationFactor),
  48. Assignments: t.assignments(),
  49. Configs: t.configs(),
  50. }
  51. }
  52. m, err := c.roundTrip(ctx, req.Addr, &createtopics.Request{
  53. Topics: topics,
  54. TimeoutMs: c.timeoutMs(ctx, defaultCreateTopicsTimeout),
  55. ValidateOnly: req.ValidateOnly,
  56. })
  57. if err != nil {
  58. return nil, fmt.Errorf("kafka.(*Client).CreateTopics: %w", err)
  59. }
  60. res := m.(*createtopics.Response)
  61. ret := &CreateTopicsResponse{
  62. Throttle: makeDuration(res.ThrottleTimeMs),
  63. Errors: make(map[string]error, len(res.Topics)),
  64. }
  65. for _, t := range res.Topics {
  66. ret.Errors[t.Name] = makeError(t.ErrorCode, t.ErrorMessage)
  67. }
  68. return ret, nil
  69. }
  70. type ConfigEntry struct {
  71. ConfigName string
  72. ConfigValue string
  73. }
  74. func (c ConfigEntry) toCreateTopicsRequestV0ConfigEntry() createTopicsRequestV0ConfigEntry {
  75. return createTopicsRequestV0ConfigEntry{
  76. ConfigName: c.ConfigName,
  77. ConfigValue: c.ConfigValue,
  78. }
  79. }
  80. type createTopicsRequestV0ConfigEntry struct {
  81. ConfigName string
  82. ConfigValue string
  83. }
  84. func (t createTopicsRequestV0ConfigEntry) size() int32 {
  85. return sizeofString(t.ConfigName) +
  86. sizeofString(t.ConfigValue)
  87. }
  88. func (t createTopicsRequestV0ConfigEntry) writeTo(wb *writeBuffer) {
  89. wb.writeString(t.ConfigName)
  90. wb.writeString(t.ConfigValue)
  91. }
  92. type ReplicaAssignment struct {
  93. Partition int
  94. // The list of brokers where the partition should be allocated. There must
  95. // be as many entries in thie list as there are replicas of the partition.
  96. // The first entry represents the broker that will be the preferred leader
  97. // for the partition.
  98. //
  99. // This field changed in 0.4 from `int` to `[]int`. It was invalid to pass
  100. // a single integer as this is supposed to be a list. While this introduces
  101. // a breaking change, it probably never worked before.
  102. Replicas []int
  103. }
  104. func (a *ReplicaAssignment) partitionIndex() int32 {
  105. return int32(a.Partition)
  106. }
  107. func (a *ReplicaAssignment) brokerIDs() []int32 {
  108. if len(a.Replicas) == 0 {
  109. return nil
  110. }
  111. replicas := make([]int32, len(a.Replicas))
  112. for i, r := range a.Replicas {
  113. replicas[i] = int32(r)
  114. }
  115. return replicas
  116. }
  117. func (a ReplicaAssignment) toCreateTopicsRequestV0ReplicaAssignment() createTopicsRequestV0ReplicaAssignment {
  118. return createTopicsRequestV0ReplicaAssignment{
  119. Partition: int32(a.Partition),
  120. Replicas: a.brokerIDs(),
  121. }
  122. }
  123. type createTopicsRequestV0ReplicaAssignment struct {
  124. Partition int32
  125. Replicas []int32
  126. }
  127. func (t createTopicsRequestV0ReplicaAssignment) size() int32 {
  128. return sizeofInt32(t.Partition) +
  129. (int32(len(t.Replicas)+1) * sizeofInt32(0)) // N+1 because the array length is a int32
  130. }
  131. func (t createTopicsRequestV0ReplicaAssignment) writeTo(wb *writeBuffer) {
  132. wb.writeInt32(t.Partition)
  133. wb.writeInt32(int32(len(t.Replicas)))
  134. for _, r := range t.Replicas {
  135. wb.writeInt32(int32(r))
  136. }
  137. }
  138. type TopicConfig struct {
  139. // Topic name
  140. Topic string
  141. // NumPartitions created. -1 indicates unset.
  142. NumPartitions int
  143. // ReplicationFactor for the topic. -1 indicates unset.
  144. ReplicationFactor int
  145. // ReplicaAssignments among kafka brokers for this topic partitions. If this
  146. // is set num_partitions and replication_factor must be unset.
  147. ReplicaAssignments []ReplicaAssignment
  148. // ConfigEntries holds topic level configuration for topic to be set.
  149. ConfigEntries []ConfigEntry
  150. }
  151. func (t *TopicConfig) assignments() []createtopics.RequestAssignment {
  152. if len(t.ReplicaAssignments) == 0 {
  153. return nil
  154. }
  155. assignments := make([]createtopics.RequestAssignment, len(t.ReplicaAssignments))
  156. for i, a := range t.ReplicaAssignments {
  157. assignments[i] = createtopics.RequestAssignment{
  158. PartitionIndex: a.partitionIndex(),
  159. BrokerIDs: a.brokerIDs(),
  160. }
  161. }
  162. return assignments
  163. }
  164. func (t *TopicConfig) configs() []createtopics.RequestConfig {
  165. if len(t.ConfigEntries) == 0 {
  166. return nil
  167. }
  168. configs := make([]createtopics.RequestConfig, len(t.ConfigEntries))
  169. for i, c := range t.ConfigEntries {
  170. configs[i] = createtopics.RequestConfig{
  171. Name: c.ConfigName,
  172. Value: c.ConfigValue,
  173. }
  174. }
  175. return configs
  176. }
  177. func (t TopicConfig) toCreateTopicsRequestV0Topic() createTopicsRequestV0Topic {
  178. var requestV0ReplicaAssignments []createTopicsRequestV0ReplicaAssignment
  179. for _, a := range t.ReplicaAssignments {
  180. requestV0ReplicaAssignments = append(
  181. requestV0ReplicaAssignments,
  182. a.toCreateTopicsRequestV0ReplicaAssignment())
  183. }
  184. var requestV0ConfigEntries []createTopicsRequestV0ConfigEntry
  185. for _, c := range t.ConfigEntries {
  186. requestV0ConfigEntries = append(
  187. requestV0ConfigEntries,
  188. c.toCreateTopicsRequestV0ConfigEntry())
  189. }
  190. return createTopicsRequestV0Topic{
  191. Topic: t.Topic,
  192. NumPartitions: int32(t.NumPartitions),
  193. ReplicationFactor: int16(t.ReplicationFactor),
  194. ReplicaAssignments: requestV0ReplicaAssignments,
  195. ConfigEntries: requestV0ConfigEntries,
  196. }
  197. }
  198. type createTopicsRequestV0Topic struct {
  199. // Topic name
  200. Topic string
  201. // NumPartitions created. -1 indicates unset.
  202. NumPartitions int32
  203. // ReplicationFactor for the topic. -1 indicates unset.
  204. ReplicationFactor int16
  205. // ReplicaAssignments among kafka brokers for this topic partitions. If this
  206. // is set num_partitions and replication_factor must be unset.
  207. ReplicaAssignments []createTopicsRequestV0ReplicaAssignment
  208. // ConfigEntries holds topic level configuration for topic to be set.
  209. ConfigEntries []createTopicsRequestV0ConfigEntry
  210. }
  211. func (t createTopicsRequestV0Topic) size() int32 {
  212. return sizeofString(t.Topic) +
  213. sizeofInt32(t.NumPartitions) +
  214. sizeofInt16(t.ReplicationFactor) +
  215. sizeofArray(len(t.ReplicaAssignments), func(i int) int32 { return t.ReplicaAssignments[i].size() }) +
  216. sizeofArray(len(t.ConfigEntries), func(i int) int32 { return t.ConfigEntries[i].size() })
  217. }
  218. func (t createTopicsRequestV0Topic) writeTo(wb *writeBuffer) {
  219. wb.writeString(t.Topic)
  220. wb.writeInt32(t.NumPartitions)
  221. wb.writeInt16(t.ReplicationFactor)
  222. wb.writeArray(len(t.ReplicaAssignments), func(i int) { t.ReplicaAssignments[i].writeTo(wb) })
  223. wb.writeArray(len(t.ConfigEntries), func(i int) { t.ConfigEntries[i].writeTo(wb) })
  224. }
  225. // See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics
  226. type createTopicsRequestV0 struct {
  227. // Topics contains n array of single topic creation requests. Can not
  228. // have multiple entries for the same topic.
  229. Topics []createTopicsRequestV0Topic
  230. // Timeout ms to wait for a topic to be completely created on the
  231. // controller node. Values <= 0 will trigger topic creation and return immediately
  232. Timeout int32
  233. }
  234. func (t createTopicsRequestV0) size() int32 {
  235. return sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() }) +
  236. sizeofInt32(t.Timeout)
  237. }
  238. func (t createTopicsRequestV0) writeTo(wb *writeBuffer) {
  239. wb.writeArray(len(t.Topics), func(i int) { t.Topics[i].writeTo(wb) })
  240. wb.writeInt32(t.Timeout)
  241. }
  242. type createTopicsResponseV0TopicError struct {
  243. // Topic name
  244. Topic string
  245. // ErrorCode holds response error code
  246. ErrorCode int16
  247. }
  248. func (t createTopicsResponseV0TopicError) size() int32 {
  249. return sizeofString(t.Topic) +
  250. sizeofInt16(t.ErrorCode)
  251. }
  252. func (t createTopicsResponseV0TopicError) writeTo(wb *writeBuffer) {
  253. wb.writeString(t.Topic)
  254. wb.writeInt16(t.ErrorCode)
  255. }
  256. func (t *createTopicsResponseV0TopicError) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  257. if remain, err = readString(r, size, &t.Topic); err != nil {
  258. return
  259. }
  260. if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil {
  261. return
  262. }
  263. return
  264. }
  265. // See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics
  266. type createTopicsResponseV0 struct {
  267. TopicErrors []createTopicsResponseV0TopicError
  268. }
  269. func (t createTopicsResponseV0) size() int32 {
  270. return sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() })
  271. }
  272. func (t createTopicsResponseV0) writeTo(wb *writeBuffer) {
  273. wb.writeArray(len(t.TopicErrors), func(i int) { t.TopicErrors[i].writeTo(wb) })
  274. }
  275. func (t *createTopicsResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
  276. fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
  277. var topic createTopicsResponseV0TopicError
  278. if fnRemain, fnErr = (&topic).readFrom(r, size); err != nil {
  279. return
  280. }
  281. t.TopicErrors = append(t.TopicErrors, topic)
  282. return
  283. }
  284. if remain, err = readArrayWith(r, size, fn); err != nil {
  285. return
  286. }
  287. return
  288. }
  289. func (c *Conn) createTopics(request createTopicsRequestV0) (createTopicsResponseV0, error) {
  290. var response createTopicsResponseV0
  291. err := c.writeOperation(
  292. func(deadline time.Time, id int32) error {
  293. if request.Timeout == 0 {
  294. now := time.Now()
  295. deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
  296. request.Timeout = milliseconds(deadlineToTimeout(deadline, now))
  297. }
  298. return c.writeRequest(createTopics, v0, id, request)
  299. },
  300. func(deadline time.Time, size int) error {
  301. return expectZeroSize(func() (remain int, err error) {
  302. return (&response).readFrom(&c.rbuf, size)
  303. }())
  304. },
  305. )
  306. if err != nil {
  307. return response, err
  308. }
  309. for _, tr := range response.TopicErrors {
  310. if tr.ErrorCode != 0 {
  311. return response, Error(tr.ErrorCode)
  312. }
  313. }
  314. return response, nil
  315. }
  316. // CreateTopics creates one topic per provided configuration with idempotent
  317. // operational semantics. In other words, if CreateTopics is invoked with a
  318. // configuration for an existing topic, it will have no effect.
  319. func (c *Conn) CreateTopics(topics ...TopicConfig) error {
  320. var requestV0Topics []createTopicsRequestV0Topic
  321. for _, t := range topics {
  322. requestV0Topics = append(
  323. requestV0Topics,
  324. t.toCreateTopicsRequestV0Topic())
  325. }
  326. _, err := c.createTopics(createTopicsRequestV0{
  327. Topics: requestV0Topics,
  328. })
  329. switch err {
  330. case TopicAlreadyExists:
  331. // ok
  332. return nil
  333. default:
  334. return err
  335. }
  336. }