cluster.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. package protocol
  2. import (
  3. "fmt"
  4. "sort"
  5. "strings"
  6. "text/tabwriter"
  7. )
  8. type Cluster struct {
  9. ClusterID string
  10. Controller int32
  11. Brokers map[int32]Broker
  12. Topics map[string]Topic
  13. }
  14. func (c Cluster) BrokerIDs() []int32 {
  15. brokerIDs := make([]int32, 0, len(c.Brokers))
  16. for id := range c.Brokers {
  17. brokerIDs = append(brokerIDs, id)
  18. }
  19. sort.Slice(brokerIDs, func(i, j int) bool {
  20. return brokerIDs[i] < brokerIDs[j]
  21. })
  22. return brokerIDs
  23. }
  24. func (c Cluster) TopicNames() []string {
  25. topicNames := make([]string, 0, len(c.Topics))
  26. for name := range c.Topics {
  27. topicNames = append(topicNames, name)
  28. }
  29. sort.Strings(topicNames)
  30. return topicNames
  31. }
  32. func (c Cluster) IsZero() bool {
  33. return c.ClusterID == "" && c.Controller == 0 && len(c.Brokers) == 0 && len(c.Topics) == 0
  34. }
  35. func (c Cluster) Format(w fmt.State, _ rune) {
  36. tw := new(tabwriter.Writer)
  37. fmt.Fprintf(w, "CLUSTER: %q\n\n", c.ClusterID)
  38. tw.Init(w, 0, 8, 2, ' ', 0)
  39. fmt.Fprint(tw, " BROKER\tHOST\tPORT\tRACK\tCONTROLLER\n")
  40. for _, id := range c.BrokerIDs() {
  41. broker := c.Brokers[id]
  42. fmt.Fprintf(tw, " %d\t%s\t%d\t%s\t%t\n", broker.ID, broker.Host, broker.Port, broker.Rack, broker.ID == c.Controller)
  43. }
  44. tw.Flush()
  45. fmt.Fprintln(w)
  46. tw.Init(w, 0, 8, 2, ' ', 0)
  47. fmt.Fprint(tw, " TOPIC\tPARTITIONS\tBROKERS\n")
  48. topicNames := c.TopicNames()
  49. brokers := make(map[int32]struct{}, len(c.Brokers))
  50. brokerIDs := make([]int32, 0, len(c.Brokers))
  51. for _, name := range topicNames {
  52. topic := c.Topics[name]
  53. for _, p := range topic.Partitions {
  54. for _, id := range p.Replicas {
  55. brokers[id] = struct{}{}
  56. }
  57. }
  58. for id := range brokers {
  59. brokerIDs = append(brokerIDs, id)
  60. }
  61. fmt.Fprintf(tw, " %s\t%d\t%s\n", topic.Name, len(topic.Partitions), formatBrokerIDs(brokerIDs, -1))
  62. for id := range brokers {
  63. delete(brokers, id)
  64. }
  65. brokerIDs = brokerIDs[:0]
  66. }
  67. tw.Flush()
  68. fmt.Fprintln(w)
  69. if w.Flag('+') {
  70. for _, name := range topicNames {
  71. fmt.Fprintf(w, " TOPIC: %q\n\n", name)
  72. tw.Init(w, 0, 8, 2, ' ', 0)
  73. fmt.Fprint(tw, " PARTITION\tREPLICAS\tISR\tOFFLINE\n")
  74. for _, p := range c.Topics[name].Partitions {
  75. fmt.Fprintf(tw, " %d\t%s\t%s\t%s\n", p.ID,
  76. formatBrokerIDs(p.Replicas, -1),
  77. formatBrokerIDs(p.ISR, p.Leader),
  78. formatBrokerIDs(p.Offline, -1),
  79. )
  80. }
  81. tw.Flush()
  82. fmt.Fprintln(w)
  83. }
  84. }
  85. }
  86. func formatBrokerIDs(brokerIDs []int32, leader int32) string {
  87. if len(brokerIDs) == 0 {
  88. return ""
  89. }
  90. if len(brokerIDs) == 1 {
  91. return itoa(brokerIDs[0])
  92. }
  93. sort.Slice(brokerIDs, func(i, j int) bool {
  94. id1 := brokerIDs[i]
  95. id2 := brokerIDs[j]
  96. if id1 == leader {
  97. return true
  98. }
  99. if id2 == leader {
  100. return false
  101. }
  102. return id1 < id2
  103. })
  104. brokerNames := make([]string, len(brokerIDs))
  105. for i, id := range brokerIDs {
  106. brokerNames[i] = itoa(id)
  107. }
  108. return strings.Join(brokerNames, ",")
  109. }
  110. var (
  111. _ fmt.Formatter = Cluster{}
  112. )