package kafka

import (
	"bufio"
	"context"
	"fmt"
	"net"
	"time"

	"github.com/segmentio/kafka-go/protocol/createtopics"
)

// CreateTopicRequests represents a request sent to a kafka broker to create
// new topics.
type CreateTopicsRequest struct {
	// Address of the kafka broker to send the request to.
	Addr net.Addr

	// List of topics to create and their configuration.
	Topics []TopicConfig

	// When set to true, topics are not created but the configuration is
	// validated as if they were.
	//
	// This field will be ignored if the kafka broker did no support the
	// CreateTopics API in version 1 or above.
	ValidateOnly bool
}

// CreateTopicResponse represents a response from a kafka broker to a topic
// creation request.
type CreateTopicsResponse struct {
	// The amount of time that the broker throttled the request.
	//
	// This field will be zero if the kafka broker did no support the
	// CreateTopics API in version 2 or above.
	Throttle time.Duration

	// Mapping of topic names to errors that occurred while attempting to create
	// the topics.
	//
	// The errors contain the kafka error code. Programs may use the standard
	// errors.Is function to test the error against kafka error codes.
	Errors map[string]error
}

// CreateTopics sends a topic creation request to a kafka broker and returns the
// response.
func (c *Client) CreateTopics(ctx context.Context, req *CreateTopicsRequest) (*CreateTopicsResponse, error) {
	topics := make([]createtopics.RequestTopic, len(req.Topics))

	for i, t := range req.Topics {
		topics[i] = createtopics.RequestTopic{
			Name:              t.Topic,
			NumPartitions:     int32(t.NumPartitions),
			ReplicationFactor: int16(t.ReplicationFactor),
			Assignments:       t.assignments(),
			Configs:           t.configs(),
		}
	}

	m, err := c.roundTrip(ctx, req.Addr, &createtopics.Request{
		Topics:       topics,
		TimeoutMs:    c.timeoutMs(ctx, defaultCreateTopicsTimeout),
		ValidateOnly: req.ValidateOnly,
	})

	if err != nil {
		return nil, fmt.Errorf("kafka.(*Client).CreateTopics: %w", err)
	}

	res := m.(*createtopics.Response)
	ret := &CreateTopicsResponse{
		Throttle: makeDuration(res.ThrottleTimeMs),
		Errors:   make(map[string]error, len(res.Topics)),
	}

	for _, t := range res.Topics {
		ret.Errors[t.Name] = makeError(t.ErrorCode, t.ErrorMessage)
	}

	return ret, nil
}

type ConfigEntry struct {
	ConfigName  string
	ConfigValue string
}

func (c ConfigEntry) toCreateTopicsRequestV0ConfigEntry() createTopicsRequestV0ConfigEntry {
	return createTopicsRequestV0ConfigEntry{
		ConfigName:  c.ConfigName,
		ConfigValue: c.ConfigValue,
	}
}

type createTopicsRequestV0ConfigEntry struct {
	ConfigName  string
	ConfigValue string
}

func (t createTopicsRequestV0ConfigEntry) size() int32 {
	return sizeofString(t.ConfigName) +
		sizeofString(t.ConfigValue)
}

func (t createTopicsRequestV0ConfigEntry) writeTo(wb *writeBuffer) {
	wb.writeString(t.ConfigName)
	wb.writeString(t.ConfigValue)
}

type ReplicaAssignment struct {
	Partition int
	// The list of brokers where the partition should be allocated. There must
	// be as many entries in thie list as there are replicas of the partition.
	// The first entry represents the broker that will be the preferred leader
	// for the partition.
	//
	// This field changed in 0.4 from `int` to `[]int`. It was invalid to pass
	// a single integer as this is supposed to be a list. While this introduces
	// a breaking change, it probably never worked before.
	Replicas []int
}

func (a *ReplicaAssignment) partitionIndex() int32 {
	return int32(a.Partition)
}

func (a *ReplicaAssignment) brokerIDs() []int32 {
	if len(a.Replicas) == 0 {
		return nil
	}
	replicas := make([]int32, len(a.Replicas))
	for i, r := range a.Replicas {
		replicas[i] = int32(r)
	}
	return replicas
}

