Files
jiacrontab/jiacrontabd/cmd.go
jiazhizhong 1279635d7f fix
2022-03-10 17:09:03 +08:00

332 lines
6.5 KiB
Go

package jiacrontabd
import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"jiacrontab/pkg/kproc"
"jiacrontab/pkg/proto"
"jiacrontab/pkg/util"
"os"
"path/filepath"
"runtime/debug"
"time"
"github.com/iwannay/log"
)
type cmdUint struct {
ctx context.Context
id uint
args [][]string
logDir string
content []byte
logFile *os.File
label string
user string
logPath string
verboseLog bool
exportLog bool
ignoreFileLog bool
env []string
ip []string
killChildProcess bool
dir string
startTime time.Time
costTime time.Duration
jd *Jiacrontabd
}
func (cu *cmdUint) release() {
if cu.logFile != nil {
cu.logFile.Close()
}
cu.costTime = time.Now().Sub(cu.startTime)
}
func (cu *cmdUint) launch() error {
//todo: 需要添加 ip 校验
defer func() {
if err := recover(); err != nil {
log.Errorf("wrapExecScript error:%v\n%s", err, debug.Stack())
}
cu.release()
}()
cfg := cu.jd.getOpts()
cu.startTime = time.Now()
var err error
if err = cu.setLogFile(); err != nil {
return err
}
if len(cu.args) > 1 {
err = cu.pipeExec()
} else {
err = cu.exec()
}
if err != nil {
var errMsg string
var prefix string
if cu.verboseLog {
prefix = fmt.Sprintf("[%s %s %s] ", time.Now().Format(proto.DefaultTimeLayout), cfg.BoardcastAddr, cu.label)
errMsg = prefix + err.Error() + "\n"
} else {
prefix = fmt.Sprintf("[%s %s] ", cfg.BoardcastAddr, cu.label)
errMsg = prefix + err.Error() + "\n"
}
cu.writeLog([]byte(errMsg))
if cu.exportLog {
cu.content = append(cu.content, []byte(errMsg)...)
}
return err
}
return nil
}
func (cu *cmdUint) setLogFile() error {
var err error
if cu.ignoreFileLog {
return nil
}
if cu.logPath == "" {
cu.logPath = filepath.Join(cu.logDir, time.Now().Format("2006/01/02"), fmt.Sprintf("%d.log", cu.id))
}
cu.logFile, err = util.TryOpen(cu.logPath, os.O_APPEND|os.O_CREATE|os.O_RDWR)
if err != nil {
return err
}
return nil
}
func (cu *cmdUint) writeLog(b []byte) {
if cu.ignoreFileLog {
return
}
var err error
logPath := filepath.Join(cu.logDir, time.Now().Format("2006/01/02"), fmt.Sprintf("%d.log", cu.id))
if logPath != cu.logPath {
cu.logFile.Close()
cu.logFile, err = util.TryOpen(logPath, os.O_APPEND|os.O_CREATE|os.O_RDWR)
if err != nil {
log.Errorf("writeLog failed - %v", err)
return
}
cu.logPath = logPath
}
cu.logFile.Write(b)
}
func (cu *cmdUint) exec() error {
log.Debug("cmd exec args:", cu.args)
if len(cu.args) == 0 {
return errors.New("invalid args")
}
cu.args[0] = util.FilterEmptyEle(cu.args[0])
cmdName := cu.args[0][0]
args := cu.args[0][1:]
cmd := kproc.CommandContext(cu.ctx, cmdName, args...)
cfg := cu.jd.getOpts()
cmd.SetDir(cu.dir)
cmd.SetEnv(cu.env)
cmd.SetUser(cu.user)
cmd.SetExitKillChildProcess(cu.killChildProcess)
stdout, err := cmd.StdoutPipe()
if err != nil {
return err
}
defer stdout.Close()
stderr, err := cmd.StderrPipe()
if err != nil {
return err
}
defer stderr.Close()
if err := cmd.Start(); err != nil {
return err
}
reader := bufio.NewReader(stdout)
readerErr := bufio.NewReader(stderr)
// 如果已经存在日志则直接写入
cu.writeLog(cu.content)
go func() {
var (
line []byte
)
for {
line, _ = reader.ReadBytes('\n')
if len(line) == 0 {
break
}
if cfg.VerboseJobLog {
prefix := fmt.Sprintf("[%s %s %s] ", time.Now().Format(proto.DefaultTimeLayout), cfg.BoardcastAddr, cu.label)
line = append([]byte(prefix), line...)
}
if cu.exportLog {
cu.content = append(cu.content, line...)
}
cu.writeLog(line)
}
for {
line, _ = readerErr.ReadBytes('\n')
if len(line) == 0 {
break
}
// 默认给err信息加上日期标志
if cfg.VerboseJobLog {
prefix := fmt.Sprintf("[%s %s %s] ", time.Now().Format(proto.DefaultTimeLayout), cfg.BoardcastAddr, cu.label)
line = append([]byte(prefix), line...)
}
if cu.exportLog {
cu.content = append(cu.content, line...)
}
cu.writeLog(line)
}
}()
if err = cmd.Wait(); err != nil {
return err
}
return nil
}
func (cu *cmdUint) pipeExec() error {
var (
outBufer bytes.Buffer
errBufer bytes.Buffer
cmdEntryList []*pipeCmd
err, exitError error
line []byte
cfg = cu.jd.getOpts()
)
for _, v := range cu.args {
v = util.FilterEmptyEle(v)
cmdName := v[0]
args := v[1:]
cmd := kproc.CommandContext(cu.ctx, cmdName, args...)
cmd.SetDir(cu.dir)
cmd.SetEnv(cu.env)
cmd.SetUser(cu.user)
cmd.SetExitKillChildProcess(cu.killChildProcess)
cmdEntryList = append(cmdEntryList, &pipeCmd{cmd})
}
exitError = execute(&outBufer, &errBufer,
cmdEntryList...,
)
// 如果已经存在日志则直接写入
cu.writeLog(cu.content)
for {
line, err = outBufer.ReadBytes('\n')
if err != nil || err == io.EOF {
break
}
if cfg.VerboseJobLog {
prefix := fmt.Sprintf("[%s %s %s] ", time.Now().Format(proto.DefaultTimeLayout), cfg.BoardcastAddr, cu.label)
line = append([]byte(prefix), line...)
}
cu.content = append(cu.content, line...)
cu.writeLog(line)
}
for {
line, err = errBufer.ReadBytes('\n')
if err != nil || err == io.EOF {
break
}
if cfg.VerboseJobLog {
prefix := fmt.Sprintf("[%s %s %s] ", time.Now().Format(proto.DefaultTimeLayout), cfg.BoardcastAddr, cu.label)
line = append([]byte(prefix), line...)
}
if cu.exportLog {
cu.content = append(cu.content, line...)
}
cu.writeLog(line)
}
return exitError
}
func call(stack []*pipeCmd, pipes []*io.PipeWriter) (err error) {
if stack[0].Process == nil {
if err = stack[0].Start(); err != nil {
return err
}
}
if len(stack) > 1 {
if err = stack[1].Start(); err != nil {
return err
}
defer func() {
pipes[0].Close()
if err == nil {
err = call(stack[1:], pipes[1:])
}
if err != nil {
// fixed zombie process
stack[1].Wait()
}
}()
}
return stack[0].Wait()
}
type pipeCmd struct {
*kproc.KCmd
}
func execute(outputBuffer *bytes.Buffer, errorBuffer *bytes.Buffer, stack ...*pipeCmd) (err error) {
pipeStack := make([]*io.PipeWriter, len(stack)-1)
i := 0
for ; i < len(stack)-1; i++ {
stdinPipe, stdoutPipe := io.Pipe()
stack[i].Stdout = stdoutPipe
stack[i].Stderr = errorBuffer
stack[i+1].Stdin = stdinPipe
pipeStack[i] = stdoutPipe
}
stack[i].Stdout = outputBuffer
stack[i].Stderr = errorBuffer
if err = call(stack, pipeStack); err != nil {
errorBuffer.WriteString(err.Error())
}
return err
}