add mcp bridge (#107)

This commit is contained in:
Zhanghaibin
2022-12-09 15:18:49 +08:00
committed by GitHub
parent 4776d62515
commit b08b00f9d5
26 changed files with 4290 additions and 0 deletions

11
api/buf.gen.yaml Normal file
View File

@@ -0,0 +1,11 @@
version: v1beta1
plugins:
- name: gogofast
out: .
opt: plugins=grpc,paths=source_relative,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto,Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/descriptor.proto=github.com/gogo/protobuf/protoc-gen-gogo/descriptor,Mgoogle/protobuf/duration.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/struct.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/wrappers.proto=github.com/gogo/protobuf/types,Mgoogle/rpc/status.proto=istio.io/gogo-genproto/googleapis/google/rpc,Mgoogle/rpc/code.proto=istio.io/gogo-genproto/googleapis/google/rpc,Mgoogle/rpc/error_details.proto=istio.io/gogo-genproto/googleapis/google/rpc,Mgoogle/api/field_behavior.proto=istio.io/gogo-genproto/googleapis/google/api
- name: deepcopy
out: .
opt: paths=source_relative,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto,Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/descriptor.proto=github.com/gogo/protobuf/protoc-gen-gogo/descriptor,Mgoogle/protobuf/duration.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/struct.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/wrappers.proto=github.com/gogo/protobuf/types,Mgoogle/rpc/status.proto=istio.io/gogo-genproto/googleapis/google/rpc,Mgoogle/rpc/code.proto=istio.io/gogo-genproto/googleapis/google/rpc,Mgoogle/rpc/error_details.proto=istio.io/gogo-genproto/googleapis/google/rpc,Mgoogle/api/field_behavior.proto=istio.io/gogo-genproto/googleapis/google/api
- name: jsonshim
out: .
opt: paths=source_relative,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto,Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/descriptor.proto=github.com/gogo/protobuf/protoc-gen-gogo/descriptor,Mgoogle/protobuf/duration.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/struct.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/wrappers.proto=github.com/gogo/protobuf/types,Mgoogle/rpc/status.proto=istio.io/gogo-genproto/googleapis/google/rpc,Mgoogle/rpc/code.proto=istio.io/gogo-genproto/googleapis/google/rpc,Mgoogle/rpc/error_details.proto=istio.io/gogo-genproto/googleapis/google/rpc,Mgoogle/api/field_behavior.proto=istio.io/gogo-genproto/googleapis/google/api

8
api/buf.yaml Normal file
View File

@@ -0,0 +1,8 @@
version: v1beta1
lint:
use:
- BASIC
except:
- FIELD_LOWER_SNAKE_CASE
- PACKAGE_DIRECTORY_MATCH
allow_comment_ignores: true

18
api/cue.yaml Normal file
View File

@@ -0,0 +1,18 @@
# Cuelang configuration to generate OpenAPI schema for Higress configs.
module: github.com/alibaba/higress/api
openapi:
selfContained: true
fieldFilter: "min.*|max.*"
directories:
networking/v1:
- mode: perFile
# All is used when generating all types referenced in the above directories to
# one file.
all:
title: All Higress types.
version: v1alpha1
oapiFilename: higress.gen.json

10
api/gen.sh Executable file
View File

@@ -0,0 +1,10 @@
#!/bin/bash
set -eu
# Generate all protos
buf generate \
--path networking \
# Generate CRDs
cue-gen -verbose -f=./cue.yaml -crd=true

1
api/github.com Symbolic link
View File

@@ -0,0 +1 @@
../external/api/common-protos/github.com

1
api/gogoproto Symbolic link
View File

@@ -0,0 +1 @@
../external/api/common-protos/gogoproto

1
api/google Symbolic link
View File

@@ -0,0 +1 @@
../external/api/common-protos/google

1
api/istio.io Symbolic link
View File

@@ -0,0 +1 @@
../external/api/common-protos/istio.io

1
api/k8s.io Symbolic link
View File

@@ -0,0 +1 @@
../external/api/common-protos/k8s.io

View File

@@ -0,0 +1,71 @@
# DO NOT EDIT - Generated by Cue OpenAPI generator based on Istio APIs.
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
"helm.sh/resource-policy": keep
name: mcpbridges.networking.higress.io
spec:
group: networking.higress.io
names:
categories:
- higress-io
kind: McpBridge
listKind: McpBridgeList
plural: mcpbridges
singular: mcpbridge
scope: Namespaced
versions:
- name: v1
schema:
openAPIV3Schema:
properties:
spec:
properties:
registries:
items:
properties:
consulNamespace:
type: string
domain:
type: string
nacosAccessKey:
type: string
nacosAddressServer:
type: string
nacosGroups:
items:
type: string
type: array
nacosNamespace:
type: string
nacosNamespaceId:
type: string
nacosRefreshInterval:
format: int64
type: integer
nacosSecretKey:
type: string
name:
type: string
port:
type: integer
type:
type: string
zkServicesPath:
items:
type: string
type: array
type: object
type: array
type: object
status:
type: object
x-kubernetes-preserve-unknown-fields: true
type: object
served: true
storage: true
subresources:
status: {}
---

View File

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,63 @@
// Copyright (c) 2022 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
import "google/api/field_behavior.proto";
// $schema: higress.networking.v1.McpBridge
// $title: McpBridge
// $description: Configuration affecting service discovery from multi registries
// $mode: none
package higress.networking.v1;
option go_package = "github.com/alibaba/higress/api/networking/v1";
// <!-- crd generation tags
// +cue-gen:McpBridge:groupName:networking.higress.io
// +cue-gen:McpBridge:version:v1
// +cue-gen:McpBridge:storageVersion
// +cue-gen:McpBridge:annotations:helm.sh/resource-policy=keep
// +cue-gen:McpBridge:subresource:status
// +cue-gen:McpBridge:scope:Namespaced
// +cue-gen:McpBridge:resource:categories=higress-io,plural=mcpbridges
// +cue-gen:McpBridge:preserveUnknownFields:false
// -->
//
// <!-- go code generation tags
// +kubetype-gen
// +kubetype-gen:groupVersion=networking.higress.io/v1
// +genclient
// +k8s:deepcopy-gen=true
// -->
message McpBridge {
repeated RegistryConfig registries = 1;
}
message RegistryConfig {
string type = 1 [(google.api.field_behavior) = REQUIRED];
string name = 2;
string domain = 3 [(google.api.field_behavior) = REQUIRED];
uint32 port = 4 [(google.api.field_behavior) = REQUIRED];
string nacosAddressServer = 5;
string nacosAccessKey = 6;
string nacosSecretKey = 7;
string nacosNamespaceId = 8;
string nacosNamespace = 9;
repeated string nacosGroups = 10;
int64 nacosRefreshInterval = 11;
string consulNamespace = 12;
repeated string zkServicesPath = 13;
}

View File

@@ -0,0 +1,58 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: networking/v1/mcp_bridge.proto
package v1
import (
fmt "fmt"
proto "github.com/gogo/protobuf/proto"
_ "istio.io/gogo-genproto/googleapis/google/api"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// DeepCopyInto supports using McpBridge within kubernetes types, where deepcopy-gen is used.
func (in *McpBridge) DeepCopyInto(out *McpBridge) {
p := proto.Clone(in).(*McpBridge)
*out = *p
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new McpBridge. Required by controller-gen.
func (in *McpBridge) DeepCopy() *McpBridge {
if in == nil {
return nil
}
out := new(McpBridge)
in.DeepCopyInto(out)
return out
}
// DeepCopyInterface is an autogenerated deepcopy function, copying the receiver, creating a new McpBridge. Required by controller-gen.
func (in *McpBridge) DeepCopyInterface() interface{} {
return in.DeepCopy()
}
// DeepCopyInto supports using RegistryConfig within kubernetes types, where deepcopy-gen is used.
func (in *RegistryConfig) DeepCopyInto(out *RegistryConfig) {
p := proto.Clone(in).(*RegistryConfig)
*out = *p
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RegistryConfig. Required by controller-gen.
func (in *RegistryConfig) DeepCopy() *RegistryConfig {
if in == nil {
return nil
}
out := new(RegistryConfig)
in.DeepCopyInto(out)
return out
}
// DeepCopyInterface is an autogenerated deepcopy function, copying the receiver, creating a new RegistryConfig. Required by controller-gen.
func (in *RegistryConfig) DeepCopyInterface() interface{} {
return in.DeepCopy()
}

View File

@@ -0,0 +1,45 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: networking/v1/mcp_bridge.proto
package v1
import (
bytes "bytes"
fmt "fmt"
github_com_gogo_protobuf_jsonpb "github.com/gogo/protobuf/jsonpb"
proto "github.com/gogo/protobuf/proto"
_ "istio.io/gogo-genproto/googleapis/google/api"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// MarshalJSON is a custom marshaler for McpBridge
func (this *McpBridge) MarshalJSON() ([]byte, error) {
str, err := McpBridgeMarshaler.MarshalToString(this)
return []byte(str), err
}
// UnmarshalJSON is a custom unmarshaler for McpBridge
func (this *McpBridge) UnmarshalJSON(b []byte) error {
return McpBridgeUnmarshaler.Unmarshal(bytes.NewReader(b), this)
}
// MarshalJSON is a custom marshaler for RegistryConfig
func (this *RegistryConfig) MarshalJSON() ([]byte, error) {
str, err := McpBridgeMarshaler.MarshalToString(this)
return []byte(str), err
}
// UnmarshalJSON is a custom unmarshaler for RegistryConfig
func (this *RegistryConfig) UnmarshalJSON(b []byte) error {
return McpBridgeUnmarshaler.Unmarshal(bytes.NewReader(b), this)
}
var (
McpBridgeMarshaler = &github_com_gogo_protobuf_jsonpb.Marshaler{}
McpBridgeUnmarshaler = &github_com_gogo_protobuf_jsonpb.Unmarshaler{AllowUnknownFields: true}
)

5
api/protocol.yaml Normal file
View File

@@ -0,0 +1,5 @@
protoc:
# This is ignored because we always run with
# --protoc-bin-path=/usr/bin/protoc to use the protoc from our
# container
version: 3.6.1

236
registry/memory/cache.go Normal file
View File

@@ -0,0 +1,236 @@
package memory
import (
"sort"
"strconv"
"sync"
"time"
"istio.io/api/networking/v1alpha3"
"github.com/alibaba/higress/pkg/common"
)
type Cache interface {
UpdateServiceEntryWrapper(service string, data *ServiceEntryWrapper)
DeleteServiceEntryWrapper(service string)
UpdateServiceEntryEnpointWrapper(service, ip, regionId, zoneId, protocol string, labels map[string]string)
GetServiceByEndpoints(requestVersions, endpoints map[string]bool, versionKey string, protocol common.Protocol) map[string][]string
GetAllServiceEntry() []*v1alpha3.ServiceEntry
GetAllServiceEntryWrapper() []*ServiceEntryWrapper
GetIncrementalServiceEntryWrapper() (updatedList []*ServiceEntryWrapper, deletedList []*ServiceEntryWrapper)
RemoveEndpointByIp(ip string)
}
func NewCache() Cache {
return &store{
mux: &sync.RWMutex{},
sew: make(map[string]*ServiceEntryWrapper),
toBeUpdated: make([]*ServiceEntryWrapper, 0),
toBeDeleted: make([]*ServiceEntryWrapper, 0),
ip2services: make(map[string]map[string]bool),
}
}
type store struct {
mux *sync.RWMutex
sew map[string]*ServiceEntryWrapper
toBeUpdated []*ServiceEntryWrapper
toBeDeleted []*ServiceEntryWrapper
ip2services map[string]map[string]bool
}
func (s *store) UpdateServiceEntryEnpointWrapper(service, ip, regionId, zoneId, protocol string, labels map[string]string) {
s.mux.Lock()
defer s.mux.Unlock()
if se, exist := s.sew[service]; exist {
idx := -1
for i, ep := range se.ServiceEntry.Endpoints {
if ep.Address == ip {
idx = i
if len(regionId) != 0 {
ep.Locality = regionId
if len(zoneId) != 0 {
ep.Locality = regionId + "/" + zoneId
}
}
if labels != nil {
for k, v := range labels {
if protocol == common.Dubbo.String() && k == "version" {
ep.Labels["appversion"] = v
continue
}
ep.Labels[k] = v
}
}
if idx != -1 {
se.ServiceEntry.Endpoints[idx] = ep
}
return
}
}
}
return
}
func (s *store) UpdateServiceEntryWrapper(service string, data *ServiceEntryWrapper) {
s.mux.Lock()
defer s.mux.Unlock()
if old, exist := s.sew[service]; exist {
data.SetCreateTime(old.GetCreateTime())
} else {
data.SetCreateTime(time.Now())
}
s.toBeUpdated = append(s.toBeUpdated, data)
s.sew[service] = data
}
func (s *store) DeleteServiceEntryWrapper(service string) {
s.mux.Lock()
defer s.mux.Unlock()
if data, exist := s.sew[service]; exist {
s.toBeDeleted = append(s.toBeDeleted, data)
}
delete(s.sew, service)
}
// GetServiceByEndpoints get the list of services of which "address:port" contained by the endpoints
// and the version of the service contained by the requestVersions. The result format is as below:
// key: serviceName + "#@" + suffix
// values: ["v1", "v2"] which has removed duplication
func (s *store) GetServiceByEndpoints(requestVersions, endpoints map[string]bool, versionKey string, protocol common.Protocol) map[string][]string {
s.mux.RLock()
defer s.mux.RUnlock()
result := make(map[string][]string)
for _, serviceEntryWrapper := range s.sew {
for _, workload := range serviceEntryWrapper.ServiceEntry.Endpoints {
port, exist := workload.Ports[protocol.String()]
if !exist {
continue
}
endpoint := workload.Address + common.ColonSeparator + strconv.Itoa(int(port))
if _, hit := endpoints[endpoint]; hit {
if version, has := workload.Labels[versionKey]; has {
if _, in := requestVersions[version]; in {
key := serviceEntryWrapper.ServiceName + common.SpecialSeparator + serviceEntryWrapper.Suffix
result[key] = append(result[key], version)
}
}
}
}
}
// remove duplication
for key, versions := range result {
sort.Strings(versions)
i := 0
for j := 1; j < len(versions); j++ {
if versions[j] != versions[i] {
i++
versions[i] = versions[j]
}
}
result[key] = versions[:i+1]
}
return result
}
// GetAllServiceEntry get all ServiceEntry in the store for xds push
func (s *store) GetAllServiceEntry() []*v1alpha3.ServiceEntry {
s.mux.RLock()
defer s.mux.RUnlock()
seList := make([]*v1alpha3.ServiceEntry, 0)
for _, serviceEntryWrapper := range s.sew {
if len(serviceEntryWrapper.ServiceEntry.Hosts) == 0 {
continue
}
seList = append(seList, serviceEntryWrapper.ServiceEntry.DeepCopy())
}
sort.Slice(seList, func(i, j int) bool {
return seList[i].Hosts[0] > seList[j].Hosts[0]
})
return seList
}
// GetAllServiceEntryWrapper get all ServiceEntryWrapper in the store for xds push
func (s *store) GetAllServiceEntryWrapper() []*ServiceEntryWrapper {
s.mux.RLock()
defer s.mux.RUnlock()
defer s.cleanUpdateAndDeleteArray()
sewList := make([]*ServiceEntryWrapper, 0)
for _, serviceEntryWrapper := range s.sew {
sewList = append(sewList, serviceEntryWrapper.DeepCopy())
}
return sewList
}
// GetIncrementalServiceEntryWrapper get incremental ServiceEntryWrapper in the store for xds push
func (s *store) GetIncrementalServiceEntryWrapper() ([]*ServiceEntryWrapper, []*ServiceEntryWrapper) {
s.mux.RLock()
defer s.mux.RUnlock()
defer s.cleanUpdateAndDeleteArray()
updatedList := make([]*ServiceEntryWrapper, 0)
for _, serviceEntryWrapper := range s.toBeUpdated {
updatedList = append(updatedList, serviceEntryWrapper.DeepCopy())
}
deletedList := make([]*ServiceEntryWrapper, 0)
for _, serviceEntryWrapper := range s.toBeDeleted {
deletedList = append(deletedList, serviceEntryWrapper.DeepCopy())
}
return updatedList, deletedList
}
func (s *store) cleanUpdateAndDeleteArray() {
s.toBeUpdated = nil
s.toBeDeleted = nil
}
func (s *store) updateIpMap(service string, data *ServiceEntryWrapper) {
for _, ep := range data.ServiceEntry.Endpoints {
if s.ip2services[ep.Address] == nil {
s.ip2services[ep.Address] = make(map[string]bool)
}
s.ip2services[ep.Address][service] = true
}
}
func (s *store) RemoveEndpointByIp(ip string) {
s.mux.Lock()
defer s.mux.Unlock()
services, has := s.ip2services[ip]
if !has {
return
}
delete(s.ip2services, ip)
for service := range services {
if data, exist := s.sew[service]; exist {
idx := -1
for i, ep := range data.ServiceEntry.Endpoints {
if ep.Address == ip {
idx = i
break
}
}
if idx != -1 {
data.ServiceEntry.Endpoints = append(data.ServiceEntry.Endpoints[:idx], data.ServiceEntry.Endpoints[idx+1:]...)
}
s.toBeUpdated = append(s.toBeUpdated, data)
}
}
}

30
registry/memory/model.go Normal file
View File

@@ -0,0 +1,30 @@
package memory
import (
"time"
"istio.io/api/networking/v1alpha3"
)
type ServiceEntryWrapper struct {
ServiceName string
ServiceEntry *v1alpha3.ServiceEntry
Suffix string
RegistryType string
createTime time.Time
}
func (sew *ServiceEntryWrapper) DeepCopy() *ServiceEntryWrapper {
return &ServiceEntryWrapper{
ServiceEntry: sew.ServiceEntry.DeepCopy(),
createTime: sew.GetCreateTime(),
}
}
func (sew *ServiceEntryWrapper) SetCreateTime(createTime time.Time) {
sew.createTime = createTime
}
func (sew *ServiceEntryWrapper) GetCreateTime() time.Time {
return sew.createTime
}

View File

@@ -0,0 +1,171 @@
package address
import (
"fmt"
"io/ioutil"
"math/rand"
"net"
"net/http"
"strings"
"sync"
"time"
"go.uber.org/atomic"
"istio.io/pkg/log"
)
const (
NACOS_PATH = "/nacos/serverlist"
MODULE_HEADER_KEY = "Request-Module"
MODULE_HEADER_VALUE = "Naming"
DEFAULT_INTERVAL = 30 * time.Second
)
type NacosAddressProvider struct {
serverAddr string
nacosAddr string
nacosBackupAddr []string
namespace string
stop chan struct{}
trigger chan struct{}
cond *sync.Cond
isStop *atomic.Bool
mutex *sync.Mutex
}
func NewNacosAddressProvider(serverAddr, namespace string) *NacosAddressProvider {
provider := &NacosAddressProvider{
serverAddr: serverAddr,
namespace: namespace,
stop: make(chan struct{}),
trigger: make(chan struct{}, 1),
cond: sync.NewCond(new(sync.Mutex)),
isStop: atomic.NewBool(false),
mutex: &sync.Mutex{},
}
go provider.Run()
return provider
}
func (p *NacosAddressProvider) Run() {
ticker := time.NewTicker(DEFAULT_INTERVAL)
defer ticker.Stop()
p.addressDiscovery()
for {
select {
case <-p.trigger:
p.addressDiscovery()
case <-ticker.C:
p.addressDiscovery()
case <-p.stop:
return
}
}
}
func (p *NacosAddressProvider) Update(serverAddr, namespace string) {
p.mutex.Lock()
p.serverAddr = serverAddr
p.namespace = namespace
p.mutex.Unlock()
p.addressDiscovery()
}
func (p *NacosAddressProvider) Trigger() {
p.cond.L.Lock()
oldAddr := p.nacosAddr
if len(p.nacosBackupAddr) > 0 {
p.nacosAddr = p.nacosBackupAddr[rand.Intn(len(p.nacosBackupAddr))]
for i := len(p.nacosBackupAddr) - 1; i >= 0; i-- {
if p.nacosBackupAddr[i] == p.nacosAddr {
p.nacosBackupAddr = append(p.nacosBackupAddr[:i], p.nacosBackupAddr[i+1:]...)
}
}
p.nacosBackupAddr = append(p.nacosBackupAddr, oldAddr)
}
p.cond.Broadcast()
p.cond.L.Unlock()
select {
case p.trigger <- struct{}{}:
default:
}
}
func (p *NacosAddressProvider) Stop() {
p.isStop.Store(true)
p.stop <- struct{}{}
}
func (p *NacosAddressProvider) GetNacosAddress(oldAddress string) <-chan string {
addressChan := make(chan string)
go func() {
var addr string
p.cond.L.Lock()
log.Debugf("get nacos address, p.nacosAddr, oldAddress", p.nacosAddr, oldAddress)
for p.nacosAddr == oldAddress || p.nacosAddr == "" {
if p.isStop.Load() {
return
}
p.cond.Wait()
}
addr = p.nacosAddr
p.cond.L.Unlock()
addressChan <- addr
}()
return addressChan
}
func (p *NacosAddressProvider) addressDiscovery() {
p.mutex.Lock()
url := fmt.Sprintf("%s%s?namespace=%s", p.serverAddr, NACOS_PATH, p.namespace)
p.mutex.Unlock()
if !strings.HasPrefix(url, "http://") && !strings.HasPrefix(url, "https://") {
url = "http://" + url
}
req, err := http.NewRequest("GET", url, nil)
if err != nil {
log.Errorf("create request failed, err:%v, url:%s", err, url)
return
}
req.Header.Add(MODULE_HEADER_KEY, MODULE_HEADER_VALUE)
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Errorf("get nacos address failed, err:%v, url:%s", err, url)
return
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
log.Errorf("get nacos address failed, statusCode:%d", resp.StatusCode)
return
}
body, _ := ioutil.ReadAll(resp.Body)
addresses := string(body)
addrVec := strings.Fields(addresses)
if len(addrVec) == 0 {
return
}
needUpdate := true
p.cond.L.Lock()
for _, address := range addrVec {
ip := net.ParseIP(address)
if ip == nil {
log.Errorf("ip parse failed, ip:%s", address)
return
}
if p.nacosAddr == address {
needUpdate = false
}
}
p.nacosBackupAddr = addrVec
if needUpdate {
p.nacosAddr = addrVec[rand.Intn(len(addrVec))]
p.cond.Broadcast()
log.Infof("nacos address updated, address:%s", p.nacosAddr)
}
for i := len(p.nacosBackupAddr) - 1; i >= 0; i-- {
if p.nacosBackupAddr[i] == p.nacosAddr {
p.nacosBackupAddr = append(p.nacosBackupAddr[:i], p.nacosBackupAddr[i+1:]...)
}
}
p.cond.L.Unlock()
}

View File

@@ -0,0 +1,287 @@
package address
import (
"net/http"
"net/http/httptest"
"testing"
"time"
)
func setUpServer(status int, body []byte) (string, func()) {
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(status)
rw.Write(body)
}))
return server.URL, func() {
server.Close()
}
}
func setUpServerWithBodyPtr(status int, body *[]byte) (string, func()) {
server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(status)
rw.Write(*body)
}))
return server.URL, func() {
server.Close()
}
}
func TestGetNacosAddress(t *testing.T) {
goodURL, goodTearDown := setUpServer(200, []byte("1.1.1.1\n 2.2.2.2"))
defer goodTearDown()
badURL, badTearDown := setUpServer(200, []byte("abc\n 2.2.2.2"))
defer badTearDown()
errURL, errTearDown := setUpServer(503, []byte("1.1.1.1\n 2.2.2.2"))
defer errTearDown()
tests := []struct {
name string
serverAddr string
want []string
}{
{
"good",
goodURL,
[]string{"1.1.1.1", "2.2.2.2"},
},
{
"bad",
badURL,
[]string{},
},
{
"err",
errURL,
[]string{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
provider := NewNacosAddressProvider(tt.serverAddr, "")
timeout := time.NewTicker(1 * time.Second)
var got string
if len(tt.want) == 0 {
select {
case got = <-provider.GetNacosAddress(""):
t.Errorf("GetNacosAddress() = %v, want empty", got)
case <-timeout.C:
return
}
}
select {
case got = <-provider.GetNacosAddress(""):
case <-timeout.C:
t.Error("GetNacosAddress timeout")
}
for _, value := range tt.want {
if got == value {
return
}
}
t.Errorf("GetNacosAddress() = %v, want %v", got, tt.want)
})
}
}
func TestTrigger(t *testing.T) {
body := []byte("1.1.1.1 ")
url, tearDown := setUpServerWithBodyPtr(200, &body)
defer tearDown()
provider := NewNacosAddressProvider(url, "xxxx")
address := <-provider.GetNacosAddress("")
if address != "1.1.1.1" {
t.Errorf("got %s, want %s", address, "1.1.1.1")
}
body = []byte(" 2.2.2.2 ")
tests := []struct {
name string
trigger bool
want string
}{
{
"no trigger",
false,
"1.1.1.1",
},
{
"trigger",
true,
"2.2.2.2",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.trigger {
provider.Trigger()
}
timeout := time.NewTicker(1 * time.Second)
select {
case <-provider.GetNacosAddress("1.1.1.1"):
case <-timeout.C:
}
if provider.nacosAddr != tt.want {
t.Errorf("got %s, want %s", provider.nacosAddr, tt.want)
}
})
}
}
func TestBackup(t *testing.T) {
body := []byte("1.1.1.1 ")
url, tearDown := setUpServerWithBodyPtr(200, &body)
defer tearDown()
provider := NewNacosAddressProvider(url, "xxxx")
address := <-provider.GetNacosAddress("")
if address != "1.1.1.1" {
t.Errorf("got %s, want %s", address, "1.1.1.1")
}
tests := []struct {
name string
oldaddr string
newaddr string
triggerNum int
want string
}{
{
"case1",
"1.1.1.1",
"1.1.1.1\n2.2.2.2",
1,
"2.2.2.2",
},
{
"case2",
"1.1.1.1",
"3.3.3.3 1.1.1.1",
1,
"3.3.3.3",
},
{
"case3",
"1.1.1.1",
"3.3.3.3 1.1.1.1",
2,
"1.1.1.1",
},
{
"case4",
"1.1.1.1",
"3.3.3.3\n 1.1.1.1",
3,
"3.3.3.3",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
provider.nacosAddr = tt.oldaddr
body = []byte(tt.newaddr)
provider.addressDiscovery()
for i := 0; i < tt.triggerNum; i++ {
provider.Trigger()
}
timeout := time.NewTicker(1 * time.Second)
var newAddr string
select {
case newAddr = <-provider.GetNacosAddress(""):
case <-timeout.C:
}
if newAddr != tt.want {
t.Errorf("got %s, want %s", newAddr, tt.want)
}
})
}
}
func TestKeepIp(t *testing.T) {
body := []byte("1.1.1.1")
url, tearDown := setUpServerWithBodyPtr(200, &body)
defer tearDown()
provider := NewNacosAddressProvider(url, "xxxx")
address := <-provider.GetNacosAddress("")
if address != "1.1.1.1" {
t.Errorf("got %s, want %s", address, "1.1.1.1")
}
tests := []struct {
name string
newAddr []byte
want string
}{
{
"add ip",
[]byte("1.1.1.1\n 2.2.2.2"),
"1.1.1.1",
},
{
"remove ip",
[]byte("2.2.2.2"),
"2.2.2.2",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
body = tt.newAddr
provider.addressDiscovery()
timeout := time.NewTicker(1 * time.Second)
select {
case <-provider.GetNacosAddress("1.1.1.1"):
case <-timeout.C:
}
if provider.nacosAddr != tt.want {
t.Errorf("got %s, want %s", provider.nacosAddr, tt.want)
}
})
}
}
func TestMultiClient(t *testing.T) {
body := []byte("1.1.1.1")
url, tearDown := setUpServerWithBodyPtr(200, &body)
defer tearDown()
provider := NewNacosAddressProvider(url, "xxxx")
address := <-provider.GetNacosAddress("")
if address != "1.1.1.1" {
t.Errorf("got %s, want %s", address, "1.1.1.1")
}
body = []byte("2.2.2.2")
tests := []struct {
name string
oldAddrs []string
want []string
}{
{
"case1",
[]string{"1.1.1.1", "1.1.1.1"},
[]string{"2.2.2.2", "2.2.2.2"},
},
{
"case2",
[]string{"2.2.2.2", "1.1.1.1"},
[]string{"", "2.2.2.2"},
},
{
"case3",
[]string{"1.1.1.1", "2.2.2.2"},
[]string{"2.2.2.2", ""},
},
{
"case4",
[]string{"2.2.2.2", "2.2.2.2"},
[]string{"", ""},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
provider.addressDiscovery()
for i := 0; i < len(tt.oldAddrs); i++ {
timeout := time.NewTicker(1 * time.Second)
var newaddr string
select {
case newaddr = <-provider.GetNacosAddress(tt.oldAddrs[i]):
case <-timeout.C:
}
if newaddr != tt.want[i] {
t.Errorf("got %s, want %s", newaddr, tt.want[i])
}
}
})
}
}

