mirror of
https://github.com/alibaba/higress.git
synced 2026-02-06 23:21:08 +08:00
455 lines
13 KiB
Go
455 lines
13 KiB
Go
// Copyright (c) 2022 Alibaba Group Holding Ltd.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package memory
|
|
|
|
import (
|
|
"encoding/json"
|
|
"sort"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"google.golang.org/protobuf/encoding/protojson"
|
|
"google.golang.org/protobuf/types/known/structpb"
|
|
"google.golang.org/protobuf/types/known/wrapperspb"
|
|
extensions "istio.io/api/extensions/v1alpha1"
|
|
"istio.io/api/networking/v1alpha3"
|
|
"istio.io/istio/pkg/config"
|
|
"istio.io/istio/pkg/config/schema/gvk"
|
|
"istio.io/pkg/log"
|
|
|
|
"github.com/alibaba/higress/v2/pkg/common"
|
|
higressconfig "github.com/alibaba/higress/v2/pkg/config"
|
|
ingress "github.com/alibaba/higress/v2/pkg/ingress/kube/common"
|
|
"github.com/alibaba/higress/v2/registry"
|
|
)
|
|
|
|
type Cache interface {
|
|
UpdateServiceWrapper(service string, data *ingress.ServiceWrapper)
|
|
DeleteServiceWrapper(service string)
|
|
UpdateProxyWrapper(name string, data *ingress.ProxyWrapper)
|
|
DeleteProxyWrapper(name string)
|
|
UpdateConfigCache(kind config.GroupVersionKind, key string, config *config.Config, forceDelete bool)
|
|
GetAllConfigs(kind config.GroupVersionKind) map[string]*config.Config
|
|
PurgeStaleItems() bool
|
|
UpdateServiceEntryEndpointWrapper(service, ip, regionId, zoneId, protocol string, labels map[string]string)
|
|
GetServiceByEndpoints(requestVersions, endpoints map[string]bool, versionKey string, protocol common.Protocol) map[string][]string
|
|
GetAllServiceEntry() []*v1alpha3.ServiceEntry
|
|
GetAllServiceWrapper() []*ingress.ServiceWrapper
|
|
GetAllProxyWrapper() []*ingress.ProxyWrapper
|
|
GetAllDestinationRuleWrapper() []*ingress.WrapperDestinationRule
|
|
GetIncrementalServiceWrapper() (updatedList []*ingress.ServiceWrapper, deletedList []*ingress.ServiceWrapper)
|
|
RemoveEndpointByIp(ip string)
|
|
}
|
|
|
|
func NewCache() Cache {
|
|
return &store{
|
|
mux: &sync.RWMutex{},
|
|
configs: make(map[string]map[string]*config.Config),
|
|
sew: make(map[string]*ingress.ServiceWrapper),
|
|
toBeUpdated: make([]*ingress.ServiceWrapper, 0),
|
|
toBeDeleted: make([]*ingress.ServiceWrapper, 0),
|
|
ip2services: make(map[string]map[string]bool),
|
|
deferredDeleteServices: make(map[string]struct{}),
|
|
pw: make(map[string]*ingress.ProxyWrapper),
|
|
deferredDeleteProxies: make(map[string]struct{}),
|
|
}
|
|
}
|
|
|
|
type store struct {
|
|
mux *sync.RWMutex
|
|
|
|
configs map[string]map[string]*config.Config
|
|
|
|
sew map[string]*ingress.ServiceWrapper
|
|
toBeUpdated []*ingress.ServiceWrapper
|
|
toBeDeleted []*ingress.ServiceWrapper
|
|
ip2services map[string]map[string]bool
|
|
deferredDeleteServices map[string]struct{}
|
|
|
|
pw map[string]*ingress.ProxyWrapper
|
|
deferredDeleteProxies map[string]struct{}
|
|
}
|
|
|
|
func (s *store) GetAllConfigs(kind config.GroupVersionKind) map[string]*config.Config {
|
|
s.mux.Lock()
|
|
defer s.mux.Unlock()
|
|
cfgs, exist := s.configs[kind.String()]
|
|
if !exist {
|
|
return map[string]*config.Config{}
|
|
}
|
|
if kind == gvk.WasmPlugin {
|
|
pluginConfig := ®istry.WasmPluginConfig{}
|
|
var ns string
|
|
for _, cfg := range cfgs {
|
|
ns = cfg.Namespace
|
|
rule := cfg.Spec.(*registry.McpServerRule)
|
|
pluginConfig.Rules = append(pluginConfig.Rules, rule)
|
|
}
|
|
if len(pluginConfig.Rules) == 0 {
|
|
log.Infof("there is no mcp server rule exist, skip generate wasm plugin")
|
|
return map[string]*config.Config{}
|
|
}
|
|
rulesBytes, err := json.Marshal(pluginConfig)
|
|
if err != nil {
|
|
log.Errorf("marshal mcp wasm plugin config error %v", err)
|
|
return map[string]*config.Config{}
|
|
}
|
|
pbs := &structpb.Struct{}
|
|
if err = protojson.Unmarshal(rulesBytes, pbs); err != nil {
|
|
log.Errorf("unmarshal mcp wasm plugin config error %v", err)
|
|
return map[string]*config.Config{}
|
|
}
|
|
wasmPlugin := &extensions.WasmPlugin{
|
|
ImagePullPolicy: extensions.PullPolicy_Always,
|
|
Phase: extensions.PluginPhase_UNSPECIFIED_PHASE,
|
|
Priority: &wrapperspb.Int32Value{Value: 999},
|
|
PluginConfig: pbs,
|
|
Url: higressconfig.McpServerWasmImageUrl,
|
|
FailStrategy: extensions.FailStrategy_FAIL_OPEN,
|
|
}
|
|
|
|
return map[string]*config.Config{"wasm": {
|
|
Meta: config.Meta{
|
|
GroupVersionKind: gvk.WasmPlugin,
|
|
Name: "istio-autogenerated-mcp-wasmplugin",
|
|
Namespace: ns,
|
|
},
|
|
Spec: wasmPlugin,
|
|
}}
|
|
}
|
|
|
|
return cfgs
|
|
}
|
|
|
|
func (s *store) UpdateConfigCache(kind config.GroupVersionKind, key string, cfg *config.Config, forceDelete bool) {
|
|
if cfg == nil && !forceDelete {
|
|
return
|
|
}
|
|
|
|
s.mux.Lock()
|
|
if forceDelete {
|
|
for _, allConfigs := range s.configs {
|
|
delete(allConfigs, key)
|
|
}
|
|
log.Infof("Delete config %s in cache", key)
|
|
} else {
|
|
if _, exist := s.configs[kind.String()]; !exist {
|
|
s.configs[kind.String()] = make(map[string]*config.Config)
|
|
}
|
|
|
|
if _, exist := s.configs[kind.String()][key]; exist {
|
|
log.Infof("Update kind %s config %s", kind.String(), key)
|
|
} else {
|
|
log.Infof("Add kind %s config %s", kind.String(), key)
|
|
}
|
|
s.configs[kind.String()][key] = cfg
|
|
}
|
|
s.mux.Unlock()
|
|
}
|
|
|
|
func (s *store) UpdateServiceEntryEndpointWrapper(service, ip, regionId, zoneId, protocol string, labels map[string]string) {
|
|
s.mux.Lock()
|
|
defer s.mux.Unlock()
|
|
if se, exist := s.sew[service]; exist {
|
|
idx := -1
|
|
for i, ep := range se.ServiceEntry.Endpoints {
|
|
if ep.Address == ip {
|
|
idx = i
|
|
if len(regionId) != 0 {
|
|
ep.Locality = regionId
|
|
if len(zoneId) != 0 {
|
|
ep.Locality = regionId + "/" + zoneId
|
|
}
|
|
}
|
|
if labels != nil {
|
|
for k, v := range labels {
|
|
if protocol == common.Dubbo.String() && k == "version" {
|
|
ep.Labels["appversion"] = v
|
|
continue
|
|
}
|
|
ep.Labels[k] = v
|
|
}
|
|
}
|
|
|
|
if idx != -1 {
|
|
se.ServiceEntry.Endpoints[idx] = ep
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (s *store) UpdateServiceWrapper(service string, data *ingress.ServiceWrapper) {
|
|
s.mux.Lock()
|
|
defer s.mux.Unlock()
|
|
|
|
if old, exist := s.sew[service]; exist {
|
|
data.SetCreateTime(old.GetCreateTime())
|
|
} else {
|
|
data.SetCreateTime(time.Now())
|
|
}
|
|
|
|
log.Debugf("mcp service entry update, name: %s, data: %v", service, data)
|
|
|
|
s.toBeUpdated = append(s.toBeUpdated, data)
|
|
s.sew[service] = data
|
|
// service is updated, should not be deleted
|
|
if _, ok := s.deferredDeleteServices[service]; ok {
|
|
delete(s.deferredDeleteServices, service)
|
|
log.Debugf("service in deferredDeleteServices updated, host: %s", service)
|
|
}
|
|
log.Infof("ServiceEntry updated, host: %s", service)
|
|
}
|
|
|
|
func (s *store) DeleteServiceWrapper(service string) {
|
|
s.mux.Lock()
|
|
defer s.mux.Unlock()
|
|
|
|
if data, exist := s.sew[service]; exist {
|
|
s.toBeDeleted = append(s.toBeDeleted, data)
|
|
s.deferredDeleteServices[service] = struct{}{}
|
|
}
|
|
}
|
|
|
|
func (s *store) UpdateProxyWrapper(name string, data *ingress.ProxyWrapper) {
|
|
s.mux.Lock()
|
|
defer s.mux.Unlock()
|
|
|
|
if old, exist := s.pw[name]; exist {
|
|
data.SetCreateTime(old.GetCreateTime())
|
|
} else {
|
|
data.SetCreateTime(time.Now())
|
|
}
|
|
|
|
log.Debugf("mcp proxy entry update, name: %s, data: %v", name, data)
|
|
|
|
s.pw[name] = data
|
|
// service is updated, should not be deleted
|
|
if _, ok := s.deferredDeleteServices[name]; ok {
|
|
delete(s.deferredDeleteServices, name)
|
|
log.Debugf("proxy in deferredDeleteProxies updated, na: %s", name)
|
|
}
|
|
log.Infof("ProxyWrapper updated, name: %s", name)
|
|
}
|
|
|
|
func (s *store) DeleteProxyWrapper(name string) {
|
|
s.mux.Lock()
|
|
defer s.mux.Unlock()
|
|
|
|
if _, exist := s.pw[name]; exist {
|
|
s.deferredDeleteProxies[name] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// should only be called when reconcile is done
|
|
func (s *store) PurgeStaleItems() bool {
|
|
s.mux.Lock()
|
|
defer s.mux.Unlock()
|
|
|
|
deleted := false
|
|
|
|
for service := range s.deferredDeleteServices {
|
|
delete(s.sew, service)
|
|
delete(s.deferredDeleteServices, service)
|
|
log.Infof("ServiceEntry deleted, host: %s", service)
|
|
|
|
deleted = true
|
|
}
|
|
|
|
for proxy := range s.deferredDeleteProxies {
|
|
delete(s.pw, proxy)
|
|
delete(s.deferredDeleteProxies, proxy)
|
|
log.Infof("ProxyWrapper deleted, name: %s", proxy)
|
|
|
|
deleted = true
|
|
}
|
|
|
|
return deleted
|
|
}
|
|
|
|
// GetServiceByEndpoints get the list of services of which "address:port" contained by the endpoints
|
|
// and the version of the service contained by the requestVersions. The result format is as below:
|
|
// key: serviceName + "#@" + suffix
|
|
// values: ["v1", "v2"] which has removed duplication
|
|
func (s *store) GetServiceByEndpoints(requestVersions, endpoints map[string]bool, versionKey string, protocol common.Protocol) map[string][]string {
|
|
s.mux.RLock()
|
|
defer s.mux.RUnlock()
|
|
|
|
result := make(map[string][]string)
|
|
for _, serviceEntryWrapper := range s.sew {
|
|
for _, workload := range serviceEntryWrapper.ServiceEntry.Endpoints {
|
|
port, exist := workload.Ports[protocol.String()]
|
|
if !exist {
|
|
continue
|
|
}
|
|
|
|
endpoint := workload.Address + common.ColonSeparator + strconv.Itoa(int(port))
|
|
if _, hit := endpoints[endpoint]; hit {
|
|
if version, has := workload.Labels[versionKey]; has {
|
|
if _, in := requestVersions[version]; in {
|
|
key := serviceEntryWrapper.ServiceName + common.SpecialSeparator + serviceEntryWrapper.Suffix
|
|
result[key] = append(result[key], version)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// remove duplication
|
|
for key, versions := range result {
|
|
sort.Strings(versions)
|
|
i := 0
|
|
for j := 1; j < len(versions); j++ {
|
|
if versions[j] != versions[i] {
|
|
i++
|
|
versions[i] = versions[j]
|
|
}
|
|
}
|
|
result[key] = versions[:i+1]
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// GetAllServiceEntry get all ServiceEntry in the store for xds push
|
|
func (s *store) GetAllServiceEntry() []*v1alpha3.ServiceEntry {
|
|
s.mux.RLock()
|
|
defer s.mux.RUnlock()
|
|
|
|
seList := make([]*v1alpha3.ServiceEntry, 0)
|
|
for _, serviceEntryWrapper := range s.sew {
|
|
if len(serviceEntryWrapper.ServiceEntry.Hosts) == 0 {
|
|
continue
|
|
}
|
|
seList = append(seList, serviceEntryWrapper.ServiceEntry.DeepCopy())
|
|
}
|
|
sort.Slice(seList, func(i, j int) bool {
|
|
return seList[i].Hosts[0] > seList[j].Hosts[0]
|
|
})
|
|
return seList
|
|
}
|
|
|
|
// GetAllServiceWrapper get all ServiceWrapper in the store for xds push
|
|
func (s *store) GetAllServiceWrapper() []*ingress.ServiceWrapper {
|
|
s.mux.RLock()
|
|
defer s.mux.RUnlock()
|
|
defer s.cleanUpdateAndDeleteArray()
|
|
|
|
sewList := make([]*ingress.ServiceWrapper, 0)
|
|
for _, serviceEntryWrapper := range s.sew {
|
|
sewList = append(sewList, serviceEntryWrapper.DeepCopy())
|
|
}
|
|
return sewList
|
|
}
|
|
|
|
// GetAllProxyWrapper get all ServiceWrapper in the store for xds push
|
|
func (s *store) GetAllProxyWrapper() []*ingress.ProxyWrapper {
|
|
s.mux.RLock()
|
|
defer s.mux.RUnlock()
|
|
|
|
pwList := make([]*ingress.ProxyWrapper, 0)
|
|
for _, pw := range s.pw {
|
|
pwList = append(pwList, pw.DeepCopy())
|
|
}
|
|
return pwList
|
|
}
|
|
|
|
// GetAllDestinationRuleWrapper get all DestinationRuleWrapper in the store for xds push
|
|
func (s *store) GetAllDestinationRuleWrapper() []*ingress.WrapperDestinationRule {
|
|
s.mux.RLock()
|
|
defer s.mux.RUnlock()
|
|
defer s.cleanUpdateAndDeleteArray()
|
|
|
|
drwList := make([]*ingress.WrapperDestinationRule, 0)
|
|
for _, serviceEntryWrapper := range s.sew {
|
|
if serviceEntryWrapper.DestinationRuleWrapper != nil {
|
|
drwList = append(drwList, serviceEntryWrapper.DeepCopy().DestinationRuleWrapper)
|
|
}
|
|
}
|
|
configFromMcp := s.configs[gvk.DestinationRule.String()]
|
|
for _, cfg := range configFromMcp {
|
|
dr := cfg.Spec.(*v1alpha3.DestinationRule)
|
|
drwList = append(drwList, &ingress.WrapperDestinationRule{
|
|
DestinationRule: dr,
|
|
ServiceKey: ingress.ServiceKey{Namespace: "mcp", Name: dr.Host, ServiceFQDN: dr.Host},
|
|
})
|
|
}
|
|
|
|
return drwList
|
|
}
|
|
|
|
// GetIncrementalServiceWrapper get incremental ServiceWrapper in the store for xds push
|
|
func (s *store) GetIncrementalServiceWrapper() ([]*ingress.ServiceWrapper, []*ingress.ServiceWrapper) {
|
|
s.mux.RLock()
|
|
defer s.mux.RUnlock()
|
|
defer s.cleanUpdateAndDeleteArray()
|
|
|
|
updatedList := make([]*ingress.ServiceWrapper, 0)
|
|
for _, serviceEntryWrapper := range s.toBeUpdated {
|
|
updatedList = append(updatedList, serviceEntryWrapper.DeepCopy())
|
|
}
|
|
|
|
deletedList := make([]*ingress.ServiceWrapper, 0)
|
|
for _, serviceEntryWrapper := range s.toBeDeleted {
|
|
deletedList = append(deletedList, serviceEntryWrapper.DeepCopy())
|
|
}
|
|
|
|
return updatedList, deletedList
|
|
}
|
|
|
|
func (s *store) cleanUpdateAndDeleteArray() {
|
|
s.toBeUpdated = nil
|
|
s.toBeDeleted = nil
|
|
}
|
|
|
|
func (s *store) updateIpMap(service string, data *ingress.ServiceWrapper) {
|
|
for _, ep := range data.ServiceEntry.Endpoints {
|
|
if s.ip2services[ep.Address] == nil {
|
|
s.ip2services[ep.Address] = make(map[string]bool)
|
|
}
|
|
s.ip2services[ep.Address][service] = true
|
|
}
|
|
}
|
|
|
|
func (s *store) RemoveEndpointByIp(ip string) {
|
|
s.mux.Lock()
|
|
defer s.mux.Unlock()
|
|
|
|
services, has := s.ip2services[ip]
|
|
if !has {
|
|
return
|
|
}
|
|
delete(s.ip2services, ip)
|
|
|
|
for service := range services {
|
|
if data, exist := s.sew[service]; exist {
|
|
idx := -1
|
|
for i, ep := range data.ServiceEntry.Endpoints {
|
|
if ep.Address == ip {
|
|
idx = i
|
|
break
|
|
}
|
|
}
|
|
if idx != -1 {
|
|
data.ServiceEntry.Endpoints = append(data.ServiceEntry.Endpoints[:idx], data.ServiceEntry.Endpoints[idx+1:]...)
|
|
}
|
|
s.toBeUpdated = append(s.toBeUpdated, data)
|
|
}
|
|
}
|
|
}
|