add tracing in higress-config configmap (#409)

This commit is contained in:
Jun
2023-07-04 19:59:11 +08:00
committed by GitHub
parent 3fd37abab7
commit b65446fa25
5 changed files with 679 additions and 2 deletions

View File

@@ -49,6 +49,7 @@ import (
netlisterv1 "github.com/alibaba/higress/client/pkg/listers/networking/v1"
"github.com/alibaba/higress/pkg/ingress/kube/annotations"
"github.com/alibaba/higress/pkg/ingress/kube/common"
"github.com/alibaba/higress/pkg/ingress/kube/configmap"
"github.com/alibaba/higress/pkg/ingress/kube/http2rpc"
"github.com/alibaba/higress/pkg/ingress/kube/ingress"
"github.com/alibaba/higress/pkg/ingress/kube/ingressv1"
@@ -126,6 +127,8 @@ type IngressConfig struct {
http2rpcs map[string]*higressv1.Http2Rpc
configmapMgr *configmap.ConfigmapMgr
XDSUpdater model.XDSUpdater
annotationHandler annotations.AnnotationHandler
@@ -169,6 +172,10 @@ func NewIngressConfig(localKubeClient kube.Client, XDSUpdater model.XDSUpdater,
http2rpcController.AddEventHandler(config.AddOrUpdateHttp2Rpc, config.DeleteHttp2Rpc)
config.http2rpcController = http2rpcController
config.http2rpcLister = http2rpcController.Lister()
higressConfigController := configmap.NewController(localKubeClient, clusterId, namespace)
config.configmapMgr = configmap.NewConfigmapMgr(XDSUpdater, namespace, higressConfigController, higressConfigController.Lister())
return config
}
@@ -241,8 +248,24 @@ func (m *IngressConfig) List(typ config.GroupVersionKind, namespace string) ([]c
if typ == gvk.EnvoyFilter {
m.mutex.RLock()
defer m.mutex.RUnlock()
IngressLog.Infof("resource type %s, configs number %d", typ, len(m.cachedEnvoyFilters))
return m.cachedEnvoyFilters, nil
var envoyFilters []config.Config
// Build configmap envoy filters
configmapEnvoyFilters, err := m.configmapMgr.ConstructEnvoyFilters()
if err != nil {
IngressLog.Errorf("Construct configmap EnvoyFilters error %v", err)
} else {
for _, envoyFilter := range configmapEnvoyFilters {
envoyFilters = append(envoyFilters, *envoyFilter)
}
IngressLog.Infof("Append %d configmap EnvoyFilters", len(configmapEnvoyFilters))
}
if len(envoyFilters) == 0 {
IngressLog.Infof("resource type %s, configs number %d", typ, len(m.cachedEnvoyFilters))
return m.cachedEnvoyFilters, nil
}
envoyFilters = append(envoyFilters, m.cachedEnvoyFilters...)
IngressLog.Infof("resource type %s, configs number %d", typ, len(envoyFilters))
return envoyFilters, nil
}
var configs []config.Config
@@ -1368,6 +1391,7 @@ func (m *IngressConfig) Run(stop <-chan struct{}) {
go m.mcpbridgeController.Run(stop)
go m.wasmPluginController.Run(stop)
go m.http2rpcController.Run(stop)
go m.configmapMgr.HigressConfigController.Run(stop)
}
func (m *IngressConfig) HasSynced() bool {
@@ -1387,6 +1411,9 @@ func (m *IngressConfig) HasSynced() bool {
if !m.http2rpcController.HasSynced() {
return false
}
if !m.configmapMgr.HigressConfigController.HasSynced() {
return false
}
IngressLog.Info("Ingress config controller synced.")
return true
}

View File

@@ -0,0 +1,50 @@
// Copyright (c) 2022 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package configmap
import (
"encoding/json"
)
type Result int32
const (
ResultNothing Result = iota
ResultReplace
ResultDelete
HigressConfigMapName = "higress-config"
HigressConfigMapKey = "higress"
ModelUpdatedReason = "higress configmap updated"
)
type ItemEventHandler = func(name string)
type HigressConfig struct {
Tracing *Tracing `json:"tracing,omitempty"`
}
func NewDefaultHigressConfig() *HigressConfig {
higressConfig := &HigressConfig{
Tracing: NewDefaultTracing(),
}
return higressConfig
}
func GetHigressConfigString(higressConfig *HigressConfig) string {
bytes, _ := json.Marshal(higressConfig)
return string(bytes)
}

View File

@@ -0,0 +1,202 @@
// Copyright (c) 2022 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package configmap
import (
"reflect"
"sync/atomic"
"github.com/alibaba/higress/pkg/ingress/kube/controller"
"github.com/alibaba/higress/pkg/ingress/kube/util"
. "github.com/alibaba/higress/pkg/ingress/log"
"istio.io/istio/pilot/pkg/model"
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/config/schema/gvk"
kubeclient "istio.io/istio/pkg/kube"
"istio.io/istio/pkg/kube/controllers"
"k8s.io/apimachinery/pkg/types"
listersv1 "k8s.io/client-go/listers/core/v1"
"sigs.k8s.io/yaml"
)
type HigressConfigController controller.Controller[listersv1.ConfigMapNamespaceLister]
func NewController(client kubeclient.Client, clusterId string, namespace string) HigressConfigController {
informer := client.KubeInformer().Core().V1().ConfigMaps().Informer()
return controller.NewCommonController("higressConfig", client.KubeInformer().Core().V1().ConfigMaps().Lister().ConfigMaps(namespace),
informer, GetConfigmap, clusterId)
}
func GetConfigmap(lister listersv1.ConfigMapNamespaceLister, namespacedName types.NamespacedName) (controllers.Object, error) {
return lister.Get(namespacedName.Name)
}
type ItemController interface {
GetName() string
AddOrUpdateHigressConfig(name util.ClusterNamespacedName, old *HigressConfig, new *HigressConfig) error
ValidHigressConfig(higressConfig *HigressConfig) error
ConstructEnvoyFilters() ([]*config.Config, error)
RegisterItemEventHandler(eventHandler ItemEventHandler)
}
type ConfigmapMgr struct {
Namespace string
HigressConfigController HigressConfigController
HigressConfigLister listersv1.ConfigMapNamespaceLister
higressConfig atomic.Value
ItemControllers []ItemController
XDSUpdater model.XDSUpdater
}
func NewConfigmapMgr(XDSUpdater model.XDSUpdater, namespace string, higressConfigController HigressConfigController, higressConfigLister listersv1.ConfigMapNamespaceLister) *ConfigmapMgr {
configmapMgr := &ConfigmapMgr{
XDSUpdater: XDSUpdater,
Namespace: namespace,
HigressConfigController: higressConfigController,
HigressConfigLister: higressConfigLister,
higressConfig: atomic.Value{},
}
configmapMgr.HigressConfigController.AddEventHandler(configmapMgr.AddOrUpdateHigressConfig)
configmapMgr.SetHigressConfig(NewDefaultHigressConfig())
tracingController := NewTracingController(namespace)
configmapMgr.AddItemControllers(tracingController)
configmapMgr.initEventHandlers()
return configmapMgr
}
func (c *ConfigmapMgr) SetHigressConfig(higressConfig *HigressConfig) {
c.higressConfig.Store(higressConfig)
}
func (c *ConfigmapMgr) GetHigressConfig() *HigressConfig {
value := c.higressConfig.Load()
if value != nil {
if higressConfig, ok := value.(*HigressConfig); ok {
return higressConfig
}
}
return nil
}
func (c *ConfigmapMgr) AddItemControllers(controllers ...ItemController) {
c.ItemControllers = append(c.ItemControllers, controllers...)
}
func (c *ConfigmapMgr) AddOrUpdateHigressConfig(name util.ClusterNamespacedName) {
if name.Namespace != c.Namespace || name.Name != HigressConfigMapName {
return
}
IngressLog.Infof("configmapMgr AddOrUpdateHigressConfig")
higressConfigmap, err := c.HigressConfigLister.Get(HigressConfigMapName)
if err != nil {
IngressLog.Errorf("higress-config configmap is not found, namespace:%s, name:%s",
name.Namespace, name.Name)
return
}
if _, ok := higressConfigmap.Data[HigressConfigMapKey]; !ok {
return
}
newHigressConfig := NewDefaultHigressConfig()
if err = yaml.Unmarshal([]byte(higressConfigmap.Data[HigressConfigMapKey]), newHigressConfig); err != nil {
IngressLog.Errorf("data:%s, convert to higress config error, error: %+v", higressConfigmap.Data[HigressConfigMapKey], err)
return
}
for _, itemController := range c.ItemControllers {
if itemErr := itemController.ValidHigressConfig(newHigressConfig); itemErr != nil {
IngressLog.Errorf("configmap %s controller valid higress config error, error: %+v", itemController.GetName(), itemErr)
return
}
}
oldHigressConfig := c.GetHigressConfig()
IngressLog.Infof("configmapMgr oldHigressConfig: %s", GetHigressConfigString(oldHigressConfig))
IngressLog.Infof("configmapMgr newHigressConfig: %s", GetHigressConfigString(newHigressConfig))
result, _ := c.CompareHigressConfig(oldHigressConfig, newHigressConfig)
IngressLog.Infof("configmapMgr CompareHigressConfig reuslt is %d", result)
if result == ResultNothing {
return
}
if result == ResultDelete {
newHigressConfig = NewDefaultHigressConfig()
}
if result == ResultReplace || result == ResultDelete {
// Pass AddOrUpdateHigressConfig to itemControllers
for _, itemController := range c.ItemControllers {
IngressLog.Infof("configmap %s controller AddOrUpdateHigressConfig", itemController.GetName())
if itemErr := itemController.AddOrUpdateHigressConfig(name, oldHigressConfig, newHigressConfig); itemErr != nil {
IngressLog.Errorf("configmap %s controller AddOrUpdateHigressConfig error, error: %+v", itemController.GetName(), itemErr)
}
}
c.SetHigressConfig(newHigressConfig)
IngressLog.Infof("configmapMgr higress config AddOrUpdate success, reuslt is %d", result)
// Call updateConfig
}
}
func (c *ConfigmapMgr) ConstructEnvoyFilters() ([]*config.Config, error) {
configs := make([]*config.Config, 0)
for _, itemController := range c.ItemControllers {
IngressLog.Infof("controller %s ConstructEnvoyFilters", itemController.GetName())
if itemConfigs, err := itemController.ConstructEnvoyFilters(); err != nil {
IngressLog.Errorf("controller %s ConstructEnvoyFilters error, error: %+v", itemController.GetName(), err)
} else {
configs = append(configs, itemConfigs...)
}
}
return configs, nil
}
func (c *ConfigmapMgr) CompareHigressConfig(old *HigressConfig, new *HigressConfig) (Result, error) {
if old == nil || new == nil {
return ResultNothing, nil
}
if !reflect.DeepEqual(old, new) {
return ResultReplace, nil
}
return ResultNothing, nil
}
func (c *ConfigmapMgr) initEventHandlers() error {
itemEventHandler := func(name string) {
c.XDSUpdater.ConfigUpdate(&model.PushRequest{
Full: true,
ConfigsUpdated: map[model.ConfigKey]struct{}{{
Kind: gvk.EnvoyFilter,
Name: name,
Namespace: c.Namespace,
}: {}},
Reason: []model.TriggerReason{ModelUpdatedReason},
})
}
for _, itemController := range c.ItemControllers {
itemController.RegisterItemEventHandler(itemEventHandler)
}
return nil
}

View File

@@ -0,0 +1,392 @@
// Copyright (c) 2022 Alibaba Group Holding Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package configmap
import (
"errors"
"fmt"
"reflect"
"sync/atomic"
"github.com/alibaba/higress/pkg/ingress/kube/util"
. "github.com/alibaba/higress/pkg/ingress/log"
networking "istio.io/api/networking/v1alpha3"
"istio.io/istio/pkg/config"
"istio.io/istio/pkg/config/schema/gvk"
)
const (
higressTracingEnvoyFilterName = "higress-config-tracing"
defaultTimeout = 500
defaultSampling = 100.0
)
type Tracing struct {
// Flag to control trace
Enable bool `json:"enable,omitempty"`
// The percentage of requests (0.0 - 100.0) that will be randomly selected for trace generation,
// if not requested by the client or not forced. Default is 100.0.
Sampling float64 `json:"sampling,omitempty"`
// The timeout for the gRPC request. Default is 500ms
Timeout int32 `json:"timeout,omitempty"`
// The tracer implementation to be used by Envoy.
//
// Types that are assignable to Tracer:
Zipkin *Zipkin `json:"zipkin,omitempty"`
Skywalking *Skywalking `json:"skywalking,omitempty"`
OpenTelemetry *OpenTelemetry `json:"opentelemetry,omitempty"`
}
// Zipkin defines configuration for a Zipkin tracer.
type Zipkin struct {
// Address of the Zipkin service (e.g. _zipkin:9411_).
Service string `json:"service,omitempty"`
Port string `json:"port,omitempty"`
}
// Defines configuration for a Skywalking tracer.
type Skywalking struct {
// Address of the Skywalking tracer.
Service string `json:"service,omitempty"`
Port string `json:"port,omitempty"`
// The access token
AccessToken string `json:"access_token,omitempty"`
}
type OpenTelemetry struct {
// Address of OpenTelemetry tracer.
Service string `json:"service,omitempty"`
Port string `json:"port,omitempty"`
}
func validServiceAndPort(service string, port string) bool {
if len(service) == 0 || len(port) == 0 {
return false
}
return true
}
func ValidTracing(t *Tracing) error {
if t == nil {
return nil
}
if t.Timeout <= 0 {
return errors.New("timeout can not be less than zero")
}
if t.Sampling < 0 || t.Sampling > 100 {
return errors.New("sampling must be in (0.0 - 100.0)")
}
tracerNum := 0
if t.Zipkin != nil {
if validServiceAndPort(t.Zipkin.Service, t.Zipkin.Port) {
tracerNum++
} else {
return errors.New("zipkin service and port can not be empty")
}
}
if t.Skywalking != nil {
if validServiceAndPort(t.Skywalking.Service, t.Skywalking.Port) {
tracerNum++
} else {
return errors.New("skywalking service and port can not be empty")
}
}
if t.OpenTelemetry != nil {
if validServiceAndPort(t.OpenTelemetry.Service, t.OpenTelemetry.Port) {
tracerNum++
} else {
return errors.New("opentelemetry service and port can not be empty")
}
}
if tracerNum != 1 {
return errors.New("only one of skywalkingzipkin and opentelemetry configuration can be set")
}
return nil
}
func CompareTracing(old *Tracing, new *Tracing) (Result, error) {
if old == nil && new == nil {
return ResultNothing, nil
}
if new == nil {
return ResultDelete, nil
}
if !reflect.DeepEqual(old, new) {
return ResultReplace, nil
}
return ResultNothing, nil
}
func NewDefaultTracing() *Tracing {
tracing := &Tracing{
Enable: false,
Timeout: defaultTimeout,
Sampling: defaultSampling,
}
return tracing
}
type TracingController struct {
Namespace string
tracing atomic.Value
Name string
eventHandler ItemEventHandler
}
func NewTracingController(namespace string) *TracingController {
tracingMgr := &TracingController{
Namespace: namespace,
tracing: atomic.Value{},
Name: "tracing",
}
tracingMgr.SetTracing(NewDefaultTracing())
return tracingMgr
}
func (t *TracingController) SetTracing(higressConfig *Tracing) {
t.tracing.Store(higressConfig)
}
func (t *TracingController) GetTracing() *Tracing {
value := t.tracing.Load()
if value != nil {
if tracing, ok := value.(*Tracing); ok {
return tracing
}
}
return nil
}
func (t *TracingController) GetName() string {
return t.Name
}
func (t *TracingController) AddOrUpdateHigressConfig(name util.ClusterNamespacedName, old *HigressConfig, new *HigressConfig) error {
if err := ValidTracing(new.Tracing); err != nil {
IngressLog.Errorf("data:%+v convert to tracing , error: %+v", new.Tracing, err)
return nil
}
result, _ := CompareTracing(old.Tracing, new.Tracing)
switch result {
case ResultReplace:
t.SetTracing(new.Tracing)
IngressLog.Infof("AddOrUpdate Higress config tracing")
t.eventHandler(higressTracingEnvoyFilterName)
IngressLog.Infof("send event with filter name:%s", higressTracingEnvoyFilterName)
case ResultDelete:
t.SetTracing(NewDefaultTracing())
IngressLog.Infof("Delete Higress config tracing")
t.eventHandler(higressTracingEnvoyFilterName)
IngressLog.Infof("send event with filter name:%s", higressTracingEnvoyFilterName)
}
return nil
}
func (t *TracingController) ValidHigressConfig(higressConfig *HigressConfig) error {
if higressConfig == nil {
return nil
}
if higressConfig.Tracing == nil {
return nil
}
return ValidTracing(higressConfig.Tracing)
}
func (t *TracingController) RegisterItemEventHandler(eventHandler ItemEventHandler) {
t.eventHandler = eventHandler
}
func (t *TracingController) ConstructEnvoyFilters() ([]*config.Config, error) {
configs := make([]*config.Config, 0)
tracing := t.GetTracing()
namespace := t.Namespace
if tracing == nil {
return configs, nil
}
if tracing.Enable == false {
return configs, nil
}
tracingConfig := t.constructTracingTracer(tracing, namespace)
if len(tracingConfig) == 0 {
return configs, nil
}
config := &config.Config{
Meta: config.Meta{
GroupVersionKind: gvk.EnvoyFilter,
Name: higressTracingEnvoyFilterName,
Namespace: namespace,
},
Spec: &networking.EnvoyFilter{
ConfigPatches: []*networking.EnvoyFilter_EnvoyConfigObjectPatch{
{
ApplyTo: networking.EnvoyFilter_NETWORK_FILTER,
Match: &networking.EnvoyFilter_EnvoyConfigObjectMatch{
Context: networking.EnvoyFilter_GATEWAY,
ObjectTypes: &networking.EnvoyFilter_EnvoyConfigObjectMatch_Listener{
Listener: &networking.EnvoyFilter_ListenerMatch{
FilterChain: &networking.EnvoyFilter_ListenerMatch_FilterChainMatch{
Filter: &networking.EnvoyFilter_ListenerMatch_FilterMatch{
Name: "envoy.filters.network.http_connection_manager",
},
},
},
},
},
Patch: &networking.EnvoyFilter_Patch{
Operation: networking.EnvoyFilter_Patch_MERGE,
Value: util.BuildPatchStruct(tracingConfig),
},
},
{
ApplyTo: networking.EnvoyFilter_HTTP_FILTER,
Match: &networking.EnvoyFilter_EnvoyConfigObjectMatch{
Context: networking.EnvoyFilter_GATEWAY,
ObjectTypes: &networking.EnvoyFilter_EnvoyConfigObjectMatch_Listener{
Listener: &networking.EnvoyFilter_ListenerMatch{
FilterChain: &networking.EnvoyFilter_ListenerMatch_FilterChainMatch{
Filter: &networking.EnvoyFilter_ListenerMatch_FilterMatch{
Name: "envoy.filters.network.http_connection_manager",
SubFilter: &networking.EnvoyFilter_ListenerMatch_SubFilterMatch{
Name: "envoy.filters.http.router",
},
},
},
},
},
},
Patch: &networking.EnvoyFilter_Patch{
Operation: networking.EnvoyFilter_Patch_MERGE,
Value: util.BuildPatchStruct(`{
"name":"envoy.filters.http.router",
"typed_config":{
"@type": "type.googleapis.com/envoy.extensions.filters.http.router.v3.Router",
"start_child_span": true
}
}`),
},
},
},
},
}
configs = append(configs, config)
return configs, nil
}
func (t *TracingController) constructTracingTracer(tracing *Tracing, namespace string) string {
tracingConfig := ""
timeout := float32(tracing.Timeout) / 1000
if tracing.Skywalking != nil {
skywalking := tracing.Skywalking
tracingConfig = fmt.Sprintf(`{
"name": "envoy.filters.network.http_connection_manager",
"typed_config": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager",
"tracing": {
"provider": {
"name": "envoy.tracers.skywalking",
"typed_config": {
"@type": "type.googleapis.com/envoy.config.trace.v3.SkyWalkingConfig",
"client_config": {
"service_name": "higress-gateway.%s",
"backend_token": "%s"
},
"grpc_service": {
"envoy_grpc": {
"cluster_name": "outbound|%s||%s"
},
"timeout": "%.3fs"
}
}
},
"random_sampling": {
"value": %.1f
}
}
}
}`, namespace, skywalking.AccessToken, skywalking.Port, skywalking.Service, timeout, tracing.Sampling)
}
if tracing.Zipkin != nil {
zipkin := tracing.Zipkin
tracingConfig = fmt.Sprintf(`{
"name": "envoy.filters.network.http_connection_manager",
"typed_config": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager",
"tracing": {
"provider": {
"name": "envoy.tracers.zipkin",
"typed_config": {
"@type": "type.googleapis.com/envoy.config.trace.v3.ZipkinConfig",
"collector_cluster": "outbound|%s||%s",
"collector_endpoint": "/api/v2/spans",
"collector_hostname": "higress-gateway",
"collector_endpoint_version": "HTTP_JSON",
"split_spans_for_request": true
}
},
"random_sampling": {
"value": %.1f
}
}
}
}`, zipkin.Port, zipkin.Service, tracing.Sampling)
}
if tracing.OpenTelemetry != nil {
opentelemetry := tracing.OpenTelemetry
tracingConfig = fmt.Sprintf(`{
"name": "envoy.filters.network.http_connection_manager",
"typed_config": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager",
"tracing": {
"provider": {
"name": "envoy.tracers.opentelemetry",
"typed_config": {
"@type": "type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig",
"service_name": "higress-gateway.%s"
"grpc_service": {
"envoy_grpc": {
"cluster_name": "outbound|%s||%s"
},
"timeout": "%.3fs"
}
}
},
"random_sampling": {
"value": %.1f
}
}
}
}`, namespace, opentelemetry.Port, opentelemetry.Service, timeout, tracing.Sampling)
}
return tracingConfig
}

View File

@@ -82,3 +82,9 @@ func MessageToGoGoStruct(msg proto.Message) (*types.Struct, error) {
func CreateServiceFQDN(namespace, name string) string {
return fmt.Sprintf("%s.%s.svc.%s", name, namespace, DefaultDomainSuffix)
}
func BuildPatchStruct(config string) *types.Struct {
val := &types.Struct{}
_ = jsonpb.Unmarshal(strings.NewReader(config), val)
return val
}