View File

@@ -0,0 +1,502 @@
package v2
import (
"errors"
"strconv"
"strings"
"sync"
"time"
"github.com/nacos-group/nacos-sdk-go/v2/clients"
"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
"github.com/nacos-group/nacos-sdk-go/v2/model"
"github.com/nacos-group/nacos-sdk-go/v2/vo"
"go.uber.org/atomic"
"istio.io/api/networking/v1alpha3"
"istio.io/pkg/log"
apiv1 "github.com/alibaba/higress/api/networking/v1"
"github.com/alibaba/higress/pkg/common"
provider "github.com/alibaba/higress/registry"
"github.com/alibaba/higress/registry/memory"
"github.com/alibaba/higress/registry/nacos/address"
)
const (
DefaultInitTimeout = time.Second * 10
DefaultNacosTimeout = 5000
DefaultNacosLogLevel = "warn"
DefaultNacosLogDir = "log/nacos/log/"
DefaultNacosCacheDir = "log/nacos/cache/"
DefaultNacosNotLoadCache = true
DefaultNacosLogRotateTime = "24h"
DefaultNacosLogMaxAge = 3
DefaultUpdateCacheWhenEmpty = true
DefaultRefreshInterval = time.Second * 30
DefaultRefreshIntervalLimit = time.Second * 10
DefaultFetchPageSize = 50
DefaultJoiner = "@@"
)
type watcher struct {
provider.BaseWatcher
apiv1.RegistryConfig
WatchingServices map[string]bool `json:"watching_services"`
RegistryType provider.ServiceRegistryType `json:"registry_type"`
Status provider.WatcherStatus `json:"status"`
namingClient naming_client.INamingClient
updateHandler provider.ServiceUpdateHandler
readyHandler provider.ReadyHandler
cache memory.Cache
mutex *sync.Mutex
stop chan struct{}
isStop bool
addrProvider *address.NacosAddressProvider
updateCacheWhenEmpty bool
nacosClietConfig *constant.ClientConfig
}
type WatcherOption func(w *watcher)
func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, error) {
w := &watcher{
WatchingServices: make(map[string]bool),
RegistryType: provider.Nacos2,
Status: provider.UnHealthy,
cache: cache,
mutex: &sync.Mutex{},
stop: make(chan struct{}),
}
w.NacosRefreshInterval = int64(DefaultRefreshInterval)
for _, opt := range opts {
opt(w)
}
log.Infof("new nacos2 watcher with config Name:%s", w.Name)
w.nacosClietConfig = constant.NewClientConfig(
constant.WithTimeoutMs(DefaultNacosTimeout),
constant.WithLogLevel(DefaultNacosLogLevel),
constant.WithLogDir(DefaultNacosLogDir),
constant.WithCacheDir(DefaultNacosCacheDir),
constant.WithNotLoadCacheAtStart(DefaultNacosNotLoadCache),
constant.WithLogRollingConfig(&constant.ClientLogRollingConfig{
MaxAge: DefaultNacosLogMaxAge,
}),
constant.WithUpdateCacheWhenEmpty(w.updateCacheWhenEmpty),
constant.WithNamespaceId(w.NacosNamespaceId),
constant.WithAccessKey(w.NacosAccessKey),
constant.WithSecretKey(w.NacosSecretKey),
)
initTimer := time.NewTimer(DefaultInitTimeout)
if w.NacosAddressServer != "" {
w.addrProvider = address.NewNacosAddressProvider(w.NacosAddressServer, w.NacosNamespace)
w.Domain = ""
select {
case w.Domain = <-w.addrProvider.GetNacosAddress(w.Domain):
case <-initTimer.C:
return nil, errors.New("new nacos2 watcher timeout")
}
go w.updateNacosClient()
}
sc := []constant.ServerConfig{
*constant.NewServerConfig(w.Domain, uint64(w.Port)),
}
success := make(chan struct{})
go func() {
namingClient, err := clients.NewNamingClient(vo.NacosClientParam{
ClientConfig: w.nacosClietConfig,
ServerConfigs: sc,
})
if err == nil {
w.namingClient = namingClient
close(success)
} else {
log.Errorf("can not create naming client, err:%v", err)
}
}()
select {
case <-initTimer.C:
return nil, errors.New("new nacos2 watcher timeout")
case <-success:
return w, nil
}
}
func WithNacosAddressServer(nacosAddressServer string) WatcherOption {
return func(w *watcher) {
w.NacosAddressServer = nacosAddressServer
}
}
func WithNacosAccessKey(nacosAccessKey string) WatcherOption {
return func(w *watcher) {
w.NacosAccessKey = nacosAccessKey
}
}
func WithNacosSecretKey(nacosSecretKey string) WatcherOption {
return func(w *watcher) {
w.NacosSecretKey = nacosSecretKey
}
}
func WithNacosNamespaceId(nacosNamespaceId string) WatcherOption {
return func(w *watcher) {
w.NacosNamespaceId = nacosNamespaceId
}
}
func WithNacosNamespace(nacosNamespace string) WatcherOption {
return func(w *watcher) {
w.NacosNamespace = nacosNamespace
}
}
func WithNacosGroups(nacosGroups []string) WatcherOption {
return func(w *watcher) {
w.NacosGroups = nacosGroups
}
}
func WithNacosRefreshInterval(refreshInterval int64) WatcherOption {
return func(w *watcher) {
if refreshInterval < int64(DefaultRefreshIntervalLimit) {
refreshInterval = int64(DefaultRefreshIntervalLimit)
}
w.NacosRefreshInterval = refreshInterval
}
}
func WithType(t string) WatcherOption {
return func(w *watcher) {
w.Type = t
}
}
func WithName(name string) WatcherOption {
return func(w *watcher) {
w.Name = name
}
}
func WithDomain(domain string) WatcherOption {
return func(w *watcher) {
w.Domain = domain
}
}
func WithPort(port uint32) WatcherOption {
return func(w *watcher) {
w.Port = port
}
}
func WithUpdateCacheWhenEmpty(enable bool) WatcherOption {
return func(w *watcher) {
w.updateCacheWhenEmpty = enable
}
}
func (w *watcher) Run() {
ticker := time.NewTicker(time.Duration(w.NacosRefreshInterval))
defer ticker.Stop()
w.Status = provider.ProbeWatcherStatus(w.Domain, strconv.FormatUint(uint64(w.Port), 10))
err := w.fetchAllServices()
if err != nil {
log.Errorf("first fetch services failed, err:%v", err)
} else {
w.readyHandler(true)
}
for {
select {
case <-ticker.C:
err := w.fetchAllServices()
if err != nil {
log.Errorf("fetch services failed, err:%v", err)
} else {
w.readyHandler(true)
}
case <-w.stop:
return
}
}
}
func (w *watcher) updateNacosClient() {
for {
select {
case addr := <-w.addrProvider.GetNacosAddress(w.Domain):
func() {
w.mutex.Lock()
defer w.mutex.Unlock()
w.Domain = addr
namingClient, err := clients.NewNamingClient(vo.NacosClientParam{
ClientConfig: w.nacosClietConfig,
ServerConfigs: []constant.ServerConfig{
*constant.NewServerConfig(addr, uint64(w.Port)),
},
})
if err != nil {
log.Errorf("can not update naming client, err:%v", err)
return
}
w.namingClient = namingClient
log.Info("naming client updated")
}()
case <-w.stop:
return
}
}
}
func (w *watcher) fetchAllServices() error {
w.mutex.Lock()
defer w.mutex.Unlock()
if w.isStop {
return nil
}
fetchedServices := make(map[string]bool)
var tries int
for _, groupName := range w.NacosGroups {
for page := 1; ; page++ {
ss, err := w.namingClient.GetAllServicesInfo(vo.GetAllServiceInfoParam{
GroupName: groupName,
PageNo: uint32(page),
PageSize: DefaultFetchPageSize,
NameSpace: w.NacosNamespace,
})
if err != nil {
if tries > 10 {
return err
}
if w.addrProvider != nil {
w.addrProvider.Trigger()
}
log.Errorf("fetch nacos service list failed, err:%v, pageNo:%d", err, page)
page--
tries++
continue
}
for _, serviceName := range ss.Doms {
fetchedServices[groupName+DefaultJoiner+serviceName] = true
}
if ss.Count < DefaultFetchPageSize {
break
}
}
}
for key := range w.WatchingServices {
if _, exist := fetchedServices[key]; !exist {
s := strings.Split(key, DefaultJoiner)
err := w.unsubscribe(s[0], s[1])
if err != nil {
return err
}
delete(w.WatchingServices, key)
}
}
wg := sync.WaitGroup{}
subscribeFailed := atomic.NewBool(false)
watchingKeys := make(chan string, len(fetchedServices))
for key := range fetchedServices {
if _, exist := w.WatchingServices[key]; !exist {
s := strings.Split(key, DefaultJoiner)
if !shouldSubscribe(s[1]) {
continue
}
wg.Add(1)
go func(k string) {
err := w.subscribe(s[0], s[1])
if err != nil {
subscribeFailed.Store(true)
log.Errorf("subscribe failed, err:%v, group:%s, service:%s", err, s[0], s[1])
} else {
watchingKeys <- k
}
wg.Done()
}(key)
}
}
wg.Wait()
close(watchingKeys)
for key := range watchingKeys {
w.WatchingServices[key] = true
}
if subscribeFailed.Load() {
return errors.New("subscribe services failed")
}
return nil
}
func (w *watcher) subscribe(groupName string, serviceName string) error {
log.Debugf("subscribe service, groupName:%s, serviceName:%s", groupName, serviceName)
err := w.namingClient.Subscribe(&vo.SubscribeParam{
ServiceName: serviceName,
GroupName: groupName,
SubscribeCallback: w.getSubscribeCallback(groupName, serviceName),
})
if err != nil {
log.Errorf("subscribe service error:%v, groupName:%s, serviceName:%s", err, groupName, serviceName)
return err
}
return nil
}
func (w *watcher) unsubscribe(groupName string, serviceName string) error {
log.Debugf("unsubscribe service, groupName:%s, serviceName:%s", groupName, serviceName)
err := w.namingClient.Unsubscribe(&vo.SubscribeParam{
ServiceName: serviceName,
GroupName: groupName,
SubscribeCallback: w.getSubscribeCallback(groupName, serviceName),
})
if err != nil {
log.Errorf("unsubscribe service error:%v, groupName:%s, serviceName:%s", err, groupName, serviceName)
return err
}
return nil
}
func (w *watcher) getSubscribeCallback(groupName string, serviceName string) func(services []model.Instance, err error) {
suffix := strings.Join([]string{groupName, w.NacosNamespace, "nacos"}, common.DotSeparator)
suffix = strings.ReplaceAll(suffix, common.Underscore, common.Hyphen)
host := strings.Join([]string{serviceName, suffix}, common.DotSeparator)
return func(services []model.Instance, err error) {
defer w.updateHandler()
//log.Info("callback", "serviceName", serviceName, "suffix", suffix, "details", services)
if err != nil {
if strings.Contains(err.Error(), "hosts is empty") {
if w.updateCacheWhenEmpty {
w.cache.DeleteServiceEntryWrapper(host)
}
} else {
log.Errorf("callback error:%v", err)
}
return
}
if len(services) > 0 && services[0].Metadata != nil && services[0].Metadata["register-resource"] == "mcp-bridge" {
return
}
serviceEntry := w.generateServiceEntry(host, services)
w.cache.UpdateServiceEntryWrapper(host, &memory.ServiceEntryWrapper{
ServiceName: serviceName,
ServiceEntry: serviceEntry,
Suffix: suffix,
RegistryType: w.Type,
})
}
}
func (w *watcher) generateServiceEntry(host string, services []model.Instance) *v1alpha3.ServiceEntry {
portList := make([]*v1alpha3.Port, 0)
endpoints := make([]*v1alpha3.WorkloadEntry, 0)
for _, service := range services {
protocol := common.HTTP
if service.Metadata != nil && service.Metadata["protocol"] != "" {
protocol = common.ParseProtocol(service.Metadata["protocol"])
}
port := &v1alpha3.Port{
Name: protocol.String(),
Number: uint32(service.Port),
Protocol: protocol.String(),
}
if len(portList) == 0 {
portList = append(portList, port)
}
endpoint := &v1alpha3.WorkloadEntry{
Address: service.Ip,
Ports: map[string]uint32{port.Protocol: port.Number},
Labels: service.Metadata,
}
endpoints = append(endpoints, endpoint)
}
se := &v1alpha3.ServiceEntry{
Hosts: []string{host},
Ports: portList,
Location: v1alpha3.ServiceEntry_MESH_INTERNAL,
Resolution: v1alpha3.ServiceEntry_STATIC,
Endpoints: endpoints,
}
return se
}
func (w *watcher) Stop() {
w.mutex.Lock()
defer w.mutex.Unlock()
if w.addrProvider != nil {
w.addrProvider.Stop()
}
for key := range w.WatchingServices {
s := strings.Split(key, DefaultJoiner)
err := w.unsubscribe(s[0], s[1])
if err == nil {
delete(w.WatchingServices, key)
}
// clean the cache
suffix := strings.Join([]string{s[0], w.NacosNamespace, w.Type}, common.DotSeparator)
suffix = strings.ReplaceAll(suffix, common.Underscore, common.Hyphen)
host := strings.Join([]string{s[1], suffix}, common.DotSeparator)
w.cache.DeleteServiceEntryWrapper(host)
}
w.isStop = true
w.stop <- struct{}{}
w.readyHandler(false)
}
func (w *watcher) IsHealthy() bool {
return w.Status == provider.Healthy
}
func (w *watcher) GetRegistryType() string {
return w.RegistryType.String()
}
func (w *watcher) AppendServiceUpdateHandler(f func()) {
w.updateHandler = f
}
func (w *watcher) ReadyHandler(f func(bool)) {
w.readyHandler = f
}
func shouldSubscribe(serviceName string) bool {
prefixFilters := []string{"consumers:"}
fullFilters := []string{""}
for _, f := range prefixFilters {
if strings.HasPrefix(serviceName, f) {
return false
}
}
for _, f := range fullFilters {
if serviceName == f {
return false
}
}
return true
}

