Files
jiacrontab/jiacrontabd/dependencies.go
jiazhizhong 1279635d7f fix
2022-03-10 17:09:03 +08:00

123 lines
2.5 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package jiacrontabd
import (
"bytes"
"context"
"jiacrontab/pkg/proto"
"time"
"github.com/iwannay/log"
)
type depEntry struct {
jobID uint // 定时任务id
jobUniqueID string // job的唯一标志
processID int // 当前依赖的父级任务可能存在多个并发的task)
id string // depID uuid
once bool
workDir string
user string
env []string
from string
commands []string
dest string
done bool
timeout int64
err error
ctx context.Context
name string
logPath string
logContent []byte
}
func newDependencies(jd *Jiacrontabd) *dependencies {
return &dependencies{
jd: jd,
dep: make(chan *depEntry, 100),
}
}
type dependencies struct {
jd *Jiacrontabd
dep chan *depEntry
}
func (d *dependencies) add(t *depEntry) {
select {
case d.dep <- t:
default:
log.Warnf("discard %v", t)
}
}
func (d *dependencies) run() {
go func() {
for {
select {
case t := <-d.dep:
go d.exec(t)
}
}
}()
}
// TODO: 主任务退出杀死依赖
func (d *dependencies) exec(task *depEntry) {
var (
reply bool
err error
)
if task.timeout == 0 {
// 默认超时10分钟
task.timeout = 600
}
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(task.timeout)*time.Second)
defer cancel()
myCmdUnit := cmdUint{
args: [][]string{task.commands},
ctx: ctx,
dir: task.workDir,
user: task.user,
logPath: task.logPath,
ignoreFileLog: true,
jd: d.jd,
exportLog: true,
}
log.Infof("dep start exec %s->%v", task.name, task.commands)
task.err = myCmdUnit.launch()
task.logContent = bytes.TrimRight(myCmdUnit.content, "\x00")
task.done = true
log.Infof("exec %s %s cost %.4fs %v", task.name, task.commands, float64(myCmdUnit.costTime)/1000000000, err)
task.dest, task.from = task.from, task.dest
if !d.jd.SetDependDone(task) {
err = d.jd.rpcCallCtx(ctx, "Srv.SetDependDone", proto.DepJob{
Name: task.name,
Dest: task.dest,
From: task.from,
ID: task.id,
JobUniqueID: task.jobUniqueID,
ProcessID: task.processID,
JobID: task.jobID,
Commands: task.commands,
LogContent: task.logContent,
Err: err,
Timeout: task.timeout,
}, &reply)
if err != nil {
log.Error("Srv.SetDependDone error:", err, "server addr:", d.jd.getOpts().AdminAddr)
}
if !reply {
log.Errorf("task %s %v call Srv.SetDependDone failed! err:%v", task.name, task.commands, err)
}
}
}