From 7b02eeac5154dcc803ebe9a8b6effc95435535a8 Mon Sep 17 00:00:00 2001 From: Simon Ding Date: Tue, 5 Nov 2024 18:54:40 +0800 Subject: [PATCH] WIP: upload with progress --- pkg/uploader/uploader.go | 86 ++++++++++++++++++++++++++++++++++++++++ pkg/utils/utils.go | 20 ++++++++-- 2 files changed, 103 insertions(+), 3 deletions(-) create mode 100644 pkg/uploader/uploader.go diff --git a/pkg/uploader/uploader.go b/pkg/uploader/uploader.go new file mode 100644 index 0000000..4c5c34a --- /dev/null +++ b/pkg/uploader/uploader.go @@ -0,0 +1,86 @@ +package uploader + +import ( + "fmt" + "io" + "os" + "polaris/pkg/utils" + "sync/atomic" + "time" +) + +type StreamWriter interface { + WriteStream(path string, stream io.Reader, _ os.FileMode) error +} + +type Uploader struct { + sw StreamWriter + progress atomic.Int64 + dir string + size int64 +} + +func NewUploader(dir string, sw StreamWriter) (*Uploader, error) { + size, err := utils.DirSize(dir) + if err != nil { + return nil, err + } + return &Uploader{sw: sw, dir: dir, size: size, progress: atomic.Int64{}}, nil +} + +func (u *Uploader) Upload() error { + + return nil +} + +type ProgressReader struct { + Reader io.Reader + Progress atomic.Int64 + Size int64 + Name string + Once bool + Done atomic.Bool +} + +func (progressReader *ProgressReader) NewLoop() { + ticker := time.NewTicker(time.Second) + var op int64 + for range ticker.C { + p := progressReader.Progress.Load() + KB := (p - op) / 1024 + var percent int64 + if progressReader.Size != 0 { + percent = p * 100 / progressReader.Size + } else { + percent = 100 + } + if KB < 1024 { + fmt.Printf("%s: %dKB/s %d%%\n", progressReader.Name, KB, percent) + } else { + fmt.Printf("%s: %.2fMB/s %d%%\n", progressReader.Name, float64(KB)/1024, percent) + } + + if progressReader.Done.Load() { + ticker.Stop() + return + } + } +} + +func (progressReader *ProgressReader) Read(p []byte) (int, error) { + n, err := progressReader.Reader.Read(p) + progressReader.Progress.Add(int64(n)) + if !progressReader.Once { + progressReader.Once = true + go progressReader.NewLoop() + } + if err != nil { + progressReader.Done.Store(true) + } + return n, err +} + +func (progressReader *ProgressReader) Close() error { + progressReader.Done.Store(true) + return nil +} diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index a06e1da..242b792 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -5,6 +5,7 @@ import ( "net/http" "os" "os/exec" + "path/filepath" "regexp" "runtime" "strconv" @@ -230,7 +231,7 @@ func Link2Magnet(link string) (string, error) { return http.ErrUseLastResponse //do not follow redirects }, } - + resp, err := client.Get(link) if err != nil { return "", errors.Wrap(err, "get link") @@ -252,7 +253,6 @@ func Link2Magnet(link string) (string, error) { return mg.String(), nil } - func MagnetHash(link string) (string, error) { if mi, err := metainfo.ParseMagnetV2Uri(link); err != nil { return "", errors.Errorf("magnet link is not valid: %v", err) @@ -271,4 +271,18 @@ func MagnetHash(link string) (string, error) { } return hash, nil } -} \ No newline at end of file +} + +func DirSize(path string) (int64, error) { + var size int64 + err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() { + size += info.Size() + } + return err + }) + return size, err +}