func (a ReplicaAssignment) toCreateTopicsRequestV0ReplicaAssignment() createTopicsRequestV0ReplicaAssignment {
	return createTopicsRequestV0ReplicaAssignment{
		Partition: int32(a.Partition),
		Replicas:  a.brokerIDs(),
	}
}

type createTopicsRequestV0ReplicaAssignment struct {
	Partition int32
	Replicas  []int32
}

func (t createTopicsRequestV0ReplicaAssignment) size() int32 {
	return sizeofInt32(t.Partition) +
		(int32(len(t.Replicas)+1) * sizeofInt32(0)) // N+1 because the array length is a int32
}

func (t createTopicsRequestV0ReplicaAssignment) writeTo(wb *writeBuffer) {
	wb.writeInt32(t.Partition)
	wb.writeInt32(int32(len(t.Replicas)))
	for _, r := range t.Replicas {
		wb.writeInt32(int32(r))
	}
}

type TopicConfig struct {
	// Topic name
	Topic string

	// NumPartitions created. -1 indicates unset.
	NumPartitions int

	// ReplicationFactor for the topic. -1 indicates unset.
	ReplicationFactor int

	// ReplicaAssignments among kafka brokers for this topic partitions. If this
	// is set num_partitions and replication_factor must be unset.
	ReplicaAssignments []ReplicaAssignment

	// ConfigEntries holds topic level configuration for topic to be set.
	ConfigEntries []ConfigEntry
}

func (t *TopicConfig) assignments() []createtopics.RequestAssignment {
	if len(t.ReplicaAssignments) == 0 {
		return nil
	}
	assignments := make([]createtopics.RequestAssignment, len(t.ReplicaAssignments))
	for i, a := range t.ReplicaAssignments {
		assignments[i] = createtopics.RequestAssignment{
			PartitionIndex: a.partitionIndex(),
			BrokerIDs:      a.brokerIDs(),
		}
	}
	return assignments
}

func (t *TopicConfig) configs() []createtopics.RequestConfig {
	if len(t.ConfigEntries) == 0 {
		return nil
	}
	configs := make([]createtopics.RequestConfig, len(t.ConfigEntries))
	for i, c := range t.ConfigEntries {
		configs[i] = createtopics.RequestConfig{
			Name:  c.ConfigName,
			Value: c.ConfigValue,
		}
	}
	return configs
}

func (t TopicConfig) toCreateTopicsRequestV0Topic() createTopicsRequestV0Topic {
	var requestV0ReplicaAssignments []createTopicsRequestV0ReplicaAssignment
	for _, a := range t.ReplicaAssignments {
		requestV0ReplicaAssignments = append(
			requestV0ReplicaAssignments,
			a.toCreateTopicsRequestV0ReplicaAssignment())
	}
	var requestV0ConfigEntries []createTopicsRequestV0ConfigEntry
	for _, c := range t.ConfigEntries {
		requestV0ConfigEntries = append(
			requestV0ConfigEntries,
			c.toCreateTopicsRequestV0ConfigEntry())
	}

	return createTopicsRequestV0Topic{
		Topic:              t.Topic,
		NumPartitions:      int32(t.NumPartitions),
		ReplicationFactor:  int16(t.ReplicationFactor),
		ReplicaAssignments: requestV0ReplicaAssignments,
		ConfigEntries:      requestV0ConfigEntries,
	}
}

type createTopicsRequestV0Topic struct {
	// Topic name
	Topic string

	// NumPartitions created. -1 indicates unset.
	NumPartitions int32

	// ReplicationFactor for the topic. -1 indicates unset.
	ReplicationFactor int16

	// ReplicaAssignments among kafka brokers for this topic partitions. If this
	// is set num_partitions and replication_factor must be unset.
	ReplicaAssignments []createTopicsRequestV0ReplicaAssignment

	// ConfigEntries holds topic level configuration for topic to be set.
	ConfigEntries []createTopicsRequestV0ConfigEntry
}

func (t createTopicsRequestV0Topic) size() int32 {
	return sizeofString(t.Topic) +
		sizeofInt32(t.NumPartitions) +
		sizeofInt16(t.ReplicationFactor) +
		sizeofArray(len(t.ReplicaAssignments), func(i int) int32 { return t.ReplicaAssignments[i].size() }) +
		sizeofArray(len(t.ConfigEntries), func(i int) int32 { return t.ConfigEntries[i].size() })
}

