topic.go 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package datahub
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. )
  6. type Field struct {
  7. Name string `json:"name"`
  8. Type FieldType `json:"type"`
  9. AllowNull bool `json:"notnull"`
  10. Comment string `json:"comment"`
  11. }
  12. // RecordSchema
  13. type RecordSchema struct {
  14. Fields []Field `json:"fields"`
  15. }
  16. // NewRecordSchema create a new record schema for tuple record
  17. func NewRecordSchema() *RecordSchema {
  18. return &RecordSchema{
  19. Fields: make([]Field, 0, 0),
  20. }
  21. }
  22. func NewRecordSchemaFromJson(SchemaJson string) (recordSchema *RecordSchema, err error) {
  23. recordSchema = &RecordSchema{}
  24. if err = json.Unmarshal([]byte(SchemaJson), recordSchema); err != nil {
  25. return
  26. }
  27. for _, v := range recordSchema.Fields {
  28. if !validateFieldType(v.Type) {
  29. panic(fmt.Sprintf("field type %q illegal", v.Type))
  30. }
  31. }
  32. return
  33. }
  34. func (rs *RecordSchema) String() string {
  35. type FieldHelper struct {
  36. Name string `json:"name"`
  37. Type FieldType `json:"type"`
  38. NotNull bool `json:"notnull,omitempty"`
  39. Comment string `json:"comment,omitempty"`
  40. }
  41. fields := make([]FieldHelper, 0, rs.Size())
  42. for _, field := range rs.Fields {
  43. tmpField := FieldHelper{field.Name, field.Type, !field.AllowNull, field.Comment}
  44. fields = append(fields, tmpField)
  45. }
  46. tmpSchema := struct {
  47. Fields []FieldHelper `json:"fields"`
  48. }{fields}
  49. buf, _ := json.Marshal(tmpSchema)
  50. return string(buf)
  51. }
  52. // AddField add a field
  53. func (rs *RecordSchema) AddField(f Field) *RecordSchema {
  54. if !validateFieldType(f.Type) {
  55. panic(fmt.Sprintf("field type %q illegal", f.Type))
  56. }
  57. for _, v := range rs.Fields {
  58. if v.Name == f.Name {
  59. panic(fmt.Sprintf("field %q duplicated", f.Name))
  60. }
  61. }
  62. rs.Fields = append(rs.Fields, f)
  63. return rs
  64. }
  65. // GetFieldIndex get index of given field
  66. func (rs *RecordSchema) GetFieldIndex(fname string) int {
  67. for idx, v := range rs.Fields {
  68. if fname == v.Name {
  69. return idx
  70. }
  71. }
  72. return -1
  73. }
  74. // Size get record schema fields size
  75. func (rs *RecordSchema) Size() int {
  76. return len(rs.Fields)
  77. }
  78. type RecordSchemaInfo struct {
  79. VersionId int `json:"VersionId"`
  80. RecordSchema RecordSchema `json:"RecordSchema"`
  81. }