Files
jiacrontab/jiacrontabd/job.go
jiazhizhong 1279635d7f fix
2022-03-10 17:09:03 +08:00

605 lines
16 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package jiacrontabd
import (
"context"
"encoding/json"
"fmt"
"jiacrontab/models"
"jiacrontab/pkg/crontab"
"jiacrontab/pkg/proto"
"jiacrontab/pkg/util"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/iwannay/log"
)
const (
exitError = "Error"
exitKilled = "Killed"
exitSuccess = "Success"
exitDependError = "Dependent job execution failed"
exitTimeout = "Timeout"
)
type process struct {
id uint32
deps []*depEntry
ctx context.Context
cancel context.CancelFunc
err error
startTime time.Time
endTime time.Time
ready chan struct{}
retryNum int
jobEntry *JobEntry
}
func newProcess(id uint32, jobEntry *JobEntry) *process {
p := &process{
id: id,
jobEntry: jobEntry,
startTime: time.Now(),
ready: make(chan struct{}),
}
p.ctx, p.cancel = context.WithCancel(context.Background())
for _, v := range p.jobEntry.detail.DependJobs {
cmd := v.Command
if v.Code != "" {
cmd = append(cmd, v.Code)
}
p.deps = append(p.deps, &depEntry{
jobID: p.jobEntry.detail.ID,
processID: int(id),
jobUniqueID: p.jobEntry.uniqueID,
id: v.ID,
from: v.From,
commands: cmd,
dest: v.Dest,
logPath: filepath.Join(p.jobEntry.jd.getOpts().LogPath, "depend_job", time.Now().Format("2006/01/02"), fmt.Sprintf("%d-%s.log", v.JobID, v.ID)),
done: false,
timeout: v.Timeout,
})
}
return p
}
func (p *process) waitDepExecDone() bool {
var err error
if len(p.deps) == 0 {
return true
}
if p.jobEntry.detail.IsSync {
// 同步
err = p.jobEntry.jd.dispatchDependSync(p.ctx, p.deps, "")
} else {
// 并发模式
err = p.jobEntry.jd.dispatchDependAsync(p.ctx, p.deps)
}
if err != nil {
prefix := fmt.Sprintf("[%s %s] ", time.Now().Format("2006-01-02 15:04:05"), p.jobEntry.jd.getOpts().BoardcastAddr)
p.jobEntry.logContent = append(p.jobEntry.logContent, []byte(prefix+"failed to exec depends\n")...)
return false
}
c := time.NewTimer(3600 * time.Second)
defer c.Stop()
for {
select {
case <-p.ctx.Done():
log.Debugf("jobID:%d exec cancel", p.jobEntry.detail.ID)
return false
case <-c.C:
p.cancel()
log.Errorf("jobID:%d exec dep timeout!", p.jobEntry.detail.ID)
return false
case <-p.ready:
if p.err != nil {
log.Errorf("jobID:%d exec dep error(%s)", p.jobEntry.detail.ID, p.err)
return false
}
log.Debugf("jobID:%d exec all dep done.", p.jobEntry.detail.ID)
return true
}
}
}
func (p *process) exec() error {
var (
ok bool
err error
doneChan = make(chan struct{}, 1)
)
if ok = p.waitDepExecDone(); !ok {
p.jobEntry.handleDepError(p.startTime, p)
} else {
if p.jobEntry.detail.Timeout != 0 {
time.AfterFunc(
time.Duration(p.jobEntry.detail.Timeout)*time.Second, func() {
select {
case <-doneChan:
close(doneChan)
default:
log.Debug("timeout callback:", "jobID:", p.jobEntry.detail.ID)
p.jobEntry.timeoutTrigger(p)
}
})
}
arg := p.jobEntry.detail.Command
if p.jobEntry.detail.Code != "" {
arg = append(arg, p.jobEntry.detail.Code)
}
myCmdUnit := cmdUint{
args: [][]string{arg},
ctx: p.ctx,
dir: p.jobEntry.detail.WorkDir,
user: p.jobEntry.detail.WorkUser,
env: p.jobEntry.detail.WorkEnv,
ip: p.jobEntry.detail.WorkIp,
content: p.jobEntry.logContent,
logDir: p.jobEntry.getLogDir(),
id: p.jobEntry.job.ID,
label: p.jobEntry.detail.Name,
killChildProcess: p.jobEntry.detail.KillChildProcess,
jd: p.jobEntry.jd,
}
if p.jobEntry.once {
myCmdUnit.exportLog = true
}
p.err = myCmdUnit.launch()
p.jobEntry.logContent = myCmdUnit.content
doneChan <- struct{}{}
if p.err != nil {
p.jobEntry.handleNotify(p)
}
}
p.endTime = time.Now()
p.jobEntry.detail.LastCostTime = p.endTime.Sub(p.startTime).Seconds()
log.Infof("%s exec cost %.3fs err(%v)", p.jobEntry.detail.Name, p.jobEntry.detail.LastCostTime, err)
return p.err
}
type JobEntry struct {
job *crontab.Job
detail models.CrontabJob
processNum int32
processes map[uint32]*process
pc int32
wg util.WaitGroupWrapper
logContent []byte
jd *Jiacrontabd
IDChan chan uint32
IDGenerator uint32
mux sync.RWMutex
once bool // 只执行一次
stop int32 // job stop status
uniqueID string
}
func newJobEntry(job *crontab.Job, jd *Jiacrontabd) *JobEntry {
return &JobEntry{
uniqueID: util.UUID(),
job: job,
IDChan: make(chan uint32, 10000),
processes: make(map[uint32]*process),
jd: jd,
}
}
func (j *JobEntry) getLogPath() string {
return filepath.Join(j.jd.getOpts().LogPath, "crontab_task", time.Now().Format("2006/01/02"), fmt.Sprintf("%d.log", j.job.ID))
}
func (j *JobEntry) getLogDir() string {
return filepath.Join(j.jd.getOpts().LogPath, "crontab_task")
}
func (j *JobEntry) setOnce(v bool) {
j.once = v
}
func (j *JobEntry) takeID() uint32 {
for {
select {
case id := <-j.IDChan:
return id
default:
id := atomic.AddUint32(&j.IDGenerator, 1)
if id != 0 {
return id
}
}
}
}
func (j *JobEntry) putID(id uint32) {
select {
case j.IDChan <- id:
default:
}
}
func (j *JobEntry) writeLog() {
writeFile(j.getLogPath(), &j.logContent)
}
func (j *JobEntry) handleDepError(startTime time.Time, p *process) {
cfg := j.jd.getOpts()
err := fmt.Errorf("%s %s exec depend job err(%v)", time.Now().Format(proto.DefaultTimeLayout), cfg.BoardcastAddr, p.err)
endTime := time.Now()
reply := true
j.logContent = append(j.logContent, []byte(err.Error()+"\n")...)
j.detail.LastExitStatus = exitDependError
j.writeLog()
if j.detail.ErrorMailNotify && len(j.detail.MailTo) != 0 {
if err := j.jd.rpcCallCtx(context.TODO(), "Srv.SendMail", proto.SendMail{
MailTo: j.detail.MailTo,
Subject: cfg.BoardcastAddr + "提醒脚本依赖异常退出",
Content: fmt.Sprintf(
"任务名:%s<br/>创建者:%s<br/>开始时间:%s<br/>耗时:%.4f<br/>异常:%s",
j.detail.Name, j.detail.CreatedUsername, startTime.Format(proto.DefaultTimeLayout), endTime.Sub(startTime).Seconds(), err),
}, &reply); err != nil {
log.Error("Srv.SendMail error:", err, "server addr:", cfg.AdminAddr)
}
}
// 钉钉webhook通知
if j.detail.ErrorDingdingNotify && len(j.detail.DingdingTo) != 0 {
nodeAddr := cfg.BoardcastAddr
title := nodeAddr + "告警:脚本依赖异常退出"
notifyContent := fmt.Sprintf("> ###### 来自jiacrontabd: %s 的依赖异常退出报警:\n> ##### 任务id%d\n> ##### 任务名称:%s\n> ##### timeout:%d\n> ##### 尝试次数:%d\n> ##### 异常:%s\n> ##### 报警时间:%s", nodeAddr, int(j.detail.ID), j.detail.Name, int64(j.detail.Timeout), p.retryNum, err, time.Now().Format("2006-01-02 15:04:05"))
notifyBody := fmt.Sprintf(
`{
"msgtype": "markdown",
"markdown": {
"title": "%s",
"text": "%s"
}
}`, title, notifyContent)
if err = j.jd.rpcCallCtx(context.TODO(), "Srv.ApiPost", proto.ApiPost{
Urls: j.detail.DingdingTo,
Data: notifyBody,
}, &reply); err != nil {
log.Error("Srv.ApiPost error:", err, "server addr:", cfg.AdminAddr)
}
}
}
func (j *JobEntry) handleNotify(p *process) {
var (
err error
reply bool
cfg = j.jd.getOpts()
)
if p.err == nil {
return
}
if j.detail.ErrorMailNotify {
if err = j.jd.rpcCallCtx(context.TODO(), "Srv.SendMail", proto.SendMail{
MailTo: j.detail.MailTo,
Subject: cfg.BoardcastAddr + "提醒脚本异常退出",
Content: fmt.Sprintf(
"任务名:%s<br/>创建者:%s<br/>开始时间:%s<br/>异常:%s<br/>重试次数:%d",
j.detail.Name, j.detail.CreatedUsername,
p.startTime.Format(proto.DefaultTimeLayout), p.err.Error(), p.retryNum),
}, &reply); err != nil {
log.Error("Srv.SendMail error:", err, "server addr:", cfg.AdminAddr)
}
}
if j.detail.ErrorAPINotify {
postData, err := json.Marshal(proto.CrontabApiNotifyBody{
NodeAddr: cfg.BoardcastAddr,
JobName: j.detail.Name,
JobID: int(j.detail.ID),
CreateUsername: j.detail.CreatedUsername,
CreatedAt: j.detail.CreatedAt,
Timeout: int64(j.detail.Timeout),
Type: "error",
RetryNum: p.retryNum,
})
if err != nil {
log.Error("json.Marshal error:", err)
return
}
if err = j.jd.rpcCallCtx(context.TODO(), "Srv.ApiPost", proto.ApiPost{
Urls: j.detail.APITo,
Data: string(postData),
}, &reply); err != nil {
log.Error("Srv.ApiPost error:", err, "server addr:", cfg.AdminAddr)
}
}
// 钉钉webhook通知
if j.detail.ErrorDingdingNotify {
nodeAddr := cfg.BoardcastAddr
title := nodeAddr + "告警:脚本异常退出"
notifyContent := fmt.Sprintf("> ###### 来自jiacrontabd: %s 的脚本异常退出报警:\n> ##### 任务id%d\n> ##### 任务名称:%s\n> ##### timeout:%d\n> ##### 尝试次数:%d\n> ##### 报警时间:%s", nodeAddr, int(j.detail.ID), j.detail.Name, int64(j.detail.Timeout), p.retryNum, time.Now().Format("2006-01-02 15:04:05"))
notifyBody := fmt.Sprintf(
`{
"msgtype": "markdown",
"markdown": {
"title": "%s",
"text": "%s"
}
}`, title, notifyContent)
if err = j.jd.rpcCallCtx(context.TODO(), "Srv.ApiPost", proto.ApiPost{
Urls: j.detail.DingdingTo,
Data: notifyBody,
}, &reply); err != nil {
log.Error("Srv.ApiPost error:", err, "server addr:", cfg.AdminAddr)
}
}
}
func (j *JobEntry) timeoutTrigger(p *process) {
var (
err error
reply bool
cfg = j.jd.getOpts()
)
for _, e := range j.detail.TimeoutTrigger {
switch e {
case proto.TimeoutTrigger_CallApi:
j.detail.LastExitStatus = exitTimeout
postData, err := json.Marshal(proto.CrontabApiNotifyBody{
NodeAddr: cfg.BoardcastAddr,
JobName: j.detail.Name,
JobID: int(j.detail.ID),
CreateUsername: j.detail.CreatedUsername,
CreatedAt: j.detail.CreatedAt,
Timeout: int64(j.detail.Timeout),
Type: "timeout",
RetryNum: p.retryNum,
})
if err != nil {
log.Error("json.Marshal error:", err)
}
if err = j.jd.rpcCallCtx(context.TODO(), "Srv.ApiPost err:", proto.ApiPost{
Urls: j.detail.APITo,
Data: string(postData),
}, &reply); err != nil {
log.Error("Srv.ApiPost err:", err, "server addr:", cfg.AdminAddr)
}
case proto.TimeoutTrigger_SendEmail:
j.detail.LastExitStatus = exitTimeout
if err = j.jd.rpcCallCtx(context.TODO(), "Srv.SendMail", proto.SendMail{
MailTo: j.detail.MailTo,
Subject: cfg.BoardcastAddr + "提醒脚本执行超时",
Content: fmt.Sprintf(
"任务名:%s<br/>创建者:%v<br/>开始时间:%s<br/>超时:%ds<br/>重试次数:%d",
j.detail.Name, j.detail.CreatedUsername, p.startTime.Format(proto.DefaultTimeLayout),
j.detail.Timeout, p.retryNum),
}, &reply); err != nil {
log.Error("Srv.SendMail error:", err, "server addr:", cfg.AdminAddr)
}
case proto.TimeoutTrigger_Kill:
j.detail.LastExitStatus = exitTimeout
p.cancel()
case proto.TimeoutTrigger_DingdingWebhook:
j.detail.LastExitStatus = exitTimeout
nodeAddr := cfg.BoardcastAddr
title := nodeAddr + "告警:脚本执行超时"
notifyContent := fmt.Sprintf("> ###### 来自jiacrontabd: %s 的脚本执行超时报警:\n> ##### 任务id%d\n> ##### 任务名称:%s\n> ##### timeout:%d\n> ##### 尝试次数:%d\n> ##### 报警时间:%s", nodeAddr, int(j.detail.ID), j.detail.Name, int64(j.detail.Timeout), p.retryNum, time.Now().Format("2006-01-02 15:04:05"))
notifyBody := fmt.Sprintf(
`{
"msgtype": "markdown",
"markdown": {
"title": "%s",
"text": "%s"
}
}`, title, notifyContent)
if err = j.jd.rpcCallCtx(context.TODO(), "Srv.ApiPost", proto.ApiPost{
Urls: j.detail.DingdingTo,
Data: notifyBody,
}, &reply); err != nil {
log.Error("Srv.ApiPost error:", err, "server addr:", cfg.AdminAddr)
}
default:
log.Error("invalid timeoutTrigger", e)
}
}
}
// GetLog return log data
func (j *JobEntry) GetLog() []byte {
return j.logContent
}
func (j *JobEntry) exec() {
if atomic.LoadInt32(&j.stop) == 1 {
return
}
exec := func() {
var err error
now := time.Now()
if j.once {
err = models.DB().Take(&j.detail, "id=?", j.job.ID).Error
atomic.StoreInt32(&j.processNum, int32(j.detail.ProcessNum))
} else {
err = models.DB().Take(&j.detail, "id=? and status in(?)",
j.job.ID, []models.JobStatus{models.StatusJobTiming, models.StatusJobRunning}).Error
}
if err != nil {
j.jd.deleteJob(j.detail.ID)
log.Warnf("jobID:%d JobEntry.exec:%v", j.detail.ID, err)
return
}
if !j.once {
// 忽略一秒内重复执行的job
if j.detail.LastExecTime.Truncate(time.Second).Equal(now.Truncate(time.Second)) {
log.Infof("ignore repeat job %s", j.detail.Name)
return
}
// 数据库中记录的执行时刻等于计时器中的时刻和现在的时刻才允许执行
execTime := j.detail.NextExecTime.Truncate(time.Second)
if !(execTime.Equal(j.job.GetNextExecTime().Truncate(time.Second)) && execTime.Equal(now.Truncate(time.Second))) {
log.Errorf("%s(%d) JobEntry.exec time error(%s not equal %s)",
j.detail.Name, j.detail.ID, execTime, now)
j.jd.addJob(&crontab.Job{
ID: j.detail.ID,
Second: j.detail.TimeArgs.Second,
Minute: j.detail.TimeArgs.Minute,
Hour: j.detail.TimeArgs.Hour,
Day: j.detail.TimeArgs.Day,
Month: j.detail.TimeArgs.Month,
Weekday: j.detail.TimeArgs.Weekday,
}, false)
return
}
j.jd.addJob(j.job, true)
}
if atomic.LoadInt32(&j.processNum) >= int32(j.detail.MaxConcurrent) && j.detail.MaxConcurrent != 0 {
j.logContent = []byte("不得超过job最大并发数量\n")
return
}
if atomic.LoadInt32(&j.processNum) == 0 {
j.logContent = nil
}
atomic.AddInt32(&j.processNum, 1)
id := j.takeID()
startTime := time.Now()
var endTime time.Time
defer func() {
endTime = time.Now()
atomic.AddInt32(&j.processNum, -1)
j.updateJob(models.StatusJobTiming, startTime, endTime, err)
}()
j.updateJob(models.StatusJobRunning, startTime, endTime, err)
p := newProcess(id, j)
j.mux.Lock()
j.processes[id] = p
j.mux.Unlock()
defer func() {
j.mux.Lock()
delete(j.processes, id)
j.mux.Unlock()
j.putID(id)
}()
for i := 0; i <= j.detail.RetryNum; i++ {
if atomic.LoadInt32(&j.stop) == 1 {
return
}
log.Debug("jobID:", j.detail.ID, "retryNum:", i)
p.retryNum = i
// 执行脚本
if err = p.exec(); err == nil || j.once {
break
}
}
}
if j.once {
exec()
return
}
j.wg.Wrap(exec)
}
func (j *JobEntry) updateJob(status models.JobStatus, startTime, endTime time.Time, err error) {
data := map[string]interface{}{
"status": status,
"process_num": atomic.LoadInt32(&j.processNum),
"last_exit_status": "",
"failed": false,
}
if endTime.After(startTime) {
data["last_cost_time"] = endTime.Sub(startTime).Seconds()
}
var errMsg string
if err != nil {
errMsg = err.Error()
data["last_exit_status"] = errMsg
data["failed"] = true
}
if j.once {
delete(data, "status")
delete(data, "last_exit_status")
}
if status == models.StatusJobTiming {
if err = j.jd.rpcCallCtx(context.TODO(), "Srv.PushJobLog", models.JobHistory{
JobType: models.JobTypeCrontab,
JobID: j.detail.ID,
Addr: j.jd.getOpts().BoardcastAddr,
JobName: j.detail.Name,
StartTime: startTime,
EndTime: endTime,
ExitMsg: errMsg,
}, nil); err != nil {
log.Error("rpc call Srv.PushJobLog failed:", err)
}
}
models.DB().Model(&j.detail).Updates(data)
}
func (j *JobEntry) kill() {
j.exit()
j.waitDone()
}
func (j *JobEntry) waitDone() []byte {
j.wg.Wait()
atomic.StoreInt32(&j.stop, 0)
return j.logContent
}
func (j *JobEntry) exit() {
atomic.StoreInt32(&j.stop, 1)
j.mux.Lock()
for _, v := range j.processes {
v.cancel()
}
j.mux.Unlock()
}