Feat registry consul (#445)

Co-authored-by: johnlanni <zty98751@alibaba-inc.com>
This commit is contained in:
Jun
2023-08-01 16:16:28 +08:00
committed by GitHub
parent e5105a4d71
commit 9b88c6bb40
16 changed files with 1049 additions and 84 deletions

31
registry/auth_option.go Normal file
View File

@@ -0,0 +1,31 @@
// Copyright (c) 2022 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package registry
const (
AuthNacosUsernameKey = "nacosUsername"
AuthNacosPasswordKey = "nacosPassword"
AuthEtcdUsernameKey = "etcdUsername"
AuthEtcdPasswordKey = "etcdPassword"
AuthConsulTokenKey = "consulToken"
)
type AuthOption struct {
NacosUsername string
NacosPassword string
ConsulToken string
EtcdUsername string
EtcdPassword string
}

363
registry/consul/watcher.go Normal file
View File

@@ -0,0 +1,363 @@
// Copyright (c) 2022 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package consul
import (
"strconv"
"strings"
"sync"
"time"
apiv1 "github.com/alibaba/higress/api/networking/v1"
"github.com/alibaba/higress/pkg/common"
provider "github.com/alibaba/higress/registry"
"github.com/alibaba/higress/registry/memory"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
"istio.io/api/networking/v1alpha3"
"istio.io/pkg/log"
)
const (
ConuslHealthPassing = "passing"
DefaultRefreshInterval = time.Second * 30
DefaultRefreshIntervalLimit = time.Second * 10
)
type watcher struct {
provider.BaseWatcher
apiv1.RegistryConfig
serverAddress string
consulClient *consulapi.Client
consulCatalog *consulapi.Catalog
WatchingServices map[string]bool
watchers map[string]*watch.Plan
RegistryType provider.ServiceRegistryType
Status provider.WatcherStatus
cache memory.Cache
mutex *sync.Mutex
stop chan struct{}
isStop bool
updateCacheWhenEmpty bool
authOption provider.AuthOption
}
type WatcherOption func(w *watcher)
func WithType(t string) WatcherOption {
return func(w *watcher) {
w.Type = t
}
}
func WithName(name string) WatcherOption {
return func(w *watcher) {
w.Name = name
}
}
func WithDomain(domain string) WatcherOption {
return func(w *watcher) {
w.Domain = domain
}
}
func WithPort(port uint32) WatcherOption {
return func(w *watcher) {
w.Port = port
}
}
func WithDatacenter(dataCenter string) WatcherOption {
return func(w *watcher) {
w.ConsulDatacenter = dataCenter
}
}
func WithAuthOption(authOption provider.AuthOption) WatcherOption {
return func(w *watcher) {
w.authOption = authOption
}
}
func WithServiceTag(serviceTag string) WatcherOption {
return func(w *watcher) {
w.ConsulServiceTag = strings.ToLower(strings.TrimSpace(serviceTag))
}
}
func WithRefreshInterval(refreshInterval int64) WatcherOption {
return func(w *watcher) {
if refreshInterval < int64(DefaultRefreshIntervalLimit) {
refreshInterval = int64(DefaultRefreshIntervalLimit)
}
w.ConsulRefreshInterval = refreshInterval
}
}
func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, error) {
w := &watcher{
WatchingServices: make(map[string]bool),
watchers: make(map[string]*watch.Plan),
RegistryType: provider.Consul,
Status: provider.UnHealthy,
cache: cache,
mutex: &sync.Mutex{},
stop: make(chan struct{}),
}
// Set default
w.ConsulRefreshInterval = int64(DefaultRefreshInterval)
// Set option
for _, opt := range opts {
opt(w)
}
// Init consul client
w.serverAddress = w.Domain + ":" + strconv.Itoa(int(w.Port))
config := consulapi.DefaultConfig()
config.Address = w.serverAddress
config.Token = w.authOption.ConsulToken
client, err := consulapi.NewClient(config)
if err != nil {
log.Errorf("[NewWatcher] NewWatcher consul, err:%v, consul address:%s", err, w.serverAddress)
return nil, err
}
w.consulClient = client
w.consulCatalog = client.Catalog()
return w, nil
}
func (w *watcher) fetchAllServices() error {
log.Infof("consul fetchAllServices")
w.mutex.Lock()
defer w.mutex.Unlock()
if w.isStop {
return nil
}
fetchedServices := make(map[string]bool)
q := &consulapi.QueryOptions{}
q.Datacenter = w.ConsulDatacenter
q.Token = w.authOption.ConsulToken
services, _, err := w.consulCatalog.Services(q)
if err != nil {
log.Errorf("consul fetch all services error:%v", err)
return err
}
for serviceName, tags := range services {
if w.filterTags(w.ConsulServiceTag, tags) {
fetchedServices[serviceName] = true
}
}
log.Infof("consul fetch services num:%d", len(fetchedServices))
for serviceName := range w.WatchingServices {
if _, exist := fetchedServices[serviceName]; !exist {
err := w.unsubscribe(serviceName)
if err == nil {
delete(w.WatchingServices, serviceName)
}
}
}
for serviceName := range fetchedServices {
if _, exist := w.WatchingServices[serviceName]; !exist {
if !w.shouldSubscribe(serviceName) {
continue
}
err := w.subscribe(serviceName)
if err == nil {
w.WatchingServices[serviceName] = true
}
}
}
return nil
}
func (w *watcher) filterTags(consulTag string, tags []string) bool {
if len(consulTag) == 0 {
return true
}
if len(tags) == 0 {
return false
}
for _, tag := range tags {
if strings.ToLower(tag) == consulTag {
return true
}
}
return false
}
func (w *watcher) Run() {
ticker := time.NewTicker(time.Duration(w.ConsulRefreshInterval))
defer ticker.Stop()
w.Status = provider.ProbeWatcherStatus(w.Domain, strconv.FormatUint(uint64(w.Port), 10))
w.fetchAllServices()
w.Ready(true)
for {
select {
case <-ticker.C:
w.fetchAllServices()
case <-w.stop:
return
}
}
}
func (w *watcher) Stop() {
w.mutex.Lock()
defer w.mutex.Unlock()
for serviceName := range w.WatchingServices {
err := w.unsubscribe(serviceName)
if err == nil {
delete(w.WatchingServices, serviceName)
}
// clean the cache
suffix := strings.Join([]string{serviceName, w.ConsulDatacenter, w.Type}, common.DotSeparator)
host := strings.ReplaceAll(suffix, common.Underscore, common.Hyphen)
w.cache.DeleteServiceEntryWrapper(host)
}
w.isStop = true
close(w.stop)
w.Ready(false)
}
func (w *watcher) IsHealthy() bool {
return w.Status == provider.Healthy
}
func (w *watcher) GetRegistryType() string {
return w.RegistryType.String()
}
func (w *watcher) unsubscribe(serviceName string) error {
log.Infof("consul unsubscribe service, serviceName:%s", serviceName)
if plan, ok := w.watchers[serviceName]; ok {
plan.Stop()
delete(w.watchers, serviceName)
}
return nil
}
func (w *watcher) shouldSubscribe(serviceName string) bool {
return true
}
func (w *watcher) subscribe(serviceName string) error {
log.Infof("consul subscribe service, serviceName:%s", serviceName)
plan, err := watch.Parse(map[string]interface{}{
"type": "service",
"service": serviceName,
})
if err != nil {
return err
}
plan.Handler = w.getSubscribeCallback(serviceName)
plan.Token = w.authOption.ConsulToken
plan.Datacenter = w.ConsulDatacenter
go plan.Run(w.serverAddress)
w.watchers[serviceName] = plan
return nil
}
func (w *watcher) getSubscribeCallback(serviceName string) func(idx uint64, data interface{}) {
suffix := strings.Join([]string{serviceName, w.ConsulDatacenter, w.Type}, common.DotSeparator)
host := strings.ReplaceAll(suffix, common.Underscore, common.Hyphen)
return func(idx uint64, data interface{}) {
log.Infof("consul subscribe callback service, host:%s, serviceName:%s", host, serviceName)
switch services := data.(type) {
case []*consulapi.ServiceEntry:
defer w.UpdateService()
serviceEntry := w.generateServiceEntry(host, services)
if serviceEntry != nil {
log.Infof("consul update serviceEntry %s cache", host)
w.cache.UpdateServiceEntryWrapper(host, &memory.ServiceEntryWrapper{
ServiceEntry: serviceEntry,
ServiceName: serviceName,
Suffix: suffix,
RegistryType: w.Type,
})
} else {
log.Infof("consul serviceEntry %s is nil", host)
//w.cache.DeleteServiceEntryWrapper(host)
}
}
}
}
func (w *watcher) generateServiceEntry(host string, services []*consulapi.ServiceEntry) *v1alpha3.ServiceEntry {
portList := make([]*v1alpha3.Port, 0)
endpoints := make([]*v1alpha3.WorkloadEntry, 0)
for _, service := range services {
// service status: maintenance > critical > warning > passing
if service.Checks.AggregatedStatus() != ConuslHealthPassing {
continue
}
metaData := make(map[string]string, 0)
if service.Service.Meta != nil {
metaData = service.Service.Meta
}
protocol := common.HTTP
if metaData["protocol"] != "" {
protocol = common.ParseProtocol(metaData["protocol"])
}
port := &v1alpha3.Port{
Name: protocol.String(),
Number: uint32(service.Service.Port),
Protocol: protocol.String(),
}
if len(portList) == 0 {
portList = append(portList, port)
}
endpoint := v1alpha3.WorkloadEntry{
Address: service.Service.Address,
Ports: map[string]uint32{port.Protocol: port.Number},
Labels: metaData,
}
endpoints = append(endpoints, &endpoint)
}
if len(endpoints) == 0 {
return nil
}
se := &v1alpha3.ServiceEntry{
Hosts: []string{host},
Ports: portList,
Location: v1alpha3.ServiceEntry_MESH_INTERNAL,
Resolution: v1alpha3.ServiceEntry_STATIC,
Endpoints: endpoints,
}
return se
}

