123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286 |
- package kafka
- import (
- "bufio"
- "context"
- "fmt"
- "net"
- "time"
- "github.com/segmentio/kafka-go/protocol/listoffsets"
- )
- // OffsetRequest represents a request to retrieve a single partition offset.
- type OffsetRequest struct {
- Partition int
- Timestamp int64
- }
- // FirstOffsetOf constructs an OffsetRequest which asks for the first offset of
- // the parition given as argument.
- func FirstOffsetOf(partition int) OffsetRequest {
- return OffsetRequest{Partition: partition, Timestamp: FirstOffset}
- }
- // LastOffsetOf constructs an OffsetRequest which asks for the last offset of
- // the partition given as argument.
- func LastOffsetOf(partition int) OffsetRequest {
- return OffsetRequest{Partition: partition, Timestamp: LastOffset}
- }
- // TimeOffsetOf constructs an OffsetRequest which asks for a partition offset
- // at a given time.
- func TimeOffsetOf(partition int, at time.Time) OffsetRequest {
- return OffsetRequest{Partition: partition, Timestamp: timestamp(at)}
- }
- // PartitionOffsets carries information about offsets available in a topic
- // partition.
- type PartitionOffsets struct {
- Partition int
- FirstOffset int64
- LastOffset int64
- Offsets map[int64]time.Time
- Error error
- }
- // ListOffsetsRequest represents a request sent to a kafka broker to list of the
- // offsets of topic partitions.
- type ListOffsetsRequest struct {
- // Address of the kafka broker to send the request to.
- Addr net.Addr
- // A mapping of topic names to list of partitions that the program wishes to
- // get the offsets for.
- Topics map[string][]OffsetRequest
- // The isolation level for the request.
- //
- // Defaults to ReadUncommitted.
- //
- // This field requires the kafka broker to support the ListOffsets API in
- // version 2 or above (otherwise the value is ignored).
- IsolationLevel IsolationLevel
- }
- // ListOffsetsResponse represents a response from a kafka broker to a offset
- // listing request.
- type ListOffsetsResponse struct {
- // The amount of time that the broker throttled the request.
- Throttle time.Duration
- // Mappings of topics names to partition offsets, there will be one entry
- // for each topic in the request.
- Topics map[string][]PartitionOffsets
- }
- // ListOffsets sends an offset request to a kafka broker and returns the
- // response.
- func (c *Client) ListOffsets(ctx context.Context, req *ListOffsetsRequest) (*ListOffsetsResponse, error) {
- type topicPartition struct {
- topic string
- partition int
- }
- partitionOffsets := make(map[topicPartition]PartitionOffsets)
- for topicName, requests := range req.Topics {
- for _, r := range requests {
- key := topicPartition{
- topic: topicName,
- partition: r.Partition,
- }
- partition, ok := partitionOffsets[key]
- if !ok {
- partition = PartitionOffsets{
- Partition: r.Partition,
- FirstOffset: -1,
- LastOffset: -1,
- Offsets: make(map[int64]time.Time),
- }
- }
- switch r.Timestamp {
- case FirstOffset:
- partition.FirstOffset = 0
- case LastOffset:
- partition.LastOffset = 0
- }
- partitionOffsets[topicPartition{
- topic: topicName,
- partition: r.Partition,
- }] = partition
- }
- }
- topics := make([]listoffsets.RequestTopic, 0, len(req.Topics))
- for topicName, requests := range req.Topics {
- partitions := make([]listoffsets.RequestPartition, len(requests))
- for i, r := range requests {
- partitions[i] = listoffsets.RequestPartition{
- Partition: int32(r.Partition),
- CurrentLeaderEpoch: -1,
- Timestamp: r.Timestamp,
- }
- }
- topics = append(topics, listoffsets.RequestTopic{
- Topic: topicName,
- Partitions: partitions,
- })
- }
- m, err := c.roundTrip(ctx, req.Addr, &listoffsets.Request{
- ReplicaID: -1,
- IsolationLevel: int8(req.IsolationLevel),
- Topics: topics,
- })
- if err != nil {
- return nil, fmt.Errorf("kafka.(*Client).ListOffsets: %w", err)
- }
- res := m.(*listoffsets.Response)
- ret := &ListOffsetsResponse{
- Throttle: makeDuration(res.ThrottleTimeMs),
- Topics: make(map[string][]PartitionOffsets, len(res.Topics)),
- }
- for _, t := range res.Topics {
- for _, p := range t.Partitions {
- key := topicPartition{
- topic: t.Topic,
- partition: int(p.Partition),
- }
- partition := partitionOffsets[key]
- switch p.Timestamp {
- case FirstOffset:
- partition.FirstOffset = p.Offset
- case LastOffset:
- partition.LastOffset = p.Offset
- default:
- partition.Offsets[p.Offset] = makeTime(p.Timestamp)
- }
- if p.ErrorCode != 0 {
- partition.Error = Error(p.ErrorCode)
- }
- partitionOffsets[key] = partition
- }
- }
- for key, partition := range partitionOffsets {
- ret.Topics[key.topic] = append(ret.Topics[key.topic], partition)
- }
- return ret, nil
- }
- type listOffsetRequestV1 struct {
- ReplicaID int32
- Topics []listOffsetRequestTopicV1
- }
- func (r listOffsetRequestV1) size() int32 {
- return 4 + sizeofArray(len(r.Topics), func(i int) int32 { return r.Topics[i].size() })
- }
- func (r listOffsetRequestV1) writeTo(wb *writeBuffer) {
- wb.writeInt32(r.ReplicaID)
- wb.writeArray(len(r.Topics), func(i int) { r.Topics[i].writeTo(wb) })
- }
- type listOffsetRequestTopicV1 struct {
- TopicName string
- Partitions []listOffsetRequestPartitionV1
- }
- func (t listOffsetRequestTopicV1) size() int32 {
- return sizeofString(t.TopicName) +
- sizeofArray(len(t.Partitions), func(i int) int32 { return t.Partitions[i].size() })
- }
- func (t listOffsetRequestTopicV1) writeTo(wb *writeBuffer) {
- wb.writeString(t.TopicName)
- wb.writeArray(len(t.Partitions), func(i int) { t.Partitions[i].writeTo(wb) })
- }
- type listOffsetRequestPartitionV1 struct {
- Partition int32
- Time int64
- }
- func (p listOffsetRequestPartitionV1) size() int32 {
- return 4 + 8
- }
- func (p listOffsetRequestPartitionV1) writeTo(wb *writeBuffer) {
- wb.writeInt32(p.Partition)
- wb.writeInt64(p.Time)
- }
- type listOffsetResponseV1 []listOffsetResponseTopicV1
- func (r listOffsetResponseV1) size() int32 {
- return sizeofArray(len(r), func(i int) int32 { return r[i].size() })
- }
- func (r listOffsetResponseV1) writeTo(wb *writeBuffer) {
- wb.writeArray(len(r), func(i int) { r[i].writeTo(wb) })
- }
- type listOffsetResponseTopicV1 struct {
- TopicName string
- PartitionOffsets []partitionOffsetV1
- }
- func (t listOffsetResponseTopicV1) size() int32 {
- return sizeofString(t.TopicName) +
- sizeofArray(len(t.PartitionOffsets), func(i int) int32 { return t.PartitionOffsets[i].size() })
- }
- func (t listOffsetResponseTopicV1) writeTo(wb *writeBuffer) {
- wb.writeString(t.TopicName)
- wb.writeArray(len(t.PartitionOffsets), func(i int) { t.PartitionOffsets[i].writeTo(wb) })
- }
- type partitionOffsetV1 struct {
- Partition int32
- ErrorCode int16
- Timestamp int64
- Offset int64
- }
- func (p partitionOffsetV1) size() int32 {
- return 4 + 2 + 8 + 8
- }
- func (p partitionOffsetV1) writeTo(wb *writeBuffer) {
- wb.writeInt32(p.Partition)
- wb.writeInt16(p.ErrorCode)
- wb.writeInt64(p.Timestamp)
- wb.writeInt64(p.Offset)
- }
- func (p *partitionOffsetV1) readFrom(r *bufio.Reader, sz int) (remain int, err error) {
- if remain, err = readInt32(r, sz, &p.Partition); err != nil {
- return
- }
- if remain, err = readInt16(r, remain, &p.ErrorCode); err != nil {
- return
- }
- if remain, err = readInt64(r, remain, &p.Timestamp); err != nil {
- return
- }
- if remain, err = readInt64(r, remain, &p.Offset); err != nil {
- return
- }
- return
- }
|