406
registry/nacos/watcher.go Normal file
View File

@@ -0,0 +1,406 @@
package nacos
import (
"strconv"
"strings"
"sync"
"time"
"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/common/constant"
"github.com/nacos-group/nacos-sdk-go/model"
"github.com/nacos-group/nacos-sdk-go/vo"
"istio.io/api/networking/v1alpha3"
versionedclient "istio.io/client-go/pkg/clientset/versioned"
"istio.io/pkg/log"
ctrl "sigs.k8s.io/controller-runtime"
apiv1 "github.com/alibaba/higress/api/networking/v1"
"github.com/alibaba/higress/pkg/common"
provider "github.com/alibaba/higress/registry"
"github.com/alibaba/higress/registry/memory"
)
const (
DefaultNacosTimeout = 5000
DefaultNacosLogLevel = "warn"
DefaultNacosLogDir = "log/nacos/log/"
DefaultNacosCacheDir = "log/nacos/cache/"
DefaultNacosNotLoadCache = true
DefaultNacosLogRotateTime = "24h"
DefaultNacosLogMaxAge = 3
DefaultUpdateCacheWhenEmpty = true
DefaultRefreshInterval = time.Second * 30
DefaultRefreshIntervalLimit = time.Second * 10
DefaultFetchPageSize = 50
DefaultJoiner = "@@"
)
type watcher struct {
provider.BaseWatcher
apiv1.RegistryConfig
WatchingServices map[string]bool `json:"watching_services"`
RegistryType provider.ServiceRegistryType `json:"registry_type"`
Status provider.WatcherStatus `json:"status"`
namingClient naming_client.INamingClient
updateHandler provider.ServiceUpdateHandler
readyHandler provider.ReadyHandler
cache memory.Cache
mutex *sync.Mutex
stop chan struct{}
client *versionedclient.Clientset
isStop bool
updateCacheWhenEmpty bool
}
type WatcherOption func(w *watcher)
func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, error) {
w := &watcher{
WatchingServices: make(map[string]bool),
RegistryType: provider.Nacos,
Status: provider.UnHealthy,
cache: cache,
mutex: &sync.Mutex{},
stop: make(chan struct{}),
}
config, err := ctrl.GetConfig()
if err != nil {
return nil, err
}
ic, err := versionedclient.NewForConfig(config)
if err != nil {
log.Errorf("can not new istio client, err:%v", err)
return nil, err
}
w.client = ic
w.NacosRefreshInterval = int64(DefaultRefreshInterval)
for _, opt := range opts {
opt(w)
}
log.Infof("new nacos watcher with config Name:%s", w.Name)
cc := constant.NewClientConfig(
constant.WithTimeoutMs(DefaultNacosTimeout),
constant.WithLogLevel(DefaultNacosLogLevel),
constant.WithLogDir(DefaultNacosLogDir),
constant.WithCacheDir(DefaultNacosCacheDir),
constant.WithNotLoadCacheAtStart(DefaultNacosNotLoadCache),
constant.WithRotateTime(DefaultNacosLogRotateTime),
constant.WithMaxAge(DefaultNacosLogMaxAge),
constant.WithUpdateCacheWhenEmpty(w.updateCacheWhenEmpty),
constant.WithNamespaceId(w.NacosNamespaceId),
)
sc := []constant.ServerConfig{
*constant.NewServerConfig(w.Domain, uint64(w.Port)),
}
namingClient, err := clients.NewNamingClient(vo.NacosClientParam{
ClientConfig: cc,
ServerConfigs: sc,
})
if err != nil {
log.Errorf("can not create naming client, err:%v", err)
return nil, err
}
w.namingClient = namingClient
return w, nil
}
func WithNacosNamespaceId(nacosNamespaceId string) WatcherOption {
return func(w *watcher) {
w.NacosNamespaceId = nacosNamespaceId
}
}
func WithNacosNamespace(nacosNamespace string) WatcherOption {
return func(w *watcher) {
w.NacosNamespace = nacosNamespace
}
}
func WithNacosGroups(nacosGroups []string) WatcherOption {
return func(w *watcher) {
w.NacosGroups = nacosGroups
}
}
func WithNacosRefreshInterval(refreshInterval int64) WatcherOption {
return func(w *watcher) {
if refreshInterval < int64(DefaultRefreshIntervalLimit) {
refreshInterval = int64(DefaultRefreshIntervalLimit)
}
w.NacosRefreshInterval = refreshInterval
}
}
func WithType(t string) WatcherOption {
return func(w *watcher) {
w.Type = t
}
}
func WithName(name string) WatcherOption {
return func(w *watcher) {
w.Name = name
}
}
func WithDomain(domain string) WatcherOption {
return func(w *watcher) {
w.Domain = domain
}
}
func WithPort(port uint32) WatcherOption {
return func(w *watcher) {
w.Port = port
}
}
func WithUpdateCacheWhenEmpty(enable bool) WatcherOption {
return func(w *watcher) {
w.updateCacheWhenEmpty = enable
}
}
func (w *watcher) Run() {
ticker := time.NewTicker(time.Duration(w.NacosRefreshInterval))
defer ticker.Stop()
w.Status = provider.ProbeWatcherStatus(w.Domain, strconv.FormatUint(uint64(w.Port), 10))
w.fetchAllServices()
w.readyHandler(true)
for {
select {
case <-ticker.C:
w.fetchAllServices()
case <-w.stop:
return
}
}
}
func (w *watcher) fetchAllServices() error {
w.mutex.Lock()
defer w.mutex.Unlock()
if w.isStop {
return nil
}
fetchedServices := make(map[string]bool)
for _, groupName := range w.NacosGroups {
for page := 1; ; page++ {
ss, err := w.namingClient.GetAllServicesInfo(vo.GetAllServiceInfoParam{
GroupName: groupName,
PageNo: uint32(page),
PageSize: DefaultFetchPageSize,
NameSpace: w.NacosNamespace,
})
if err != nil {
log.Errorf("fetch all services error:%v", err)
break
}
for _, serviceName := range ss.Doms {
fetchedServices[groupName+DefaultJoiner+serviceName] = true
}
if ss.Count < DefaultFetchPageSize {
break
}
}
}
for key := range w.WatchingServices {
if _, exist := fetchedServices[key]; !exist {
s := strings.Split(key, DefaultJoiner)
err := w.unsubscribe(s[0], s[1])
if err == nil {
delete(w.WatchingServices, key)
}
}
}
for key := range fetchedServices {
if _, exist := w.WatchingServices[key]; !exist {
s := strings.Split(key, DefaultJoiner)
if !shouldSubscribe(s[1]) {
continue
}
err := w.subscribe(s[0], s[1])
if err == nil {
w.WatchingServices[key] = true
}
}
}
return nil
}
func (w *watcher) subscribe(groupName string, serviceName string) error {
log.Debugf("subscribe service, groupName:%s, serviceName:%s", groupName, serviceName)
err := w.namingClient.Subscribe(&vo.SubscribeParam{
ServiceName: serviceName,
GroupName: groupName,
SubscribeCallback: w.getSubscribeCallback(groupName, serviceName),
})
if err != nil {
log.Errorf("subscribe service error:%v, groupName:%s, serviceName:%s", err, groupName, serviceName)
return err
}
return nil
}
func (w *watcher) unsubscribe(groupName string, serviceName string) error {
log.Debugf("unsubscribe service, groupName:%s, serviceName:%s", groupName, serviceName)
err := w.namingClient.Unsubscribe(&vo.SubscribeParam{
ServiceName: serviceName,
GroupName: groupName,
SubscribeCallback: w.getSubscribeCallback(groupName, serviceName),
})
if err != nil {
log.Errorf("unsubscribe service error:%v, groupName:%s, serviceName:%s", err, groupName, serviceName)
return err
}
return nil
}
func (w *watcher) getSubscribeCallback(groupName string, serviceName string) func(services []model.SubscribeService, err error) {
suffix := strings.Join([]string{groupName, w.NacosNamespace, w.Type}, common.DotSeparator)
suffix = strings.ReplaceAll(suffix, common.Underscore, common.Hyphen)
host := strings.Join([]string{serviceName, suffix}, common.DotSeparator)
return func(services []model.SubscribeService, err error) {
defer w.updateHandler()
//log.Info("callback", "serviceName", serviceName, "suffix", suffix, "details", services)
if err != nil {
if strings.Contains(err.Error(), "hosts is empty") {
if w.updateCacheWhenEmpty {
w.cache.DeleteServiceEntryWrapper(host)
}
} else {
log.Errorf("callback error:%v", err)
}
return
}
if len(services) > 0 && services[0].Metadata != nil && services[0].Metadata["register-resource"] == "mcp-bridge" {
return
}
serviceEntry := w.generateServiceEntry(host, services)
w.cache.UpdateServiceEntryWrapper(host, &memory.ServiceEntryWrapper{
ServiceName: serviceName,
ServiceEntry: serviceEntry,
Suffix: suffix,
RegistryType: w.Type,
})
}
}
func (w *watcher) generateServiceEntry(host string, services []model.SubscribeService) *v1alpha3.ServiceEntry {
portList := make([]*v1alpha3.Port, 0)
endpoints := make([]*v1alpha3.WorkloadEntry, 0)
for _, service := range services {
protocol := common.HTTP
if service.Metadata != nil && service.Metadata["protocol"] != "" {
protocol = common.ParseProtocol(service.Metadata["protocol"])
} else {
service.Metadata = make(map[string]string)
}
port := &v1alpha3.Port{
Name: protocol.String(),
Number: uint32(service.Port),
Protocol: protocol.String(),
}
if len(portList) == 0 {
portList = append(portList, port)
}
endpoint := v1alpha3.WorkloadEntry{
Address: service.Ip,
Ports: map[string]uint32{port.Protocol: port.Number},
Labels: service.Metadata,
}
endpoints = append(endpoints, &endpoint)
}
se := &v1alpha3.ServiceEntry{
Hosts: []string{host},
Ports: portList,
Location: v1alpha3.ServiceEntry_MESH_INTERNAL,
Resolution: v1alpha3.ServiceEntry_STATIC,
Endpoints: endpoints,
}
return se
}
func (w *watcher) Stop() {
w.mutex.Lock()
defer w.mutex.Unlock()
for key := range w.WatchingServices {
s := strings.Split(key, DefaultJoiner)
err := w.unsubscribe(s[0], s[1])
if err == nil {
delete(w.WatchingServices, key)
}
// clean the cache
suffix := strings.Join([]string{s[0], w.NacosNamespace, w.Type}, common.DotSeparator)
suffix = strings.ReplaceAll(suffix, common.Underscore, common.Hyphen)
host := strings.Join([]string{s[1], suffix}, common.DotSeparator)
w.cache.DeleteServiceEntryWrapper(host)
}
w.isStop = true
w.stop <- struct{}{}
w.readyHandler(false)
}
func (w *watcher) IsHealthy() bool {
return w.Status == provider.Healthy
}
func (w *watcher) GetRegistryType() string {
return w.RegistryType.String()
}
func (w *watcher) AppendServiceUpdateHandler(f func()) {
w.updateHandler = f
}
func (w *watcher) ReadyHandler(f func(bool)) {
w.readyHandler = f
}
func shouldSubscribe(serviceName string) bool {
prefixFilters := []string{"consumers:"}
fullFilters := []string{""}
for _, f := range prefixFilters {
if strings.HasPrefix(serviceName, f) {
return false
}
}
for _, f := range fullFilters {
if serviceName == f {
return false
}
}
return true
}

