package kafka

import (
	"bufio"
	"context"
	"fmt"
	"net"
	"time"

	"github.com/segmentio/kafka-go/protocol/offsetfetch"
)

// OffsetFetchRequest represents a request sent to a kafka broker to read the
// currently committed offsets of topic partitions.
type OffsetFetchRequest struct {
	// Address of the kafka broker to send the request to.
	Addr net.Addr

	// ID of the consumer group to retrieve the offsets for.
	GroupID string

	// Set of topic partitions to retrieve the offsets for.
	Topics map[string][]int
}

// OffsetFetchResponse represents a response from a kafka broker to an offset
// fetch request.
type OffsetFetchResponse struct {
	// The amount of time that the broker throttled the request.
	Throttle time.Duration

	// Set of topic partitions that the kafka broker has returned offsets for.
	Topics map[string][]OffsetFetchPartition

	// An error that may have occurred while attempting to retrieve consumer
	// group offsets.
	//
	// The error contains both the kafka error code, and an error message
	// returned by the kafka broker. Programs may use the standard errors.Is
	// function to test the error against kafka error codes.
	Error error
}

// OffsetFetchPartition represents the state of a single partition in a consumer
// group.
type OffsetFetchPartition struct {
	// ID of the partition.
	Partition int

	// Last committed offsets on the partition when the request was served by
	// the kafka broker.
	CommittedOffset int64

	// Consumer group metadata for this partition.
	Metadata string

	// An error that may have occurred while attempting to retrieve consumer
	// group offsets for this partition.
	//
	// The error contains both the kafka error code, and an error message
	// returned by the kafka broker. Programs may use the standard errors.Is
	// function to test the error against kafka error codes.
	Error error
}

// OffsetFetch sends an offset fetch request to a kafka broker and returns the
// response.
func (c *Client) OffsetFetch(ctx context.Context, req *OffsetFetchRequest) (*OffsetFetchResponse, error) {
	topics := make([]offsetfetch.RequestTopic, 0, len(req.Topics))

	for topicName, partitions := range req.Topics {
		indexes := make([]int32, len(partitions))

		for i, p := range partitions {
			indexes[i] = int32(p)
		}

		topics = append(topics, offsetfetch.RequestTopic{
			Name:             topicName,
			PartitionIndexes: indexes,
		})
	}

	m, err := c.roundTrip(ctx, req.Addr, &offsetfetch.Request{
		GroupID: req.GroupID,
		Topics:  topics,
	})

	if err != nil {
		return nil, fmt.Errorf("kafka.(*Client).OffsetFetch: %w", err)
	}

	res := m.(*offsetfetch.Response)
	ret := &OffsetFetchResponse{
		Throttle: makeDuration(res.ThrottleTimeMs),
		Topics:   make(map[string][]OffsetFetchPartition, len(res.Topics)),
		Error:    makeError(res.ErrorCode, ""),
	}

	for _, t := range res.Topics {
		partitions := make([]OffsetFetchPartition, len(t.Partitions))

		for i, p := range t.Partitions {
			partitions[i] = OffsetFetchPartition{
				Partition:       int(p.PartitionIndex),
				CommittedOffset: p.CommittedOffset,
				Metadata:        p.Metadata,
				Error:           makeError(p.ErrorCode, ""),
			}
		}

		ret.Topics[t.Name] = partitions
	}

	return ret, nil
}

type offsetFetchRequestV1Topic struct {
	// Topic name
	Topic string

	// Partitions to fetch offsets
	Partitions []int32
}

func (t offsetFetchRequestV1Topic) size() int32 {
	return sizeofString(t.Topic) +
		sizeofInt32Array(t.Partitions)
}

func (t offsetFetchRequestV1Topic) writeTo(wb *writeBuffer) {
	wb.writeString(t.Topic)
	wb.writeInt32Array(t.Partitions)
}

type offsetFetchRequestV1 struct {
	// GroupID holds the unique group identifier
	GroupID string

	// Topics to fetch offsets.
	Topics []offsetFetchRequestV1Topic
}

