schemaclient.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package datahub
  2. import (
  3. "sync"
  4. )
  5. type schemaRegistryClient struct {
  6. dh DataHubApi
  7. schemaMap sync.Map
  8. versionMap sync.Map
  9. }
  10. func NewSchemaClient(dh DataHubApi) *schemaRegistryClient {
  11. return &schemaRegistryClient{
  12. dh: dh,
  13. }
  14. }
  15. func (client *schemaRegistryClient) getSchemaByVersion(project, topic string, version int) (*RecordSchema, error) {
  16. key := client.genKey(project, topic)
  17. value, ok := client.versionMap.Load(key)
  18. if !ok {
  19. m := sync.Map{}
  20. client.versionMap.Store(key, m)
  21. value, _ = client.versionMap.Load(key)
  22. }
  23. topicCache := value.(sync.Map)
  24. schemaVal, ok := topicCache.Load(version)
  25. if !ok {
  26. newSchema, err := client.fetchSchemaById(project, topic, version)
  27. if err != nil {
  28. return nil, err
  29. }
  30. schemaVal = *newSchema
  31. topicCache.Store(version, schemaVal)
  32. client.versionMap.Store(key, topicCache)
  33. }
  34. schema := schemaVal.(RecordSchema)
  35. return &schema, nil
  36. }
  37. func (client *schemaRegistryClient) getVersionBySchema(project, topic string, schema *RecordSchema) (int, error) {
  38. key := client.genKey(project, topic)
  39. value, ok := client.schemaMap.Load(key)
  40. if !ok {
  41. m := sync.Map{}
  42. client.schemaMap.Store(key, m)
  43. value, _ = client.schemaMap.Load(key)
  44. }
  45. topicCache := value.(sync.Map)
  46. schemaKey := schema.String()
  47. versionVal, ok := topicCache.Load(schemaKey)
  48. if !ok {
  49. newVersion, err := client.fetchVersionBySchema(project, topic, schema)
  50. if err != nil {
  51. return -1, err
  52. }
  53. versionVal = newVersion
  54. topicCache.Store(schemaKey, versionVal)
  55. client.schemaMap.Store(key, topicCache)
  56. }
  57. version := versionVal.(int)
  58. return version, nil
  59. }
  60. func (client *schemaRegistryClient) fetchSchemaById(project, topic string, version int) (*RecordSchema, error) {
  61. ret, err := client.dh.GetTopicSchemaByVersion(project, topic, version)
  62. if err != nil {
  63. return nil, err
  64. }
  65. return &ret.RecordSchema, nil
  66. }
  67. func (client *schemaRegistryClient) fetchVersionBySchema(project, topic string, schema *RecordSchema) (int, error) {
  68. ret, err := client.dh.GetTopicSchemaBySchema(project, topic, schema)
  69. if err != nil {
  70. return -1, err
  71. }
  72. return ret.VersionId, nil
  73. }
  74. func (client *schemaRegistryClient) genKey(project, topic string) string {
  75. return project + "/" + topic
  76. }