View File

@@ -0,0 +1,168 @@
package reconcile
import (
"errors"
"path"
"reflect"
"sync"
"istio.io/pkg/log"
apiv1 "github.com/alibaba/higress/api/networking/v1"
v1 "github.com/alibaba/higress/client/pkg/apis/networking/v1"
. "github.com/alibaba/higress/registry"
"github.com/alibaba/higress/registry/memory"
"github.com/alibaba/higress/registry/nacos"
nacosv2 "github.com/alibaba/higress/registry/nacos/v2"
"github.com/alibaba/higress/registry/zookeeper"
)
type Reconciler struct {
memory.Cache
registries map[string]*apiv1.RegistryConfig
watchers map[string]Watcher
serviceUpdate func()
}
func NewReconciler(serviceUpdate func()) *Reconciler {
return &Reconciler{
Cache: memory.NewCache(),
registries: make(map[string]*apiv1.RegistryConfig),
watchers: make(map[string]Watcher),
serviceUpdate: serviceUpdate,
}
}
func (r *Reconciler) Reconcile(mcpbridge *v1.McpBridge) {
newRegistries := make(map[string]*apiv1.RegistryConfig)
if mcpbridge != nil {
for _, registry := range mcpbridge.Spec.Registries {
newRegistries[path.Join(registry.Type, registry.Name)] = registry
}
}
var wg sync.WaitGroup
toBeCreated := make(map[string]*apiv1.RegistryConfig)
toBeUpdated := make(map[string]*apiv1.RegistryConfig)
toBeDeleted := make(map[string]*apiv1.RegistryConfig)
for key, newRegistry := range newRegistries {
if oldRegistry, ok := r.registries[key]; !ok {
toBeCreated[key] = newRegistry
} else if reflect.DeepEqual(newRegistry, oldRegistry) {
continue
} else {
toBeUpdated[key] = newRegistry
}
}
for key, oldRegistry := range r.registries {
if _, ok := newRegistries[key]; !ok {
toBeDeleted[key] = oldRegistry
}
}
errHappened := false
log.Infof("ReconcileRegistries, toBeCreated: %d, toBeUpdated: %d, toBeDeleted: %d",
len(toBeCreated), len(toBeUpdated), len(toBeDeleted))
for k, v := range toBeCreated {
watcher, err := r.generateWatcherFromRegistryConfig(v, &wg)
if err != nil {
errHappened = true
log.Errorf("ReconcileRegistries failed, err:%v", err)
continue
}
go watcher.Run()
r.watchers[k] = watcher
r.registries[k] = v
}
for k, v := range toBeUpdated {
go r.watchers[k].Stop()
delete(r.registries, k)
delete(r.watchers, k)
watcher, err := r.generateWatcherFromRegistryConfig(v, &wg)
if err != nil {
errHappened = true
log.Errorf("ReconcileRegistries failed, err:%v", err)
continue
}
go watcher.Run()
r.watchers[k] = watcher
r.registries[k] = v
}
for k := range toBeDeleted {
go r.watchers[k].Stop()
delete(r.registries, k)
delete(r.watchers, k)
}
if errHappened {
log.Error("ReconcileRegistries failed, Init Watchers failed")
return
}
wg.Wait()
log.Infof("Registries is reconciled")
}
func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryConfig, wg *sync.WaitGroup) (Watcher, error) {
var watcher Watcher
var err error
switch registry.Type {
case string(Nacos):
watcher, err = nacos.NewWatcher(
r.Cache,
nacos.WithType(registry.Type),
nacos.WithName(registry.Name),
nacos.WithDomain(registry.Domain),
nacos.WithPort(registry.Port),
nacos.WithNacosNamespaceId(registry.NacosNamespaceId),
nacos.WithNacosNamespace(registry.NacosNamespace),
nacos.WithNacosGroups(registry.NacosGroups),
nacos.WithNacosRefreshInterval(registry.NacosRefreshInterval),
)
case string(Nacos2):
watcher, err = nacosv2.NewWatcher(
r.Cache,
nacosv2.WithType(registry.Type),
nacosv2.WithName(registry.Name),
nacosv2.WithNacosAddressServer(registry.NacosAddressServer),
nacosv2.WithDomain(registry.Domain),
nacosv2.WithPort(registry.Port),
nacosv2.WithNacosAccessKey(registry.NacosAccessKey),
nacosv2.WithNacosSecretKey(registry.NacosSecretKey),
nacosv2.WithNacosNamespaceId(registry.NacosNamespaceId),
nacosv2.WithNacosNamespace(registry.NacosNamespace),
nacosv2.WithNacosGroups(registry.NacosGroups),
nacosv2.WithNacosRefreshInterval(registry.NacosRefreshInterval),
)
case string(Zookeeper):
watcher, err = zookeeper.NewWatcher(
r.Cache,
zookeeper.WithType(registry.Type),
zookeeper.WithName(registry.Name),
zookeeper.WithDomain(registry.Domain),
zookeeper.WithPort(registry.Port),
zookeeper.WithZkServicesPath(registry.ZkServicesPath),
)
default:
return nil, errors.New("unsupported registry type:" + registry.Type)
}
if err != nil {
return nil, err
}
wg.Add(1)
var once sync.Once
watcher.ReadyHandler(func(ready bool) {
once.Do(func() {
wg.Done()
if ready {
log.Infof("Registry Watcher is ready, type:%s, name:%s", registry.Type, registry.Name)
}
})
})
watcher.AppendServiceUpdateHandler(r.serviceUpdate)
return watcher, nil
}