func (t offsetFetchRequestV1) size() int32 {
	return sizeofString(t.GroupID) +
		sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() })
}

func (t offsetFetchRequestV1) writeTo(wb *writeBuffer) {
	wb.writeString(t.GroupID)
	wb.writeArray(len(t.Topics), func(i int) { t.Topics[i].writeTo(wb) })
}

type offsetFetchResponseV1PartitionResponse struct {
	// Partition ID
	Partition int32

	// Offset of last committed message
	Offset int64

	// Metadata client wants to keep
	Metadata string

	// ErrorCode holds response error code
	ErrorCode int16
}

func (t offsetFetchResponseV1PartitionResponse) size() int32 {
	return sizeofInt32(t.Partition) +
		sizeofInt64(t.Offset) +
		sizeofString(t.Metadata) +
		sizeofInt16(t.ErrorCode)
}

func (t offsetFetchResponseV1PartitionResponse) writeTo(wb *writeBuffer) {
	wb.writeInt32(t.Partition)
	wb.writeInt64(t.Offset)
	wb.writeString(t.Metadata)
	wb.writeInt16(t.ErrorCode)
}

func (t *offsetFetchResponseV1PartitionResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) {
	if remain, err = readInt32(r, size, &t.Partition); err != nil {
		return
	}
	if remain, err = readInt64(r, remain, &t.Offset); err != nil {
		return
	}
	if remain, err = readString(r, remain, &t.Metadata); err != nil {
		return
	}
	if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil {
		return
	}
	return
}

type offsetFetchResponseV1Response struct {
	// Topic name
	Topic string

	// PartitionResponses holds offsets by partition
	PartitionResponses []offsetFetchResponseV1PartitionResponse
}

func (t offsetFetchResponseV1Response) size() int32 {
	return sizeofString(t.Topic) +
		sizeofArray(len(t.PartitionResponses), func(i int) int32 { return t.PartitionResponses[i].size() })
}

func (t offsetFetchResponseV1Response) writeTo(wb *writeBuffer) {
	wb.writeString(t.Topic)
	wb.writeArray(len(t.PartitionResponses), func(i int) { t.PartitionResponses[i].writeTo(wb) })
}

func (t *offsetFetchResponseV1Response) readFrom(r *bufio.Reader, size int) (remain int, err error) {
	if remain, err = readString(r, size, &t.Topic); err != nil {
		return
	}

	fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) {
		item := offsetFetchResponseV1PartitionResponse{}
		if fnRemain, fnErr = (&item).readFrom(r, size); err != nil {
			return
		}
		t.PartitionResponses = append(t.PartitionResponses, item)
		return
	}
	if remain, err = readArrayWith(r, remain, fn); err != nil {
		return
	}

	return
}

type offsetFetchResponseV1 struct {
	// Responses holds topic partition offsets
	Responses []offsetFetchResponseV1Response
}

func (t offsetFetchResponseV1) size() int32 {
	return sizeofArray(len(t.Responses), func(i int) int32 { return t.Responses[i].size() })
}

func (t offsetFetchResponseV1) writeTo(wb *writeBuffer) {
	wb.writeArray(len(t.Responses), func(i int) { t.Responses[i].writeTo(wb) })
}

func (t *offsetFetchResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) {
	fn := func(r *bufio.Reader, withSize int) (fnRemain int, fnErr error) {
		item := offsetFetchResponseV1Response{}
		if fnRemain, fnErr = (&item).readFrom(r, withSize); fnErr != nil {
			return
		}
		t.Responses = append(t.Responses, item)
		return
	}
	if remain, err = readArrayWith(r, size, fn); err != nil {
		return
	}

	return
}

func findOffset(topic string, partition int32, response offsetFetchResponseV1) (int64, bool) {
	for _, r := range response.Responses {
		if r.Topic != topic {
			continue
		}

		for _, pr := range r.PartitionResponses {
			if pr.Partition == partition {
				return pr.Offset, true
			}
		}
	}

	return 0, false
}