diff --git a/engine/client.go b/engine/client.go index 0516917..c64a9fa 100644 --- a/engine/client.go +++ b/engine/client.go @@ -20,7 +20,8 @@ func NewEngine(db *db.Client, language string) *Engine { return &Engine{ db: db, cron: cron.New(), - tasks: make(map[int]*Task, 0), + tasks: utils.Map[int, *Task]{}, + schedulers: utils.Map[string, scheduler]{}, language: language, } } @@ -32,7 +33,7 @@ type scheduler struct { type Engine struct { db *db.Client cron *cron.Cron - tasks map[int]*Task + tasks utils.Map[int, *Task] language string schedulers utils.Map[string, scheduler] } @@ -59,7 +60,7 @@ func (c *Engine) reloadUsingBuildinDownloader(h *ent.History) error{ if err != nil { return errors.Wrap(err, "download torrent") } - c.tasks[h.ID] = &Task{Torrent: t} + c.tasks.Store(h.ID, &Task{Torrent: t}) return nil } @@ -93,7 +94,7 @@ func (c *Engine) reloadTasks() { log.Warnf("get task error: %v", err) continue } - c.tasks[t.ID] = &Task{Torrent: to} + c.tasks.Store(t.ID, &Task{Torrent: to}) } else if t.Link != "" { to, err := transmission.NewTorrent(transmission.Config{ URL: dl.URL, @@ -104,7 +105,7 @@ func (c *Engine) reloadTasks() { log.Warnf("get task error: %v", err) continue } - c.tasks[t.ID] = &Task{Torrent: to} + c.tasks.Store(t.ID, &Task{Torrent: to}) } } else if dl.Implementation == downloadclients.ImplementationQbittorrent { if t.Hash != "" { @@ -117,7 +118,7 @@ func (c *Engine) reloadTasks() { log.Warnf("get task error: %v", err) continue } - c.tasks[t.ID] = &Task{Torrent: to} + c.tasks.Store(t.ID, &Task{Torrent: to}) } else if t.Link != "" { to, err := qbittorrent.NewTorrent(qbittorrent.Info{ @@ -129,8 +130,7 @@ func (c *Engine) reloadTasks() { log.Warnf("get task error: %v", err) continue } - c.tasks[t.ID] = &Task{Torrent: to} - + c.tasks.Store(t.ID, &Task{Torrent: to}) } } @@ -200,16 +200,16 @@ func (c *Engine) MustTMDB() *tmdb.Client { } func (c *Engine) RemoveTaskAndTorrent(id int) error { - torrent := c.tasks[id] - if torrent != nil { + torrent, ok := c.tasks.Load(id) + if ok { if err := torrent.Remove(); err != nil { return errors.Wrap(err, "remove torrent") } - delete(c.tasks, id) + c.tasks.Delete(id) } return nil } -func (c *Engine) GetTasks() map[int]*Task { +func (c *Engine) GetTasks() utils.Map[int, *Task] { return c.tasks } diff --git a/engine/integration.go b/engine/integration.go index 0990d21..b499504 100644 --- a/engine/integration.go +++ b/engine/integration.go @@ -253,7 +253,7 @@ func (c *Engine) findEpisodeFilesPreMoving(historyId int) error { episodeIds := c.GetEpisodeIds(his) - task := c.tasks[historyId] + task, _ := c.tasks.Load(historyId) ff, err := c.db.GetAcceptedVideoFormats() if err != nil { diff --git a/engine/resources.go b/engine/resources.go index 679f06f..7adbec9 100644 --- a/engine/resources.go +++ b/engine/resources.go @@ -199,7 +199,7 @@ func (c *Engine) downloadTorrent(m *ent.Media, r1 torznab.Result, seasonNum int, } torrent.Start() - c.tasks[history.ID] = &Task{Torrent: torrent} + c.tasks.Store(history.ID, &Task{Torrent: torrent}) c.sendMsg(fmt.Sprintf(message.BeginDownload, name)) diff --git a/engine/scheduler.go b/engine/scheduler.go index ae8de2d..755e1d8 100644 --- a/engine/scheduler.go +++ b/engine/scheduler.go @@ -65,46 +65,50 @@ func (c *Engine) TriggerCronJob(name string) error { func (c *Engine) checkTasks() error { log.Debug("begin check tasks...") - for id, t := range c.tasks { + c.tasks.Range(func(id int, t *Task) bool { r := c.db.GetHistory(id) if !t.Exists() { log.Infof("task no longer exists: %v", id) - delete(c.tasks, id) - continue + c.tasks.Delete(id) + return true } name, err := t.Name() if err != nil { - return errors.Wrap(err, "get name") + log.Warnf("get task name error: %v", err) + return true } progress, err := t.Progress() if err != nil { - return errors.Wrap(err, "get progress") + log.Warnf("get task progress error: %v", err) + return true } log.Infof("task (%s) percentage done: %d%%", name, progress) if progress == 100 { if r.Status == history.StatusSeeding { //task already success, check seed ratio - torrent := c.tasks[id] + torrent, _ := c.tasks.Load(id) ratio, ok := c.isSeedRatioLimitReached(r.IndexerID, torrent) if ok { log.Infof("torrent file seed ratio reached, remove: %v, current seed ratio: %v", name, ratio) torrent.Remove() - delete(c.tasks, id) + c.tasks.Delete(id) c.setHistoryStatus(id, history.StatusSuccess) } else { log.Infof("torrent file still sedding: %v, current seed ratio: %v", name, ratio) } - continue + return true } else if r.Status == history.StatusRunning { log.Infof("task is done: %v", name) c.sendMsg(fmt.Sprintf(message.DownloadComplete, name)) go c.postTaskProcessing(id) } } - } + + return true + }) return nil } @@ -232,7 +236,7 @@ func (c *Engine) GetEpisodeIds(r *ent.History) []int { } func (c *Engine) moveCompletedTask(id int) (err1 error) { - torrent := c.tasks[id] + torrent, _ := c.tasks.Load(id) r := c.db.GetHistory(id) // if r.Status == history.StatusUploading { // log.Infof("task %d is already uploading, skip", id) @@ -258,7 +262,7 @@ func (c *Engine) moveCompletedTask(id int) (err1 error) { c.sendMsg(fmt.Sprintf(message.ProcessingFailed, err1)) if downloadclient.RemoveFailedDownloads { log.Debugf("task failed, remove failed torrent and files related") - delete(c.tasks, r.ID) + c.tasks.Delete(r.ID) torrent.Remove() } } @@ -289,7 +293,7 @@ func (c *Engine) moveCompletedTask(id int) (err1 error) { if downloadclient.RemoveCompletedDownloads && ok { log.Debugf("download complete,remove torrent and files related, torrent: %v, seed ratio: %v", torrentName, r1) c.setHistoryStatus(r.ID, history.StatusSuccess) - delete(c.tasks, r.ID) + c.tasks.Delete(r.ID) torrent.Remove() } else { log.Infof("task complete but still needs seeding: %v", torrentName) diff --git a/server/activity.go b/server/activity.go index 0601e85..b5c310b 100644 --- a/server/activity.go +++ b/server/activity.go @@ -2,6 +2,7 @@ package server import ( "fmt" + "polaris/engine" "polaris/ent" "polaris/ent/blacklist" "polaris/ent/episode" @@ -31,7 +32,8 @@ func (s *Server) GetAllActivities(c *gin.Context) (interface{}, error) { a := Activity{ History: h, } - for id, task := range s.core.GetTasks() { + tasks := s.core.GetTasks() + tasks.Range(func(id int, task *engine.Task) bool { if h.ID == id && task.Exists() { p, err := task.Progress() if err != nil { @@ -49,7 +51,9 @@ func (s *Server) GetAllActivities(c *gin.Context) (interface{}, error) { a.UploadProgress = task.UploadProgresser() } } - } + return true + }) + activities = append(activities, a) } } else {