61
registry/watcher.go Normal file
View File

@@ -0,0 +1,61 @@
package registry
import (
"net"
"time"
)
const (
Zookeeper ServiceRegistryType = "zookeeper"
Eureka ServiceRegistryType = "eureka"
Consul ServiceRegistryType = "consul"
Nacos ServiceRegistryType = "nacos"
Nacos2 ServiceRegistryType = "nacos2"
Healthy WatcherStatus = "healthy"
UnHealthy WatcherStatus = "unhealthy"
DefaultDialTimeout = time.Second * 3
)
type ServiceRegistryType string
func (srt *ServiceRegistryType) String() string {
return string(*srt)
}
type WatcherStatus string
func (ws *WatcherStatus) String() string {
return string(*ws)
}
type Watcher interface {
Run()
Stop()
IsHealthy() bool
GetRegistryType() string
AppendServiceUpdateHandler(f func())
ReadyHandler(f func(bool))
}
type BaseWatcher struct{}
func (w *BaseWatcher) Run() {}
func (w *BaseWatcher) Stop() {}
func (w *BaseWatcher) IsHealthy() bool { return true }
func (w *BaseWatcher) GetRegistryType() string { return "" }
func (w *BaseWatcher) AppendServiceUpdateHandler(f func()) {}
func (w *BaseWatcher) ReadyHandler(f func(bool)) {}
type ServiceUpdateHandler func()
type ReadyHandler func(bool)
func ProbeWatcherStatus(host string, port string) WatcherStatus {
address := net.JoinHostPort(host, port)
conn, err := net.DialTimeout("tcp", address, DefaultDialTimeout)
if err != nil || conn == nil {
return UnHealthy
}
_ = conn.Close()
return Healthy
}

