332 lines
6.5 KiB
Go
332 lines
6.5 KiB
Go
package jiacrontabd
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"jiacrontab/pkg/kproc"
|
|
"jiacrontab/pkg/proto"
|
|
"jiacrontab/pkg/util"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime/debug"
|
|
"time"
|
|
|
|
"github.com/iwannay/log"
|
|
)
|
|
|
|
type cmdUint struct {
|
|
ctx context.Context
|
|
id uint
|
|
args [][]string
|
|
logDir string
|
|
content []byte
|
|
logFile *os.File
|
|
label string
|
|
user string
|
|
logPath string
|
|
verboseLog bool
|
|
exportLog bool
|
|
ignoreFileLog bool
|
|
env []string
|
|
ip []string
|
|
killChildProcess bool
|
|
dir string
|
|
startTime time.Time
|
|
costTime time.Duration
|
|
jd *Jiacrontabd
|
|
}
|
|
|
|
func (cu *cmdUint) release() {
|
|
if cu.logFile != nil {
|
|
cu.logFile.Close()
|
|
}
|
|
cu.costTime = time.Now().Sub(cu.startTime)
|
|
}
|
|
|
|
func (cu *cmdUint) launch() error {
|
|
//todo: 需要添加 ip 校验
|
|
defer func() {
|
|
if err := recover(); err != nil {
|
|
log.Errorf("wrapExecScript error:%v\n%s", err, debug.Stack())
|
|
}
|
|
cu.release()
|
|
}()
|
|
cfg := cu.jd.getOpts()
|
|
cu.startTime = time.Now()
|
|
|
|
var err error
|
|
|
|
if err = cu.setLogFile(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(cu.args) > 1 {
|
|
err = cu.pipeExec()
|
|
} else {
|
|
err = cu.exec()
|
|
}
|
|
|
|
if err != nil {
|
|
var errMsg string
|
|
var prefix string
|
|
if cu.verboseLog {
|
|
prefix = fmt.Sprintf("[%s %s %s] ", time.Now().Format(proto.DefaultTimeLayout), cfg.BoardcastAddr, cu.label)
|
|
errMsg = prefix + err.Error() + "\n"
|
|
} else {
|
|
prefix = fmt.Sprintf("[%s %s] ", cfg.BoardcastAddr, cu.label)
|
|
errMsg = prefix + err.Error() + "\n"
|
|
}
|
|
|
|
cu.writeLog([]byte(errMsg))
|
|
if cu.exportLog {
|
|
cu.content = append(cu.content, []byte(errMsg)...)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (cu *cmdUint) setLogFile() error {
|
|
var err error
|
|
|
|
if cu.ignoreFileLog {
|
|
return nil
|
|
}
|
|
if cu.logPath == "" {
|
|
cu.logPath = filepath.Join(cu.logDir, time.Now().Format("2006/01/02"), fmt.Sprintf("%d.log", cu.id))
|
|
}
|
|
|
|
cu.logFile, err = util.TryOpen(cu.logPath, os.O_APPEND|os.O_CREATE|os.O_RDWR)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (cu *cmdUint) writeLog(b []byte) {
|
|
if cu.ignoreFileLog {
|
|
return
|
|
}
|
|
var err error
|
|
logPath := filepath.Join(cu.logDir, time.Now().Format("2006/01/02"), fmt.Sprintf("%d.log", cu.id))
|
|
if logPath != cu.logPath {
|
|
cu.logFile.Close()
|
|
cu.logFile, err = util.TryOpen(logPath, os.O_APPEND|os.O_CREATE|os.O_RDWR)
|
|
if err != nil {
|
|
log.Errorf("writeLog failed - %v", err)
|
|
return
|
|
}
|
|
cu.logPath = logPath
|
|
}
|
|
|
|
cu.logFile.Write(b)
|
|
}
|
|
|
|
func (cu *cmdUint) exec() error {
|
|
log.Debug("cmd exec args:", cu.args)
|
|
if len(cu.args) == 0 {
|
|
return errors.New("invalid args")
|
|
}
|
|
cu.args[0] = util.FilterEmptyEle(cu.args[0])
|
|
cmdName := cu.args[0][0]
|
|
args := cu.args[0][1:]
|
|
cmd := kproc.CommandContext(cu.ctx, cmdName, args...)
|
|
cfg := cu.jd.getOpts()
|
|
|
|
cmd.SetDir(cu.dir)
|
|
cmd.SetEnv(cu.env)
|
|
cmd.SetUser(cu.user)
|
|
cmd.SetExitKillChildProcess(cu.killChildProcess)
|
|
|
|
stdout, err := cmd.StdoutPipe()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer stdout.Close()
|
|
|
|
stderr, err := cmd.StderrPipe()
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
defer stderr.Close()
|
|
|
|
if err := cmd.Start(); err != nil {
|
|
return err
|
|
}
|
|
|
|
reader := bufio.NewReader(stdout)
|
|
readerErr := bufio.NewReader(stderr)
|
|
// 如果已经存在日志则直接写入
|
|
cu.writeLog(cu.content)
|
|
go func() {
|
|
var (
|
|
line []byte
|
|
)
|
|
|
|
for {
|
|
|
|
line, _ = reader.ReadBytes('\n')
|
|
if len(line) == 0 {
|
|
break
|
|
}
|
|
|
|
if cfg.VerboseJobLog {
|
|
prefix := fmt.Sprintf("[%s %s %s] ", time.Now().Format(proto.DefaultTimeLayout), cfg.BoardcastAddr, cu.label)
|
|
line = append([]byte(prefix), line...)
|
|
}
|
|
|
|
if cu.exportLog {
|
|
cu.content = append(cu.content, line...)
|
|
}
|
|
cu.writeLog(line)
|
|
}
|
|
|
|
for {
|
|
line, _ = readerErr.ReadBytes('\n')
|
|
if len(line) == 0 {
|
|
break
|
|
}
|
|
// 默认给err信息加上日期标志
|
|
if cfg.VerboseJobLog {
|
|
prefix := fmt.Sprintf("[%s %s %s] ", time.Now().Format(proto.DefaultTimeLayout), cfg.BoardcastAddr, cu.label)
|
|
line = append([]byte(prefix), line...)
|
|
}
|
|
if cu.exportLog {
|
|
cu.content = append(cu.content, line...)
|
|
}
|
|
cu.writeLog(line)
|
|
}
|
|
}()
|
|
|
|
if err = cmd.Wait(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (cu *cmdUint) pipeExec() error {
|
|
var (
|
|
outBufer bytes.Buffer
|
|
errBufer bytes.Buffer
|
|
cmdEntryList []*pipeCmd
|
|
err, exitError error
|
|
line []byte
|
|
cfg = cu.jd.getOpts()
|
|
)
|
|
|
|
for _, v := range cu.args {
|
|
v = util.FilterEmptyEle(v)
|
|
cmdName := v[0]
|
|
args := v[1:]
|
|
|
|
cmd := kproc.CommandContext(cu.ctx, cmdName, args...)
|
|
|
|
cmd.SetDir(cu.dir)
|
|
cmd.SetEnv(cu.env)
|
|
cmd.SetUser(cu.user)
|
|
cmd.SetExitKillChildProcess(cu.killChildProcess)
|
|
|
|
cmdEntryList = append(cmdEntryList, &pipeCmd{cmd})
|
|
}
|
|
|
|
exitError = execute(&outBufer, &errBufer,
|
|
cmdEntryList...,
|
|
)
|
|
|
|
// 如果已经存在日志则直接写入
|
|
cu.writeLog(cu.content)
|
|
|
|
for {
|
|
|
|
line, err = outBufer.ReadBytes('\n')
|
|
if err != nil || err == io.EOF {
|
|
break
|
|
}
|
|
if cfg.VerboseJobLog {
|
|
prefix := fmt.Sprintf("[%s %s %s] ", time.Now().Format(proto.DefaultTimeLayout), cfg.BoardcastAddr, cu.label)
|
|
line = append([]byte(prefix), line...)
|
|
}
|
|
|
|
cu.content = append(cu.content, line...)
|
|
cu.writeLog(line)
|
|
}
|
|
|
|
for {
|
|
line, err = errBufer.ReadBytes('\n')
|
|
if err != nil || err == io.EOF {
|
|
break
|
|
}
|
|
|
|
if cfg.VerboseJobLog {
|
|
prefix := fmt.Sprintf("[%s %s %s] ", time.Now().Format(proto.DefaultTimeLayout), cfg.BoardcastAddr, cu.label)
|
|
line = append([]byte(prefix), line...)
|
|
}
|
|
|
|
if cu.exportLog {
|
|
cu.content = append(cu.content, line...)
|
|
}
|
|
cu.writeLog(line)
|
|
}
|
|
return exitError
|
|
}
|
|
|
|
func call(stack []*pipeCmd, pipes []*io.PipeWriter) (err error) {
|
|
if stack[0].Process == nil {
|
|
if err = stack[0].Start(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if len(stack) > 1 {
|
|
if err = stack[1].Start(); err != nil {
|
|
return err
|
|
}
|
|
|
|
defer func() {
|
|
pipes[0].Close()
|
|
if err == nil {
|
|
err = call(stack[1:], pipes[1:])
|
|
}
|
|
if err != nil {
|
|
// fixed zombie process
|
|
stack[1].Wait()
|
|
}
|
|
}()
|
|
}
|
|
return stack[0].Wait()
|
|
}
|
|
|
|
type pipeCmd struct {
|
|
*kproc.KCmd
|
|
}
|
|
|
|
func execute(outputBuffer *bytes.Buffer, errorBuffer *bytes.Buffer, stack ...*pipeCmd) (err error) {
|
|
pipeStack := make([]*io.PipeWriter, len(stack)-1)
|
|
i := 0
|
|
for ; i < len(stack)-1; i++ {
|
|
stdinPipe, stdoutPipe := io.Pipe()
|
|
stack[i].Stdout = stdoutPipe
|
|
stack[i].Stderr = errorBuffer
|
|
stack[i+1].Stdin = stdinPipe
|
|
pipeStack[i] = stdoutPipe
|
|
}
|
|
|
|
stack[i].Stdout = outputBuffer
|
|
stack[i].Stderr = errorBuffer
|
|
|
|
if err = call(stack, pipeStack); err != nil {
|
|
errorBuffer.WriteString(err.Error())
|
|
}
|
|
return err
|
|
}
|