123 lines
2.5 KiB
Go
123 lines
2.5 KiB
Go
package jiacrontabd
|
||
|
||
import (
|
||
"bytes"
|
||
"context"
|
||
"jiacrontab/pkg/proto"
|
||
"time"
|
||
|
||
"github.com/iwannay/log"
|
||
)
|
||
|
||
type depEntry struct {
|
||
jobID uint // 定时任务id
|
||
jobUniqueID string // job的唯一标志
|
||
processID int // 当前依赖的父级任务(可能存在多个并发的task)
|
||
id string // depID uuid
|
||
once bool
|
||
workDir string
|
||
user string
|
||
env []string
|
||
from string
|
||
commands []string
|
||
dest string
|
||
done bool
|
||
timeout int64
|
||
err error
|
||
ctx context.Context
|
||
name string
|
||
logPath string
|
||
logContent []byte
|
||
}
|
||
|
||
func newDependencies(jd *Jiacrontabd) *dependencies {
|
||
return &dependencies{
|
||
jd: jd,
|
||
dep: make(chan *depEntry, 100),
|
||
}
|
||
}
|
||
|
||
type dependencies struct {
|
||
jd *Jiacrontabd
|
||
dep chan *depEntry
|
||
}
|
||
|
||
func (d *dependencies) add(t *depEntry) {
|
||
select {
|
||
case d.dep <- t:
|
||
default:
|
||
log.Warnf("discard %v", t)
|
||
}
|
||
|
||
}
|
||
|
||
func (d *dependencies) run() {
|
||
go func() {
|
||
for {
|
||
select {
|
||
case t := <-d.dep:
|
||
go d.exec(t)
|
||
}
|
||
}
|
||
}()
|
||
}
|
||
|
||
// TODO: 主任务退出杀死依赖
|
||
func (d *dependencies) exec(task *depEntry) {
|
||
|
||
var (
|
||
reply bool
|
||
err error
|
||
)
|
||
|
||
if task.timeout == 0 {
|
||
// 默认超时10分钟
|
||
task.timeout = 600
|
||
}
|
||
|
||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(task.timeout)*time.Second)
|
||
defer cancel()
|
||
myCmdUnit := cmdUint{
|
||
args: [][]string{task.commands},
|
||
ctx: ctx,
|
||
dir: task.workDir,
|
||
user: task.user,
|
||
logPath: task.logPath,
|
||
ignoreFileLog: true,
|
||
jd: d.jd,
|
||
exportLog: true,
|
||
}
|
||
|
||
log.Infof("dep start exec %s->%v", task.name, task.commands)
|
||
task.err = myCmdUnit.launch()
|
||
task.logContent = bytes.TrimRight(myCmdUnit.content, "\x00")
|
||
task.done = true
|
||
log.Infof("exec %s %s cost %.4fs %v", task.name, task.commands, float64(myCmdUnit.costTime)/1000000000, err)
|
||
|
||
task.dest, task.from = task.from, task.dest
|
||
|
||
if !d.jd.SetDependDone(task) {
|
||
err = d.jd.rpcCallCtx(ctx, "Srv.SetDependDone", proto.DepJob{
|
||
Name: task.name,
|
||
Dest: task.dest,
|
||
From: task.from,
|
||
ID: task.id,
|
||
JobUniqueID: task.jobUniqueID,
|
||
ProcessID: task.processID,
|
||
JobID: task.jobID,
|
||
Commands: task.commands,
|
||
LogContent: task.logContent,
|
||
Err: err,
|
||
Timeout: task.timeout,
|
||
}, &reply)
|
||
|
||
if err != nil {
|
||
log.Error("Srv.SetDependDone error:", err, "server addr:", d.jd.getOpts().AdminAddr)
|
||
}
|
||
|
||
if !reply {
|
||
log.Errorf("task %s %v call Srv.SetDependDone failed! err:%v", task.name, task.commands, err)
|
||
}
|
||
}
|
||
}
|