117
registry/zookeeper/types.go Normal file
View File

@@ -0,0 +1,117 @@
package zookeeper
import (
"errors"
"time"
)
const (
DEFAULT_REG_TIMEOUT = "10s"
DUBBO = "/dubbo/"
SPRING_CLOUD_SERVICES = "/services"
DUBBO_SERVICES = "/dubbo"
PROVIDERS = "/providers"
CONFIG = "config"
MAPPING = "mapping"
METADATA = "metadata"
DUBBO_PROTOCOL = "dubbo"
HTTP_PROTOCOL = "http"
VERSION = "version"
PROTOCOL = "protocol"
)
type ServiceType int
const (
DubboService ServiceType = iota
SpringCloudService
)
type EventType int
type Event struct {
Path string
Action EventType
Content []byte
InterfaceName string
ServiceType ServiceType
}
const (
// ConnDelay connection delay interval
ConnDelay = 3
// MaxFailTimes max fail times
MaxFailTimes = 3
)
var DefaultTTL = 10 * time.Minute
type InterfaceConfig struct {
Host string
Endpoints []Endpoint
Protocol string
ServiceType ServiceType
}
type Endpoint struct {
Ip string
Port string
Metadata map[string]string
}
var ErrNilChildren = errors.New("has none children")
func WithType(t string) WatcherOption {
return func(w *watcher) {
w.Type = t
}
}
func WithName(name string) WatcherOption {
return func(w *watcher) {
w.Name = name
}
}
func WithDomain(domain string) WatcherOption {
return func(w *watcher) {
w.Domain = domain
}
}
func WithPort(port uint32) WatcherOption {
return func(w *watcher) {
w.Port = port
}
}
type DataListener interface {
DataChange(eventType Event) bool // bool is return for interface implement is interesting
}
const (
// EventTypeAdd means add event
EventTypeAdd = iota
// EventTypeDel means del event
EventTypeDel
// EventTypeUpdate means update event
EventTypeUpdate
)
type ListServiceConfig struct {
UrlIndex string
InterfaceName string
Exit chan struct{}
ServiceType ServiceType
}
type SpringCloudInstancePayload struct {
Metadata map[string]string `json:"metadata"`
}
type SpringCloudInstance struct {
Name string `json:"name"`
Address string `json:"address"`
Port int `json:"port"`
Payload SpringCloudInstancePayload `json:"payload"`
}

View File

