mirror of
https://github.com/alibaba/higress.git
synced 2026-06-03 09:37:28 +08:00
Support HTTP/JSON to dubbo (#340)
This commit is contained in:
@@ -15,6 +15,7 @@
|
||||
package config
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -25,6 +26,7 @@ import (
|
||||
wasm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/wasm/v3"
|
||||
httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
|
||||
v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/wasm/v3"
|
||||
"github.com/gogo/protobuf/jsonpb"
|
||||
"github.com/gogo/protobuf/types"
|
||||
"github.com/golang/protobuf/ptypes/wrappers"
|
||||
"google.golang.org/protobuf/types/known/anypb"
|
||||
@@ -42,10 +44,12 @@ import (
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
higressext "github.com/alibaba/higress/api/extensions/v1alpha1"
|
||||
higressv1 "github.com/alibaba/higress/api/networking/v1"
|
||||
extlisterv1 "github.com/alibaba/higress/client/pkg/listers/extensions/v1alpha1"
|
||||
netlisterv1 "github.com/alibaba/higress/client/pkg/listers/networking/v1"
|
||||
"github.com/alibaba/higress/pkg/ingress/kube/annotations"
|
||||
"github.com/alibaba/higress/pkg/ingress/kube/common"
|
||||
"github.com/alibaba/higress/pkg/ingress/kube/http2rpc"
|
||||
"github.com/alibaba/higress/pkg/ingress/kube/ingress"
|
||||
"github.com/alibaba/higress/pkg/ingress/kube/ingressv1"
|
||||
"github.com/alibaba/higress/pkg/ingress/kube/mcpbridge"
|
||||
@@ -54,12 +58,30 @@ import (
|
||||
"github.com/alibaba/higress/pkg/ingress/kube/wasmplugin"
|
||||
. "github.com/alibaba/higress/pkg/ingress/log"
|
||||
"github.com/alibaba/higress/pkg/kube"
|
||||
"github.com/alibaba/higress/registry/memory"
|
||||
"github.com/alibaba/higress/registry/reconcile"
|
||||
)
|
||||
|
||||
var (
|
||||
_ model.ConfigStoreCache = &IngressConfig{}
|
||||
_ model.IngressStore = &IngressConfig{}
|
||||
_ model.ConfigStoreCache = &IngressConfig{}
|
||||
_ model.IngressStore = &IngressConfig{}
|
||||
Http2RpcMethodMap = func() map[string]string {
|
||||
return map[string]string{
|
||||
"GET": "ALL_GET",
|
||||
"POST": "ALL_POST",
|
||||
"PUT": "ALL_PUT",
|
||||
"DELETE": "ALL_DELETE",
|
||||
"PATCH": "ALL_PATCH",
|
||||
}
|
||||
}
|
||||
Http2RpcParamSourceMap = func() map[string]string {
|
||||
return map[string]string{
|
||||
"QUERY": "ALL_QUERY_PARAMETER",
|
||||
"HEADER": "ALL_HEADER",
|
||||
"PATH": "ALL_PATH",
|
||||
"BODY": "ALL_BODY",
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
type IngressConfig struct {
|
||||
@@ -98,6 +120,12 @@ type IngressConfig struct {
|
||||
|
||||
wasmPlugins map[string]*extensions.WasmPlugin
|
||||
|
||||
http2rpcController http2rpc.Http2RpcController
|
||||
|
||||
http2rpcLister netlisterv1.Http2RpcLister
|
||||
|
||||
http2rpcs map[string]*higressv1.Http2Rpc
|
||||
|
||||
XDSUpdater model.XDSUpdater
|
||||
|
||||
annotationHandler annotations.AnnotationHandler
|
||||
@@ -125,6 +153,7 @@ func NewIngressConfig(localKubeClient kube.Client, XDSUpdater model.XDSUpdater,
|
||||
namespace: namespace,
|
||||
mcpbridgeReconciled: true,
|
||||
wasmPlugins: make(map[string]*extensions.WasmPlugin),
|
||||
http2rpcs: make(map[string]*higressv1.Http2Rpc),
|
||||
}
|
||||
mcpbridgeController := mcpbridge.NewController(localKubeClient, clusterId)
|
||||
mcpbridgeController.AddEventHandler(config.AddOrUpdateMcpBridge, config.DeleteMcpBridge)
|
||||
@@ -135,6 +164,11 @@ func NewIngressConfig(localKubeClient kube.Client, XDSUpdater model.XDSUpdater,
|
||||
wasmPluginController.AddEventHandler(config.AddOrUpdateWasmPlugin, config.DeleteWasmPlugin)
|
||||
config.wasmPluginController = wasmPluginController
|
||||
config.wasmPluginLister = wasmPluginController.Lister()
|
||||
|
||||
http2rpcController := http2rpc.NewController(localKubeClient, clusterId)
|
||||
http2rpcController.AddEventHandler(config.AddOrUpdateHttp2Rpc, config.DeleteHttp2Rpc)
|
||||
config.http2rpcController = http2rpcController
|
||||
config.http2rpcLister = http2rpcController.Lister()
|
||||
return config
|
||||
}
|
||||
|
||||
@@ -458,6 +492,18 @@ func (m *IngressConfig) convertEnvoyFilter(convertOptions *common.ConvertOptions
|
||||
continue
|
||||
}
|
||||
|
||||
http2rpc := route.WrapperConfig.AnnotationsConfig.Http2Rpc
|
||||
if http2rpc != nil {
|
||||
IngressLog.Infof("Found http2rpc for name %s", http2rpc.Name)
|
||||
envoyFilter, err := m.constructHttp2RpcEnvoyFilter(http2rpc, route, m.namespace)
|
||||
if err != nil {
|
||||
IngressLog.Errorf("Construct http2rpc EnvoyFilter error %v", err)
|
||||
} else {
|
||||
IngressLog.Infof("Append http2rpc EnvoyFilter for name %s", http2rpc.Name)
|
||||
envoyFilters = append(envoyFilters, *envoyFilter)
|
||||
}
|
||||
}
|
||||
|
||||
auth := route.WrapperConfig.AnnotationsConfig.Auth
|
||||
if auth == nil {
|
||||
continue
|
||||
@@ -521,6 +567,7 @@ func (m *IngressConfig) convertServiceEntry([]common.WrapperConfig) []config.Con
|
||||
return nil
|
||||
}
|
||||
serviceEntries := m.RegistryReconciler.GetAllServiceEntryWrapper()
|
||||
IngressLog.Infof("Found http2rpc serviceEntries %s", serviceEntries)
|
||||
out := make([]config.Config, 0, len(serviceEntries))
|
||||
for _, se := range serviceEntries {
|
||||
out = append(out, config.Config{
|
||||
@@ -912,6 +959,38 @@ func (m *IngressConfig) DeleteMcpBridge(clusterNamespacedName util.ClusterNamesp
|
||||
}
|
||||
}
|
||||
|
||||
func (m *IngressConfig) AddOrUpdateHttp2Rpc(clusterNamespacedName util.ClusterNamespacedName) {
|
||||
if clusterNamespacedName.Namespace != m.namespace {
|
||||
return
|
||||
}
|
||||
http2rpc, err := m.http2rpcLister.Http2Rpcs(clusterNamespacedName.Namespace).Get(clusterNamespacedName.Name)
|
||||
if err != nil {
|
||||
IngressLog.Errorf("http2rpc is not found, namespace:%s, name:%s",
|
||||
clusterNamespacedName.Namespace, clusterNamespacedName.Name)
|
||||
return
|
||||
}
|
||||
m.mutex.Lock()
|
||||
m.http2rpcs[clusterNamespacedName.Name] = &http2rpc.Spec
|
||||
m.mutex.Unlock()
|
||||
IngressLog.Infof("AddOrUpdateHttp2Rpc http2rpc ingress name %s", clusterNamespacedName.Name)
|
||||
}
|
||||
|
||||
func (m *IngressConfig) DeleteHttp2Rpc(clusterNamespacedName util.ClusterNamespacedName) {
|
||||
if clusterNamespacedName.Namespace != m.namespace {
|
||||
return
|
||||
}
|
||||
var hit bool
|
||||
m.mutex.Lock()
|
||||
if _, ok := m.http2rpcs[clusterNamespacedName.Name]; ok {
|
||||
delete(m.http2rpcs, clusterNamespacedName.Name)
|
||||
hit = true
|
||||
}
|
||||
m.mutex.Unlock()
|
||||
if hit {
|
||||
IngressLog.Debugf("Http2Rpc triggerd deleted %s", clusterNamespacedName.Name)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *IngressConfig) ReflectSecretChanges(clusterNamespacedName util.ClusterNamespacedName) {
|
||||
var hit bool
|
||||
m.mutex.RLock()
|
||||
@@ -996,6 +1075,179 @@ func (m *IngressConfig) applyCanaryIngresses(convertOptions *common.ConvertOptio
|
||||
}
|
||||
}
|
||||
|
||||
func (m *IngressConfig) constructHttp2RpcEnvoyFilter(http2rpcConfig *annotations.Http2RpcConfig, route *common.WrapperHTTPRoute, namespace string) (*config.Config, error) {
|
||||
mappings := m.http2rpcs
|
||||
IngressLog.Infof("Found http2rpc mappings %v", mappings)
|
||||
if _, exist := mappings[http2rpcConfig.Name]; !exist {
|
||||
IngressLog.Errorf("Http2RpcConfig name %s, not found Http2Rpc CRD", http2rpcConfig.Name)
|
||||
return nil, errors.New("invalid http2rpcConfig has no useable http2rpc")
|
||||
}
|
||||
http2rpcCRD := mappings[http2rpcConfig.Name]
|
||||
|
||||
if http2rpcCRD.GetDubbo() == nil {
|
||||
IngressLog.Errorf("Http2RpcConfig name %s, only support Http2Rpc CRD Dubbo Service type", http2rpcConfig.Name)
|
||||
return nil, errors.New("invalid http2rpcConfig has no useable http2rpc")
|
||||
}
|
||||
|
||||
httpRoute := route.HTTPRoute
|
||||
httpRouteDestination := httpRoute.Route[0]
|
||||
typeStruct, err := m.constructHttp2RpcMethods(http2rpcCRD.GetDubbo())
|
||||
if err != nil {
|
||||
return nil, errors.New(err.Error())
|
||||
}
|
||||
|
||||
return &config.Config{
|
||||
Meta: config.Meta{
|
||||
GroupVersionKind: gvk.EnvoyFilter,
|
||||
Name: common.CreateConvertedName(constants.IstioIngressGatewayName, http2rpcConfig.Name),
|
||||
Namespace: namespace,
|
||||
},
|
||||
Spec: &networking.EnvoyFilter{
|
||||
ConfigPatches: []*networking.EnvoyFilter_EnvoyConfigObjectPatch{
|
||||
{
|
||||
ApplyTo: networking.EnvoyFilter_HTTP_FILTER,
|
||||
Match: &networking.EnvoyFilter_EnvoyConfigObjectMatch{
|
||||
Context: networking.EnvoyFilter_GATEWAY,
|
||||
ObjectTypes: &networking.EnvoyFilter_EnvoyConfigObjectMatch_Listener{
|
||||
Listener: &networking.EnvoyFilter_ListenerMatch{
|
||||
FilterChain: &networking.EnvoyFilter_ListenerMatch_FilterChainMatch{
|
||||
Filter: &networking.EnvoyFilter_ListenerMatch_FilterMatch{
|
||||
Name: "envoy.filters.network.http_connection_manager",
|
||||
SubFilter: &networking.EnvoyFilter_ListenerMatch_SubFilterMatch{
|
||||
Name: "envoy.filters.http.router",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Patch: &networking.EnvoyFilter_Patch{
|
||||
Operation: networking.EnvoyFilter_Patch_INSERT_BEFORE,
|
||||
Value: buildPatchStruct(`{
|
||||
"name":"envoy.filters.http.http_dubbo_transcoder",
|
||||
"typed_config":{
|
||||
"@type":"type.googleapis.com/udpa.type.v1.TypedStruct",
|
||||
"type_url":"type.googleapis.com/envoy.extensions.filters.http.http_dubbo_transcoder.v3.HttpDubboTranscoder"
|
||||
}
|
||||
}`),
|
||||
},
|
||||
},
|
||||
{
|
||||
ApplyTo: networking.EnvoyFilter_HTTP_ROUTE,
|
||||
Match: &networking.EnvoyFilter_EnvoyConfigObjectMatch{
|
||||
Context: networking.EnvoyFilter_GATEWAY,
|
||||
ObjectTypes: &networking.EnvoyFilter_EnvoyConfigObjectMatch_RouteConfiguration{
|
||||
RouteConfiguration: &networking.EnvoyFilter_RouteConfigurationMatch{
|
||||
Vhost: &networking.EnvoyFilter_RouteConfigurationMatch_VirtualHostMatch{
|
||||
Route: &networking.EnvoyFilter_RouteConfigurationMatch_RouteMatch{
|
||||
Name: httpRoute.Name,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Patch: &networking.EnvoyFilter_Patch{
|
||||
Operation: networking.EnvoyFilter_Patch_MERGE,
|
||||
Value: typeStruct,
|
||||
},
|
||||
},
|
||||
{
|
||||
ApplyTo: networking.EnvoyFilter_CLUSTER,
|
||||
Match: &networking.EnvoyFilter_EnvoyConfigObjectMatch{
|
||||
Context: networking.EnvoyFilter_GATEWAY,
|
||||
ObjectTypes: &networking.EnvoyFilter_EnvoyConfigObjectMatch_Cluster{
|
||||
Cluster: &networking.EnvoyFilter_ClusterMatch{
|
||||
Service: httpRouteDestination.Destination.Host,
|
||||
},
|
||||
},
|
||||
},
|
||||
Patch: &networking.EnvoyFilter_Patch{
|
||||
Operation: networking.EnvoyFilter_Patch_MERGE,
|
||||
Value: buildPatchStruct(`{
|
||||
"upstream_config": {
|
||||
"name":"envoy.upstreams.http.dubbo_tcp",
|
||||
"typed_config":{
|
||||
"@type":"type.googleapis.com/udpa.type.v1.TypedStruct",
|
||||
"type_url":"type.googleapis.com/envoy.extensions.upstreams.http.dubbo_tcp.v3.DubboTcpConnectionPoolProto"
|
||||
}
|
||||
}
|
||||
}`),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *IngressConfig) constructHttp2RpcMethods(dubbo *higressv1.DubboService) (*types.Struct, error) {
|
||||
httpRouterTemplate := `{
|
||||
"route": {
|
||||
"upgrade_configs": [
|
||||
{
|
||||
"connect_config": {
|
||||
"allow_post": true
|
||||
},
|
||||
"upgrade_type": "CONNECT"
|
||||
}
|
||||
]
|
||||
},
|
||||
"typed_per_filter_config": {
|
||||
"envoy.filters.http.http_dubbo_transcoder": {
|
||||
"@type": "type.googleapis.com/udpa.type.v1.TypedStruct",
|
||||
"type_url": "type.googleapis.com/envoy.extensions.filters.http.http_dubbo_transcoder.v3.HttpDubboTranscoder",
|
||||
"value": {
|
||||
"request_validation_options": {
|
||||
"reject_unknown_method": true,
|
||||
"reject_unknown_query_parameters": true
|
||||
},
|
||||
"services_mapping": {
|
||||
"name": "%s",
|
||||
"version": "%s",
|
||||
"method_mapping": %s
|
||||
},
|
||||
"url_unescape_spec": "ALL_CHARACTERS_EXCEPT_RESERVED"
|
||||
}
|
||||
}
|
||||
}
|
||||
}`
|
||||
var methods []interface{}
|
||||
for _, serviceMethod := range dubbo.GetMethods() {
|
||||
var method = make(map[string]interface{})
|
||||
method["name"] = serviceMethod.GetServiceMethod()
|
||||
var passthrough_setting = make(map[string]interface{})
|
||||
passthrough_setting["passthrough_all_headers"] = true
|
||||
method["passthrough_setting"] = passthrough_setting
|
||||
var params []interface{}
|
||||
for _, methodParam := range serviceMethod.GetParams() {
|
||||
var param = make(map[string]interface{})
|
||||
param["extract_key"] = methodParam.GetParamKey()
|
||||
param["extract_key_spec"] = Http2RpcParamSourceMap()[methodParam.GetParamSource()]
|
||||
param["mapping_type"] = methodParam.GetParamType()
|
||||
params = append(params, param)
|
||||
}
|
||||
method["parameter_mapping"] = params
|
||||
var path_matcher = make(map[string]interface{})
|
||||
path_matcher["match_http_method_spec"] = Http2RpcMethodMap()[serviceMethod.HttpMethods[0]]
|
||||
path_matcher["match_pattern"] = serviceMethod.GetHttpPath()
|
||||
method["path_matcher"] = path_matcher
|
||||
methods = append(methods, method)
|
||||
}
|
||||
name := dubbo.GetService()
|
||||
version := dubbo.GetVersion()
|
||||
strBuffer := new(bytes.Buffer)
|
||||
methodsJsonStr, _ := json.Marshal(methods)
|
||||
fmt.Fprintf(strBuffer, httpRouterTemplate, name, version, string(methodsJsonStr))
|
||||
IngressLog.Infof("Found http2rpc buildHttp2RpcMethods %s", strBuffer.String())
|
||||
result := buildPatchStruct(strBuffer.String())
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func buildPatchStruct(config string) *types.Struct {
|
||||
val := &types.Struct{}
|
||||
_ = jsonpb.Unmarshal(strings.NewReader(config), val)
|
||||
return val
|
||||
}
|
||||
|
||||
func constructBasicAuthEnvoyFilter(rules *common.BasicAuthRules, namespace string) (*config.Config, error) {
|
||||
rulesStr, err := json.Marshal(rules)
|
||||
if err != nil {
|
||||
@@ -1079,9 +1331,34 @@ func constructBasicAuthEnvoyFilter(rules *common.BasicAuthRules, namespace strin
|
||||
}, nil
|
||||
}
|
||||
|
||||
func QueryByName(serviceEntries []*memory.ServiceEntryWrapper, serviceName string) (*memory.ServiceEntryWrapper, error) {
|
||||
IngressLog.Infof("Found http2rpc serviceEntries %s", serviceEntries)
|
||||
for _, se := range serviceEntries {
|
||||
if se.ServiceName == serviceName {
|
||||
return se, nil
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("can't find ServiceEntry by serviceName:%v", serviceName)
|
||||
}
|
||||
|
||||
func QueryRpcServiceVersion(serviceEntry *memory.ServiceEntryWrapper, serviceName string) (string, error) {
|
||||
IngressLog.Infof("Found http2rpc serviceEntry %s", serviceEntry)
|
||||
IngressLog.Infof("Found http2rpc ServiceEntry %s", serviceEntry.ServiceEntry)
|
||||
IngressLog.Infof("Found http2rpc WorkloadSelector %s", serviceEntry.ServiceEntry.WorkloadSelector)
|
||||
IngressLog.Infof("Found http2rpc Labels %s", serviceEntry.ServiceEntry.WorkloadSelector.Labels)
|
||||
labels := (*serviceEntry).ServiceEntry.WorkloadSelector.Labels
|
||||
for key, value := range labels {
|
||||
if key == "version" {
|
||||
return value, nil
|
||||
}
|
||||
}
|
||||
return "", fmt.Errorf("can't get RpcServiceVersion for serviceName:%v", serviceName)
|
||||
}
|
||||
|
||||
func (m *IngressConfig) Run(stop <-chan struct{}) {
|
||||
go m.mcpbridgeController.Run(stop)
|
||||
go m.wasmPluginController.Run(stop)
|
||||
go m.http2rpcController.Run(stop)
|
||||
}
|
||||
|
||||
func (m *IngressConfig) HasSynced() bool {
|
||||
@@ -1098,6 +1375,9 @@ func (m *IngressConfig) HasSynced() bool {
|
||||
if !m.wasmPluginController.HasSynced() {
|
||||
return false
|
||||
}
|
||||
if !m.http2rpcController.HasSynced() {
|
||||
return false
|
||||
}
|
||||
IngressLog.Info("Ingress config controller synced.")
|
||||
return true
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user