View File

@@ -67,6 +67,7 @@ type watcher struct {
addrProvider *address.NacosAddressProvider
updateCacheWhenEmpty bool
nacosClietConfig *constant.ClientConfig
authOption provider.AuthOption
}
type WatcherOption func(w *watcher)
@@ -106,6 +107,8 @@ func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, er
constant.WithNamespaceId(w.NacosNamespaceId),
constant.WithAccessKey(w.NacosAccessKey),
constant.WithSecretKey(w.NacosSecretKey),
constant.WithUsername(w.authOption.NacosUsername),
constant.WithPassword(w.authOption.NacosPassword),
)
initTimer := time.NewTimer(DefaultInitTimeout)
@@ -224,6 +227,12 @@ func WithUpdateCacheWhenEmpty(enable bool) WatcherOption {
}
}
func WithAuthOption(authOption provider.AuthOption) WatcherOption {
return func(w *watcher) {
w.authOption = authOption
}
}
func (w *watcher) Run() {
ticker := time.NewTicker(time.Duration(w.NacosRefreshInterval))
defer ticker.Stop()

View File

@@ -64,6 +64,7 @@ type watcher struct {
client *versionedclient.Clientset
isStop bool
updateCacheWhenEmpty bool
authOption provider.AuthOption
}
type WatcherOption func(w *watcher)
@@ -193,6 +194,12 @@ func WithUpdateCacheWhenEmpty(enable bool) WatcherOption {
}
}
func WithAuthOption(authOption provider.AuthOption) WatcherOption {
return func(w *watcher) {
w.authOption = authOption
}
}
func (w *watcher) Run() {
ticker := time.NewTicker(time.Duration(w.NacosRefreshInterval))
defer ticker.Stop()

View File

@@ -15,7 +15,9 @@
package reconcile
import (
"context"
"errors"
"fmt"
"path"
"reflect"
"sync"
@@ -24,12 +26,15 @@ import (
apiv1 "github.com/alibaba/higress/api/networking/v1"
v1 "github.com/alibaba/higress/client/pkg/apis/networking/v1"
"github.com/alibaba/higress/pkg/kube"
. "github.com/alibaba/higress/registry"
"github.com/alibaba/higress/registry/consul"
"github.com/alibaba/higress/registry/direct"
"github.com/alibaba/higress/registry/memory"
"github.com/alibaba/higress/registry/nacos"
nacosv2 "github.com/alibaba/higress/registry/nacos/v2"
"github.com/alibaba/higress/registry/zookeeper"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type Reconciler struct {
@@ -37,14 +42,18 @@ type Reconciler struct {
registries map[string]*apiv1.RegistryConfig
watchers map[string]Watcher
serviceUpdate func()
client kube.Client
namespace string
}
func NewReconciler(serviceUpdate func()) *Reconciler {
func NewReconciler(serviceUpdate func(), client kube.Client, namespace string) *Reconciler {
return &Reconciler{
Cache: memory.NewCache(),
registries: make(map[string]*apiv1.RegistryConfig),
watchers: make(map[string]Watcher),
serviceUpdate: serviceUpdate,
client: client,
namespace: namespace,
}
}
@@ -123,6 +132,11 @@ func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryC
var watcher Watcher
var err error
authOption, err := r.getAuthOption(registry)
if err != nil {
return nil, err
}
switch registry.Type {
case string(Nacos):
watcher, err = nacos.NewWatcher(
@@ -135,6 +149,7 @@ func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryC
nacos.WithNacosNamespace(registry.NacosNamespace),
nacos.WithNacosGroups(registry.NacosGroups),
nacos.WithNacosRefreshInterval(registry.NacosRefreshInterval),
nacos.WithAuthOption(authOption),
)
case string(Nacos2):
watcher, err = nacosv2.NewWatcher(
@@ -150,6 +165,7 @@ func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryC
nacosv2.WithNacosNamespace(registry.NacosNamespace),
nacosv2.WithNacosGroups(registry.NacosGroups),
nacosv2.WithNacosRefreshInterval(registry.NacosRefreshInterval),
nacosv2.WithAuthOption(authOption),
)
case string(Zookeeper):
watcher, err = zookeeper.NewWatcher(
@@ -160,6 +176,18 @@ func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryC
zookeeper.WithPort(registry.Port),
zookeeper.WithZkServicesPath(registry.ZkServicesPath),
)
case string(Consul):
watcher, err = consul.NewWatcher(
r.Cache,
consul.WithType(registry.Type),
consul.WithName(registry.Name),
consul.WithDomain(registry.Domain),
consul.WithPort(registry.Port),
consul.WithDatacenter(registry.ConsulDatacenter),
consul.WithServiceTag(registry.ConsulServiceTag),
consul.WithRefreshInterval(registry.ConsulRefreshInterval),
consul.WithAuthOption(authOption),
)
case string(Static), string(DNS):
watcher, err = direct.NewWatcher(
r.Cache,
@@ -190,3 +218,39 @@ func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryC
return watcher, nil
}
func (r *Reconciler) getAuthOption(registry *apiv1.RegistryConfig) (AuthOption, error) {
authOption := AuthOption{}
authSecretName := registry.AuthSecretName
if len(authSecretName) == 0 {
return authOption, nil
}
authSecret, err := r.client.CoreV1().Secrets(r.namespace).Get(context.Background(), authSecretName, metav1.GetOptions{})
if err != nil {
return authOption, errors.New(fmt.Sprintf("get auth secret %s in namespace %s error:%v", authSecretName, r.namespace, err))
}
if nacosUsername, ok := authSecret.Data[AuthNacosUsernameKey]; ok {
authOption.NacosUsername = string(nacosUsername)
}
if nacosPassword, ok := authSecret.Data[AuthNacosPasswordKey]; ok {
authOption.NacosPassword = string(nacosPassword)
}
if consulToken, ok := authSecret.Data[AuthConsulTokenKey]; ok {
authOption.ConsulToken = string(consulToken)
}
if etcdUsername, ok := authSecret.Data[AuthEtcdUsernameKey]; ok {
authOption.EtcdUsername = string(etcdUsername)
}
if etcdPassword, ok := authSecret.Data[AuthEtcdPasswordKey]; ok {
authOption.EtcdPassword = string(etcdPassword)
}
return authOption, nil
}