@@ -0,0 +1,762 @@
package zookeeper
import (
"encoding/json"
"errors"
"net/url"
"path"
"reflect"
"strconv"
"strings"
"sync"
"time"
"github.com/dubbogo/go-zookeeper/zk"
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
"github.com/hashicorp/go-multierror"
"go.uber.org/atomic"
"istio.io/api/networking/v1alpha3"
"istio.io/pkg/log"
apiv1 "github.com/alibaba/higress/api/networking/v1"
"github.com/alibaba/higress/pkg/common"
provider "github.com/alibaba/higress/registry"
"github.com/alibaba/higress/registry/memory"
)
type watchConfig struct {
exit chan struct{}
listen bool
}
type watcher struct {
provider.BaseWatcher
apiv1.RegistryConfig
WatchingServices map[string]watchConfig `json:"watching_services"`
RegistryType provider.ServiceRegistryType `json:"registry_type"`
Status provider.WatcherStatus `json:"status"`
serviceRemaind *atomic.Int32
updateHandler provider.ServiceUpdateHandler
readyHandler provider.ReadyHandler
cache memory.Cache
mutex *sync.Mutex
stop chan struct{}
zkClient *gxzookeeper.ZookeeperClient
reconnectCh <-chan struct{}
Done chan struct{}
seMux *sync.Mutex
serviceEntry map[string]InterfaceConfig
listIndex chan ListServiceConfig
listServiceChan chan struct{}
isStop bool
keepStaleWhenEmpty bool
zkServicesPath []string
}
type WatcherOption func(w *watcher)
func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, error) {
w := &watcher{
WatchingServices: make(map[string]watchConfig),
RegistryType: provider.Zookeeper,
Status: provider.UnHealthy,
cache: cache,
mutex: &sync.Mutex{},
stop: make(chan struct{}),
Done: make(chan struct{}),
seMux: &sync.Mutex{},
serviceEntry: make(map[string]InterfaceConfig),
listIndex: make(chan ListServiceConfig, 1),
listServiceChan: make(chan struct{}),
zkServicesPath: []string{SPRING_CLOUD_SERVICES},
}
timeout, _ := time.ParseDuration(DEFAULT_REG_TIMEOUT)
for _, opt := range opts {
opt(w)
}
var address []string
address = append(address, w.Domain+":"+strconv.Itoa(int(w.Port)))
newClient, cltErr := gxzookeeper.NewZookeeperClient("zk", address, false, gxzookeeper.WithZkTimeOut(timeout))
if cltErr != nil {
log.Errorf("[NewWatcher] NewWatcher zk, err:%v, zk address:%s", cltErr, address)
return nil, cltErr
}
valid := newClient.ZkConnValid()
if !valid {
log.Info("connect zk error")
return nil, errors.New("connect zk error")
}
connectEvent := make(chan zk.Event, 2)
newClient.RegisterEvent("", connectEvent)
connectTimer := time.NewTimer(timeout)
connectTimout := false
FOR:
for {
select {
case ev := <-connectEvent:
if ev.State == zk.StateConnected {
break FOR
}
case <-connectTimer.C:
connectTimout = true
break FOR
}
}
if connectTimout {
return nil, errors.New("connect zk timeout")
}
log.Info("zk connected")
newClient.UnregisterEvent("", connectEvent)
w.reconnectCh = newClient.Reconnect()
w.zkClient = newClient
go func() {
w.HandleClientRestart()
}()
return w, nil
}
func WithKeepStaleWhenEmpty(enable bool) WatcherOption {
return func(w *watcher) {
w.keepStaleWhenEmpty = enable
}
}
func WithZkServicesPath(paths []string) WatcherOption {
return func(w *watcher) {
for _, path := range paths {
path = strings.TrimSuffix(path, common.Slash)
if path == DUBBO_SERVICES || path == SPRING_CLOUD_SERVICES {
continue
}
w.zkServicesPath = append(w.zkServicesPath, path)
}
}
}
func (w *watcher) HandleClientRestart() {
for {
select {
case <-w.reconnectCh:
w.reconnectCh = w.zkClient.Reconnect()
log.Info("zkclient reconnected")
w.RestartCallBack()
time.Sleep(10 * time.Microsecond)
case <-w.Done:
log.Info("[HandleClientRestart] receive registry destroy event, quit client restart handler")
return
}
}
}
func (w *watcher) RestartCallBack() bool {
err := w.fetchAllServices()
if err != nil {
log.Errorf("[RestartCallBack] fetch all service for zk err:%v", err)
return false
}
return true
}
type serviceInfo struct {
serviceType ServiceType
rootPath string
service string
}
func (w *watcher) fetchedServices(fetchedServices map[string]serviceInfo, path string, serviceType ServiceType) error {
children, err := w.zkClient.GetChildren(path)
if err != nil {
if err == gxzookeeper.ErrNilChildren || err == gxzookeeper.ErrNilNode ||
strings.Contains(err.Error(), "has none children") {
return nil
} else {
log.Errorf("[fetchAllServices] can not get children, err:%v, path:%s", err, path)
return err
}
}
info := serviceInfo{
serviceType: serviceType,
rootPath: path,
}
for _, child := range children {
if child == CONFIG || child == MAPPING || child == METADATA {
continue
}
var interfaceName string
switch serviceType {
case DubboService:
interfaceName = child
case SpringCloudService:
info.service = child
if path == "" || path == common.Slash {
interfaceName = child
break
}
interfaceName = child + "." + strings.ReplaceAll(
strings.TrimPrefix(path, common.Slash), common.Slash, common.Hyphen)
}
fetchedServices[interfaceName] = info
log.Debugf("fetchedServices, interface:%s, path:%s", interfaceName, info.rootPath)
}
return nil
}
func (w *watcher) fetchAllServices(firstFetch ...bool) error {
w.mutex.Lock()
defer w.mutex.Unlock()
if w.isStop {
return nil
}
fetchedServices := make(map[string]serviceInfo)
var result error
err := w.fetchedServices(fetchedServices, DUBBO_SERVICES, DubboService)
if err != nil {
result = multierror.Append(result, err)
}
for _, path := range w.zkServicesPath {
err = w.fetchedServices(fetchedServices, path, SpringCloudService)
if err != nil {
result = multierror.Append(result, err)
}
}
for interfaceName, value := range w.WatchingServices {
if _, exist := fetchedServices[interfaceName]; !exist {
if value.exit != nil {
close(value.exit)
}
delete(w.WatchingServices, interfaceName)
}
}
var serviceConfigs []ListServiceConfig
for interfaceName, serviceInfo := range fetchedServices {
if _, exist := w.WatchingServices[interfaceName]; !exist {
w.WatchingServices[interfaceName] = watchConfig{
exit: make(chan struct{}),
listen: true,
}
serviceConfig := ListServiceConfig{
ServiceType: serviceInfo.serviceType,
InterfaceName: interfaceName,
Exit: w.WatchingServices[interfaceName].exit,
}
switch serviceInfo.serviceType {
case DubboService:
serviceConfig.UrlIndex = DUBBO + interfaceName + PROVIDERS
case SpringCloudService:
serviceConfig.UrlIndex = path.Join(serviceInfo.rootPath, serviceInfo.service)
default:
return errors.New("unkown type")
}
serviceConfigs = append(serviceConfigs, serviceConfig)
}
}
if len(firstFetch) > 0 && firstFetch[0] {
w.serviceRemaind = atomic.NewInt32(int32(len(serviceConfigs)))
}
for _, service := range serviceConfigs {
w.listIndex <- service
}
return result
}
func (w *watcher) ListenService() {
defer func() {
w.listServiceChan <- struct{}{}
}()
ttl := DefaultTTL
var failTimes int
for {
select {
case listIndex := <-w.listIndex:
go func() {
for {
log.Info(listIndex.UrlIndex)
children, childEventCh, err := w.zkClient.GetChildrenW(listIndex.UrlIndex)
if err != nil {
failTimes++
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
log.Errorf("[Zookeeper][ListenService] Get children of path zkRootPath with watcher failed, err:%v, index:%s", err, listIndex.UrlIndex)
// May be the provider does not ready yet, sleep failTimes * ConnDelay senconds to wait
after := time.After(timeSecondDuration(failTimes * ConnDelay))
select {
case <-after:
continue
case <-listIndex.Exit:
return
}
}
failTimes = 0
if len(children) > 0 {
w.ChildToServiceEntry(children, listIndex.InterfaceName, listIndex.UrlIndex, listIndex.ServiceType)
}
if w.serviceRemaind != nil {
w.serviceRemaind.Sub(1)
}
if w.startScheduleWatchTask(listIndex, children, ttl, childEventCh, listIndex.Exit) {
return
}
}
}()
case <-w.stop:
log.Info("[ListenService] is shutdown")
return
}
}
}
func (w *watcher) DataChange(eventType Event) bool {
//fmt.Println(eventType)
host, interfaceConfig, err := w.GetInterfaceConfig(eventType)
if err != nil {
log.Errorf("GetInterfaceConfig failed, err:%v, event:%v", err, eventType)
return false
}
if eventType.Action == EventTypeAdd || eventType.Action == EventTypeUpdate {
w.seMux.Lock()
isHave := false
value, ok := w.serviceEntry[host]
if ok {
for _, endpoint := range value.Endpoints {
if endpoint.Ip == interfaceConfig.Endpoints[0].Ip && endpoint.Port == interfaceConfig.Endpoints[0].Port {
isHave = true
}
}
if !isHave {
value.Endpoints = append(value.Endpoints, interfaceConfig.Endpoints[0])
}
w.serviceEntry[host] = value
} else {
w.serviceEntry[host] = *interfaceConfig
}
se := w.generateServiceEntry(w.serviceEntry[host])
w.seMux.Unlock()
w.cache.UpdateServiceEntryWrapper(host, &memory.ServiceEntryWrapper{
ServiceName: host,
ServiceEntry: se,
Suffix: "zookeeper",
RegistryType: w.Type,
})
w.updateHandler()
} else if eventType.Action == EventTypeDel {
w.seMux.Lock()
value, ok := w.serviceEntry[host]
if ok {
var endpoints []Endpoint
for _, endpoint := range value.Endpoints {
if endpoint.Ip == interfaceConfig.Endpoints[0].Ip && endpoint.Port == interfaceConfig.Endpoints[0].Port {
continue
} else {
endpoints = append(endpoints, endpoint)
}
}
value.Endpoints = endpoints
w.serviceEntry[host] = value
}
se := w.generateServiceEntry(w.serviceEntry[host])
w.seMux.Unlock()
//todo update
if len(se.Endpoints) == 0 {
if !w.keepStaleWhenEmpty {
w.cache.DeleteServiceEntryWrapper(host)
}
} else {
w.cache.UpdateServiceEntryWrapper(host, &memory.ServiceEntryWrapper{
ServiceName: host,
ServiceEntry: se,
Suffix: "zookeeper",
RegistryType: w.Type,
})
}
w.updateHandler()
}
return true
}
func (w *watcher) GetInterfaceConfig(event Event) (string, *InterfaceConfig, error) {
switch event.ServiceType {
case DubboService:
return w.GetDubboConfig(event.Path)
case SpringCloudService:
return w.GetSpringCloudConfig(event.InterfaceName, event.Content)
default:
return "", nil, errors.New("unknown service type")
}
}
func (w *watcher) GetSpringCloudConfig(intefaceName string, content []byte) (string, *InterfaceConfig, error) {
var instance SpringCloudInstance
err := json.Unmarshal(content, &instance)
if err != nil {
log.Errorf("unmarshal failed, err:%v, content:%s", err, content)
return "", nil, err
}
var config InterfaceConfig
host := intefaceName
config.Host = host
config.Protocol = common.HTTP.String()
if len(instance.Payload.Metadata) > 0 && instance.Payload.Metadata["protocol"] != "" {
config.Protocol = common.ParseProtocol(instance.Payload.Metadata["protocol"]).String()
}
port := strconv.Itoa(instance.Port)
if port == "" {
return "", nil, errors.New("empty port")
}
endpoint := Endpoint{
Ip: instance.Address,
Port: port,
Metadata: instance.Payload.Metadata,
}
config.Endpoints = []Endpoint{endpoint}
config.ServiceType = SpringCloudService
return host, &config, nil
}
func (w *watcher) GetDubboConfig(dubboUrl string) (string, *InterfaceConfig, error) {
dubboUrl = strings.Replace(dubboUrl, "%3F", "?", 1)
dubboUrl = strings.ReplaceAll(dubboUrl, "%3D", "=")
dubboUrl = strings.ReplaceAll(dubboUrl, "%26", "&")
tempPath := strings.Replace(dubboUrl, DUBBO, "", -1)
urls := strings.Split(tempPath, PROVIDERS+"/dubbo")
key := urls[0]
serviceUrl, urlParseErr := url.Parse(dubboUrl)
if urlParseErr != nil {
return "", nil, urlParseErr
}
var (
dubboInterfaceConfig InterfaceConfig
host string
)
serviceUrl.Path = strings.Replace(serviceUrl.Path, DUBBO+key+PROVIDERS+"/dubbo://", "", -1)
values, err := url.ParseQuery(serviceUrl.RawQuery)
if err != nil {
return "", nil, err
}
paths := strings.Split(serviceUrl.Path, "/")
if len(paths) > 0 {
var group string
_, ok := values["group"]
if ok {
group = values["group"][0]
}
version := "0.0.0"
_, ok = values[VERSION]
if ok && len(values[VERSION]) > 0 {
version = values[VERSION][0]
}
dubboInterfaceConfig.Host = "providers:" + key + ":" + version + ":" + group
host = dubboInterfaceConfig.Host
dubboInterfaceConfig.Protocol = DUBBO_PROTOCOL
address := strings.Split(paths[0], ":")
if len(address) != 2 {
log.Infof("[GetDubboConfig] can not get dubbo ip and port, path:%s ", serviceUrl.Path)
return "", nil, errors.New("can not get dubbo ip and port")
}
metadata := make(map[string]string)
for key, value := range values {
if len(value) == 1 {
metadata[key] = value[0]
}
}
metadata[PROTOCOL] = DUBBO_PROTOCOL
dubboEndpoint := Endpoint{
Ip: address[0],
Port: address[1],
Metadata: metadata,
}
dubboInterfaceConfig.Endpoints = append(dubboInterfaceConfig.Endpoints, dubboEndpoint)
}
dubboInterfaceConfig.ServiceType = DubboService
return host, &dubboInterfaceConfig, nil
}
func (w *watcher) startScheduleWatchTask(serviceConfig ListServiceConfig, oldChildren []string, ttl time.Duration, childEventCh <-chan zk.Event, exit chan struct{}) bool {
zkRootPath := serviceConfig.UrlIndex
interfaceName := serviceConfig.InterfaceName
serviceType := serviceConfig.ServiceType
tickerTTL := ttl
if tickerTTL > 20e9 {
tickerTTL = 20e9
}
ticker := time.NewTicker(tickerTTL)
for {
select {
case <-ticker.C:
w.handleZkNodeEvent(zkRootPath, oldChildren, interfaceName, serviceType)
if tickerTTL < ttl {
tickerTTL *= 2
if tickerTTL > ttl {
tickerTTL = ttl
}
ticker.Stop()
ticker = time.NewTicker(tickerTTL)
}
case zkEvent := <-childEventCh:
if zkEvent.Type == zk.EventNodeChildrenChanged {
w.handleZkNodeEvent(zkEvent.Path, oldChildren, interfaceName, serviceType)
}
return false
case <-exit:
ticker.Stop()
return true
}
}
}
func (w *watcher) handleZkNodeEvent(zkPath string, oldChildren []string, interfaceName string, serviceType ServiceType) {
newChildren, err := w.zkClient.GetChildren(zkPath)
if err != nil {
if err == gxzookeeper.ErrNilChildren || err == gxzookeeper.ErrNilNode ||
strings.Contains(err.Error(), "has none children") {
content, _, connErr := w.zkClient.Conn.Get(zkPath)
if connErr != nil {
log.Errorf("[handleZkNodeEvent] Get new node path's content error:%v, path:%s", connErr, zkPath)
} else {
for _, c := range oldChildren {
path := path.Join(zkPath, c)
content, _, connErr = w.zkClient.Conn.Get(path)
if connErr != nil {
log.Errorf("[handleZkNodeEvent] Get node path's content error:%v, path:%s", connErr, path)
continue
}
w.DataChange(Event{
Path: path,
Action: EventTypeDel,
Content: content,
InterfaceName: interfaceName,
ServiceType: serviceType,
})
}
}
} else {
log.Errorf("zkClient get children failed, err:%v", err)
}
return
}
w.ChildToServiceEntry(newChildren, interfaceName, zkPath, serviceType)
}
func (w *watcher) ChildToServiceEntry(children []string, interfaceName, zkPath string, serviceType ServiceType) {
serviceEntry := make(map[string]InterfaceConfig)
switch serviceType {
case DubboService:
w.DubboChildToServiceEntry(serviceEntry, children, interfaceName, zkPath)
case SpringCloudService:
w.SpringCloudChildToServiceEntry(serviceEntry, children, interfaceName, zkPath)
default:
log.Error("unknown type")
}
if len(serviceEntry) != 0 {
w.seMux.Lock()
for host, config := range serviceEntry {
se := w.generateServiceEntry(config)
value, ok := w.serviceEntry[host]
if ok {
if !reflect.DeepEqual(value, config) {
w.serviceEntry[host] = config
//todo update or create serviceentry
w.cache.UpdateServiceEntryWrapper(host, &memory.ServiceEntryWrapper{
ServiceName: host,
ServiceEntry: se,
Suffix: "zookeeper",
RegistryType: w.Type,
})
}
} else {
w.serviceEntry[host] = config
w.cache.UpdateServiceEntryWrapper(host, &memory.ServiceEntryWrapper{
ServiceName: host,
ServiceEntry: se,
Suffix: "zookeeper",
RegistryType: w.Type,
})
}
}
w.seMux.Unlock()
w.updateHandler()
}
}
func (w *watcher) SpringCloudChildToServiceEntry(serviceEntry map[string]InterfaceConfig, children []string, interfaceName, zkPath string) {
for _, c := range children {
path := path.Join(zkPath, c)
content, _, err := w.zkClient.Conn.Get(path)
if err != nil {
log.Errorf("[handleZkNodeEvent] Get node path's content error:%v, path:%s", err, path)
continue
}
host, config, err := w.GetSpringCloudConfig(interfaceName, content)
if err != nil {
log.Errorf("GetSpringCloudConfig failed:%v", err)
continue
}
if existConfig, exist := serviceEntry[host]; !exist {
serviceEntry[host] = *config
} else {
existConfig.Endpoints = append(existConfig.Endpoints, config.Endpoints...)
serviceEntry[host] = existConfig
}
}
}
func (w *watcher) DubboChildToServiceEntry(serviceEntry map[string]InterfaceConfig, children []string, interfaceName, zkPath string) {
for _, c := range children {
path := path.Join(zkPath, c)
host, config, err := w.GetDubboConfig(path)
if err != nil {
log.Errorf("GetDubboConfig failed:%v", err)
continue
}
if existConfig, exist := serviceEntry[host]; !exist {
serviceEntry[host] = *config
} else {
existConfig.Endpoints = append(existConfig.Endpoints, config.Endpoints...)
serviceEntry[host] = existConfig
}
}
}
func (w *watcher) generateServiceEntry(config InterfaceConfig) *v1alpha3.ServiceEntry {
portList := make([]*v1alpha3.Port, 0)
endpoints := make([]*v1alpha3.WorkloadEntry, 0)
for _, service := range config.Endpoints {
protocol := common.HTTP
if service.Metadata != nil && service.Metadata[PROTOCOL] != "" {
protocol = common.ParseProtocol(service.Metadata[PROTOCOL])
}
portNumber, _ := strconv.Atoi(service.Port)
port := &v1alpha3.Port{
Name: protocol.String(),
Number: uint32(portNumber),
Protocol: protocol.String(),
}
if len(portList) == 0 {
portList = append(portList, port)
}
endpoints = append(endpoints, &v1alpha3.WorkloadEntry{
Address: service.Ip,
Ports: map[string]uint32{port.Protocol: port.Number},
Labels: service.Metadata,
Weight: 1,
})
}
se := &v1alpha3.ServiceEntry{
Hosts: []string{config.Host + ".zookeeper"},
Ports: portList,
Location: v1alpha3.ServiceEntry_MESH_INTERNAL,
Resolution: v1alpha3.ServiceEntry_STATIC,
Endpoints: endpoints,
}
return se
}
func (w *watcher) Run() {
defer func() {
log.Info("[zookeeper] Run is down")
if r := recover(); r != nil {
log.Info("Recovered in f", "r is", r)
}
}()
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
w.Status = provider.ProbeWatcherStatus(w.Domain, strconv.FormatUint(uint64(w.Port), 10))
go func() {
w.ListenService()
}()
firstFetchErr := w.fetchAllServices(true)
if firstFetchErr != nil {
log.Errorf("first fetch services failed:%v", firstFetchErr)
}
for {
select {
case <-ticker.C:
var needNewFetch bool
if w.IsReady() {
w.readyHandler(true)
needNewFetch = true
}
if firstFetchErr != nil || needNewFetch {
firstFetchErr = w.fetchAllServices()
}
case <-w.stop:
return
case <-w.listServiceChan:
go func() {
w.ListenService()
}()
}
}
}
func (w *watcher) Stop() {
w.mutex.Lock()
for key, value := range w.WatchingServices {
if value.exit != nil {
close(value.exit)
}
delete(w.WatchingServices, key)
}
w.isStop = true
w.mutex.Unlock()
w.seMux.Lock()
for key := range w.serviceEntry {
w.cache.DeleteServiceEntryWrapper(key)
}
w.updateHandler()
w.seMux.Unlock()
w.stop <- struct{}{}
w.Done <- struct{}{}
close(w.stop)
close(w.Done)
w.zkClient.Close()
w.readyHandler(false)
}
func (w *watcher) IsHealthy() bool {
return w.Status == provider.Healthy
}
func (w *watcher) GetRegistryType() string {
return w.RegistryType.String()
}
func (w *watcher) AppendServiceUpdateHandler(f func()) {
w.updateHandler = f
}
func (w *watcher) ReadyHandler(f func(bool)) {
w.readyHandler = f
}
func (w *watcher) IsReady() bool {
if w.serviceRemaind == nil {
return true
}
remaind := w.serviceRemaind.Load()
if remaind <= 0 {
return true
}
return false
}
func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}

