feat: allow skip notify nodes when all previous nodes were skipped

This commit is contained in:
Fu Diwei
2025-06-09 20:39:23 +08:00
parent 84a3f3346a
commit 24fe824757
13 changed files with 126 additions and 74 deletions

View File

@@ -47,6 +47,7 @@ func (n *applyNode) Process(ctx context.Context) error {
// 检测是否可以跳过本次执行
if skippable, reason := n.checkCanSkip(ctx, lastOutput); skippable {
n.outputs[outputKeyForNodeSkipped] = strconv.FormatBool(true)
n.logger.Info(fmt.Sprintf("skip this application, because %s", reason))
return nil
} else if reason != "" {
@@ -112,6 +113,7 @@ func (n *applyNode) Process(ctx context.Context) error {
}
// 记录中间结果
n.outputs[outputKeyForNodeSkipped] = strconv.FormatBool(false)
n.outputs[outputKeyForCertificateValidity] = strconv.FormatBool(true)
n.outputs[outputKeyForCertificateDaysLeft] = strconv.FormatInt(int64(time.Until(certificate.ExpireAt).Hours()/24), 10)
@@ -122,39 +124,40 @@ func (n *applyNode) Process(ctx context.Context) error {
func (n *applyNode) checkCanSkip(ctx context.Context, lastOutput *domain.WorkflowOutput) (_skip bool, _reason string) {
if lastOutput != nil && lastOutput.Succeeded {
// 比较和上次申请时的关键配置(即影响证书签发的)参数是否一致
currentNodeConfig := n.node.GetConfigForApply()
lastNodeConfig := lastOutput.Node.GetConfigForApply()
if currentNodeConfig.Domains != lastNodeConfig.Domains {
thisNodeCfg := n.node.GetConfigForApply()
lastNodeCfg := lastOutput.Node.GetConfigForApply()
if thisNodeCfg.Domains != lastNodeCfg.Domains {
return false, "the configuration item 'Domains' changed"
}
if currentNodeConfig.ContactEmail != lastNodeConfig.ContactEmail {
if thisNodeCfg.ContactEmail != lastNodeCfg.ContactEmail {
return false, "the configuration item 'ContactEmail' changed"
}
if currentNodeConfig.Provider != lastNodeConfig.Provider {
if thisNodeCfg.Provider != lastNodeCfg.Provider {
return false, "the configuration item 'Provider' changed"
}
if currentNodeConfig.ProviderAccessId != lastNodeConfig.ProviderAccessId {
if thisNodeCfg.ProviderAccessId != lastNodeCfg.ProviderAccessId {
return false, "the configuration item 'ProviderAccessId' changed"
}
if !maps.Equal(currentNodeConfig.ProviderConfig, lastNodeConfig.ProviderConfig) {
if !maps.Equal(thisNodeCfg.ProviderConfig, lastNodeCfg.ProviderConfig) {
return false, "the configuration item 'ProviderConfig' changed"
}
if currentNodeConfig.CAProvider != lastNodeConfig.CAProvider {
if thisNodeCfg.CAProvider != lastNodeCfg.CAProvider {
return false, "the configuration item 'CAProvider' changed"
}
if currentNodeConfig.CAProviderAccessId != lastNodeConfig.CAProviderAccessId {
if thisNodeCfg.CAProviderAccessId != lastNodeCfg.CAProviderAccessId {
return false, "the configuration item 'CAProviderAccessId' changed"
}
if !maps.Equal(currentNodeConfig.CAProviderConfig, lastNodeConfig.CAProviderConfig) {
if !maps.Equal(thisNodeCfg.CAProviderConfig, lastNodeCfg.CAProviderConfig) {
return false, "the configuration item 'CAProviderConfig' changed"
}
if currentNodeConfig.KeyAlgorithm != lastNodeConfig.KeyAlgorithm {
if thisNodeCfg.KeyAlgorithm != lastNodeCfg.KeyAlgorithm {
return false, "the configuration item 'KeyAlgorithm' changed"
}
lastCertificate, _ := n.certRepo.GetByWorkflowRunId(ctx, lastOutput.RunId)
if lastCertificate != nil {
renewalInterval := time.Duration(currentNodeConfig.SkipBeforeExpiryDays) * time.Hour * 24
renewalInterval := time.Duration(thisNodeCfg.SkipBeforeExpiryDays) * time.Hour * 24
expirationTime := time.Until(lastCertificate.ExpireAt)
if expirationTime > renewalInterval {
daysLeft := int(expirationTime.Hours() / 24)
@@ -162,7 +165,7 @@ func (n *applyNode) checkCanSkip(ctx context.Context, lastOutput *domain.Workflo
n.outputs[outputKeyForCertificateValidity] = strconv.FormatBool(true)
n.outputs[outputKeyForCertificateDaysLeft] = strconv.FormatInt(int64(daysLeft), 10)
return true, fmt.Sprintf("the certificate has already been issued (expires in %d day(s), next renewal in %d day(s))", daysLeft, currentNodeConfig.SkipBeforeExpiryDays)
return true, fmt.Sprintf("the certificate has already been issued (expires in %d day(s), next renewal in %d day(s))", daysLeft, thisNodeCfg.SkipBeforeExpiryDays)
}
}
}

View File

@@ -47,6 +47,6 @@ func (n *conditionNode) Process(ctx context.Context) error {
}
func (n *conditionNode) evalExpr(ctx context.Context, expression expr.Expr) (*expr.EvalResult, error) {
variables := GetNodeOutputs(ctx)
variables := GetAllNodeOutputs(ctx)
return expression.Eval(variables)
}

View File

@@ -3,4 +3,5 @@ package nodeprocessor
const (
outputKeyForCertificateValidity = "certificate.validity"
outputKeyForCertificateDaysLeft = "certificate.daysLeft"
outputKeyForNodeSkipped = "node.skipped"
)

View File

@@ -25,6 +25,15 @@ func newNodeOutputsContainer() *nodeOutputsContainer {
}
}
// 获取节点输出容器
func getNodeOutputsContainer(ctx context.Context) *nodeOutputsContainer {
value := ctx.Value(nodeOutputsKey)
if value == nil {
return nil
}
return value.(*nodeOutputsContainer)
}
// 添加节点输出到上下文
func AddNodeOutput(ctx context.Context, nodeId string, output map[string]any) context.Context {
container := getNodeOutputsContainer(ctx)
@@ -50,7 +59,7 @@ func AddNodeOutput(ctx context.Context, nodeId string, output map[string]any) co
func GetNodeOutput(ctx context.Context, nodeId string) map[string]any {
container := getNodeOutputsContainer(ctx)
if container == nil {
return nil
container = newNodeOutputsContainer()
}
container.RLock()
@@ -69,22 +78,11 @@ func GetNodeOutput(ctx context.Context, nodeId string) map[string]any {
return outputCopy
}
// 获取特定节点的特定输出项
func GetNodeOutputValue(ctx context.Context, nodeId string, key string) (any, bool) {
output := GetNodeOutput(ctx, nodeId)
if output == nil {
return nil, false
}
value, exists := output[key]
return value, exists
}
// 获取所有节点输出
func GetNodeOutputs(ctx context.Context) map[string]map[string]any {
func GetAllNodeOutputs(ctx context.Context) map[string]map[string]any {
container := getNodeOutputsContainer(ctx)
if container == nil {
return nil
container = newNodeOutputsContainer()
}
container.RLock()
@@ -103,26 +101,3 @@ func GetNodeOutputs(ctx context.Context) map[string]map[string]any {
return allOutputs
}
// 获取节点输出容器
func getNodeOutputsContainer(ctx context.Context) *nodeOutputsContainer {
value := ctx.Value(nodeOutputsKey)
if value == nil {
return nil
}
return value.(*nodeOutputsContainer)
}
// 检查节点是否有输出
func HasNodeOutput(ctx context.Context, nodeId string) bool {
container := getNodeOutputsContainer(ctx)
if container == nil {
return false
}
container.RLock()
defer container.RUnlock()
_, exists := container.outputs[nodeId]
return exists
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"log/slog"
"strconv"
"strings"
"github.com/usual2970/certimate/internal/deployer"
@@ -59,6 +60,7 @@ func (n *deployNode) Process(ctx context.Context) error {
// 检测是否可以跳过本次执行
if lastOutput != nil && certificate.CreatedAt.Before(lastOutput.UpdatedAt) {
if skippable, reason := n.checkCanSkip(ctx, lastOutput); skippable {
n.outputs[outputKeyForNodeSkipped] = strconv.FormatBool(true)
n.logger.Info(fmt.Sprintf("skip this deployment, because %s", reason))
return nil
} else if reason != "" {
@@ -97,6 +99,9 @@ func (n *deployNode) Process(ctx context.Context) error {
return err
}
// 记录中间结果
n.outputs[outputKeyForNodeSkipped] = strconv.FormatBool(false)
n.logger.Info("deployment completed")
return nil
}
@@ -104,16 +109,17 @@ func (n *deployNode) Process(ctx context.Context) error {
func (n *deployNode) checkCanSkip(ctx context.Context, lastOutput *domain.WorkflowOutput) (_skip bool, _reason string) {
if lastOutput != nil && lastOutput.Succeeded {
// 比较和上次部署时的关键配置(即影响证书部署的)参数是否一致
currentNodeConfig := n.node.GetConfigForDeploy()
lastNodeConfig := lastOutput.Node.GetConfigForDeploy()
if currentNodeConfig.ProviderAccessId != lastNodeConfig.ProviderAccessId {
thisNodeCfg := n.node.GetConfigForDeploy()
lastNodeCfg := lastOutput.Node.GetConfigForDeploy()
if thisNodeCfg.ProviderAccessId != lastNodeCfg.ProviderAccessId {
return false, "the configuration item 'ProviderAccessId' changed"
}
if !maps.Equal(currentNodeConfig.ProviderConfig, lastNodeConfig.ProviderConfig) {
if !maps.Equal(thisNodeCfg.ProviderConfig, lastNodeCfg.ProviderConfig) {
return false, "the configuration item 'ProviderConfig' changed"
}
if currentNodeConfig.SkipOnLastSucceeded {
if thisNodeCfg.SkipOnLastSucceeded {
return true, "the certificate has already been deployed"
}
}

View File

@@ -2,7 +2,9 @@ package nodeprocessor
import (
"context"
"fmt"
"log/slog"
"strconv"
"github.com/usual2970/certimate/internal/domain"
"github.com/usual2970/certimate/internal/notify"
@@ -58,6 +60,12 @@ func (n *notifyNode) Process(ctx context.Context) error {
return nil
}
// 检测是否可以跳过本次执行
if skippable := n.checkCanSkip(ctx); skippable {
n.logger.Info(fmt.Sprintf("skip this notification, because all the previous nodes have been skipped"))
return nil
}
// 初始化通知器
deployer, err := notify.NewWithWorkflowNode(notify.NotifierWithWorkflowNodeConfig{
Node: n.node,
@@ -79,3 +87,21 @@ func (n *notifyNode) Process(ctx context.Context) error {
n.logger.Info("notification completed")
return nil
}
func (n *notifyNode) checkCanSkip(ctx context.Context) (_skip bool) {
thisNodeCfg := n.node.GetConfigForNotify()
if !thisNodeCfg.SkipOnAllPrevSkipped {
return false
}
prevNodeOutputs := GetAllNodeOutputs(ctx)
for _, nodeOutput := range prevNodeOutputs {
if nodeOutput[outputKeyForNodeSkipped] != nil {
if nodeOutput[outputKeyForNodeSkipped].(string) != strconv.FormatBool(true) {
return false
}
}
}
return true
}

View File

@@ -44,6 +44,7 @@ func (n *uploadNode) Process(ctx context.Context) error {
// 检测是否可以跳过本次执行
if skippable, reason := n.checkCanSkip(ctx, lastOutput); skippable {
n.outputs[outputKeyForNodeSkipped] = strconv.FormatBool(true)
n.logger.Info(fmt.Sprintf("skip this uploading, because %s", reason))
return nil
} else if reason != "" {
@@ -71,6 +72,7 @@ func (n *uploadNode) Process(ctx context.Context) error {
}
// 记录中间结果
n.outputs[outputKeyForNodeSkipped] = strconv.FormatBool(false)
n.outputs[outputKeyForCertificateValidity] = strconv.FormatBool(true)
n.outputs[outputKeyForCertificateDaysLeft] = strconv.FormatInt(int64(time.Until(certificate.ExpireAt).Hours()/24), 10)
@@ -81,12 +83,13 @@ func (n *uploadNode) Process(ctx context.Context) error {
func (n *uploadNode) checkCanSkip(ctx context.Context, lastOutput *domain.WorkflowOutput) (_skip bool, _reason string) {
if lastOutput != nil && lastOutput.Succeeded {
// 比较和上次上传时的关键配置(即影响证书上传的)参数是否一致
currentNodeConfig := n.node.GetConfigForUpload()
lastNodeConfig := lastOutput.Node.GetConfigForUpload()
if strings.TrimSpace(currentNodeConfig.Certificate) != strings.TrimSpace(lastNodeConfig.Certificate) {
thisNodeCfg := n.node.GetConfigForUpload()
lastNodeCfg := lastOutput.Node.GetConfigForUpload()
if strings.TrimSpace(thisNodeCfg.Certificate) != strings.TrimSpace(lastNodeCfg.Certificate) {
return false, "the configuration item 'Certificate' changed"
}
if strings.TrimSpace(currentNodeConfig.PrivateKey) != strings.TrimSpace(lastNodeConfig.PrivateKey) {
if strings.TrimSpace(thisNodeCfg.PrivateKey) != strings.TrimSpace(lastNodeCfg.PrivateKey) {
return false, "the configuration item 'PrivateKey' changed"
}