refactor: workflow condition node

refactor: workflow condition node
This commit is contained in:
Fu Diwei
2025-05-28 23:30:38 +08:00
parent 3a829ad53b
commit 6731c465e7
59 changed files with 1140 additions and 988 deletions

View File

@@ -3,6 +3,7 @@ package nodeprocessor
import (
"context"
"fmt"
"strconv"
"time"
"golang.org/x/exp/maps"
@@ -108,15 +109,15 @@ func (n *applyNode) Process(ctx context.Context) error {
}
}
// 添加中间结果
n.outputs[outputCertificateValidatedKey] = "true"
n.outputs[outputCertificateDaysLeftKey] = fmt.Sprintf("%d", int(time.Until(certificate.ExpireAt).Hours()/24))
// 记录中间结果
n.outputs[outputKeyForCertificateValidity] = strconv.FormatBool(true)
n.outputs[outputKeyForCertificateDaysLeft] = strconv.FormatInt(int64(time.Until(certificate.ExpireAt).Hours()/24), 10)
n.logger.Info("application completed")
return nil
}
func (n *applyNode) checkCanSkip(ctx context.Context, lastOutput *domain.WorkflowOutput) (skip bool, reason string) {
func (n *applyNode) checkCanSkip(ctx context.Context, lastOutput *domain.WorkflowOutput) (_skip bool, _reason string) {
if lastOutput != nil && lastOutput.Succeeded {
// 比较和上次申请时的关键配置(即影响证书签发的)参数是否一致
currentNodeConfig := n.node.GetConfigForApply()
@@ -154,9 +155,12 @@ func (n *applyNode) checkCanSkip(ctx context.Context, lastOutput *domain.Workflo
renewalInterval := time.Duration(currentNodeConfig.SkipBeforeExpiryDays) * time.Hour * 24
expirationTime := time.Until(lastCertificate.ExpireAt)
if expirationTime > renewalInterval {
n.outputs[outputCertificateValidatedKey] = "true"
n.outputs[outputCertificateDaysLeftKey] = fmt.Sprintf("%d", int(expirationTime.Hours()/24))
return true, fmt.Sprintf("the certificate has already been issued (expires in %d day(s), next renewal in %d day(s))", int(expirationTime.Hours()/24), currentNodeConfig.SkipBeforeExpiryDays)
daysLeft := int(expirationTime.Hours() / 24)
// TODO: 优化此处逻辑,[checkCanSkip] 方法不应该修改中间结果,违背单一职责
n.outputs[outputKeyForCertificateValidity] = strconv.FormatBool(true)
n.outputs[outputKeyForCertificateDaysLeft] = strconv.FormatInt(int64(daysLeft), 10)
return true, fmt.Sprintf("the certificate has already been issued (expires in %d day(s), next renewal in %d day(s))", daysLeft, currentNodeConfig.SkipBeforeExpiryDays)
}
}
}

View File

@@ -3,8 +3,10 @@ package nodeprocessor
import (
"context"
"errors"
"fmt"
"github.com/usual2970/certimate/internal/domain"
"github.com/usual2970/certimate/internal/domain/expr"
)
type conditionNode struct {
@@ -22,30 +24,29 @@ func NewConditionNode(node *domain.WorkflowNode) *conditionNode {
}
func (n *conditionNode) Process(ctx context.Context) error {
n.logger.Info("enter condition node: " + n.node.Name)
nodeConfig := n.node.GetConfigForCondition()
if nodeConfig.Expression == nil {
n.logger.Info("no condition found, continue to next node")
nodeCfg := n.node.GetConfigForCondition()
if nodeCfg.Expression == nil {
n.logger.Info("without any conditions, enter this branch")
return nil
}
rs, err := n.eval(ctx, nodeConfig.Expression)
rs, err := n.evalExpr(ctx, nodeCfg.Expression)
if err != nil {
n.logger.Warn("failed to eval expression: " + err.Error())
n.logger.Warn(fmt.Sprintf("failed to eval condition expression: %w", err))
return err
}
if rs.Value == false {
n.logger.Info("condition not met, skip this branch")
return errors.New("condition not met")
return errors.New("condition not met") // TODO: 错误处理
} else {
n.logger.Info("condition met, enter this branch")
}
n.logger.Info("condition met, continue to next node")
return nil
}
func (n *conditionNode) eval(ctx context.Context, expression domain.Expr) (*domain.EvalResult, error) {
func (n *conditionNode) evalExpr(ctx context.Context, expression expr.Expr) (*expr.EvalResult, error) {
variables := GetNodeOutputs(ctx)
return expression.Eval(variables)
}

View File

@@ -1,6 +1,6 @@
package nodeprocessor
const (
outputCertificateValidatedKey = "certificate.validated"
outputCertificateDaysLeftKey = "certificate.daysLeft"
outputKeyForCertificateValidity = "certificate.validity"
outputKeyForCertificateDaysLeft = "certificate.daysLeft"
)

View File

@@ -35,7 +35,8 @@ func AddNodeOutput(ctx context.Context, nodeId string, output map[string]any) co
container.Lock()
defer container.Unlock()
// 创建输出的深拷贝以避免后续修改
// 创建输出的深拷贝
// TODO: 暂时使用浅拷贝,等后续值类型扩充后修改
outputCopy := make(map[string]any, len(output))
for k, v := range output {
outputCopy[k] = v
@@ -90,6 +91,7 @@ func GetNodeOutputs(ctx context.Context) map[string]map[string]any {
defer container.RUnlock()
// 创建所有输出的深拷贝
// TODO: 暂时使用浅拷贝,等后续值类型扩充后修改
allOutputs := make(map[string]map[string]any, len(container.outputs))
for nodeId, output := range container.outputs {
nodeCopy := make(map[string]any, len(output))

View File

@@ -42,8 +42,9 @@ func (n *deployNode) Process(ctx context.Context) error {
}
// 获取前序节点输出证书
const DELIMITER = "#"
previousNodeOutputCertificateSource := n.node.GetConfigForDeploy().Certificate
previousNodeOutputCertificateSourceSlice := strings.Split(previousNodeOutputCertificateSource, "#")
previousNodeOutputCertificateSourceSlice := strings.Split(previousNodeOutputCertificateSource, DELIMITER)
if len(previousNodeOutputCertificateSourceSlice) != 2 {
n.logger.Warn("invalid certificate source", slog.String("certificate.source", previousNodeOutputCertificateSource))
return fmt.Errorf("invalid certificate source: %s", previousNodeOutputCertificateSource)
@@ -99,7 +100,7 @@ func (n *deployNode) Process(ctx context.Context) error {
return nil
}
func (n *deployNode) checkCanSkip(ctx context.Context, lastOutput *domain.WorkflowOutput) (skip bool, reason string) {
func (n *deployNode) checkCanSkip(ctx context.Context, lastOutput *domain.WorkflowOutput) (_skip bool, _reason string) {
if lastOutput != nil && lastOutput.Succeeded {
// 比较和上次部署时的关键配置(即影响证书部署的)参数是否一致
currentNodeConfig := n.node.GetConfigForDeploy()

View File

@@ -6,13 +6,13 @@ import (
"crypto/x509"
"fmt"
"math"
"net"
"net/http"
"strconv"
"strings"
"time"
"github.com/usual2970/certimate/internal/domain"
httputil "github.com/usual2970/certimate/internal/pkg/utils/http"
)
type monitorNode struct {
@@ -32,23 +32,23 @@ func NewMonitorNode(node *domain.WorkflowNode) *monitorNode {
func (n *monitorNode) Process(ctx context.Context) error {
n.logger.Info("ready to monitor certificate ...")
nodeConfig := n.node.GetConfigForMonitor()
nodeCfg := n.node.GetConfigForMonitor()
targetAddr := fmt.Sprintf("%s:%d", nodeConfig.Host, nodeConfig.Port)
if nodeConfig.Port == 0 {
targetAddr = fmt.Sprintf("%s:443", nodeConfig.Host)
targetAddr := fmt.Sprintf("%s:%d", nodeCfg.Host, nodeCfg.Port)
if nodeCfg.Port == 0 {
targetAddr = fmt.Sprintf("%s:443", nodeCfg.Host)
}
targetDomain := nodeConfig.Domain
targetDomain := nodeCfg.Domain
if targetDomain == "" {
targetDomain = nodeConfig.Host
targetDomain = nodeCfg.Host
}
n.logger.Info(fmt.Sprintf("retrieving certificate at %s (domain: %s)", targetAddr, targetDomain))
const MAX_ATTEMPTS = 3
const RETRY_INTERVAL = 2 * time.Second
var cert *x509.Certificate
var certs []*x509.Certificate
var err error
for attempt := 0; attempt < MAX_ATTEMPTS; attempt++ {
if attempt > 0 {
@@ -61,7 +61,7 @@ func (n *monitorNode) Process(ctx context.Context) error {
}
}
cert, err = n.tryRetrieveCert(ctx, targetAddr, targetDomain, nodeConfig.RequestPath)
certs, err = n.tryRetrievePeerCertificates(ctx, targetAddr, targetDomain, nodeCfg.RequestPath)
if err == nil {
break
}
@@ -71,15 +71,13 @@ func (n *monitorNode) Process(ctx context.Context) error {
n.logger.Warn("failed to monitor certificate")
return err
} else {
if cert == nil {
if len(certs) == 0 {
n.logger.Warn("no ssl certificates retrieved in http response")
outputs := map[string]any{
outputCertificateValidatedKey: strconv.FormatBool(false),
outputCertificateDaysLeftKey: strconv.FormatInt(0, 10),
}
n.setOutputs(outputs)
n.outputs[outputKeyForCertificateValidity] = strconv.FormatBool(false)
n.outputs[outputKeyForCertificateDaysLeft] = strconv.FormatInt(0, 10)
} else {
cert := certs[0] // 只取证书链中的第一个证书,即服务器证书
n.logger.Info(fmt.Sprintf("ssl certificate retrieved (serial='%s', subject='%s', issuer='%s', not_before='%s', not_after='%s', sans='%s')",
cert.SerialNumber, cert.Subject.String(), cert.Issuer.String(),
cert.NotBefore.Format(time.RFC3339), cert.NotAfter.Format(time.RFC3339),
@@ -95,11 +93,8 @@ func (n *monitorNode) Process(ctx context.Context) error {
validated := isCertPeriodValid && isCertHostMatched
daysLeft := int(math.Floor(cert.NotAfter.Sub(now).Hours() / 24))
outputs := map[string]any{
outputCertificateValidatedKey: strconv.FormatBool(validated),
outputCertificateDaysLeftKey: strconv.FormatInt(int64(daysLeft), 10),
}
n.setOutputs(outputs)
n.outputs[outputKeyForCertificateValidity] = strconv.FormatBool(validated)
n.outputs[outputKeyForCertificateDaysLeft] = strconv.FormatInt(int64(daysLeft), 10)
if validated {
n.logger.Info(fmt.Sprintf("the certificate is valid, and will expire in %d day(s)", daysLeft))
@@ -113,52 +108,40 @@ func (n *monitorNode) Process(ctx context.Context) error {
return nil
}
func (n *monitorNode) tryRetrieveCert(ctx context.Context, addr, domain, requestPath string) (_cert *x509.Certificate, _err error) {
transport := &http.Transport{
DialContext: (&net.Dialer{
Timeout: 10 * time.Second,
}).DialContext,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
ForceAttemptHTTP2: false,
DisableKeepAlives: true,
Proxy: http.ProxyFromEnvironment,
func (n *monitorNode) tryRetrievePeerCertificates(ctx context.Context, addr, domain, requestPath string) ([]*x509.Certificate, error) {
transport := httputil.NewDefaultTransport()
if transport.TLSClientConfig == nil {
transport.TLSClientConfig = &tls.Config{}
}
transport.TLSClientConfig.InsecureSkipVerify = true
client := &http.Client{
Transport: transport,
Timeout: 15 * time.Second,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
Timeout: 30 * time.Second,
Transport: transport,
}
url := fmt.Sprintf("https://%s/%s", addr, strings.TrimLeft(requestPath, "/"))
req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil)
if err != nil {
_err = fmt.Errorf("failed to create http request: %w", err)
n.logger.Warn(fmt.Sprintf("failed to create http request: %w", err))
return nil, _err
err = fmt.Errorf("failed to create http request: %w", err)
n.logger.Warn(err.Error())
return nil, err
}
req.Header.Set("User-Agent", "certimate")
resp, err := client.Do(req)
if err != nil {
_err = fmt.Errorf("failed to send http request: %w", err)
n.logger.Warn(fmt.Sprintf("failed to send http request: %w", err))
return nil, _err
err = fmt.Errorf("failed to send http request: %w", err)
n.logger.Warn(err.Error())
return nil, err
}
defer resp.Body.Close()
if resp.TLS == nil || len(resp.TLS.PeerCertificates) == 0 {
return nil, _err
return make([]*x509.Certificate, 0), nil
}
_cert = resp.TLS.PeerCertificates[0]
return _cert, nil
}
func (n *monitorNode) setOutputs(outputs map[string]any) {
n.outputs = outputs
return resp.TLS.PeerCertificates, nil
}

View File

@@ -30,9 +30,9 @@ func NewNotifyNode(node *domain.WorkflowNode) *notifyNode {
func (n *notifyNode) Process(ctx context.Context) error {
n.logger.Info("ready to send notification ...")
nodeConfig := n.node.GetConfigForNotify()
nodeCfg := n.node.GetConfigForNotify()
if nodeConfig.Provider == "" {
if nodeCfg.Provider == "" {
// Deprecated: v0.4.x 将废弃
// 兼容旧版本的通知渠道
n.logger.Warn("WARNING! you are using the notification channel from global settings, which will be deprecated in the future")
@@ -44,14 +44,14 @@ func (n *notifyNode) Process(ctx context.Context) error {
}
// 获取通知渠道
channelConfig, err := settings.GetNotifyChannelConfig(nodeConfig.Channel)
channelConfig, err := settings.GetNotifyChannelConfig(nodeCfg.Channel)
if err != nil {
return err
}
// 发送通知
if err := notify.SendToChannel(nodeConfig.Subject, nodeConfig.Message, nodeConfig.Channel, channelConfig); err != nil {
n.logger.Warn("failed to send notification", slog.String("channel", nodeConfig.Channel))
if err := notify.SendToChannel(nodeCfg.Subject, nodeCfg.Message, nodeCfg.Channel, channelConfig); err != nil {
n.logger.Warn("failed to send notification", slog.String("channel", nodeCfg.Channel))
return err
}
@@ -63,8 +63,8 @@ func (n *notifyNode) Process(ctx context.Context) error {
deployer, err := notify.NewWithWorkflowNode(notify.NotifierWithWorkflowNodeConfig{
Node: n.node,
Logger: n.logger,
Subject: nodeConfig.Subject,
Message: nodeConfig.Message,
Subject: nodeCfg.Subject,
Message: nodeCfg.Message,
})
if err != nil {
n.logger.Warn("failed to create notifier provider")

View File

@@ -3,6 +3,7 @@ package nodeprocessor
import (
"context"
"fmt"
"strconv"
"strings"
"time"
@@ -33,7 +34,7 @@ func NewUploadNode(node *domain.WorkflowNode) *uploadNode {
func (n *uploadNode) Process(ctx context.Context) error {
n.logger.Info("ready to upload certiticate ...")
nodeConfig := n.node.GetConfigForUpload()
nodeCfg := n.node.GetConfigForUpload()
// 查询上次执行结果
lastOutput, err := n.outputRepo.GetByNodeId(ctx, n.node.Id)
@@ -53,7 +54,7 @@ func (n *uploadNode) Process(ctx context.Context) error {
certificate := &domain.Certificate{
Source: domain.CertificateSourceTypeUpload,
}
certificate.PopulateFromPEM(nodeConfig.Certificate, nodeConfig.PrivateKey)
certificate.PopulateFromPEM(nodeCfg.Certificate, nodeCfg.PrivateKey)
// 保存执行结果
output := &domain.WorkflowOutput{
@@ -69,15 +70,15 @@ func (n *uploadNode) Process(ctx context.Context) error {
return err
}
n.outputs[outputCertificateValidatedKey] = "true"
n.outputs[outputCertificateDaysLeftKey] = fmt.Sprintf("%d", int(time.Until(certificate.ExpireAt).Hours()/24))
// 记录中间结果
n.outputs[outputKeyForCertificateValidity] = strconv.FormatBool(true)
n.outputs[outputKeyForCertificateDaysLeft] = strconv.FormatInt(int64(time.Until(certificate.ExpireAt).Hours()/24), 10)
n.logger.Info("uploading completed")
return nil
}
func (n *uploadNode) checkCanSkip(ctx context.Context, lastOutput *domain.WorkflowOutput) (skip bool, reason string) {
func (n *uploadNode) checkCanSkip(ctx context.Context, lastOutput *domain.WorkflowOutput) (_skip bool, _reason string) {
if lastOutput != nil && lastOutput.Succeeded {
// 比较和上次上传时的关键配置(即影响证书上传的)参数是否一致
currentNodeConfig := n.node.GetConfigForUpload()
@@ -91,8 +92,10 @@ func (n *uploadNode) checkCanSkip(ctx context.Context, lastOutput *domain.Workfl
lastCertificate, _ := n.certRepo.GetByWorkflowRunId(ctx, lastOutput.RunId)
if lastCertificate != nil {
n.outputs[outputCertificateValidatedKey] = "true"
n.outputs[outputCertificateDaysLeftKey] = fmt.Sprintf("%d", int(time.Until(lastCertificate.ExpireAt).Hours()/24))
daysLeft := int(time.Until(lastCertificate.ExpireAt).Hours() / 24)
n.outputs[outputKeyForCertificateValidity] = strconv.FormatBool(daysLeft > 0)
n.outputs[outputKeyForCertificateDaysLeft] = strconv.FormatInt(int64(daysLeft), 10)
return true, "the certificate has already been uploaded"
}
}