package datahub

import (
    "sync"
)

type schemaRegistryClient struct {
    dh         DataHubApi
    schemaMap  sync.Map
    versionMap sync.Map
}

func NewSchemaClient(dh DataHubApi) *schemaRegistryClient {
    return &schemaRegistryClient{
        dh: dh,
    }
}

func (client *schemaRegistryClient) getSchemaByVersion(project, topic string, version int) (*RecordSchema, error) {
    key := client.genKey(project, topic)

    value, ok := client.versionMap.Load(key)
    if !ok {
        m := sync.Map{}
        client.versionMap.Store(key, m)
        value, _ = client.versionMap.Load(key)
    }

    topicCache := value.(sync.Map)
    schemaVal, ok := topicCache.Load(version)
    if !ok {
        newSchema, err := client.fetchSchemaById(project, topic, version)
        if err != nil {
            return nil, err
        }
        schemaVal = *newSchema
        topicCache.Store(version, schemaVal)
        client.versionMap.Store(key, topicCache)
    }
    schema := schemaVal.(RecordSchema)

    return &schema, nil
}

func (client *schemaRegistryClient) getVersionBySchema(project, topic string, schema *RecordSchema) (int, error) {
    key := client.genKey(project, topic)

    value, ok := client.schemaMap.Load(key)
    if !ok {
        m := sync.Map{}
        client.schemaMap.Store(key, m)
        value, _ = client.schemaMap.Load(key)
    }

    topicCache := value.(sync.Map)
    schemaKey := schema.String()
    versionVal, ok := topicCache.Load(schemaKey)
    if !ok {
        newVersion, err := client.fetchVersionBySchema(project, topic, schema)
        if err != nil {
            return -1, err
        }
        versionVal = newVersion
        topicCache.Store(schemaKey, versionVal)
        client.schemaMap.Store(key, topicCache)
    }

    version := versionVal.(int)
    return version, nil
}

func (client *schemaRegistryClient) fetchSchemaById(project, topic string, version int) (*RecordSchema, error) {
    ret, err := client.dh.GetTopicSchemaByVersion(project, topic, version)
    if err != nil {
        return nil, err
    }

    return &ret.RecordSchema, nil
}

func (client *schemaRegistryClient) fetchVersionBySchema(project, topic string, schema *RecordSchema) (int, error) {
    ret, err := client.dh.GetTopicSchemaBySchema(project, topic, schema)

    if err != nil {
        return -1, err
    }

    return ret.VersionId, nil
}

func (client *schemaRegistryClient) genKey(project, topic string) string {
    return project + "/" + topic
}