diff --git a/api/buf.gen.yaml b/api/buf.gen.yaml new file mode 100644 index 000000000..9c292f310 --- /dev/null +++ b/api/buf.gen.yaml @@ -0,0 +1,11 @@ +version: v1beta1 +plugins: +- name: gogofast + out: . + opt: plugins=grpc,paths=source_relative,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto,Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/descriptor.proto=github.com/gogo/protobuf/protoc-gen-gogo/descriptor,Mgoogle/protobuf/duration.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/struct.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/wrappers.proto=github.com/gogo/protobuf/types,Mgoogle/rpc/status.proto=istio.io/gogo-genproto/googleapis/google/rpc,Mgoogle/rpc/code.proto=istio.io/gogo-genproto/googleapis/google/rpc,Mgoogle/rpc/error_details.proto=istio.io/gogo-genproto/googleapis/google/rpc,Mgoogle/api/field_behavior.proto=istio.io/gogo-genproto/googleapis/google/api +- name: deepcopy + out: . + opt: paths=source_relative,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto,Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/descriptor.proto=github.com/gogo/protobuf/protoc-gen-gogo/descriptor,Mgoogle/protobuf/duration.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/struct.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/wrappers.proto=github.com/gogo/protobuf/types,Mgoogle/rpc/status.proto=istio.io/gogo-genproto/googleapis/google/rpc,Mgoogle/rpc/code.proto=istio.io/gogo-genproto/googleapis/google/rpc,Mgoogle/rpc/error_details.proto=istio.io/gogo-genproto/googleapis/google/rpc,Mgoogle/api/field_behavior.proto=istio.io/gogo-genproto/googleapis/google/api +- name: jsonshim + out: . + opt: paths=source_relative,Mgogoproto/gogo.proto=github.com/gogo/protobuf/gogoproto,Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/descriptor.proto=github.com/gogo/protobuf/protoc-gen-gogo/descriptor,Mgoogle/protobuf/duration.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/struct.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,Mgoogle/protobuf/wrappers.proto=github.com/gogo/protobuf/types,Mgoogle/rpc/status.proto=istio.io/gogo-genproto/googleapis/google/rpc,Mgoogle/rpc/code.proto=istio.io/gogo-genproto/googleapis/google/rpc,Mgoogle/rpc/error_details.proto=istio.io/gogo-genproto/googleapis/google/rpc,Mgoogle/api/field_behavior.proto=istio.io/gogo-genproto/googleapis/google/api diff --git a/api/buf.yaml b/api/buf.yaml new file mode 100644 index 000000000..e725d5497 --- /dev/null +++ b/api/buf.yaml @@ -0,0 +1,8 @@ +version: v1beta1 +lint: + use: + - BASIC + except: + - FIELD_LOWER_SNAKE_CASE + - PACKAGE_DIRECTORY_MATCH + allow_comment_ignores: true diff --git a/api/cue.yaml b/api/cue.yaml new file mode 100644 index 000000000..b07d2ba0f --- /dev/null +++ b/api/cue.yaml @@ -0,0 +1,18 @@ +# Cuelang configuration to generate OpenAPI schema for Higress configs. + +module: github.com/alibaba/higress/api + +openapi: + selfContained: true + fieldFilter: "min.*|max.*" + +directories: + networking/v1: + - mode: perFile + +# All is used when generating all types referenced in the above directories to +# one file. +all: + title: All Higress types. + version: v1alpha1 + oapiFilename: higress.gen.json diff --git a/api/gen.sh b/api/gen.sh new file mode 100755 index 000000000..7883fde69 --- /dev/null +++ b/api/gen.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +set -eu + +# Generate all protos +buf generate \ + --path networking \ + +# Generate CRDs +cue-gen -verbose -f=./cue.yaml -crd=true diff --git a/api/github.com b/api/github.com new file mode 120000 index 000000000..8aadff4cc --- /dev/null +++ b/api/github.com @@ -0,0 +1 @@ +../external/api/common-protos/github.com \ No newline at end of file diff --git a/api/gogoproto b/api/gogoproto new file mode 120000 index 000000000..b24fa01b5 --- /dev/null +++ b/api/gogoproto @@ -0,0 +1 @@ +../external/api/common-protos/gogoproto \ No newline at end of file diff --git a/api/google b/api/google new file mode 120000 index 000000000..4086affa5 --- /dev/null +++ b/api/google @@ -0,0 +1 @@ +../external/api/common-protos/google \ No newline at end of file diff --git a/api/istio.io b/api/istio.io new file mode 120000 index 000000000..688c73a46 --- /dev/null +++ b/api/istio.io @@ -0,0 +1 @@ +../external/api/common-protos/istio.io \ No newline at end of file diff --git a/api/k8s.io b/api/k8s.io new file mode 120000 index 000000000..7daba745e --- /dev/null +++ b/api/k8s.io @@ -0,0 +1 @@ +../external/api/common-protos/k8s.io \ No newline at end of file diff --git a/api/kubernetes/customresourcedefinitions.gen.yaml b/api/kubernetes/customresourcedefinitions.gen.yaml new file mode 100644 index 000000000..77e5d4e86 --- /dev/null +++ b/api/kubernetes/customresourcedefinitions.gen.yaml @@ -0,0 +1,71 @@ +# DO NOT EDIT - Generated by Cue OpenAPI generator based on Istio APIs. +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + "helm.sh/resource-policy": keep + name: mcpbridges.networking.higress.io +spec: + group: networking.higress.io + names: + categories: + - higress-io + kind: McpBridge + listKind: McpBridgeList + plural: mcpbridges + singular: mcpbridge + scope: Namespaced + versions: + - name: v1 + schema: + openAPIV3Schema: + properties: + spec: + properties: + registries: + items: + properties: + consulNamespace: + type: string + domain: + type: string + nacosAccessKey: + type: string + nacosAddressServer: + type: string + nacosGroups: + items: + type: string + type: array + nacosNamespace: + type: string + nacosNamespaceId: + type: string + nacosRefreshInterval: + format: int64 + type: integer + nacosSecretKey: + type: string + name: + type: string + port: + type: integer + type: + type: string + zkServicesPath: + items: + type: string + type: array + type: object + type: array + type: object + status: + type: object + x-kubernetes-preserve-unknown-fields: true + type: object + served: true + storage: true + subresources: + status: {} + +--- diff --git a/api/networking/v1/mcp_bridge.pb.go b/api/networking/v1/mcp_bridge.pb.go new file mode 100644 index 000000000..5f8c5f376 --- /dev/null +++ b/api/networking/v1/mcp_bridge.pb.go @@ -0,0 +1,1140 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: networking/v1/mcp_bridge.proto + +package v1 + +import ( + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + io "io" + _ "istio.io/gogo-genproto/googleapis/google/api" + math "math" + math_bits "math/bits" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +// +// +// +type McpBridge struct { + Registries []*RegistryConfig `protobuf:"bytes,1,rep,name=registries,proto3" json:"registries,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *McpBridge) Reset() { *m = McpBridge{} } +func (m *McpBridge) String() string { return proto.CompactTextString(m) } +func (*McpBridge) ProtoMessage() {} +func (*McpBridge) Descriptor() ([]byte, []int) { + return fileDescriptor_3fcc59a15c34642d, []int{0} +} +func (m *McpBridge) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *McpBridge) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_McpBridge.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *McpBridge) XXX_Merge(src proto.Message) { + xxx_messageInfo_McpBridge.Merge(m, src) +} +func (m *McpBridge) XXX_Size() int { + return m.Size() +} +func (m *McpBridge) XXX_DiscardUnknown() { + xxx_messageInfo_McpBridge.DiscardUnknown(m) +} + +var xxx_messageInfo_McpBridge proto.InternalMessageInfo + +func (m *McpBridge) GetRegistries() []*RegistryConfig { + if m != nil { + return m.Registries + } + return nil +} + +type RegistryConfig struct { + Type string `protobuf:"bytes,1,opt,name=type,proto3" json:"type,omitempty"` + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + Domain string `protobuf:"bytes,3,opt,name=domain,proto3" json:"domain,omitempty"` + Port uint32 `protobuf:"varint,4,opt,name=port,proto3" json:"port,omitempty"` + NacosAddressServer string `protobuf:"bytes,5,opt,name=nacosAddressServer,proto3" json:"nacosAddressServer,omitempty"` + NacosAccessKey string `protobuf:"bytes,6,opt,name=nacosAccessKey,proto3" json:"nacosAccessKey,omitempty"` + NacosSecretKey string `protobuf:"bytes,7,opt,name=nacosSecretKey,proto3" json:"nacosSecretKey,omitempty"` + NacosNamespaceId string `protobuf:"bytes,8,opt,name=nacosNamespaceId,proto3" json:"nacosNamespaceId,omitempty"` + NacosNamespace string `protobuf:"bytes,9,opt,name=nacosNamespace,proto3" json:"nacosNamespace,omitempty"` + NacosGroups []string `protobuf:"bytes,10,rep,name=nacosGroups,proto3" json:"nacosGroups,omitempty"` + NacosRefreshInterval int64 `protobuf:"varint,11,opt,name=nacosRefreshInterval,proto3" json:"nacosRefreshInterval,omitempty"` + ConsulNamespace string `protobuf:"bytes,12,opt,name=consulNamespace,proto3" json:"consulNamespace,omitempty"` + ZkServicesPath []string `protobuf:"bytes,13,rep,name=zkServicesPath,proto3" json:"zkServicesPath,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *RegistryConfig) Reset() { *m = RegistryConfig{} } +func (m *RegistryConfig) String() string { return proto.CompactTextString(m) } +func (*RegistryConfig) ProtoMessage() {} +func (*RegistryConfig) Descriptor() ([]byte, []int) { + return fileDescriptor_3fcc59a15c34642d, []int{1} +} +func (m *RegistryConfig) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *RegistryConfig) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_RegistryConfig.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *RegistryConfig) XXX_Merge(src proto.Message) { + xxx_messageInfo_RegistryConfig.Merge(m, src) +} +func (m *RegistryConfig) XXX_Size() int { + return m.Size() +} +func (m *RegistryConfig) XXX_DiscardUnknown() { + xxx_messageInfo_RegistryConfig.DiscardUnknown(m) +} + +var xxx_messageInfo_RegistryConfig proto.InternalMessageInfo + +func (m *RegistryConfig) GetType() string { + if m != nil { + return m.Type + } + return "" +} + +func (m *RegistryConfig) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *RegistryConfig) GetDomain() string { + if m != nil { + return m.Domain + } + return "" +} + +func (m *RegistryConfig) GetPort() uint32 { + if m != nil { + return m.Port + } + return 0 +} + +func (m *RegistryConfig) GetNacosAddressServer() string { + if m != nil { + return m.NacosAddressServer + } + return "" +} + +func (m *RegistryConfig) GetNacosAccessKey() string { + if m != nil { + return m.NacosAccessKey + } + return "" +} + +func (m *RegistryConfig) GetNacosSecretKey() string { + if m != nil { + return m.NacosSecretKey + } + return "" +} + +func (m *RegistryConfig) GetNacosNamespaceId() string { + if m != nil { + return m.NacosNamespaceId + } + return "" +} + +func (m *RegistryConfig) GetNacosNamespace() string { + if m != nil { + return m.NacosNamespace + } + return "" +} + +func (m *RegistryConfig) GetNacosGroups() []string { + if m != nil { + return m.NacosGroups + } + return nil +} + +func (m *RegistryConfig) GetNacosRefreshInterval() int64 { + if m != nil { + return m.NacosRefreshInterval + } + return 0 +} + +func (m *RegistryConfig) GetConsulNamespace() string { + if m != nil { + return m.ConsulNamespace + } + return "" +} + +func (m *RegistryConfig) GetZkServicesPath() []string { + if m != nil { + return m.ZkServicesPath + } + return nil +} + +func init() { + proto.RegisterType((*McpBridge)(nil), "higress.networking.v1.McpBridge") + proto.RegisterType((*RegistryConfig)(nil), "higress.networking.v1.RegistryConfig") +} + +func init() { proto.RegisterFile("networking/v1/mcp_bridge.proto", fileDescriptor_3fcc59a15c34642d) } + +var fileDescriptor_3fcc59a15c34642d = []byte{ + // 421 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x6c, 0x92, 0xd1, 0x8a, 0xd4, 0x30, + 0x14, 0x86, 0xc9, 0x76, 0x1c, 0x9d, 0x8c, 0xbb, 0x4a, 0x50, 0x08, 0x22, 0x63, 0x59, 0x50, 0x8a, + 0x48, 0xcb, 0xae, 0x77, 0xde, 0xed, 0x88, 0xc8, 0x22, 0x8a, 0x74, 0xef, 0xbc, 0x59, 0xd2, 0xf4, + 0x4c, 0x1a, 0xb6, 0x4d, 0x42, 0x92, 0xa9, 0x8c, 0x4f, 0xe8, 0xa5, 0x8f, 0x20, 0x7d, 0x04, 0x9f, + 0x40, 0x9a, 0x59, 0xbb, 0x9d, 0x71, 0xee, 0xda, 0xef, 0x7c, 0xf9, 0x73, 0x08, 0x3f, 0x5e, 0x28, + 0xf0, 0xdf, 0xb5, 0xbd, 0x91, 0x4a, 0x64, 0xed, 0x59, 0xd6, 0x70, 0x73, 0x5d, 0x58, 0x59, 0x0a, + 0x48, 0x8d, 0xd5, 0x5e, 0x93, 0xa7, 0x95, 0x14, 0x16, 0x9c, 0x4b, 0xef, 0xbc, 0xb4, 0x3d, 0x7b, + 0xf6, 0x42, 0x68, 0x2d, 0x6a, 0xc8, 0x98, 0x91, 0xd9, 0x4a, 0x42, 0x5d, 0x5e, 0x17, 0x50, 0xb1, + 0x56, 0x6a, 0xbb, 0x3d, 0x77, 0x9a, 0xe3, 0xd9, 0x67, 0x6e, 0x96, 0x21, 0x8a, 0x7c, 0xc0, 0xd8, + 0x82, 0x90, 0xce, 0x5b, 0x09, 0x8e, 0xa2, 0x38, 0x4a, 0xe6, 0xe7, 0x2f, 0xd3, 0x83, 0xc9, 0x69, + 0xbe, 0x15, 0x37, 0xef, 0xb5, 0x5a, 0x49, 0x91, 0x8f, 0x0e, 0x9e, 0xfe, 0x89, 0xf0, 0xc9, 0xee, + 0x98, 0x50, 0x3c, 0xf1, 0x1b, 0x03, 0x14, 0xc5, 0x28, 0x99, 0x2d, 0x27, 0xdd, 0x05, 0x3a, 0xca, + 0x03, 0x21, 0x04, 0x4f, 0x14, 0x6b, 0x80, 0x1e, 0xf5, 0x93, 0x3c, 0x7c, 0x93, 0xe7, 0x78, 0x5a, + 0xea, 0x86, 0x49, 0x45, 0xa3, 0x91, 0x7f, 0xcb, 0xfa, 0x2c, 0xa3, 0xad, 0xa7, 0x93, 0x18, 0x25, + 0xc7, 0xff, 0xb2, 0x7a, 0x42, 0x52, 0x4c, 0x14, 0xe3, 0xda, 0x5d, 0x94, 0x65, 0xbf, 0xf1, 0x15, + 0xd8, 0x16, 0x2c, 0xbd, 0x17, 0x92, 0x0f, 0x4c, 0xc8, 0x2b, 0x7c, 0xb2, 0xa5, 0x9c, 0x83, 0x73, + 0x9f, 0x60, 0x43, 0xa7, 0xc1, 0xdd, 0xa3, 0x83, 0x77, 0x05, 0xdc, 0x82, 0xef, 0xbd, 0xfb, 0x23, + 0x6f, 0xa0, 0xe4, 0x35, 0x7e, 0x1c, 0xc8, 0x17, 0xd6, 0x80, 0x33, 0x8c, 0xc3, 0x65, 0x49, 0x1f, + 0x04, 0xf3, 0x3f, 0x3e, 0x64, 0x0e, 0x8c, 0xce, 0x46, 0x99, 0x03, 0x25, 0x31, 0x9e, 0x07, 0xf2, + 0xd1, 0xea, 0xb5, 0x71, 0x14, 0xc7, 0x51, 0x32, 0xcb, 0xc7, 0x88, 0x9c, 0xe3, 0x27, 0xe1, 0x37, + 0x87, 0x95, 0x05, 0x57, 0x5d, 0x2a, 0x0f, 0xb6, 0x65, 0x35, 0x9d, 0xc7, 0x28, 0x89, 0xf2, 0x83, + 0x33, 0x92, 0xe0, 0x47, 0x5c, 0x2b, 0xb7, 0xae, 0xef, 0xae, 0x7f, 0x18, 0xae, 0xdf, 0xc7, 0xfd, + 0x9e, 0x3f, 0x6e, 0xfa, 0xf7, 0x92, 0x1c, 0xdc, 0x57, 0xe6, 0x2b, 0x7a, 0x1c, 0x56, 0xd8, 0xa3, + 0xcb, 0x77, 0x3f, 0xbb, 0x05, 0xfa, 0xd5, 0x2d, 0xd0, 0xef, 0x6e, 0x81, 0xbe, 0xbd, 0x11, 0xd2, + 0x57, 0xeb, 0x22, 0xe5, 0xba, 0xc9, 0x58, 0x2d, 0x0b, 0x56, 0xb0, 0xec, 0xb6, 0x47, 0xa1, 0x8b, + 0x3b, 0x6d, 0x2e, 0xa6, 0xa1, 0x8b, 0x6f, 0xff, 0x06, 0x00, 0x00, 0xff, 0xff, 0xb9, 0x6f, 0xf7, + 0xf0, 0xe5, 0x02, 0x00, 0x00, +} + +func (m *McpBridge) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *McpBridge) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *McpBridge) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.Registries) > 0 { + for iNdEx := len(m.Registries) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Registries[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintMcpBridge(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + +func (m *RegistryConfig) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *RegistryConfig) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *RegistryConfig) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.ZkServicesPath) > 0 { + for iNdEx := len(m.ZkServicesPath) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.ZkServicesPath[iNdEx]) + copy(dAtA[i:], m.ZkServicesPath[iNdEx]) + i = encodeVarintMcpBridge(dAtA, i, uint64(len(m.ZkServicesPath[iNdEx]))) + i-- + dAtA[i] = 0x6a + } + } + if len(m.ConsulNamespace) > 0 { + i -= len(m.ConsulNamespace) + copy(dAtA[i:], m.ConsulNamespace) + i = encodeVarintMcpBridge(dAtA, i, uint64(len(m.ConsulNamespace))) + i-- + dAtA[i] = 0x62 + } + if m.NacosRefreshInterval != 0 { + i = encodeVarintMcpBridge(dAtA, i, uint64(m.NacosRefreshInterval)) + i-- + dAtA[i] = 0x58 + } + if len(m.NacosGroups) > 0 { + for iNdEx := len(m.NacosGroups) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.NacosGroups[iNdEx]) + copy(dAtA[i:], m.NacosGroups[iNdEx]) + i = encodeVarintMcpBridge(dAtA, i, uint64(len(m.NacosGroups[iNdEx]))) + i-- + dAtA[i] = 0x52 + } + } + if len(m.NacosNamespace) > 0 { + i -= len(m.NacosNamespace) + copy(dAtA[i:], m.NacosNamespace) + i = encodeVarintMcpBridge(dAtA, i, uint64(len(m.NacosNamespace))) + i-- + dAtA[i] = 0x4a + } + if len(m.NacosNamespaceId) > 0 { + i -= len(m.NacosNamespaceId) + copy(dAtA[i:], m.NacosNamespaceId) + i = encodeVarintMcpBridge(dAtA, i, uint64(len(m.NacosNamespaceId))) + i-- + dAtA[i] = 0x42 + } + if len(m.NacosSecretKey) > 0 { + i -= len(m.NacosSecretKey) + copy(dAtA[i:], m.NacosSecretKey) + i = encodeVarintMcpBridge(dAtA, i, uint64(len(m.NacosSecretKey))) + i-- + dAtA[i] = 0x3a + } + if len(m.NacosAccessKey) > 0 { + i -= len(m.NacosAccessKey) + copy(dAtA[i:], m.NacosAccessKey) + i = encodeVarintMcpBridge(dAtA, i, uint64(len(m.NacosAccessKey))) + i-- + dAtA[i] = 0x32 + } + if len(m.NacosAddressServer) > 0 { + i -= len(m.NacosAddressServer) + copy(dAtA[i:], m.NacosAddressServer) + i = encodeVarintMcpBridge(dAtA, i, uint64(len(m.NacosAddressServer))) + i-- + dAtA[i] = 0x2a + } + if m.Port != 0 { + i = encodeVarintMcpBridge(dAtA, i, uint64(m.Port)) + i-- + dAtA[i] = 0x20 + } + if len(m.Domain) > 0 { + i -= len(m.Domain) + copy(dAtA[i:], m.Domain) + i = encodeVarintMcpBridge(dAtA, i, uint64(len(m.Domain))) + i-- + dAtA[i] = 0x1a + } + if len(m.Name) > 0 { + i -= len(m.Name) + copy(dAtA[i:], m.Name) + i = encodeVarintMcpBridge(dAtA, i, uint64(len(m.Name))) + i-- + dAtA[i] = 0x12 + } + if len(m.Type) > 0 { + i -= len(m.Type) + copy(dAtA[i:], m.Type) + i = encodeVarintMcpBridge(dAtA, i, uint64(len(m.Type))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func encodeVarintMcpBridge(dAtA []byte, offset int, v uint64) int { + offset -= sovMcpBridge(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *McpBridge) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Registries) > 0 { + for _, e := range m.Registries { + l = e.Size() + n += 1 + l + sovMcpBridge(uint64(l)) + } + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *RegistryConfig) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Type) + if l > 0 { + n += 1 + l + sovMcpBridge(uint64(l)) + } + l = len(m.Name) + if l > 0 { + n += 1 + l + sovMcpBridge(uint64(l)) + } + l = len(m.Domain) + if l > 0 { + n += 1 + l + sovMcpBridge(uint64(l)) + } + if m.Port != 0 { + n += 1 + sovMcpBridge(uint64(m.Port)) + } + l = len(m.NacosAddressServer) + if l > 0 { + n += 1 + l + sovMcpBridge(uint64(l)) + } + l = len(m.NacosAccessKey) + if l > 0 { + n += 1 + l + sovMcpBridge(uint64(l)) + } + l = len(m.NacosSecretKey) + if l > 0 { + n += 1 + l + sovMcpBridge(uint64(l)) + } + l = len(m.NacosNamespaceId) + if l > 0 { + n += 1 + l + sovMcpBridge(uint64(l)) + } + l = len(m.NacosNamespace) + if l > 0 { + n += 1 + l + sovMcpBridge(uint64(l)) + } + if len(m.NacosGroups) > 0 { + for _, s := range m.NacosGroups { + l = len(s) + n += 1 + l + sovMcpBridge(uint64(l)) + } + } + if m.NacosRefreshInterval != 0 { + n += 1 + sovMcpBridge(uint64(m.NacosRefreshInterval)) + } + l = len(m.ConsulNamespace) + if l > 0 { + n += 1 + l + sovMcpBridge(uint64(l)) + } + if len(m.ZkServicesPath) > 0 { + for _, s := range m.ZkServicesPath { + l = len(s) + n += 1 + l + sovMcpBridge(uint64(l)) + } + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovMcpBridge(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozMcpBridge(x uint64) (n int) { + return sovMcpBridge(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *McpBridge) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMcpBridge + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: McpBridge: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: McpBridge: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Registries", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMcpBridge + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthMcpBridge + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthMcpBridge + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Registries = append(m.Registries, &RegistryConfig{}) + if err := m.Registries[len(m.Registries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipMcpBridge(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthMcpBridge + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *RegistryConfig) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMcpBridge + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: RegistryConfig: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: RegistryConfig: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMcpBridge + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMcpBridge + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMcpBridge + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Type = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMcpBridge + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMcpBridge + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMcpBridge + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Name = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Domain", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMcpBridge + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMcpBridge + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMcpBridge + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Domain = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Port", wireType) + } + m.Port = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMcpBridge + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Port |= uint32(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field NacosAddressServer", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMcpBridge + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMcpBridge + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMcpBridge + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.NacosAddressServer = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 6: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field NacosAccessKey", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMcpBridge + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMcpBridge + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMcpBridge + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.NacosAccessKey = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 7: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field NacosSecretKey", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMcpBridge + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMcpBridge + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMcpBridge + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.NacosSecretKey = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 8: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field NacosNamespaceId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMcpBridge + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMcpBridge + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMcpBridge + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.NacosNamespaceId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 9: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field NacosNamespace", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMcpBridge + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMcpBridge + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMcpBridge + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.NacosNamespace = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 10: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field NacosGroups", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMcpBridge + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMcpBridge + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMcpBridge + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.NacosGroups = append(m.NacosGroups, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + case 11: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field NacosRefreshInterval", wireType) + } + m.NacosRefreshInterval = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMcpBridge + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.NacosRefreshInterval |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + case 12: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ConsulNamespace", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMcpBridge + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMcpBridge + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMcpBridge + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ConsulNamespace = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 13: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ZkServicesPath", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowMcpBridge + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthMcpBridge + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthMcpBridge + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ZkServicesPath = append(m.ZkServicesPath, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipMcpBridge(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthMcpBridge + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipMcpBridge(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowMcpBridge + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowMcpBridge + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowMcpBridge + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthMcpBridge + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupMcpBridge + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthMcpBridge + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthMcpBridge = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowMcpBridge = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupMcpBridge = fmt.Errorf("proto: unexpected end of group") +) diff --git a/api/networking/v1/mcp_bridge.proto b/api/networking/v1/mcp_bridge.proto new file mode 100644 index 000000000..19ceb3d48 --- /dev/null +++ b/api/networking/v1/mcp_bridge.proto @@ -0,0 +1,63 @@ +// Copyright (c) 2022 Alibaba Group Holding Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; + +import "google/api/field_behavior.proto"; + +// $schema: higress.networking.v1.McpBridge +// $title: McpBridge +// $description: Configuration affecting service discovery from multi registries +// $mode: none + +package higress.networking.v1; + +option go_package = "github.com/alibaba/higress/api/networking/v1"; + +// +// +// +message McpBridge { + repeated RegistryConfig registries = 1; +} + +message RegistryConfig { + string type = 1 [(google.api.field_behavior) = REQUIRED]; + string name = 2; + string domain = 3 [(google.api.field_behavior) = REQUIRED]; + uint32 port = 4 [(google.api.field_behavior) = REQUIRED]; + string nacosAddressServer = 5; + string nacosAccessKey = 6; + string nacosSecretKey = 7; + string nacosNamespaceId = 8; + string nacosNamespace = 9; + repeated string nacosGroups = 10; + int64 nacosRefreshInterval = 11; + string consulNamespace = 12; + repeated string zkServicesPath = 13; +} diff --git a/api/networking/v1/mcp_bridge_deepcopy.gen.go b/api/networking/v1/mcp_bridge_deepcopy.gen.go new file mode 100644 index 000000000..842dc83ce --- /dev/null +++ b/api/networking/v1/mcp_bridge_deepcopy.gen.go @@ -0,0 +1,58 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: networking/v1/mcp_bridge.proto + +package v1 + +import ( + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + _ "istio.io/gogo-genproto/googleapis/google/api" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// DeepCopyInto supports using McpBridge within kubernetes types, where deepcopy-gen is used. +func (in *McpBridge) DeepCopyInto(out *McpBridge) { + p := proto.Clone(in).(*McpBridge) + *out = *p +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new McpBridge. Required by controller-gen. +func (in *McpBridge) DeepCopy() *McpBridge { + if in == nil { + return nil + } + out := new(McpBridge) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInterface is an autogenerated deepcopy function, copying the receiver, creating a new McpBridge. Required by controller-gen. +func (in *McpBridge) DeepCopyInterface() interface{} { + return in.DeepCopy() +} + +// DeepCopyInto supports using RegistryConfig within kubernetes types, where deepcopy-gen is used. +func (in *RegistryConfig) DeepCopyInto(out *RegistryConfig) { + p := proto.Clone(in).(*RegistryConfig) + *out = *p +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RegistryConfig. Required by controller-gen. +func (in *RegistryConfig) DeepCopy() *RegistryConfig { + if in == nil { + return nil + } + out := new(RegistryConfig) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInterface is an autogenerated deepcopy function, copying the receiver, creating a new RegistryConfig. Required by controller-gen. +func (in *RegistryConfig) DeepCopyInterface() interface{} { + return in.DeepCopy() +} diff --git a/api/networking/v1/mcp_bridge_json.gen.go b/api/networking/v1/mcp_bridge_json.gen.go new file mode 100644 index 000000000..7926cee31 --- /dev/null +++ b/api/networking/v1/mcp_bridge_json.gen.go @@ -0,0 +1,45 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: networking/v1/mcp_bridge.proto + +package v1 + +import ( + bytes "bytes" + fmt "fmt" + github_com_gogo_protobuf_jsonpb "github.com/gogo/protobuf/jsonpb" + proto "github.com/gogo/protobuf/proto" + _ "istio.io/gogo-genproto/googleapis/google/api" + math "math" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// MarshalJSON is a custom marshaler for McpBridge +func (this *McpBridge) MarshalJSON() ([]byte, error) { + str, err := McpBridgeMarshaler.MarshalToString(this) + return []byte(str), err +} + +// UnmarshalJSON is a custom unmarshaler for McpBridge +func (this *McpBridge) UnmarshalJSON(b []byte) error { + return McpBridgeUnmarshaler.Unmarshal(bytes.NewReader(b), this) +} + +// MarshalJSON is a custom marshaler for RegistryConfig +func (this *RegistryConfig) MarshalJSON() ([]byte, error) { + str, err := McpBridgeMarshaler.MarshalToString(this) + return []byte(str), err +} + +// UnmarshalJSON is a custom unmarshaler for RegistryConfig +func (this *RegistryConfig) UnmarshalJSON(b []byte) error { + return McpBridgeUnmarshaler.Unmarshal(bytes.NewReader(b), this) +} + +var ( + McpBridgeMarshaler = &github_com_gogo_protobuf_jsonpb.Marshaler{} + McpBridgeUnmarshaler = &github_com_gogo_protobuf_jsonpb.Unmarshaler{AllowUnknownFields: true} +) diff --git a/api/protocol.yaml b/api/protocol.yaml new file mode 100644 index 000000000..a3fc3fe63 --- /dev/null +++ b/api/protocol.yaml @@ -0,0 +1,5 @@ +protoc: + # This is ignored because we always run with + # --protoc-bin-path=/usr/bin/protoc to use the protoc from our + # container + version: 3.6.1 diff --git a/registry/memory/cache.go b/registry/memory/cache.go new file mode 100644 index 000000000..f941954f1 --- /dev/null +++ b/registry/memory/cache.go @@ -0,0 +1,236 @@ +package memory + +import ( + "sort" + "strconv" + "sync" + "time" + + "istio.io/api/networking/v1alpha3" + + "github.com/alibaba/higress/pkg/common" +) + +type Cache interface { + UpdateServiceEntryWrapper(service string, data *ServiceEntryWrapper) + DeleteServiceEntryWrapper(service string) + UpdateServiceEntryEnpointWrapper(service, ip, regionId, zoneId, protocol string, labels map[string]string) + GetServiceByEndpoints(requestVersions, endpoints map[string]bool, versionKey string, protocol common.Protocol) map[string][]string + GetAllServiceEntry() []*v1alpha3.ServiceEntry + GetAllServiceEntryWrapper() []*ServiceEntryWrapper + GetIncrementalServiceEntryWrapper() (updatedList []*ServiceEntryWrapper, deletedList []*ServiceEntryWrapper) + RemoveEndpointByIp(ip string) +} + +func NewCache() Cache { + return &store{ + mux: &sync.RWMutex{}, + sew: make(map[string]*ServiceEntryWrapper), + toBeUpdated: make([]*ServiceEntryWrapper, 0), + toBeDeleted: make([]*ServiceEntryWrapper, 0), + ip2services: make(map[string]map[string]bool), + } +} + +type store struct { + mux *sync.RWMutex + sew map[string]*ServiceEntryWrapper + toBeUpdated []*ServiceEntryWrapper + toBeDeleted []*ServiceEntryWrapper + ip2services map[string]map[string]bool +} + +func (s *store) UpdateServiceEntryEnpointWrapper(service, ip, regionId, zoneId, protocol string, labels map[string]string) { + s.mux.Lock() + defer s.mux.Unlock() + if se, exist := s.sew[service]; exist { + idx := -1 + for i, ep := range se.ServiceEntry.Endpoints { + if ep.Address == ip { + idx = i + if len(regionId) != 0 { + ep.Locality = regionId + if len(zoneId) != 0 { + ep.Locality = regionId + "/" + zoneId + } + } + if labels != nil { + for k, v := range labels { + if protocol == common.Dubbo.String() && k == "version" { + ep.Labels["appversion"] = v + continue + } + ep.Labels[k] = v + } + } + + if idx != -1 { + se.ServiceEntry.Endpoints[idx] = ep + } + return + } + + } + + } + return +} + +func (s *store) UpdateServiceEntryWrapper(service string, data *ServiceEntryWrapper) { + s.mux.Lock() + defer s.mux.Unlock() + + if old, exist := s.sew[service]; exist { + data.SetCreateTime(old.GetCreateTime()) + } else { + data.SetCreateTime(time.Now()) + } + + s.toBeUpdated = append(s.toBeUpdated, data) + s.sew[service] = data +} + +func (s *store) DeleteServiceEntryWrapper(service string) { + s.mux.Lock() + defer s.mux.Unlock() + + if data, exist := s.sew[service]; exist { + s.toBeDeleted = append(s.toBeDeleted, data) + } + delete(s.sew, service) +} + +// GetServiceByEndpoints get the list of services of which "address:port" contained by the endpoints +// and the version of the service contained by the requestVersions. The result format is as below: +// key: serviceName + "#@" + suffix +// values: ["v1", "v2"] which has removed duplication +func (s *store) GetServiceByEndpoints(requestVersions, endpoints map[string]bool, versionKey string, protocol common.Protocol) map[string][]string { + s.mux.RLock() + defer s.mux.RUnlock() + + result := make(map[string][]string) + for _, serviceEntryWrapper := range s.sew { + for _, workload := range serviceEntryWrapper.ServiceEntry.Endpoints { + port, exist := workload.Ports[protocol.String()] + if !exist { + continue + } + + endpoint := workload.Address + common.ColonSeparator + strconv.Itoa(int(port)) + if _, hit := endpoints[endpoint]; hit { + if version, has := workload.Labels[versionKey]; has { + if _, in := requestVersions[version]; in { + key := serviceEntryWrapper.ServiceName + common.SpecialSeparator + serviceEntryWrapper.Suffix + result[key] = append(result[key], version) + } + } + } + } + } + + // remove duplication + for key, versions := range result { + sort.Strings(versions) + i := 0 + for j := 1; j < len(versions); j++ { + if versions[j] != versions[i] { + i++ + versions[i] = versions[j] + } + } + result[key] = versions[:i+1] + } + + return result +} + +// GetAllServiceEntry get all ServiceEntry in the store for xds push +func (s *store) GetAllServiceEntry() []*v1alpha3.ServiceEntry { + s.mux.RLock() + defer s.mux.RUnlock() + + seList := make([]*v1alpha3.ServiceEntry, 0) + for _, serviceEntryWrapper := range s.sew { + if len(serviceEntryWrapper.ServiceEntry.Hosts) == 0 { + continue + } + seList = append(seList, serviceEntryWrapper.ServiceEntry.DeepCopy()) + } + sort.Slice(seList, func(i, j int) bool { + return seList[i].Hosts[0] > seList[j].Hosts[0] + }) + return seList +} + +// GetAllServiceEntryWrapper get all ServiceEntryWrapper in the store for xds push +func (s *store) GetAllServiceEntryWrapper() []*ServiceEntryWrapper { + s.mux.RLock() + defer s.mux.RUnlock() + defer s.cleanUpdateAndDeleteArray() + + sewList := make([]*ServiceEntryWrapper, 0) + for _, serviceEntryWrapper := range s.sew { + sewList = append(sewList, serviceEntryWrapper.DeepCopy()) + } + return sewList +} + +// GetIncrementalServiceEntryWrapper get incremental ServiceEntryWrapper in the store for xds push +func (s *store) GetIncrementalServiceEntryWrapper() ([]*ServiceEntryWrapper, []*ServiceEntryWrapper) { + s.mux.RLock() + defer s.mux.RUnlock() + defer s.cleanUpdateAndDeleteArray() + + updatedList := make([]*ServiceEntryWrapper, 0) + for _, serviceEntryWrapper := range s.toBeUpdated { + updatedList = append(updatedList, serviceEntryWrapper.DeepCopy()) + } + + deletedList := make([]*ServiceEntryWrapper, 0) + for _, serviceEntryWrapper := range s.toBeDeleted { + deletedList = append(deletedList, serviceEntryWrapper.DeepCopy()) + } + + return updatedList, deletedList +} + +func (s *store) cleanUpdateAndDeleteArray() { + s.toBeUpdated = nil + s.toBeDeleted = nil +} + +func (s *store) updateIpMap(service string, data *ServiceEntryWrapper) { + for _, ep := range data.ServiceEntry.Endpoints { + if s.ip2services[ep.Address] == nil { + s.ip2services[ep.Address] = make(map[string]bool) + } + s.ip2services[ep.Address][service] = true + } +} + +func (s *store) RemoveEndpointByIp(ip string) { + s.mux.Lock() + defer s.mux.Unlock() + + services, has := s.ip2services[ip] + if !has { + return + } + delete(s.ip2services, ip) + + for service := range services { + if data, exist := s.sew[service]; exist { + idx := -1 + for i, ep := range data.ServiceEntry.Endpoints { + if ep.Address == ip { + idx = i + break + } + } + if idx != -1 { + data.ServiceEntry.Endpoints = append(data.ServiceEntry.Endpoints[:idx], data.ServiceEntry.Endpoints[idx+1:]...) + } + s.toBeUpdated = append(s.toBeUpdated, data) + } + } +} diff --git a/registry/memory/model.go b/registry/memory/model.go new file mode 100644 index 000000000..4694d97ee --- /dev/null +++ b/registry/memory/model.go @@ -0,0 +1,30 @@ +package memory + +import ( + "time" + + "istio.io/api/networking/v1alpha3" +) + +type ServiceEntryWrapper struct { + ServiceName string + ServiceEntry *v1alpha3.ServiceEntry + Suffix string + RegistryType string + createTime time.Time +} + +func (sew *ServiceEntryWrapper) DeepCopy() *ServiceEntryWrapper { + return &ServiceEntryWrapper{ + ServiceEntry: sew.ServiceEntry.DeepCopy(), + createTime: sew.GetCreateTime(), + } +} + +func (sew *ServiceEntryWrapper) SetCreateTime(createTime time.Time) { + sew.createTime = createTime +} + +func (sew *ServiceEntryWrapper) GetCreateTime() time.Time { + return sew.createTime +} diff --git a/registry/nacos/address/address_discovery.go b/registry/nacos/address/address_discovery.go new file mode 100644 index 000000000..3d0daec8e --- /dev/null +++ b/registry/nacos/address/address_discovery.go @@ -0,0 +1,171 @@ +package address + +import ( + "fmt" + "io/ioutil" + "math/rand" + "net" + "net/http" + "strings" + "sync" + "time" + + "go.uber.org/atomic" + "istio.io/pkg/log" +) + +const ( + NACOS_PATH = "/nacos/serverlist" + MODULE_HEADER_KEY = "Request-Module" + MODULE_HEADER_VALUE = "Naming" + DEFAULT_INTERVAL = 30 * time.Second +) + +type NacosAddressProvider struct { + serverAddr string + nacosAddr string + nacosBackupAddr []string + namespace string + stop chan struct{} + trigger chan struct{} + cond *sync.Cond + isStop *atomic.Bool + mutex *sync.Mutex +} + +func NewNacosAddressProvider(serverAddr, namespace string) *NacosAddressProvider { + provider := &NacosAddressProvider{ + serverAddr: serverAddr, + namespace: namespace, + stop: make(chan struct{}), + trigger: make(chan struct{}, 1), + cond: sync.NewCond(new(sync.Mutex)), + isStop: atomic.NewBool(false), + mutex: &sync.Mutex{}, + } + go provider.Run() + return provider +} + +func (p *NacosAddressProvider) Run() { + ticker := time.NewTicker(DEFAULT_INTERVAL) + defer ticker.Stop() + p.addressDiscovery() + for { + select { + case <-p.trigger: + p.addressDiscovery() + case <-ticker.C: + p.addressDiscovery() + case <-p.stop: + return + } + } +} + +func (p *NacosAddressProvider) Update(serverAddr, namespace string) { + p.mutex.Lock() + p.serverAddr = serverAddr + p.namespace = namespace + p.mutex.Unlock() + p.addressDiscovery() +} + +func (p *NacosAddressProvider) Trigger() { + p.cond.L.Lock() + oldAddr := p.nacosAddr + if len(p.nacosBackupAddr) > 0 { + p.nacosAddr = p.nacosBackupAddr[rand.Intn(len(p.nacosBackupAddr))] + for i := len(p.nacosBackupAddr) - 1; i >= 0; i-- { + if p.nacosBackupAddr[i] == p.nacosAddr { + p.nacosBackupAddr = append(p.nacosBackupAddr[:i], p.nacosBackupAddr[i+1:]...) + } + } + p.nacosBackupAddr = append(p.nacosBackupAddr, oldAddr) + } + p.cond.Broadcast() + p.cond.L.Unlock() + select { + case p.trigger <- struct{}{}: + default: + } +} + +func (p *NacosAddressProvider) Stop() { + p.isStop.Store(true) + p.stop <- struct{}{} +} + +func (p *NacosAddressProvider) GetNacosAddress(oldAddress string) <-chan string { + addressChan := make(chan string) + go func() { + var addr string + p.cond.L.Lock() + log.Debugf("get nacos address, p.nacosAddr, oldAddress", p.nacosAddr, oldAddress) + for p.nacosAddr == oldAddress || p.nacosAddr == "" { + if p.isStop.Load() { + return + } + p.cond.Wait() + } + addr = p.nacosAddr + p.cond.L.Unlock() + addressChan <- addr + }() + return addressChan +} + +func (p *NacosAddressProvider) addressDiscovery() { + p.mutex.Lock() + url := fmt.Sprintf("%s%s?namespace=%s", p.serverAddr, NACOS_PATH, p.namespace) + p.mutex.Unlock() + if !strings.HasPrefix(url, "http://") && !strings.HasPrefix(url, "https://") { + url = "http://" + url + } + req, err := http.NewRequest("GET", url, nil) + if err != nil { + log.Errorf("create request failed, err:%v, url:%s", err, url) + return + } + req.Header.Add(MODULE_HEADER_KEY, MODULE_HEADER_VALUE) + resp, err := http.DefaultClient.Do(req) + if err != nil { + log.Errorf("get nacos address failed, err:%v, url:%s", err, url) + return + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + log.Errorf("get nacos address failed, statusCode:%d", resp.StatusCode) + return + } + body, _ := ioutil.ReadAll(resp.Body) + addresses := string(body) + addrVec := strings.Fields(addresses) + if len(addrVec) == 0 { + return + } + needUpdate := true + p.cond.L.Lock() + for _, address := range addrVec { + ip := net.ParseIP(address) + if ip == nil { + log.Errorf("ip parse failed, ip:%s", address) + return + } + if p.nacosAddr == address { + needUpdate = false + } + } + p.nacosBackupAddr = addrVec + if needUpdate { + p.nacosAddr = addrVec[rand.Intn(len(addrVec))] + p.cond.Broadcast() + log.Infof("nacos address updated, address:%s", p.nacosAddr) + } + for i := len(p.nacosBackupAddr) - 1; i >= 0; i-- { + if p.nacosBackupAddr[i] == p.nacosAddr { + p.nacosBackupAddr = append(p.nacosBackupAddr[:i], p.nacosBackupAddr[i+1:]...) + } + } + p.cond.L.Unlock() +} diff --git a/registry/nacos/address/address_discovery_test.go b/registry/nacos/address/address_discovery_test.go new file mode 100644 index 000000000..4bd350dd6 --- /dev/null +++ b/registry/nacos/address/address_discovery_test.go @@ -0,0 +1,287 @@ +package address + +import ( + "net/http" + "net/http/httptest" + "testing" + "time" +) + +func setUpServer(status int, body []byte) (string, func()) { + server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.WriteHeader(status) + rw.Write(body) + })) + return server.URL, func() { + server.Close() + } +} + +func setUpServerWithBodyPtr(status int, body *[]byte) (string, func()) { + server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.WriteHeader(status) + rw.Write(*body) + })) + return server.URL, func() { + server.Close() + } +} +func TestGetNacosAddress(t *testing.T) { + goodURL, goodTearDown := setUpServer(200, []byte("1.1.1.1\n 2.2.2.2")) + defer goodTearDown() + badURL, badTearDown := setUpServer(200, []byte("abc\n 2.2.2.2")) + defer badTearDown() + errURL, errTearDown := setUpServer(503, []byte("1.1.1.1\n 2.2.2.2")) + defer errTearDown() + tests := []struct { + name string + serverAddr string + want []string + }{ + { + "good", + goodURL, + []string{"1.1.1.1", "2.2.2.2"}, + }, + { + "bad", + badURL, + []string{}, + }, + { + "err", + errURL, + []string{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + provider := NewNacosAddressProvider(tt.serverAddr, "") + timeout := time.NewTicker(1 * time.Second) + var got string + if len(tt.want) == 0 { + select { + case got = <-provider.GetNacosAddress(""): + t.Errorf("GetNacosAddress() = %v, want empty", got) + case <-timeout.C: + return + } + } + select { + case got = <-provider.GetNacosAddress(""): + case <-timeout.C: + t.Error("GetNacosAddress timeout") + } + for _, value := range tt.want { + if got == value { + return + } + } + t.Errorf("GetNacosAddress() = %v, want %v", got, tt.want) + }) + } +} + +func TestTrigger(t *testing.T) { + body := []byte("1.1.1.1 ") + url, tearDown := setUpServerWithBodyPtr(200, &body) + defer tearDown() + provider := NewNacosAddressProvider(url, "xxxx") + address := <-provider.GetNacosAddress("") + if address != "1.1.1.1" { + t.Errorf("got %s, want %s", address, "1.1.1.1") + } + body = []byte(" 2.2.2.2 ") + tests := []struct { + name string + trigger bool + want string + }{ + { + "no trigger", + false, + "1.1.1.1", + }, + { + "trigger", + true, + "2.2.2.2", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.trigger { + provider.Trigger() + } + timeout := time.NewTicker(1 * time.Second) + select { + case <-provider.GetNacosAddress("1.1.1.1"): + case <-timeout.C: + } + if provider.nacosAddr != tt.want { + t.Errorf("got %s, want %s", provider.nacosAddr, tt.want) + } + }) + } +} + +func TestBackup(t *testing.T) { + body := []byte("1.1.1.1 ") + url, tearDown := setUpServerWithBodyPtr(200, &body) + defer tearDown() + provider := NewNacosAddressProvider(url, "xxxx") + address := <-provider.GetNacosAddress("") + if address != "1.1.1.1" { + t.Errorf("got %s, want %s", address, "1.1.1.1") + } + tests := []struct { + name string + oldaddr string + newaddr string + triggerNum int + want string + }{ + { + "case1", + "1.1.1.1", + "1.1.1.1\n2.2.2.2", + 1, + "2.2.2.2", + }, + { + "case2", + "1.1.1.1", + "3.3.3.3 1.1.1.1", + 1, + "3.3.3.3", + }, + { + "case3", + "1.1.1.1", + "3.3.3.3 1.1.1.1", + 2, + "1.1.1.1", + }, + { + "case4", + "1.1.1.1", + "3.3.3.3\n 1.1.1.1", + 3, + "3.3.3.3", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + provider.nacosAddr = tt.oldaddr + body = []byte(tt.newaddr) + provider.addressDiscovery() + for i := 0; i < tt.triggerNum; i++ { + provider.Trigger() + } + timeout := time.NewTicker(1 * time.Second) + var newAddr string + select { + case newAddr = <-provider.GetNacosAddress(""): + case <-timeout.C: + } + if newAddr != tt.want { + t.Errorf("got %s, want %s", newAddr, tt.want) + } + }) + } +} + +func TestKeepIp(t *testing.T) { + body := []byte("1.1.1.1") + url, tearDown := setUpServerWithBodyPtr(200, &body) + defer tearDown() + provider := NewNacosAddressProvider(url, "xxxx") + address := <-provider.GetNacosAddress("") + if address != "1.1.1.1" { + t.Errorf("got %s, want %s", address, "1.1.1.1") + } + tests := []struct { + name string + newAddr []byte + want string + }{ + { + "add ip", + []byte("1.1.1.1\n 2.2.2.2"), + "1.1.1.1", + }, + { + "remove ip", + []byte("2.2.2.2"), + "2.2.2.2", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + body = tt.newAddr + provider.addressDiscovery() + timeout := time.NewTicker(1 * time.Second) + select { + case <-provider.GetNacosAddress("1.1.1.1"): + case <-timeout.C: + } + if provider.nacosAddr != tt.want { + t.Errorf("got %s, want %s", provider.nacosAddr, tt.want) + } + }) + } +} + +func TestMultiClient(t *testing.T) { + body := []byte("1.1.1.1") + url, tearDown := setUpServerWithBodyPtr(200, &body) + defer tearDown() + provider := NewNacosAddressProvider(url, "xxxx") + address := <-provider.GetNacosAddress("") + if address != "1.1.1.1" { + t.Errorf("got %s, want %s", address, "1.1.1.1") + } + body = []byte("2.2.2.2") + tests := []struct { + name string + oldAddrs []string + want []string + }{ + { + "case1", + []string{"1.1.1.1", "1.1.1.1"}, + []string{"2.2.2.2", "2.2.2.2"}, + }, + { + "case2", + []string{"2.2.2.2", "1.1.1.1"}, + []string{"", "2.2.2.2"}, + }, + { + "case3", + []string{"1.1.1.1", "2.2.2.2"}, + []string{"2.2.2.2", ""}, + }, + { + "case4", + []string{"2.2.2.2", "2.2.2.2"}, + []string{"", ""}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + provider.addressDiscovery() + for i := 0; i < len(tt.oldAddrs); i++ { + timeout := time.NewTicker(1 * time.Second) + var newaddr string + select { + case newaddr = <-provider.GetNacosAddress(tt.oldAddrs[i]): + case <-timeout.C: + } + if newaddr != tt.want[i] { + t.Errorf("got %s, want %s", newaddr, tt.want[i]) + } + } + }) + } +} diff --git a/registry/nacos/v2/watcher.go b/registry/nacos/v2/watcher.go new file mode 100644 index 000000000..8afc3307d --- /dev/null +++ b/registry/nacos/v2/watcher.go @@ -0,0 +1,502 @@ +package v2 + +import ( + "errors" + "strconv" + "strings" + "sync" + "time" + + "github.com/nacos-group/nacos-sdk-go/v2/clients" + "github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client" + "github.com/nacos-group/nacos-sdk-go/v2/common/constant" + "github.com/nacos-group/nacos-sdk-go/v2/model" + "github.com/nacos-group/nacos-sdk-go/v2/vo" + "go.uber.org/atomic" + "istio.io/api/networking/v1alpha3" + "istio.io/pkg/log" + + apiv1 "github.com/alibaba/higress/api/networking/v1" + "github.com/alibaba/higress/pkg/common" + provider "github.com/alibaba/higress/registry" + "github.com/alibaba/higress/registry/memory" + "github.com/alibaba/higress/registry/nacos/address" +) + +const ( + DefaultInitTimeout = time.Second * 10 + DefaultNacosTimeout = 5000 + DefaultNacosLogLevel = "warn" + DefaultNacosLogDir = "log/nacos/log/" + DefaultNacosCacheDir = "log/nacos/cache/" + DefaultNacosNotLoadCache = true + DefaultNacosLogRotateTime = "24h" + DefaultNacosLogMaxAge = 3 + DefaultUpdateCacheWhenEmpty = true + DefaultRefreshInterval = time.Second * 30 + DefaultRefreshIntervalLimit = time.Second * 10 + DefaultFetchPageSize = 50 + DefaultJoiner = "@@" +) + +type watcher struct { + provider.BaseWatcher + apiv1.RegistryConfig + WatchingServices map[string]bool `json:"watching_services"` + RegistryType provider.ServiceRegistryType `json:"registry_type"` + Status provider.WatcherStatus `json:"status"` + namingClient naming_client.INamingClient + updateHandler provider.ServiceUpdateHandler + readyHandler provider.ReadyHandler + cache memory.Cache + mutex *sync.Mutex + stop chan struct{} + isStop bool + addrProvider *address.NacosAddressProvider + updateCacheWhenEmpty bool + nacosClietConfig *constant.ClientConfig +} + +type WatcherOption func(w *watcher) + +func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, error) { + w := &watcher{ + WatchingServices: make(map[string]bool), + RegistryType: provider.Nacos2, + Status: provider.UnHealthy, + cache: cache, + mutex: &sync.Mutex{}, + stop: make(chan struct{}), + } + + w.NacosRefreshInterval = int64(DefaultRefreshInterval) + + for _, opt := range opts { + opt(w) + } + + log.Infof("new nacos2 watcher with config Name:%s", w.Name) + + w.nacosClietConfig = constant.NewClientConfig( + constant.WithTimeoutMs(DefaultNacosTimeout), + constant.WithLogLevel(DefaultNacosLogLevel), + constant.WithLogDir(DefaultNacosLogDir), + constant.WithCacheDir(DefaultNacosCacheDir), + constant.WithNotLoadCacheAtStart(DefaultNacosNotLoadCache), + constant.WithLogRollingConfig(&constant.ClientLogRollingConfig{ + MaxAge: DefaultNacosLogMaxAge, + }), + constant.WithUpdateCacheWhenEmpty(w.updateCacheWhenEmpty), + constant.WithNamespaceId(w.NacosNamespaceId), + constant.WithAccessKey(w.NacosAccessKey), + constant.WithSecretKey(w.NacosSecretKey), + ) + + initTimer := time.NewTimer(DefaultInitTimeout) + if w.NacosAddressServer != "" { + w.addrProvider = address.NewNacosAddressProvider(w.NacosAddressServer, w.NacosNamespace) + w.Domain = "" + select { + case w.Domain = <-w.addrProvider.GetNacosAddress(w.Domain): + case <-initTimer.C: + return nil, errors.New("new nacos2 watcher timeout") + } + go w.updateNacosClient() + } + sc := []constant.ServerConfig{ + *constant.NewServerConfig(w.Domain, uint64(w.Port)), + } + + success := make(chan struct{}) + go func() { + namingClient, err := clients.NewNamingClient(vo.NacosClientParam{ + ClientConfig: w.nacosClietConfig, + ServerConfigs: sc, + }) + if err == nil { + w.namingClient = namingClient + close(success) + } else { + log.Errorf("can not create naming client, err:%v", err) + } + }() + + select { + case <-initTimer.C: + return nil, errors.New("new nacos2 watcher timeout") + case <-success: + return w, nil + } +} + +func WithNacosAddressServer(nacosAddressServer string) WatcherOption { + return func(w *watcher) { + w.NacosAddressServer = nacosAddressServer + } +} + +func WithNacosAccessKey(nacosAccessKey string) WatcherOption { + return func(w *watcher) { + w.NacosAccessKey = nacosAccessKey + } +} + +func WithNacosSecretKey(nacosSecretKey string) WatcherOption { + return func(w *watcher) { + w.NacosSecretKey = nacosSecretKey + } +} + +func WithNacosNamespaceId(nacosNamespaceId string) WatcherOption { + return func(w *watcher) { + w.NacosNamespaceId = nacosNamespaceId + } +} + +func WithNacosNamespace(nacosNamespace string) WatcherOption { + return func(w *watcher) { + w.NacosNamespace = nacosNamespace + } +} + +func WithNacosGroups(nacosGroups []string) WatcherOption { + return func(w *watcher) { + w.NacosGroups = nacosGroups + } +} + +func WithNacosRefreshInterval(refreshInterval int64) WatcherOption { + return func(w *watcher) { + if refreshInterval < int64(DefaultRefreshIntervalLimit) { + refreshInterval = int64(DefaultRefreshIntervalLimit) + } + w.NacosRefreshInterval = refreshInterval + } +} + +func WithType(t string) WatcherOption { + return func(w *watcher) { + w.Type = t + } +} + +func WithName(name string) WatcherOption { + return func(w *watcher) { + w.Name = name + } +} + +func WithDomain(domain string) WatcherOption { + return func(w *watcher) { + w.Domain = domain + } +} + +func WithPort(port uint32) WatcherOption { + return func(w *watcher) { + w.Port = port + } +} + +func WithUpdateCacheWhenEmpty(enable bool) WatcherOption { + return func(w *watcher) { + w.updateCacheWhenEmpty = enable + } +} + +func (w *watcher) Run() { + ticker := time.NewTicker(time.Duration(w.NacosRefreshInterval)) + defer ticker.Stop() + w.Status = provider.ProbeWatcherStatus(w.Domain, strconv.FormatUint(uint64(w.Port), 10)) + err := w.fetchAllServices() + if err != nil { + log.Errorf("first fetch services failed, err:%v", err) + } else { + w.readyHandler(true) + } + for { + select { + case <-ticker.C: + err := w.fetchAllServices() + if err != nil { + log.Errorf("fetch services failed, err:%v", err) + } else { + w.readyHandler(true) + } + case <-w.stop: + return + } + } +} + +func (w *watcher) updateNacosClient() { + for { + select { + case addr := <-w.addrProvider.GetNacosAddress(w.Domain): + func() { + w.mutex.Lock() + defer w.mutex.Unlock() + w.Domain = addr + namingClient, err := clients.NewNamingClient(vo.NacosClientParam{ + ClientConfig: w.nacosClietConfig, + ServerConfigs: []constant.ServerConfig{ + *constant.NewServerConfig(addr, uint64(w.Port)), + }, + }) + if err != nil { + log.Errorf("can not update naming client, err:%v", err) + return + } + w.namingClient = namingClient + log.Info("naming client updated") + }() + case <-w.stop: + return + } + } +} + +func (w *watcher) fetchAllServices() error { + w.mutex.Lock() + defer w.mutex.Unlock() + + if w.isStop { + return nil + } + fetchedServices := make(map[string]bool) + var tries int + for _, groupName := range w.NacosGroups { + for page := 1; ; page++ { + ss, err := w.namingClient.GetAllServicesInfo(vo.GetAllServiceInfoParam{ + GroupName: groupName, + PageNo: uint32(page), + PageSize: DefaultFetchPageSize, + NameSpace: w.NacosNamespace, + }) + if err != nil { + if tries > 10 { + return err + } + if w.addrProvider != nil { + w.addrProvider.Trigger() + } + log.Errorf("fetch nacos service list failed, err:%v, pageNo:%d", err, page) + page-- + tries++ + continue + } + for _, serviceName := range ss.Doms { + fetchedServices[groupName+DefaultJoiner+serviceName] = true + } + if ss.Count < DefaultFetchPageSize { + break + } + } + } + + for key := range w.WatchingServices { + if _, exist := fetchedServices[key]; !exist { + s := strings.Split(key, DefaultJoiner) + err := w.unsubscribe(s[0], s[1]) + if err != nil { + return err + } + delete(w.WatchingServices, key) + } + } + wg := sync.WaitGroup{} + subscribeFailed := atomic.NewBool(false) + watchingKeys := make(chan string, len(fetchedServices)) + for key := range fetchedServices { + if _, exist := w.WatchingServices[key]; !exist { + s := strings.Split(key, DefaultJoiner) + if !shouldSubscribe(s[1]) { + continue + } + wg.Add(1) + go func(k string) { + err := w.subscribe(s[0], s[1]) + if err != nil { + subscribeFailed.Store(true) + log.Errorf("subscribe failed, err:%v, group:%s, service:%s", err, s[0], s[1]) + } else { + watchingKeys <- k + } + wg.Done() + }(key) + } + } + wg.Wait() + close(watchingKeys) + for key := range watchingKeys { + w.WatchingServices[key] = true + } + if subscribeFailed.Load() { + return errors.New("subscribe services failed") + } + return nil +} + +func (w *watcher) subscribe(groupName string, serviceName string) error { + log.Debugf("subscribe service, groupName:%s, serviceName:%s", groupName, serviceName) + + err := w.namingClient.Subscribe(&vo.SubscribeParam{ + ServiceName: serviceName, + GroupName: groupName, + SubscribeCallback: w.getSubscribeCallback(groupName, serviceName), + }) + + if err != nil { + log.Errorf("subscribe service error:%v, groupName:%s, serviceName:%s", err, groupName, serviceName) + return err + } + + return nil +} + +func (w *watcher) unsubscribe(groupName string, serviceName string) error { + log.Debugf("unsubscribe service, groupName:%s, serviceName:%s", groupName, serviceName) + + err := w.namingClient.Unsubscribe(&vo.SubscribeParam{ + ServiceName: serviceName, + GroupName: groupName, + SubscribeCallback: w.getSubscribeCallback(groupName, serviceName), + }) + + if err != nil { + log.Errorf("unsubscribe service error:%v, groupName:%s, serviceName:%s", err, groupName, serviceName) + return err + } + + return nil +} + +func (w *watcher) getSubscribeCallback(groupName string, serviceName string) func(services []model.Instance, err error) { + suffix := strings.Join([]string{groupName, w.NacosNamespace, "nacos"}, common.DotSeparator) + suffix = strings.ReplaceAll(suffix, common.Underscore, common.Hyphen) + host := strings.Join([]string{serviceName, suffix}, common.DotSeparator) + + return func(services []model.Instance, err error) { + defer w.updateHandler() + + //log.Info("callback", "serviceName", serviceName, "suffix", suffix, "details", services) + + if err != nil { + if strings.Contains(err.Error(), "hosts is empty") { + if w.updateCacheWhenEmpty { + w.cache.DeleteServiceEntryWrapper(host) + } + } else { + log.Errorf("callback error:%v", err) + } + return + } + + if len(services) > 0 && services[0].Metadata != nil && services[0].Metadata["register-resource"] == "mcp-bridge" { + return + } + serviceEntry := w.generateServiceEntry(host, services) + w.cache.UpdateServiceEntryWrapper(host, &memory.ServiceEntryWrapper{ + ServiceName: serviceName, + ServiceEntry: serviceEntry, + Suffix: suffix, + RegistryType: w.Type, + }) + } +} + +func (w *watcher) generateServiceEntry(host string, services []model.Instance) *v1alpha3.ServiceEntry { + portList := make([]*v1alpha3.Port, 0) + endpoints := make([]*v1alpha3.WorkloadEntry, 0) + + for _, service := range services { + protocol := common.HTTP + if service.Metadata != nil && service.Metadata["protocol"] != "" { + protocol = common.ParseProtocol(service.Metadata["protocol"]) + } + port := &v1alpha3.Port{ + Name: protocol.String(), + Number: uint32(service.Port), + Protocol: protocol.String(), + } + if len(portList) == 0 { + portList = append(portList, port) + } + endpoint := &v1alpha3.WorkloadEntry{ + Address: service.Ip, + Ports: map[string]uint32{port.Protocol: port.Number}, + Labels: service.Metadata, + } + endpoints = append(endpoints, endpoint) + } + + se := &v1alpha3.ServiceEntry{ + Hosts: []string{host}, + Ports: portList, + Location: v1alpha3.ServiceEntry_MESH_INTERNAL, + Resolution: v1alpha3.ServiceEntry_STATIC, + Endpoints: endpoints, + } + + return se +} + +func (w *watcher) Stop() { + w.mutex.Lock() + defer w.mutex.Unlock() + if w.addrProvider != nil { + w.addrProvider.Stop() + } + for key := range w.WatchingServices { + s := strings.Split(key, DefaultJoiner) + err := w.unsubscribe(s[0], s[1]) + if err == nil { + delete(w.WatchingServices, key) + } + + // clean the cache + suffix := strings.Join([]string{s[0], w.NacosNamespace, w.Type}, common.DotSeparator) + suffix = strings.ReplaceAll(suffix, common.Underscore, common.Hyphen) + host := strings.Join([]string{s[1], suffix}, common.DotSeparator) + w.cache.DeleteServiceEntryWrapper(host) + } + + w.isStop = true + w.stop <- struct{}{} + w.readyHandler(false) +} + +func (w *watcher) IsHealthy() bool { + return w.Status == provider.Healthy +} + +func (w *watcher) GetRegistryType() string { + return w.RegistryType.String() +} + +func (w *watcher) AppendServiceUpdateHandler(f func()) { + w.updateHandler = f +} + +func (w *watcher) ReadyHandler(f func(bool)) { + w.readyHandler = f +} + +func shouldSubscribe(serviceName string) bool { + prefixFilters := []string{"consumers:"} + fullFilters := []string{""} + + for _, f := range prefixFilters { + if strings.HasPrefix(serviceName, f) { + return false + } + } + + for _, f := range fullFilters { + if serviceName == f { + return false + } + } + + return true +} diff --git a/registry/nacos/watcher.go b/registry/nacos/watcher.go new file mode 100644 index 000000000..4f8605808 --- /dev/null +++ b/registry/nacos/watcher.go @@ -0,0 +1,406 @@ +package nacos + +import ( + "strconv" + "strings" + "sync" + "time" + + "github.com/nacos-group/nacos-sdk-go/clients" + "github.com/nacos-group/nacos-sdk-go/clients/naming_client" + "github.com/nacos-group/nacos-sdk-go/common/constant" + "github.com/nacos-group/nacos-sdk-go/model" + "github.com/nacos-group/nacos-sdk-go/vo" + "istio.io/api/networking/v1alpha3" + versionedclient "istio.io/client-go/pkg/clientset/versioned" + "istio.io/pkg/log" + ctrl "sigs.k8s.io/controller-runtime" + + apiv1 "github.com/alibaba/higress/api/networking/v1" + "github.com/alibaba/higress/pkg/common" + provider "github.com/alibaba/higress/registry" + "github.com/alibaba/higress/registry/memory" +) + +const ( + DefaultNacosTimeout = 5000 + DefaultNacosLogLevel = "warn" + DefaultNacosLogDir = "log/nacos/log/" + DefaultNacosCacheDir = "log/nacos/cache/" + DefaultNacosNotLoadCache = true + DefaultNacosLogRotateTime = "24h" + DefaultNacosLogMaxAge = 3 + DefaultUpdateCacheWhenEmpty = true + DefaultRefreshInterval = time.Second * 30 + DefaultRefreshIntervalLimit = time.Second * 10 + DefaultFetchPageSize = 50 + DefaultJoiner = "@@" +) + +type watcher struct { + provider.BaseWatcher + apiv1.RegistryConfig + WatchingServices map[string]bool `json:"watching_services"` + RegistryType provider.ServiceRegistryType `json:"registry_type"` + Status provider.WatcherStatus `json:"status"` + namingClient naming_client.INamingClient + updateHandler provider.ServiceUpdateHandler + readyHandler provider.ReadyHandler + cache memory.Cache + mutex *sync.Mutex + stop chan struct{} + client *versionedclient.Clientset + isStop bool + updateCacheWhenEmpty bool +} + +type WatcherOption func(w *watcher) + +func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, error) { + w := &watcher{ + WatchingServices: make(map[string]bool), + RegistryType: provider.Nacos, + Status: provider.UnHealthy, + cache: cache, + mutex: &sync.Mutex{}, + stop: make(chan struct{}), + } + + config, err := ctrl.GetConfig() + if err != nil { + return nil, err + } + + ic, err := versionedclient.NewForConfig(config) + if err != nil { + log.Errorf("can not new istio client, err:%v", err) + return nil, err + } + w.client = ic + + w.NacosRefreshInterval = int64(DefaultRefreshInterval) + + for _, opt := range opts { + opt(w) + } + + log.Infof("new nacos watcher with config Name:%s", w.Name) + + cc := constant.NewClientConfig( + constant.WithTimeoutMs(DefaultNacosTimeout), + constant.WithLogLevel(DefaultNacosLogLevel), + constant.WithLogDir(DefaultNacosLogDir), + constant.WithCacheDir(DefaultNacosCacheDir), + constant.WithNotLoadCacheAtStart(DefaultNacosNotLoadCache), + constant.WithRotateTime(DefaultNacosLogRotateTime), + constant.WithMaxAge(DefaultNacosLogMaxAge), + constant.WithUpdateCacheWhenEmpty(w.updateCacheWhenEmpty), + constant.WithNamespaceId(w.NacosNamespaceId), + ) + + sc := []constant.ServerConfig{ + *constant.NewServerConfig(w.Domain, uint64(w.Port)), + } + + namingClient, err := clients.NewNamingClient(vo.NacosClientParam{ + ClientConfig: cc, + ServerConfigs: sc, + }) + if err != nil { + log.Errorf("can not create naming client, err:%v", err) + return nil, err + } + + w.namingClient = namingClient + + return w, nil +} + +func WithNacosNamespaceId(nacosNamespaceId string) WatcherOption { + return func(w *watcher) { + w.NacosNamespaceId = nacosNamespaceId + } +} + +func WithNacosNamespace(nacosNamespace string) WatcherOption { + return func(w *watcher) { + w.NacosNamespace = nacosNamespace + } +} + +func WithNacosGroups(nacosGroups []string) WatcherOption { + return func(w *watcher) { + w.NacosGroups = nacosGroups + } +} + +func WithNacosRefreshInterval(refreshInterval int64) WatcherOption { + return func(w *watcher) { + if refreshInterval < int64(DefaultRefreshIntervalLimit) { + refreshInterval = int64(DefaultRefreshIntervalLimit) + } + w.NacosRefreshInterval = refreshInterval + } +} + +func WithType(t string) WatcherOption { + return func(w *watcher) { + w.Type = t + } +} + +func WithName(name string) WatcherOption { + return func(w *watcher) { + w.Name = name + } +} + +func WithDomain(domain string) WatcherOption { + return func(w *watcher) { + w.Domain = domain + } +} + +func WithPort(port uint32) WatcherOption { + return func(w *watcher) { + w.Port = port + } +} + +func WithUpdateCacheWhenEmpty(enable bool) WatcherOption { + return func(w *watcher) { + w.updateCacheWhenEmpty = enable + } +} + +func (w *watcher) Run() { + ticker := time.NewTicker(time.Duration(w.NacosRefreshInterval)) + defer ticker.Stop() + w.Status = provider.ProbeWatcherStatus(w.Domain, strconv.FormatUint(uint64(w.Port), 10)) + w.fetchAllServices() + w.readyHandler(true) + for { + select { + case <-ticker.C: + w.fetchAllServices() + case <-w.stop: + return + } + } +} + +func (w *watcher) fetchAllServices() error { + w.mutex.Lock() + defer w.mutex.Unlock() + if w.isStop { + return nil + } + fetchedServices := make(map[string]bool) + + for _, groupName := range w.NacosGroups { + for page := 1; ; page++ { + ss, err := w.namingClient.GetAllServicesInfo(vo.GetAllServiceInfoParam{ + GroupName: groupName, + PageNo: uint32(page), + PageSize: DefaultFetchPageSize, + NameSpace: w.NacosNamespace, + }) + if err != nil { + log.Errorf("fetch all services error:%v", err) + break + } + for _, serviceName := range ss.Doms { + fetchedServices[groupName+DefaultJoiner+serviceName] = true + } + if ss.Count < DefaultFetchPageSize { + break + } + } + } + + for key := range w.WatchingServices { + if _, exist := fetchedServices[key]; !exist { + s := strings.Split(key, DefaultJoiner) + err := w.unsubscribe(s[0], s[1]) + if err == nil { + delete(w.WatchingServices, key) + } + } + } + + for key := range fetchedServices { + if _, exist := w.WatchingServices[key]; !exist { + s := strings.Split(key, DefaultJoiner) + if !shouldSubscribe(s[1]) { + continue + } + err := w.subscribe(s[0], s[1]) + if err == nil { + w.WatchingServices[key] = true + } + } + } + return nil +} + +func (w *watcher) subscribe(groupName string, serviceName string) error { + log.Debugf("subscribe service, groupName:%s, serviceName:%s", groupName, serviceName) + + err := w.namingClient.Subscribe(&vo.SubscribeParam{ + ServiceName: serviceName, + GroupName: groupName, + SubscribeCallback: w.getSubscribeCallback(groupName, serviceName), + }) + + if err != nil { + log.Errorf("subscribe service error:%v, groupName:%s, serviceName:%s", err, groupName, serviceName) + return err + } + + return nil +} + +func (w *watcher) unsubscribe(groupName string, serviceName string) error { + log.Debugf("unsubscribe service, groupName:%s, serviceName:%s", groupName, serviceName) + + err := w.namingClient.Unsubscribe(&vo.SubscribeParam{ + ServiceName: serviceName, + GroupName: groupName, + SubscribeCallback: w.getSubscribeCallback(groupName, serviceName), + }) + + if err != nil { + log.Errorf("unsubscribe service error:%v, groupName:%s, serviceName:%s", err, groupName, serviceName) + return err + } + + return nil +} + +func (w *watcher) getSubscribeCallback(groupName string, serviceName string) func(services []model.SubscribeService, err error) { + suffix := strings.Join([]string{groupName, w.NacosNamespace, w.Type}, common.DotSeparator) + suffix = strings.ReplaceAll(suffix, common.Underscore, common.Hyphen) + host := strings.Join([]string{serviceName, suffix}, common.DotSeparator) + + return func(services []model.SubscribeService, err error) { + defer w.updateHandler() + + //log.Info("callback", "serviceName", serviceName, "suffix", suffix, "details", services) + + if err != nil { + if strings.Contains(err.Error(), "hosts is empty") { + if w.updateCacheWhenEmpty { + w.cache.DeleteServiceEntryWrapper(host) + } + } else { + log.Errorf("callback error:%v", err) + } + return + } + if len(services) > 0 && services[0].Metadata != nil && services[0].Metadata["register-resource"] == "mcp-bridge" { + return + } + serviceEntry := w.generateServiceEntry(host, services) + w.cache.UpdateServiceEntryWrapper(host, &memory.ServiceEntryWrapper{ + ServiceName: serviceName, + ServiceEntry: serviceEntry, + Suffix: suffix, + RegistryType: w.Type, + }) + } +} + +func (w *watcher) generateServiceEntry(host string, services []model.SubscribeService) *v1alpha3.ServiceEntry { + portList := make([]*v1alpha3.Port, 0) + endpoints := make([]*v1alpha3.WorkloadEntry, 0) + + for _, service := range services { + protocol := common.HTTP + if service.Metadata != nil && service.Metadata["protocol"] != "" { + protocol = common.ParseProtocol(service.Metadata["protocol"]) + } else { + service.Metadata = make(map[string]string) + } + port := &v1alpha3.Port{ + Name: protocol.String(), + Number: uint32(service.Port), + Protocol: protocol.String(), + } + if len(portList) == 0 { + portList = append(portList, port) + } + endpoint := v1alpha3.WorkloadEntry{ + Address: service.Ip, + Ports: map[string]uint32{port.Protocol: port.Number}, + Labels: service.Metadata, + } + endpoints = append(endpoints, &endpoint) + } + + se := &v1alpha3.ServiceEntry{ + Hosts: []string{host}, + Ports: portList, + Location: v1alpha3.ServiceEntry_MESH_INTERNAL, + Resolution: v1alpha3.ServiceEntry_STATIC, + Endpoints: endpoints, + } + + return se +} + +func (w *watcher) Stop() { + w.mutex.Lock() + defer w.mutex.Unlock() + + for key := range w.WatchingServices { + s := strings.Split(key, DefaultJoiner) + err := w.unsubscribe(s[0], s[1]) + if err == nil { + delete(w.WatchingServices, key) + } + + // clean the cache + suffix := strings.Join([]string{s[0], w.NacosNamespace, w.Type}, common.DotSeparator) + suffix = strings.ReplaceAll(suffix, common.Underscore, common.Hyphen) + host := strings.Join([]string{s[1], suffix}, common.DotSeparator) + w.cache.DeleteServiceEntryWrapper(host) + } + w.isStop = true + w.stop <- struct{}{} + w.readyHandler(false) +} + +func (w *watcher) IsHealthy() bool { + return w.Status == provider.Healthy +} + +func (w *watcher) GetRegistryType() string { + return w.RegistryType.String() +} + +func (w *watcher) AppendServiceUpdateHandler(f func()) { + w.updateHandler = f +} + +func (w *watcher) ReadyHandler(f func(bool)) { + w.readyHandler = f +} + +func shouldSubscribe(serviceName string) bool { + prefixFilters := []string{"consumers:"} + fullFilters := []string{""} + + for _, f := range prefixFilters { + if strings.HasPrefix(serviceName, f) { + return false + } + } + + for _, f := range fullFilters { + if serviceName == f { + return false + } + } + + return true +} diff --git a/registry/reconcile/reconcile.go b/registry/reconcile/reconcile.go new file mode 100644 index 000000000..b6e3450c0 --- /dev/null +++ b/registry/reconcile/reconcile.go @@ -0,0 +1,168 @@ +package reconcile + +import ( + "errors" + "path" + "reflect" + "sync" + + "istio.io/pkg/log" + + apiv1 "github.com/alibaba/higress/api/networking/v1" + v1 "github.com/alibaba/higress/client/pkg/apis/networking/v1" + . "github.com/alibaba/higress/registry" + "github.com/alibaba/higress/registry/memory" + "github.com/alibaba/higress/registry/nacos" + nacosv2 "github.com/alibaba/higress/registry/nacos/v2" + "github.com/alibaba/higress/registry/zookeeper" +) + +type Reconciler struct { + memory.Cache + registries map[string]*apiv1.RegistryConfig + watchers map[string]Watcher + serviceUpdate func() +} + +func NewReconciler(serviceUpdate func()) *Reconciler { + return &Reconciler{ + Cache: memory.NewCache(), + registries: make(map[string]*apiv1.RegistryConfig), + watchers: make(map[string]Watcher), + serviceUpdate: serviceUpdate, + } +} + +func (r *Reconciler) Reconcile(mcpbridge *v1.McpBridge) { + newRegistries := make(map[string]*apiv1.RegistryConfig) + if mcpbridge != nil { + for _, registry := range mcpbridge.Spec.Registries { + newRegistries[path.Join(registry.Type, registry.Name)] = registry + } + } + var wg sync.WaitGroup + toBeCreated := make(map[string]*apiv1.RegistryConfig) + toBeUpdated := make(map[string]*apiv1.RegistryConfig) + toBeDeleted := make(map[string]*apiv1.RegistryConfig) + + for key, newRegistry := range newRegistries { + if oldRegistry, ok := r.registries[key]; !ok { + toBeCreated[key] = newRegistry + } else if reflect.DeepEqual(newRegistry, oldRegistry) { + continue + } else { + toBeUpdated[key] = newRegistry + } + } + + for key, oldRegistry := range r.registries { + if _, ok := newRegistries[key]; !ok { + toBeDeleted[key] = oldRegistry + } + } + errHappened := false + log.Infof("ReconcileRegistries, toBeCreated: %d, toBeUpdated: %d, toBeDeleted: %d", + len(toBeCreated), len(toBeUpdated), len(toBeDeleted)) + for k, v := range toBeCreated { + watcher, err := r.generateWatcherFromRegistryConfig(v, &wg) + if err != nil { + errHappened = true + log.Errorf("ReconcileRegistries failed, err:%v", err) + continue + } + + go watcher.Run() + r.watchers[k] = watcher + r.registries[k] = v + } + for k, v := range toBeUpdated { + go r.watchers[k].Stop() + delete(r.registries, k) + delete(r.watchers, k) + watcher, err := r.generateWatcherFromRegistryConfig(v, &wg) + if err != nil { + errHappened = true + log.Errorf("ReconcileRegistries failed, err:%v", err) + continue + } + + go watcher.Run() + r.watchers[k] = watcher + r.registries[k] = v + } + for k := range toBeDeleted { + go r.watchers[k].Stop() + delete(r.registries, k) + delete(r.watchers, k) + } + if errHappened { + log.Error("ReconcileRegistries failed, Init Watchers failed") + return + } + wg.Wait() + log.Infof("Registries is reconciled") +} + +func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryConfig, wg *sync.WaitGroup) (Watcher, error) { + var watcher Watcher + var err error + + switch registry.Type { + case string(Nacos): + watcher, err = nacos.NewWatcher( + r.Cache, + nacos.WithType(registry.Type), + nacos.WithName(registry.Name), + nacos.WithDomain(registry.Domain), + nacos.WithPort(registry.Port), + nacos.WithNacosNamespaceId(registry.NacosNamespaceId), + nacos.WithNacosNamespace(registry.NacosNamespace), + nacos.WithNacosGroups(registry.NacosGroups), + nacos.WithNacosRefreshInterval(registry.NacosRefreshInterval), + ) + case string(Nacos2): + watcher, err = nacosv2.NewWatcher( + r.Cache, + nacosv2.WithType(registry.Type), + nacosv2.WithName(registry.Name), + nacosv2.WithNacosAddressServer(registry.NacosAddressServer), + nacosv2.WithDomain(registry.Domain), + nacosv2.WithPort(registry.Port), + nacosv2.WithNacosAccessKey(registry.NacosAccessKey), + nacosv2.WithNacosSecretKey(registry.NacosSecretKey), + nacosv2.WithNacosNamespaceId(registry.NacosNamespaceId), + nacosv2.WithNacosNamespace(registry.NacosNamespace), + nacosv2.WithNacosGroups(registry.NacosGroups), + nacosv2.WithNacosRefreshInterval(registry.NacosRefreshInterval), + ) + case string(Zookeeper): + watcher, err = zookeeper.NewWatcher( + r.Cache, + zookeeper.WithType(registry.Type), + zookeeper.WithName(registry.Name), + zookeeper.WithDomain(registry.Domain), + zookeeper.WithPort(registry.Port), + zookeeper.WithZkServicesPath(registry.ZkServicesPath), + ) + default: + return nil, errors.New("unsupported registry type:" + registry.Type) + } + + if err != nil { + return nil, err + } + + wg.Add(1) + var once sync.Once + watcher.ReadyHandler(func(ready bool) { + once.Do(func() { + wg.Done() + if ready { + log.Infof("Registry Watcher is ready, type:%s, name:%s", registry.Type, registry.Name) + } + }) + }) + watcher.AppendServiceUpdateHandler(r.serviceUpdate) + + return watcher, nil +} diff --git a/registry/watcher.go b/registry/watcher.go new file mode 100644 index 000000000..f37f1abf5 --- /dev/null +++ b/registry/watcher.go @@ -0,0 +1,61 @@ +package registry + +import ( + "net" + "time" +) + +const ( + Zookeeper ServiceRegistryType = "zookeeper" + Eureka ServiceRegistryType = "eureka" + Consul ServiceRegistryType = "consul" + Nacos ServiceRegistryType = "nacos" + Nacos2 ServiceRegistryType = "nacos2" + Healthy WatcherStatus = "healthy" + UnHealthy WatcherStatus = "unhealthy" + + DefaultDialTimeout = time.Second * 3 +) + +type ServiceRegistryType string + +func (srt *ServiceRegistryType) String() string { + return string(*srt) +} + +type WatcherStatus string + +func (ws *WatcherStatus) String() string { + return string(*ws) +} + +type Watcher interface { + Run() + Stop() + IsHealthy() bool + GetRegistryType() string + AppendServiceUpdateHandler(f func()) + ReadyHandler(f func(bool)) +} + +type BaseWatcher struct{} + +func (w *BaseWatcher) Run() {} +func (w *BaseWatcher) Stop() {} +func (w *BaseWatcher) IsHealthy() bool { return true } +func (w *BaseWatcher) GetRegistryType() string { return "" } +func (w *BaseWatcher) AppendServiceUpdateHandler(f func()) {} +func (w *BaseWatcher) ReadyHandler(f func(bool)) {} + +type ServiceUpdateHandler func() +type ReadyHandler func(bool) + +func ProbeWatcherStatus(host string, port string) WatcherStatus { + address := net.JoinHostPort(host, port) + conn, err := net.DialTimeout("tcp", address, DefaultDialTimeout) + if err != nil || conn == nil { + return UnHealthy + } + _ = conn.Close() + return Healthy +} diff --git a/registry/zookeeper/types.go b/registry/zookeeper/types.go new file mode 100644 index 000000000..d10e12591 --- /dev/null +++ b/registry/zookeeper/types.go @@ -0,0 +1,117 @@ +package zookeeper + +import ( + "errors" + "time" +) + +const ( + DEFAULT_REG_TIMEOUT = "10s" + DUBBO = "/dubbo/" + SPRING_CLOUD_SERVICES = "/services" + DUBBO_SERVICES = "/dubbo" + PROVIDERS = "/providers" + CONFIG = "config" + MAPPING = "mapping" + METADATA = "metadata" + DUBBO_PROTOCOL = "dubbo" + HTTP_PROTOCOL = "http" + VERSION = "version" + PROTOCOL = "protocol" +) + +type ServiceType int + +const ( + DubboService ServiceType = iota + SpringCloudService +) + +type EventType int + +type Event struct { + Path string + Action EventType + Content []byte + InterfaceName string + ServiceType ServiceType +} + +const ( + // ConnDelay connection delay interval + ConnDelay = 3 + // MaxFailTimes max fail times + MaxFailTimes = 3 +) + +var DefaultTTL = 10 * time.Minute + +type InterfaceConfig struct { + Host string + Endpoints []Endpoint + Protocol string + ServiceType ServiceType +} + +type Endpoint struct { + Ip string + Port string + Metadata map[string]string +} + +var ErrNilChildren = errors.New("has none children") + +func WithType(t string) WatcherOption { + return func(w *watcher) { + w.Type = t + } +} + +func WithName(name string) WatcherOption { + return func(w *watcher) { + w.Name = name + } +} + +func WithDomain(domain string) WatcherOption { + return func(w *watcher) { + w.Domain = domain + } +} + +func WithPort(port uint32) WatcherOption { + return func(w *watcher) { + w.Port = port + } +} + +type DataListener interface { + DataChange(eventType Event) bool // bool is return for interface implement is interesting +} + +const ( + // EventTypeAdd means add event + EventTypeAdd = iota + // EventTypeDel means del event + EventTypeDel + // EventTypeUpdate means update event + EventTypeUpdate +) + +type ListServiceConfig struct { + UrlIndex string + InterfaceName string + Exit chan struct{} + ServiceType ServiceType +} + +type SpringCloudInstancePayload struct { + Metadata map[string]string `json:"metadata"` +} + +type SpringCloudInstance struct { + Name string `json:"name"` + Address string `json:"address"` + Port int `json:"port"` + Payload SpringCloudInstancePayload `json:"payload"` +} diff --git a/registry/zookeeper/watcher.go b/registry/zookeeper/watcher.go new file mode 100644 index 000000000..0f8b6a311 --- /dev/null +++ b/registry/zookeeper/watcher.go @@ -0,0 +1,762 @@ +package zookeeper + +import ( + "encoding/json" + "errors" + "net/url" + "path" + "reflect" + "strconv" + "strings" + "sync" + "time" + + "github.com/dubbogo/go-zookeeper/zk" + gxzookeeper "github.com/dubbogo/gost/database/kv/zk" + "github.com/hashicorp/go-multierror" + "go.uber.org/atomic" + "istio.io/api/networking/v1alpha3" + "istio.io/pkg/log" + + apiv1 "github.com/alibaba/higress/api/networking/v1" + "github.com/alibaba/higress/pkg/common" + provider "github.com/alibaba/higress/registry" + "github.com/alibaba/higress/registry/memory" +) + +type watchConfig struct { + exit chan struct{} + listen bool +} + +type watcher struct { + provider.BaseWatcher + apiv1.RegistryConfig + WatchingServices map[string]watchConfig `json:"watching_services"` + RegistryType provider.ServiceRegistryType `json:"registry_type"` + Status provider.WatcherStatus `json:"status"` + serviceRemaind *atomic.Int32 + updateHandler provider.ServiceUpdateHandler + readyHandler provider.ReadyHandler + cache memory.Cache + mutex *sync.Mutex + stop chan struct{} + zkClient *gxzookeeper.ZookeeperClient + reconnectCh <-chan struct{} + Done chan struct{} + seMux *sync.Mutex + serviceEntry map[string]InterfaceConfig + listIndex chan ListServiceConfig + listServiceChan chan struct{} + isStop bool + keepStaleWhenEmpty bool + zkServicesPath []string +} + +type WatcherOption func(w *watcher) + +func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, error) { + w := &watcher{ + WatchingServices: make(map[string]watchConfig), + RegistryType: provider.Zookeeper, + Status: provider.UnHealthy, + cache: cache, + mutex: &sync.Mutex{}, + stop: make(chan struct{}), + Done: make(chan struct{}), + seMux: &sync.Mutex{}, + serviceEntry: make(map[string]InterfaceConfig), + listIndex: make(chan ListServiceConfig, 1), + listServiceChan: make(chan struct{}), + zkServicesPath: []string{SPRING_CLOUD_SERVICES}, + } + + timeout, _ := time.ParseDuration(DEFAULT_REG_TIMEOUT) + + for _, opt := range opts { + opt(w) + } + + var address []string + address = append(address, w.Domain+":"+strconv.Itoa(int(w.Port))) + newClient, cltErr := gxzookeeper.NewZookeeperClient("zk", address, false, gxzookeeper.WithZkTimeOut(timeout)) + if cltErr != nil { + log.Errorf("[NewWatcher] NewWatcher zk, err:%v, zk address:%s", cltErr, address) + return nil, cltErr + } + valid := newClient.ZkConnValid() + if !valid { + log.Info("connect zk error") + return nil, errors.New("connect zk error") + } + connectEvent := make(chan zk.Event, 2) + newClient.RegisterEvent("", connectEvent) + connectTimer := time.NewTimer(timeout) + connectTimout := false +FOR: + for { + select { + case ev := <-connectEvent: + if ev.State == zk.StateConnected { + break FOR + } + case <-connectTimer.C: + connectTimout = true + break FOR + } + } + if connectTimout { + return nil, errors.New("connect zk timeout") + } + log.Info("zk connected") + newClient.UnregisterEvent("", connectEvent) + w.reconnectCh = newClient.Reconnect() + w.zkClient = newClient + go func() { + w.HandleClientRestart() + }() + return w, nil +} + +func WithKeepStaleWhenEmpty(enable bool) WatcherOption { + return func(w *watcher) { + w.keepStaleWhenEmpty = enable + } +} + +func WithZkServicesPath(paths []string) WatcherOption { + return func(w *watcher) { + for _, path := range paths { + path = strings.TrimSuffix(path, common.Slash) + if path == DUBBO_SERVICES || path == SPRING_CLOUD_SERVICES { + continue + } + w.zkServicesPath = append(w.zkServicesPath, path) + } + } +} + +func (w *watcher) HandleClientRestart() { + for { + select { + case <-w.reconnectCh: + w.reconnectCh = w.zkClient.Reconnect() + log.Info("zkclient reconnected") + w.RestartCallBack() + time.Sleep(10 * time.Microsecond) + case <-w.Done: + log.Info("[HandleClientRestart] receive registry destroy event, quit client restart handler") + return + } + } +} + +func (w *watcher) RestartCallBack() bool { + err := w.fetchAllServices() + if err != nil { + log.Errorf("[RestartCallBack] fetch all service for zk err:%v", err) + return false + } + return true +} + +type serviceInfo struct { + serviceType ServiceType + rootPath string + service string +} + +func (w *watcher) fetchedServices(fetchedServices map[string]serviceInfo, path string, serviceType ServiceType) error { + children, err := w.zkClient.GetChildren(path) + if err != nil { + if err == gxzookeeper.ErrNilChildren || err == gxzookeeper.ErrNilNode || + strings.Contains(err.Error(), "has none children") { + return nil + } else { + log.Errorf("[fetchAllServices] can not get children, err:%v, path:%s", err, path) + return err + } + } + info := serviceInfo{ + serviceType: serviceType, + rootPath: path, + } + for _, child := range children { + if child == CONFIG || child == MAPPING || child == METADATA { + continue + } + var interfaceName string + switch serviceType { + case DubboService: + interfaceName = child + case SpringCloudService: + info.service = child + if path == "" || path == common.Slash { + interfaceName = child + break + } + interfaceName = child + "." + strings.ReplaceAll( + strings.TrimPrefix(path, common.Slash), common.Slash, common.Hyphen) + } + fetchedServices[interfaceName] = info + log.Debugf("fetchedServices, interface:%s, path:%s", interfaceName, info.rootPath) + } + return nil +} + +func (w *watcher) fetchAllServices(firstFetch ...bool) error { + w.mutex.Lock() + defer w.mutex.Unlock() + + if w.isStop { + return nil + } + + fetchedServices := make(map[string]serviceInfo) + var result error + err := w.fetchedServices(fetchedServices, DUBBO_SERVICES, DubboService) + if err != nil { + result = multierror.Append(result, err) + } + for _, path := range w.zkServicesPath { + err = w.fetchedServices(fetchedServices, path, SpringCloudService) + if err != nil { + result = multierror.Append(result, err) + } + } + for interfaceName, value := range w.WatchingServices { + if _, exist := fetchedServices[interfaceName]; !exist { + if value.exit != nil { + close(value.exit) + } + delete(w.WatchingServices, interfaceName) + } + } + var serviceConfigs []ListServiceConfig + for interfaceName, serviceInfo := range fetchedServices { + if _, exist := w.WatchingServices[interfaceName]; !exist { + w.WatchingServices[interfaceName] = watchConfig{ + exit: make(chan struct{}), + listen: true, + } + serviceConfig := ListServiceConfig{ + ServiceType: serviceInfo.serviceType, + InterfaceName: interfaceName, + Exit: w.WatchingServices[interfaceName].exit, + } + switch serviceInfo.serviceType { + case DubboService: + serviceConfig.UrlIndex = DUBBO + interfaceName + PROVIDERS + case SpringCloudService: + serviceConfig.UrlIndex = path.Join(serviceInfo.rootPath, serviceInfo.service) + default: + return errors.New("unkown type") + } + serviceConfigs = append(serviceConfigs, serviceConfig) + } + } + if len(firstFetch) > 0 && firstFetch[0] { + w.serviceRemaind = atomic.NewInt32(int32(len(serviceConfigs))) + } + for _, service := range serviceConfigs { + w.listIndex <- service + } + return result +} + +func (w *watcher) ListenService() { + defer func() { + w.listServiceChan <- struct{}{} + }() + ttl := DefaultTTL + var failTimes int + for { + select { + case listIndex := <-w.listIndex: + go func() { + for { + log.Info(listIndex.UrlIndex) + children, childEventCh, err := w.zkClient.GetChildrenW(listIndex.UrlIndex) + if err != nil { + failTimes++ + if MaxFailTimes <= failTimes { + failTimes = MaxFailTimes + } + log.Errorf("[Zookeeper][ListenService] Get children of path zkRootPath with watcher failed, err:%v, index:%s", err, listIndex.UrlIndex) + + // May be the provider does not ready yet, sleep failTimes * ConnDelay senconds to wait + after := time.After(timeSecondDuration(failTimes * ConnDelay)) + select { + case <-after: + continue + case <-listIndex.Exit: + return + } + } + failTimes = 0 + if len(children) > 0 { + w.ChildToServiceEntry(children, listIndex.InterfaceName, listIndex.UrlIndex, listIndex.ServiceType) + } + if w.serviceRemaind != nil { + w.serviceRemaind.Sub(1) + } + if w.startScheduleWatchTask(listIndex, children, ttl, childEventCh, listIndex.Exit) { + return + } + } + }() + case <-w.stop: + log.Info("[ListenService] is shutdown") + return + } + } + +} + +func (w *watcher) DataChange(eventType Event) bool { + //fmt.Println(eventType) + host, interfaceConfig, err := w.GetInterfaceConfig(eventType) + if err != nil { + log.Errorf("GetInterfaceConfig failed, err:%v, event:%v", err, eventType) + return false + } + if eventType.Action == EventTypeAdd || eventType.Action == EventTypeUpdate { + w.seMux.Lock() + isHave := false + value, ok := w.serviceEntry[host] + if ok { + for _, endpoint := range value.Endpoints { + if endpoint.Ip == interfaceConfig.Endpoints[0].Ip && endpoint.Port == interfaceConfig.Endpoints[0].Port { + isHave = true + } + } + if !isHave { + value.Endpoints = append(value.Endpoints, interfaceConfig.Endpoints[0]) + } + w.serviceEntry[host] = value + } else { + w.serviceEntry[host] = *interfaceConfig + } + se := w.generateServiceEntry(w.serviceEntry[host]) + + w.seMux.Unlock() + w.cache.UpdateServiceEntryWrapper(host, &memory.ServiceEntryWrapper{ + ServiceName: host, + ServiceEntry: se, + Suffix: "zookeeper", + RegistryType: w.Type, + }) + w.updateHandler() + } else if eventType.Action == EventTypeDel { + w.seMux.Lock() + value, ok := w.serviceEntry[host] + if ok { + var endpoints []Endpoint + for _, endpoint := range value.Endpoints { + if endpoint.Ip == interfaceConfig.Endpoints[0].Ip && endpoint.Port == interfaceConfig.Endpoints[0].Port { + continue + } else { + endpoints = append(endpoints, endpoint) + } + } + value.Endpoints = endpoints + w.serviceEntry[host] = value + } + se := w.generateServiceEntry(w.serviceEntry[host]) + w.seMux.Unlock() + //todo update + if len(se.Endpoints) == 0 { + if !w.keepStaleWhenEmpty { + w.cache.DeleteServiceEntryWrapper(host) + } + } else { + w.cache.UpdateServiceEntryWrapper(host, &memory.ServiceEntryWrapper{ + ServiceName: host, + ServiceEntry: se, + Suffix: "zookeeper", + RegistryType: w.Type, + }) + } + w.updateHandler() + } + return true +} + +func (w *watcher) GetInterfaceConfig(event Event) (string, *InterfaceConfig, error) { + switch event.ServiceType { + case DubboService: + return w.GetDubboConfig(event.Path) + case SpringCloudService: + return w.GetSpringCloudConfig(event.InterfaceName, event.Content) + default: + return "", nil, errors.New("unknown service type") + } +} + +func (w *watcher) GetSpringCloudConfig(intefaceName string, content []byte) (string, *InterfaceConfig, error) { + var instance SpringCloudInstance + err := json.Unmarshal(content, &instance) + if err != nil { + log.Errorf("unmarshal failed, err:%v, content:%s", err, content) + return "", nil, err + } + var config InterfaceConfig + host := intefaceName + config.Host = host + config.Protocol = common.HTTP.String() + if len(instance.Payload.Metadata) > 0 && instance.Payload.Metadata["protocol"] != "" { + config.Protocol = common.ParseProtocol(instance.Payload.Metadata["protocol"]).String() + } + port := strconv.Itoa(instance.Port) + if port == "" { + return "", nil, errors.New("empty port") + } + endpoint := Endpoint{ + Ip: instance.Address, + Port: port, + Metadata: instance.Payload.Metadata, + } + config.Endpoints = []Endpoint{endpoint} + config.ServiceType = SpringCloudService + return host, &config, nil +} + +func (w *watcher) GetDubboConfig(dubboUrl string) (string, *InterfaceConfig, error) { + dubboUrl = strings.Replace(dubboUrl, "%3F", "?", 1) + dubboUrl = strings.ReplaceAll(dubboUrl, "%3D", "=") + dubboUrl = strings.ReplaceAll(dubboUrl, "%26", "&") + tempPath := strings.Replace(dubboUrl, DUBBO, "", -1) + urls := strings.Split(tempPath, PROVIDERS+"/dubbo") + key := urls[0] + serviceUrl, urlParseErr := url.Parse(dubboUrl) + if urlParseErr != nil { + return "", nil, urlParseErr + } + var ( + dubboInterfaceConfig InterfaceConfig + host string + ) + + serviceUrl.Path = strings.Replace(serviceUrl.Path, DUBBO+key+PROVIDERS+"/dubbo://", "", -1) + + values, err := url.ParseQuery(serviceUrl.RawQuery) + if err != nil { + return "", nil, err + } + + paths := strings.Split(serviceUrl.Path, "/") + + if len(paths) > 0 { + var group string + _, ok := values["group"] + if ok { + group = values["group"][0] + } + version := "0.0.0" + _, ok = values[VERSION] + if ok && len(values[VERSION]) > 0 { + version = values[VERSION][0] + } + dubboInterfaceConfig.Host = "providers:" + key + ":" + version + ":" + group + host = dubboInterfaceConfig.Host + dubboInterfaceConfig.Protocol = DUBBO_PROTOCOL + address := strings.Split(paths[0], ":") + if len(address) != 2 { + log.Infof("[GetDubboConfig] can not get dubbo ip and port, path:%s ", serviceUrl.Path) + return "", nil, errors.New("can not get dubbo ip and port") + } + metadata := make(map[string]string) + for key, value := range values { + if len(value) == 1 { + metadata[key] = value[0] + } + } + metadata[PROTOCOL] = DUBBO_PROTOCOL + dubboEndpoint := Endpoint{ + Ip: address[0], + Port: address[1], + Metadata: metadata, + } + dubboInterfaceConfig.Endpoints = append(dubboInterfaceConfig.Endpoints, dubboEndpoint) + + } + dubboInterfaceConfig.ServiceType = DubboService + return host, &dubboInterfaceConfig, nil +} + +func (w *watcher) startScheduleWatchTask(serviceConfig ListServiceConfig, oldChildren []string, ttl time.Duration, childEventCh <-chan zk.Event, exit chan struct{}) bool { + zkRootPath := serviceConfig.UrlIndex + interfaceName := serviceConfig.InterfaceName + serviceType := serviceConfig.ServiceType + tickerTTL := ttl + if tickerTTL > 20e9 { + tickerTTL = 20e9 + } + ticker := time.NewTicker(tickerTTL) + for { + select { + case <-ticker.C: + w.handleZkNodeEvent(zkRootPath, oldChildren, interfaceName, serviceType) + if tickerTTL < ttl { + tickerTTL *= 2 + if tickerTTL > ttl { + tickerTTL = ttl + } + ticker.Stop() + ticker = time.NewTicker(tickerTTL) + } + case zkEvent := <-childEventCh: + if zkEvent.Type == zk.EventNodeChildrenChanged { + w.handleZkNodeEvent(zkEvent.Path, oldChildren, interfaceName, serviceType) + } + return false + case <-exit: + ticker.Stop() + return true + } + } +} + +func (w *watcher) handleZkNodeEvent(zkPath string, oldChildren []string, interfaceName string, serviceType ServiceType) { + newChildren, err := w.zkClient.GetChildren(zkPath) + if err != nil { + if err == gxzookeeper.ErrNilChildren || err == gxzookeeper.ErrNilNode || + strings.Contains(err.Error(), "has none children") { + content, _, connErr := w.zkClient.Conn.Get(zkPath) + if connErr != nil { + log.Errorf("[handleZkNodeEvent] Get new node path's content error:%v, path:%s", connErr, zkPath) + } else { + for _, c := range oldChildren { + path := path.Join(zkPath, c) + content, _, connErr = w.zkClient.Conn.Get(path) + if connErr != nil { + log.Errorf("[handleZkNodeEvent] Get node path's content error:%v, path:%s", connErr, path) + continue + } + w.DataChange(Event{ + Path: path, + Action: EventTypeDel, + Content: content, + InterfaceName: interfaceName, + ServiceType: serviceType, + }) + } + } + } else { + log.Errorf("zkClient get children failed, err:%v", err) + } + return + } + w.ChildToServiceEntry(newChildren, interfaceName, zkPath, serviceType) +} + +func (w *watcher) ChildToServiceEntry(children []string, interfaceName, zkPath string, serviceType ServiceType) { + serviceEntry := make(map[string]InterfaceConfig) + switch serviceType { + case DubboService: + w.DubboChildToServiceEntry(serviceEntry, children, interfaceName, zkPath) + case SpringCloudService: + w.SpringCloudChildToServiceEntry(serviceEntry, children, interfaceName, zkPath) + default: + log.Error("unknown type") + } + if len(serviceEntry) != 0 { + w.seMux.Lock() + for host, config := range serviceEntry { + se := w.generateServiceEntry(config) + value, ok := w.serviceEntry[host] + if ok { + if !reflect.DeepEqual(value, config) { + w.serviceEntry[host] = config + //todo update or create serviceentry + w.cache.UpdateServiceEntryWrapper(host, &memory.ServiceEntryWrapper{ + ServiceName: host, + ServiceEntry: se, + Suffix: "zookeeper", + RegistryType: w.Type, + }) + } + } else { + w.serviceEntry[host] = config + w.cache.UpdateServiceEntryWrapper(host, &memory.ServiceEntryWrapper{ + ServiceName: host, + ServiceEntry: se, + Suffix: "zookeeper", + RegistryType: w.Type, + }) + } + } + w.seMux.Unlock() + w.updateHandler() + } +} + +func (w *watcher) SpringCloudChildToServiceEntry(serviceEntry map[string]InterfaceConfig, children []string, interfaceName, zkPath string) { + for _, c := range children { + path := path.Join(zkPath, c) + content, _, err := w.zkClient.Conn.Get(path) + if err != nil { + log.Errorf("[handleZkNodeEvent] Get node path's content error:%v, path:%s", err, path) + continue + } + host, config, err := w.GetSpringCloudConfig(interfaceName, content) + if err != nil { + log.Errorf("GetSpringCloudConfig failed:%v", err) + continue + } + if existConfig, exist := serviceEntry[host]; !exist { + serviceEntry[host] = *config + } else { + existConfig.Endpoints = append(existConfig.Endpoints, config.Endpoints...) + serviceEntry[host] = existConfig + } + } +} + +func (w *watcher) DubboChildToServiceEntry(serviceEntry map[string]InterfaceConfig, children []string, interfaceName, zkPath string) { + for _, c := range children { + path := path.Join(zkPath, c) + host, config, err := w.GetDubboConfig(path) + if err != nil { + log.Errorf("GetDubboConfig failed:%v", err) + continue + } + if existConfig, exist := serviceEntry[host]; !exist { + serviceEntry[host] = *config + } else { + existConfig.Endpoints = append(existConfig.Endpoints, config.Endpoints...) + serviceEntry[host] = existConfig + } + } +} + +func (w *watcher) generateServiceEntry(config InterfaceConfig) *v1alpha3.ServiceEntry { + portList := make([]*v1alpha3.Port, 0) + endpoints := make([]*v1alpha3.WorkloadEntry, 0) + + for _, service := range config.Endpoints { + protocol := common.HTTP + if service.Metadata != nil && service.Metadata[PROTOCOL] != "" { + protocol = common.ParseProtocol(service.Metadata[PROTOCOL]) + } + portNumber, _ := strconv.Atoi(service.Port) + port := &v1alpha3.Port{ + Name: protocol.String(), + Number: uint32(portNumber), + Protocol: protocol.String(), + } + if len(portList) == 0 { + portList = append(portList, port) + } + endpoints = append(endpoints, &v1alpha3.WorkloadEntry{ + Address: service.Ip, + Ports: map[string]uint32{port.Protocol: port.Number}, + Labels: service.Metadata, + Weight: 1, + }) + } + + se := &v1alpha3.ServiceEntry{ + Hosts: []string{config.Host + ".zookeeper"}, + Ports: portList, + Location: v1alpha3.ServiceEntry_MESH_INTERNAL, + Resolution: v1alpha3.ServiceEntry_STATIC, + Endpoints: endpoints, + } + return se +} + +func (w *watcher) Run() { + defer func() { + log.Info("[zookeeper] Run is down") + if r := recover(); r != nil { + log.Info("Recovered in f", "r is", r) + } + }() + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + w.Status = provider.ProbeWatcherStatus(w.Domain, strconv.FormatUint(uint64(w.Port), 10)) + go func() { + w.ListenService() + }() + firstFetchErr := w.fetchAllServices(true) + if firstFetchErr != nil { + log.Errorf("first fetch services failed:%v", firstFetchErr) + } + for { + select { + case <-ticker.C: + var needNewFetch bool + if w.IsReady() { + w.readyHandler(true) + needNewFetch = true + } + if firstFetchErr != nil || needNewFetch { + firstFetchErr = w.fetchAllServices() + } + case <-w.stop: + return + case <-w.listServiceChan: + go func() { + w.ListenService() + }() + } + } +} + +func (w *watcher) Stop() { + w.mutex.Lock() + for key, value := range w.WatchingServices { + if value.exit != nil { + close(value.exit) + } + delete(w.WatchingServices, key) + } + w.isStop = true + w.mutex.Unlock() + + w.seMux.Lock() + for key := range w.serviceEntry { + w.cache.DeleteServiceEntryWrapper(key) + } + w.updateHandler() + w.seMux.Unlock() + + w.stop <- struct{}{} + w.Done <- struct{}{} + close(w.stop) + close(w.Done) + w.zkClient.Close() + w.readyHandler(false) +} + +func (w *watcher) IsHealthy() bool { + return w.Status == provider.Healthy +} + +func (w *watcher) GetRegistryType() string { + return w.RegistryType.String() +} + +func (w *watcher) AppendServiceUpdateHandler(f func()) { + w.updateHandler = f +} + +func (w *watcher) ReadyHandler(f func(bool)) { + w.readyHandler = f +} + +func (w *watcher) IsReady() bool { + if w.serviceRemaind == nil { + return true + } + remaind := w.serviceRemaind.Load() + if remaind <= 0 { + return true + } + return false +} + +func timeSecondDuration(sec int) time.Duration { + return time.Duration(sec) * time.Second +} diff --git a/registry/zookeeper/watcher_test.go b/registry/zookeeper/watcher_test.go new file mode 100644 index 000000000..d8183facd --- /dev/null +++ b/registry/zookeeper/watcher_test.go @@ -0,0 +1,116 @@ +package zookeeper + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetSpringCloudConfig(t *testing.T) { + var w watcher + w.seMux = &sync.Mutex{} + cases := []struct { + name string + interfaceName string + content []byte + expectedHost string + expectedConfig InterfaceConfig + }{ + { + name: "normal", + interfaceName: "service-provider.services", + content: []byte(`{"name":"service-provider","id":"e479f40a-8f91-42a1-98e6-9377d224b360","address":"10.0.0.0","port":8071,"sslPort":null,"payload":{"@class":"org.springframework.cloud.zookeeper.discovery.ZookeeperInstance","id":"application-1","name":"service-provider","metadata":{"version":"1"}},"registrationTimeUTC":1663145171645,"serviceType":"DYNAMIC","uriSpec":{"parts":[{"value":"scheme","variable":true},{"value":"://","variable":false},{"value":"address","variable":true},{"value":":","variable":false},{"value":"port","variable":true}]}}`), + expectedHost: "service-provider.services", + expectedConfig: InterfaceConfig{ + Host: "service-provider.services", + Protocol: "HTTP", + ServiceType: SpringCloudService, + Endpoints: []Endpoint{ + { + Ip: "10.0.0.0", + Port: "8071", + Metadata: map[string]string{ + "version": "1", + }, + }, + }, + }, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + actualHost, actualConfig, err := w.GetSpringCloudConfig(c.interfaceName, c.content) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, c.expectedHost, actualHost) + assert.Equal(t, c.expectedConfig, *actualConfig) + }) + } +} + +func TestGetDubboConfig(t *testing.T) { + var w watcher + w.seMux = &sync.Mutex{} + cases := []struct { + name string + url string + expectedHost string + expectedConfig InterfaceConfig + }{ + { + name: "no version", + url: `/dubbo/org.apache.dubbo.samples.api.GreetingService/providers/dubbo%3A%2F%2F10.0.0.0%3A20880%2Fcom.alibaba.adrive.business.contract.service.UserVipService%3Fzone%3Dcn-shanghai-g%26dubbo%3D2.0.2`, + expectedHost: "providers:org.apache.dubbo.samples.api.GreetingService:0.0.0:", + expectedConfig: InterfaceConfig{ + Host: "providers:org.apache.dubbo.samples.api.GreetingService:0.0.0:", + Protocol: "dubbo", + ServiceType: DubboService, + Endpoints: []Endpoint{ + { + Ip: "10.0.0.0", + Port: "20880", + Metadata: map[string]string{ + "zone": "cn-shanghai-g", + "dubbo": "2.0.2", + "protocol": "dubbo", + }, + }, + }, + }, + }, + { + name: "has version", + url: `/dubbo/org.apache.dubbo.samples.api.GreetingService/providers/dubbo%3A%2F%2F10.0.0.0%3A20880%2Fcom.alibaba.adrive.business.contract.service.UserVipService%3Fzone%3Dcn-shanghai-g%26dubbo%3D2.0.2%26version%3D1.0.0`, + expectedHost: "providers:org.apache.dubbo.samples.api.GreetingService:1.0.0:", + expectedConfig: InterfaceConfig{ + Host: "providers:org.apache.dubbo.samples.api.GreetingService:1.0.0:", + Protocol: "dubbo", + ServiceType: DubboService, + Endpoints: []Endpoint{ + { + Ip: "10.0.0.0", + Port: "20880", + Metadata: map[string]string{ + "zone": "cn-shanghai-g", + "dubbo": "2.0.2", + "protocol": "dubbo", + "version": "1.0.0", + }, + }, + }, + }, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + actualHost, actualConfig, err := w.GetDubboConfig(c.url) + if err != nil { + t.Fatal(err) + } + assert.Equal(t, c.expectedHost, actualHost) + assert.Equal(t, c.expectedConfig, *actualConfig) + }) + } +}