package jiacrontabd import ( "errors" "fmt" "jiacrontab/models" "jiacrontab/pkg/crontab" "jiacrontab/pkg/file" "jiacrontab/pkg/finder" "jiacrontab/pkg/proto" "jiacrontab/pkg/util" "os" "path/filepath" "strings" "time" "github.com/iwannay/log" ) type Srv struct { jd *Jiacrontabd } func newSrv(jd *Jiacrontabd) *Srv { return &Srv{ jd: jd, } } func (s *Srv) Ping(args proto.EmptyArgs, reply *proto.EmptyReply) error { return nil } func (s *Srv) SystemInfo(args proto.EmptyArgs, reply *map[string]interface{}) error { *reply = util.SystemInfo(s.jd.startTime) (*reply)["job日志文件大小"] = file.FileSize(file.DirSize(s.jd.getOpts().LogPath)) return nil } func (s *Srv) CleanLogFiles(args proto.CleanNodeLog, reply *proto.CleanNodeLogRet) error { dir := s.jd.getOpts().LogPath var t time.Time if args.Unit == "month" { t = time.Now().AddDate(0, -args.Offset, 0) } else if args.Unit == "day" { t = time.Now().AddDate(0, 0, -args.Offset) } total, size, err := file.Remove(dir, t) reply.Total = total reply.Size = file.FileSize(size) return err } type CrontabJob struct { jd *Jiacrontabd } func newCrontabJobSrv(jd *Jiacrontabd) *CrontabJob { return &CrontabJob{ jd: jd, } } func (j *CrontabJob) List(args proto.QueryJobArgs, reply *proto.QueryCrontabJobRet) error { model := models.DB().Model(&models.CrontabJob{}) if args.SearchTxt != "" { txt := "%" + args.SearchTxt + "%" model = model.Where("(name like ? or command like ? or code like ?)", txt, txt, txt) } if args.GroupID == models.SuperGroup.ID { } else if args.Root { model = model.Where("group_id=?", args.GroupID) } else { model = model.Where("created_user_id=? and group_id=?", args.UserID, args.GroupID) } err := model.Count(&reply.Total).Error if err != nil { return err } reply.Page = args.Page reply.Pagesize = args.Pagesize return model.Order(fmt.Sprintf("created_user_id=%d desc, id desc", args.UserID)).Offset((args.Page - 1) * args.Pagesize).Limit(args.Pagesize).Find(&reply.List).Error } func (j *CrontabJob) Audit(args proto.AuditJobArgs, reply *[]models.CrontabJob) error { model := models.DB().Model(&models.CrontabJob{}) if args.GroupID == models.SuperGroup.ID { model = model.Where("id in (?)", args.JobIDs) } else { model = model.Where("id in (?) and group_id=?", args.JobIDs, args.GroupID) } defer model.Find(reply) return model.Where("status=?", models.StatusJobUnaudited).Update("status", models.StatusJobOk).Error } func (j *CrontabJob) Edit(args proto.EditCrontabJobArgs, reply *models.CrontabJob) error { var ( model = models.DB() ) if args.Job.MaxConcurrent == 0 { args.Job.MaxConcurrent = 1 } if args.Job.ID == 0 { model = models.DB().Save(&args.Job) } else { // we should kill the job j.jd.killTask(args.Job.ID) j.jd.mux.Lock() delete(j.jd.jobs, args.Job.ID) j.jd.mux.Unlock() if args.GroupID == models.SuperGroup.ID { model = model.Where("id=?", args.Job.ID) } else if args.Root { model = model.Where("id=? and group_id=?", args.Job.ID, args.Job.GroupID) } else { model = model.Where("id=? and created_user_id=? and group_id=?", args.Job.ID, args.Job.CreatedUserID, args.Job.GroupID) } args.Job.NextExecTime = time.Time{} model = model.Omit( "updated_at", "created_at", "deleted_at", "created_user_id", "created_username", "last_cost_time", "last_exec_time", "group_id", "last_exit_status", "process_num", ).Save(&args.Job) } *reply = args.Job return model.Error } func (j *CrontabJob) Get(args proto.GetJobArgs, reply *models.CrontabJob) error { model := models.DB() if args.GroupID == models.SuperGroup.ID { model = model.Where("id=?", args.JobID) } else if args.Root { model = model.Where("id=? and group_id=?", args.JobID, args.GroupID) } else { model = model.Where("id=? and created_user_id=? and group_id=?", args.JobID, args.UserID, args.GroupID) } return model.Find(reply).Error } func (j *CrontabJob) Start(args proto.ActionJobsArgs, jobs *[]models.CrontabJob) error { model := models.DB() if len(args.JobIDs) == 0 { return errors.New("empty ids") } if args.GroupID == models.SuperGroup.ID { model = model.Where("id in (?) and status in (?)", args.JobIDs, []models.JobStatus{models.StatusJobOk, models.StatusJobStop}) } else if args.Root { model = model.Where("id in (?) and status in (?) and group_id=?", args.JobIDs, []models.JobStatus{models.StatusJobOk, models.StatusJobStop}, args.GroupID) } else { model = model.Where("created_user_id = ? and id in (?) and status in (?) and group_id=?", args.UserID, args.JobIDs, []models.JobStatus{models.StatusJobOk, models.StatusJobStop}, args.GroupID) } ret := model.Find(jobs) if ret.Error != nil { return ret.Error } for _, v := range *jobs { err := j.jd.addJob(&crontab.Job{ ID: v.ID, Second: v.TimeArgs.Second, Minute: v.TimeArgs.Minute, Hour: v.TimeArgs.Hour, Day: v.TimeArgs.Day, Month: v.TimeArgs.Month, Weekday: v.TimeArgs.Weekday, }, false) if err != nil { return err } } return nil } func (j *CrontabJob) Stop(args proto.ActionJobsArgs, jobs *[]models.CrontabJob) error { model := models.DB().Model(&models.CrontabJob{}) if args.GroupID == models.SuperGroup.ID { model = model.Where("id in (?) and status in (?)", args.JobIDs, []models.JobStatus{models.StatusJobTiming, models.StatusJobRunning}) } else if args.Root { model = model.Where("id in (?) and status in (?) and group_id=?", args.JobIDs, []models.JobStatus{models.StatusJobTiming, models.StatusJobRunning}, args.GroupID) } else { model = model.Where("created_user_id = ? and id in (?) and status in (?) and group_id=?", args.UserID, args.JobIDs, []models.JobStatus{models.StatusJobTiming, models.StatusJobRunning}, args.GroupID) } for _, jobID := range args.JobIDs { j.jd.killTask(jobID) } return model.Updates(map[string]interface{}{ "status": models.StatusJobStop, "next_exec_time": time.Time{}, }).Find(jobs).Error } func (j *CrontabJob) Delete(args proto.ActionJobsArgs, job *[]models.CrontabJob) error { model := models.DB() if args.GroupID == models.SuperGroup.ID { model = model.Where("id in (?)", args.JobIDs) } else if args.Root { model = model.Where("id in (?) and group_id=?", args.JobIDs, args.GroupID) } else { model = model.Where("created_user_id = ? and id in (?) and group_id=?", args.UserID, args.JobIDs, args.GroupID) } return model.Find(job).Delete(&models.CrontabJob{}).Error } func (j *CrontabJob) Kill(args proto.ActionJobsArgs, job *[]models.CrontabJob) error { model := models.DB() if args.GroupID == models.SuperGroup.ID { model = model.Where("id in (?)", args.JobIDs) } else if args.Root { model = model.Where("id in (?) and group_id=?", args.JobIDs, args.GroupID) } else { model = model.Where("created_user_id = ? and id in (?) and group_id=?", args.UserID, args.JobIDs, args.GroupID) } err := model.Take(job).Error if err != nil { return err } for _, jobID := range args.JobIDs { j.jd.killTask(jobID) } return nil } func (j *CrontabJob) Execs(args proto.ActionJobsArgs, reply *[]models.CrontabJob) error { model := models.DB() if args.GroupID == models.SuperGroup.ID { model = model.Where("id in (?)", args.JobIDs) } else if args.Root { model = model.Where("id in (?) and group_id=?", args.JobIDs, args.GroupID) } else { model = model.Where("created_user_id=? and id in (?) and group_id=?", args.UserID, args.JobIDs, args.GroupID) } var jobs []models.CrontabJob if err := model.Find(&jobs).Error; err != nil { return err } for _, v := range jobs { *reply = append(*reply, v) go func(v models.CrontabJob) { ins := newJobEntry(&crontab.Job{ ID: v.ID, Value: v, }, j.jd) ins.setOnce(true) j.jd.addTmpJob(ins) defer j.jd.removeTmpJob(ins) ins.once = true ins.exec() }(v) } return nil } func (j *CrontabJob) Exec(args proto.GetJobArgs, reply *proto.ExecCrontabJobReply) error { model := models.DB() if args.GroupID == models.SuperGroup.ID { model = model.Where("id=?", args.JobID) } else if args.Root { model = model.Where("id=? and group_id=?", args.JobID, args.GroupID) } else { model = model.Where("created_user_id = ? and id=? and group_id=?", args.UserID, args.JobID, args.GroupID) } err := model.Take(&reply.Job).Error if err == nil { ins := newJobEntry(&crontab.Job{ ID: reply.Job.ID, Value: reply.Job, }, j.jd) ins.setOnce(true) j.jd.addTmpJob(ins) defer j.jd.removeTmpJob(ins) ins.once = true ins.exec() reply.Content = ins.GetLog() } else { reply.Content = []byte(err.Error()) } return err } func (j *CrontabJob) Log(args proto.SearchLog, reply *proto.SearchLogResult) error { fd := finder.NewFinder(func(info os.FileInfo) bool { basename := filepath.Base(info.Name()) arr := strings.Split(basename, ".") if len(arr) != 2 { return false } if arr[1] == "log" && arr[0] == fmt.Sprint(args.JobID) { return true } return false }) if args.Date == "" { args.Date = time.Now().Format("2006/01/02") } if args.IsTail { fd.SetTail(true) } rootpath := filepath.Join(j.jd.getOpts().LogPath, "crontab_task", args.Date) err := fd.Search(rootpath, args.Pattern, &reply.Content, args.Offset, args.Pagesize) reply.Offset = fd.Offset() reply.FileSize = fd.FileSize() return err } // SetDependDone 依赖执行完毕时设置相关状态 func (j *CrontabJob) SetDependDone(args proto.DepJob, reply *bool) error { *reply = j.jd.SetDependDone(&depEntry{ jobID: args.JobID, processID: args.ProcessID, jobUniqueID: args.JobUniqueID, id: args.ID, dest: args.Dest, from: args.From, done: true, logContent: args.LogContent, err: args.Err, }) return nil } // ExecDepend 执行依赖 func (j *CrontabJob) ExecDepend(args proto.DepJob, reply *bool) error { j.jd.dep.add(&depEntry{ jobUniqueID: args.JobUniqueID, processID: args.ProcessID, jobID: args.JobID, id: args.ID, dest: args.Dest, from: args.From, name: args.Name, commands: args.Commands, }) *reply = true log.Infof("job %s %v add to execution queue ", args.Name, args.Commands) return nil } func (j *CrontabJob) Ping(args *proto.EmptyArgs, reply *proto.EmptyReply) error { return nil } type DaemonJob struct { jd *Jiacrontabd } func newDaemonJobSrv(jd *Jiacrontabd) *DaemonJob { return &DaemonJob{ jd: jd, } } func (j *DaemonJob) List(args proto.QueryJobArgs, reply *proto.QueryDaemonJobRet) error { model := models.DB().Model(&models.DaemonJob{}) if args.SearchTxt != "" { txt := "%" + args.SearchTxt + "%" model = model.Where("(name like ? or command like ? or code like ?)", txt, txt, txt) } if args.GroupID == models.SuperGroup.ID { } else if args.Root { model = model.Where("group_id=?", args.GroupID) } else { model = model.Where("created_user_id=? and group_id=?", args.UserID, args.GroupID) } err := model.Count(&reply.Total).Error if err != nil { return err } reply.Page = args.Page reply.Pagesize = args.Pagesize return model.Order(fmt.Sprintf("created_user_id=%d desc, id desc", args.UserID)).Offset((args.Page - 1) * args.Pagesize).Limit(args.Pagesize).Find(&reply.List).Error } func (j *DaemonJob) Edit(args proto.EditDaemonJobArgs, job *models.DaemonJob) error { model := models.DB() if args.Job.ID == 0 { model = models.DB().Create(&args.Job) } else { j.jd.daemon.lock.Lock() delete(j.jd.daemon.taskMap, args.Job.ID) j.jd.daemon.lock.Unlock() if args.GroupID == models.SuperGroup.ID { model = model.Where("id=?", args.Job.ID) } else if args.Root { model = model.Where("id=? and group_id=?", args.Job.ID, args.GroupID) } else { model = model.Where("id=? and created_user_id=? and group_id=?", args.Job.ID, args.Job.CreatedUserID, args.GroupID) } model = model.Omit( "updated_at", "created_at", "deleted_at", "group_id", "created_user_id", "created_username", "start_at").Save(&args.Job) } *job = args.Job return model.Error } func (j *DaemonJob) Start(args proto.ActionJobsArgs, jobs *[]models.DaemonJob) error { model := models.DB() if args.GroupID == models.SuperGroup.ID { model = model.Where("id in (?) and status in (?)", args.JobIDs, []models.JobStatus{models.StatusJobOk, models.StatusJobStop}) } else if args.Root { model = model.Where("id in (?) and status in (?) and group_id=?", args.JobIDs, []models.JobStatus{models.StatusJobOk, models.StatusJobStop}, args.GroupID) } else { model = model.Where("created_user_id = ? and id in (?) and status in (?) and group_id=?", args.UserID, args.JobIDs, []models.JobStatus{models.StatusJobOk, models.StatusJobStop}, args.GroupID) } ret := model.Find(&jobs) if ret.Error != nil { return ret.Error } for _, v := range *jobs { job := v j.jd.daemon.add(&daemonJob{ job: &job, }) } return nil } func (j *DaemonJob) Stop(args proto.ActionJobsArgs, jobs *[]models.DaemonJob) error { model := models.DB() if args.GroupID == models.SuperGroup.ID { model = model.Where("id in (?) and status in (?)", args.JobIDs, []models.JobStatus{models.StatusJobRunning, models.StatusJobTiming}) } else if args.Root { model = model.Where("id in (?) and status in (?) and group_id=?", args.JobIDs, []models.JobStatus{models.StatusJobRunning, models.StatusJobTiming}, args.GroupID) } else { model = model.Where("created_user_id = ? and id in (?) and status in (?) and group_id=?", args.UserID, args.JobIDs, []models.JobStatus{models.StatusJobRunning, models.StatusJobTiming}, args.GroupID) } if err := model.Find(jobs).Error; err != nil { return err } args.JobIDs = nil for _, job := range *jobs { args.JobIDs = append(args.JobIDs, job.ID) j.jd.daemon.PopJob(job.ID) } return model.Model(&models.DaemonJob{}).Update("status", models.StatusJobStop).Error } func (j *DaemonJob) Delete(args proto.ActionJobsArgs, jobs *[]models.DaemonJob) error { model := models.DB() if args.GroupID == models.SuperGroup.ID { model = model.Where("id in (?)", args.JobIDs) } else if args.Root { model = model.Where("id in (?) and group_id=?", args.JobIDs, args.GroupID) } else { model = model.Where("created_user_id = ? and id in(?) and group_id=?", args.UserID, args.JobIDs, args.GroupID) } if err := model.Find(jobs).Error; err != nil { return err } for _, job := range *jobs { j.jd.daemon.PopJob(job.ID) } return model.Delete(&models.DaemonJob{}).Error } func (j *DaemonJob) Get(args proto.GetJobArgs, job *models.DaemonJob) error { model := models.DB() if args.GroupID == models.SuperGroup.ID { model = model.Where("id=?", args.JobID) } else if args.Root { model = model.Where("id=? and group_id=?", args.JobID, args.GroupID) } else { model = model.Where("id=? and group_id=? and created_user_id=?", args.JobID, args.GroupID, args.UserID) } return model.Take(job).Error } func (j *DaemonJob) Log(args proto.SearchLog, reply *proto.SearchLogResult) error { fd := finder.NewFinder(func(info os.FileInfo) bool { basename := filepath.Base(info.Name()) arr := strings.Split(basename, ".") if len(arr) != 2 { return false } if arr[1] == "log" && arr[0] == fmt.Sprint(args.JobID) { return true } return false }) if args.Date == "" { args.Date = time.Now().Format("2006/01/02") } if args.IsTail { fd.SetTail(true) } rootpath := filepath.Join(j.jd.getOpts().LogPath, "daemon_job", args.Date) err := fd.Search(rootpath, args.Pattern, &reply.Content, args.Offset, args.Pagesize) reply.Offset = fd.Offset() reply.FileSize = fd.FileSize() return err } func (j *DaemonJob) Audit(args proto.AuditJobArgs, jobs *[]models.DaemonJob) error { model := models.DB().Model(&models.DaemonJob{}) if args.GroupID == models.SuperGroup.ID { model = model.Where("id in (?)", args.JobIDs) } else if args.Root { model = model.Where("id in (?) and group_id=?", args.JobIDs, args.GroupID) } else { model = model.Where("id in (?) and group_id=? and created_user_id=?", args.JobIDs, args.GroupID, args.UserID) } return model.Where("status=?", models.StatusJobUnaudited).Find(jobs).Update("status", models.StatusJobOk).Error }