View File

@@ -0,0 +1,116 @@
package zookeeper
import (
"sync"
"testing"
"github.com/stretchr/testify/assert"
)
func TestGetSpringCloudConfig(t *testing.T) {
var w watcher
w.seMux = &sync.Mutex{}
cases := []struct {
name string
interfaceName string
content []byte
expectedHost string
expectedConfig InterfaceConfig
}{
{
name: "normal",
interfaceName: "service-provider.services",
content: []byte(`{"name":"service-provider","id":"e479f40a-8f91-42a1-98e6-9377d224b360","address":"10.0.0.0","port":8071,"sslPort":null,"payload":{"@class":"org.springframework.cloud.zookeeper.discovery.ZookeeperInstance","id":"application-1","name":"service-provider","metadata":{"version":"1"}},"registrationTimeUTC":1663145171645,"serviceType":"DYNAMIC","uriSpec":{"parts":[{"value":"scheme","variable":true},{"value":"://","variable":false},{"value":"address","variable":true},{"value":":","variable":false},{"value":"port","variable":true}]}}`),
expectedHost: "service-provider.services",
expectedConfig: InterfaceConfig{
Host: "service-provider.services",
Protocol: "HTTP",
ServiceType: SpringCloudService,
Endpoints: []Endpoint{
{
Ip: "10.0.0.0",
Port: "8071",
Metadata: map[string]string{
"version": "1",
},
},
},
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
actualHost, actualConfig, err := w.GetSpringCloudConfig(c.interfaceName, c.content)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, c.expectedHost, actualHost)
assert.Equal(t, c.expectedConfig, *actualConfig)
})
}
}
func TestGetDubboConfig(t *testing.T) {
var w watcher
w.seMux = &sync.Mutex{}
cases := []struct {
name string
url string
expectedHost string
expectedConfig InterfaceConfig
}{
{
name: "no version",
url: `/dubbo/org.apache.dubbo.samples.api.GreetingService/providers/dubbo%3A%2F%2F10.0.0.0%3A20880%2Fcom.alibaba.adrive.business.contract.service.UserVipService%3Fzone%3Dcn-shanghai-g%26dubbo%3D2.0.2`,
expectedHost: "providers:org.apache.dubbo.samples.api.GreetingService:0.0.0:",
expectedConfig: InterfaceConfig{
Host: "providers:org.apache.dubbo.samples.api.GreetingService:0.0.0:",
Protocol: "dubbo",
ServiceType: DubboService,
Endpoints: []Endpoint{
{
Ip: "10.0.0.0",
Port: "20880",
Metadata: map[string]string{
"zone": "cn-shanghai-g",
"dubbo": "2.0.2",
"protocol": "dubbo",
},
},
},
},
},
{
name: "has version",
url: `/dubbo/org.apache.dubbo.samples.api.GreetingService/providers/dubbo%3A%2F%2F10.0.0.0%3A20880%2Fcom.alibaba.adrive.business.contract.service.UserVipService%3Fzone%3Dcn-shanghai-g%26dubbo%3D2.0.2%26version%3D1.0.0`,
expectedHost: "providers:org.apache.dubbo.samples.api.GreetingService:1.0.0:",
expectedConfig: InterfaceConfig{
Host: "providers:org.apache.dubbo.samples.api.GreetingService:1.0.0:",
Protocol: "dubbo",
ServiceType: DubboService,
Endpoints: []Endpoint{
{
Ip: "10.0.0.0",
Port: "20880",
Metadata: map[string]string{
"zone": "cn-shanghai-g",
"dubbo": "2.0.2",
"protocol": "dubbo",
"version": "1.0.0",
},
},
},
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
actualHost, actualConfig, err := w.GetDubboConfig(c.url)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, c.expectedHost, actualHost)
assert.Equal(t, c.expectedConfig, *actualConfig)
})
}
}