package jiacrontabd import ( "context" "encoding/json" "fmt" "jiacrontab/models" "jiacrontab/pkg/proto" "path/filepath" "strings" "sync" "time" "github.com/iwannay/log" ) type ApiNotifyArgs struct { JobName string JobID uint NodeAddr string CreateUsername string CreatedAt time.Time NotifyType string } type daemonJob struct { job *models.DaemonJob daemon *Daemon ctx context.Context cancel context.CancelFunc processNum int } func (d *daemonJob) do(ctx context.Context) { d.processNum = 1 t := time.NewTicker(1 * time.Second) defer t.Stop() d.daemon.wait.Add(1) cfg := d.daemon.jd.getOpts() retryNum := d.job.RetryNum defer func() { if err := recover(); err != nil { log.Errorf("%s exec panic %s \n", d.job.Name, err) } d.processNum = 0 if err := models.DB().Model(d.job).Update("status", models.StatusJobStop).Error; err != nil { log.Error(err) } d.daemon.wait.Done() }() if err := models.DB().Model(d.job).Updates(map[string]interface{}{ "start_at": time.Now(), "status": models.StatusJobRunning}).Error; err != nil { log.Error(err) } for { var ( stop bool err error ) arg := d.job.Command if d.job.Code != "" { arg = append(arg, d.job.Code) } myCmdUint := cmdUint{ ctx: ctx, args: [][]string{arg}, env: d.job.WorkEnv, ip: d.job.WorkIp, dir: d.job.WorkDir, user: d.job.WorkUser, label: d.job.Name, jd: d.daemon.jd, id: d.job.ID, logDir: filepath.Join(cfg.LogPath, "daemon_job"), } log.Info("exec daemon job, jobName:", d.job.Name, " jobID", d.job.ID) err = myCmdUint.launch() retryNum-- d.handleNotify(err) select { case <-ctx.Done(): stop = true case <-t.C: } if stop || d.job.FailRestart == false || (d.job.RetryNum > 0 && retryNum == 0) { break } if err = d.syncJob(); err != nil { break } } t.Stop() d.daemon.PopJob(d.job.ID) log.Info("daemon task end", d.job.Name) } func (d *daemonJob) syncJob() error { return models.DB().Take(d.job, "id=? and status=?", d.job.ID, models.StatusJobRunning).Error } func (d *daemonJob) handleNotify(err error) { if err == nil { return } var reply bool cfg := d.daemon.jd.getOpts() if d.job.ErrorMailNotify && len(d.job.MailTo) > 0 { var reply bool err := d.daemon.jd.rpcCallCtx(d.ctx, "Srv.SendMail", proto.SendMail{ MailTo: d.job.MailTo, Subject: cfg.BoardcastAddr + "提醒常驻脚本异常退出", Content: fmt.Sprintf( "任务名:%s
创建者:%s
开始时间:%s
异常:%s", d.job.Name, d.job.CreatedUsername, time.Now().Format(proto.DefaultTimeLayout), err), }, &reply) if err != nil { log.Error("Srv.SendMail error:", err, "server addr:", cfg.AdminAddr) } } if d.job.ErrorAPINotify && len(d.job.APITo) > 0 { postData, err := json.Marshal(ApiNotifyArgs{ JobName: d.job.Name, JobID: d.job.ID, CreateUsername: d.job.CreatedUsername, CreatedAt: d.job.CreatedAt, NodeAddr: cfg.BoardcastAddr, NotifyType: "error", }) if err != nil { log.Error("json.Marshal error:", err) } err = d.daemon.jd.rpcCallCtx(d.ctx, "Srv.ApiPost", proto.ApiPost{ Urls: d.job.APITo, Data: string(postData), }, &reply) if err != nil { log.Error("Logic.ApiPost error:", err, "server addr:", cfg.AdminAddr) } } // 钉钉webhook通知 if d.job.ErrorDingdingNotify && len(d.job.DingdingTo) > 0 { nodeAddr := cfg.BoardcastAddr title := nodeAddr + "告警:常驻脚本异常退出" notifyContent := fmt.Sprintf("> ###### 来自jiacrontabd: %s 的常驻脚本异常退出报警:\n> ##### 任务id:%d\n> ##### 任务名称:%s\n> ##### 异常:%s\n> ##### 报警时间:%s", nodeAddr, int(d.job.ID), d.job.Name, err, time.Now().Format("2006-01-02 15:04:05")) notifyBody := fmt.Sprintf( `{ "msgtype": "markdown", "markdown": { "title": "%s", "text": "%s" } }`, title, notifyContent) err = d.daemon.jd.rpcCallCtx(d.ctx, "Srv.ApiPost", proto.ApiPost{ Urls: d.job.DingdingTo, Data: notifyBody, }, &reply) if err != nil { log.Error("Logic.ApiPost error:", err, "server addr:", cfg.AdminAddr) } } } type Daemon struct { taskChannel chan *daemonJob taskMap map[uint]*daemonJob jd *Jiacrontabd lock sync.Mutex wait sync.WaitGroup } func newDaemon(taskChannelLength int, jd *Jiacrontabd) *Daemon { return &Daemon{ taskMap: make(map[uint]*daemonJob), taskChannel: make(chan *daemonJob, taskChannelLength), jd: jd, } } func (d *Daemon) add(t *daemonJob) { if t != nil { if len(t.job.WorkIp) > 0 && !checkIpInWhiteList(strings.Join(t.job.WorkIp, ",")) { if err := models.DB().Model(t.job).Updates(map[string]interface{}{ "status": models.StatusJobStop, //"next_exec_time": time.Time{}, //"last_exit_status": "IP受限制", }).Error; err != nil { log.Error(err) } return } log.Debugf("daemon.add(%s)\n", t.job.Name) t.daemon = d d.taskChannel <- t } } // PopJob 删除调度列表中的任务 func (d *Daemon) PopJob(jobID uint) { d.lock.Lock() t := d.taskMap[jobID] if t != nil { delete(d.taskMap, jobID) d.lock.Unlock() t.cancel() } else { d.lock.Unlock() } } func (d *Daemon) run() { var jobList []models.DaemonJob err := models.DB().Where("status=?", models.StatusJobRunning).Find(&jobList).Error if err != nil { log.Error("init daemon task error:", err) } for _, v := range jobList { job := v d.add(&daemonJob{ job: &job, }) } d.process() } func (d *Daemon) process() { go func() { for v := range d.taskChannel { d.lock.Lock() if t := d.taskMap[v.job.ID]; t == nil { d.taskMap[v.job.ID] = v d.lock.Unlock() v.ctx, v.cancel = context.WithCancel(context.Background()) go v.do(v.ctx) } else { d.lock.Unlock() } } }() } func (d *Daemon) count() int { var count int d.lock.Lock() count = len(d.taskMap) d.lock.Unlock() return count } func (d *Daemon) waitDone() { d.wait.Wait() }