func (t createTopicsRequestV0Topic) writeTo(wb *writeBuffer) {
	wb.writeString(t.Topic)
	wb.writeInt32(t.NumPartitions)
	wb.writeInt16(t.ReplicationFactor)
	wb.writeArray(len(t.ReplicaAssignments), func(i int) { t.ReplicaAssignments[i].writeTo(wb) })
	wb.writeArray(len(t.ConfigEntries), func(i int) { t.ConfigEntries[i].writeTo(wb) })
}

// See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics
type createTopicsRequestV0 struct {
	// Topics contains n array of single topic creation requests. Can not
	// have multiple entries for the same topic.
	Topics []createTopicsRequestV0Topic

	// Timeout ms to wait for a topic to be completely created on the
	// controller node. Values <= 0 will trigger topic creation and return immediately
	Timeout int32
}

func (t createTopicsRequestV0) size() int32 {
	return sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() }) +
		sizeofInt32(t.Timeout)
}

func (t createTopicsRequestV0) writeTo(wb *writeBuffer) {
	wb.writeArray(len(t.Topics), func(i int) { t.Topics[i].writeTo(wb) })
	wb.writeInt32(t.Timeout)
}

type createTopicsResponseV0TopicError struct {
	// Topic name
	Topic string

	// ErrorCode holds response error code
	ErrorCode int16
}

func (t createTopicsResponseV0TopicError) size() int32 {
	return sizeofString(t.Topic) +
		sizeofInt16(t.ErrorCode)
}

func (t createTopicsResponseV0TopicError) writeTo(wb *writeBuffer) {
	wb.writeString(t.Topic)
	wb.writeInt16(t.ErrorCode)
}

func (t *createTopicsResponseV0TopicError) readFrom(r *bufio.Reader, size int) (remain int, err error) {
	if remain, err = readString(r, size, &t.Topic); err != nil {
		return
	}
	if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil {
		return
	}
	return
}

// See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics
type createTopicsResponseV0 struct {
	TopicErrors []createTopicsResponseV0TopicError
}

func (t createTopicsResponseV0) size() int32 {
	return sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() })
}

func (t createTopicsResponseV0) writeTo(wb *writeBuffer) {
	wb.writeArray(len(t.TopicErrors), func(i int) { t.TopicErrors[i].writeTo(wb) })
}

func (t *createTopicsResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) {
	fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
		var topic createTopicsResponseV0TopicError
		if fnRemain, fnErr = (&topic).readFrom(r, size); err != nil {
			return
		}
		t.TopicErrors = append(t.TopicErrors, topic)
		return
	}
	if remain, err = readArrayWith(r, size, fn); err != nil {
		return
	}

	return
}

func (c *Conn) createTopics(request createTopicsRequestV0) (createTopicsResponseV0, error) {
	var response createTopicsResponseV0

	err := c.writeOperation(
		func(deadline time.Time, id int32) error {
			if request.Timeout == 0 {
				now := time.Now()
				deadline = adjustDeadlineForRTT(deadline, now, defaultRTT)
				request.Timeout = milliseconds(deadlineToTimeout(deadline, now))
			}
			return c.writeRequest(createTopics, v0, id, request)
		},
		func(deadline time.Time, size int) error {
			return expectZeroSize(func() (remain int, err error) {
				return (&response).readFrom(&c.rbuf, size)
			}())
		},
	)
	if err != nil {
		return response, err
	}
	for _, tr := range response.TopicErrors {
		if tr.ErrorCode != 0 {
			return response, Error(tr.ErrorCode)
		}
	}

	return response, nil
}

// CreateTopics creates one topic per provided configuration with idempotent
// operational semantics. In other words, if CreateTopics is invoked with a
// configuration for an existing topic, it will have no effect.
func (c *Conn) CreateTopics(topics ...TopicConfig) error {
	var requestV0Topics []createTopicsRequestV0Topic
	for _, t := range topics {
		requestV0Topics = append(
			requestV0Topics,
			t.toCreateTopicsRequestV0Topic())
	}

	_, err := c.createTopics(createTopicsRequestV0{
		Topics: requestV0Topics,
	})

	switch err {
	case TopicAlreadyExists:
		// ok
		return nil
	default:
		return err
	}
}