forgepb/rill.go

263 lines
6.3 KiB
Go

package forgepb
import (
"sync"
"time"
"github.com/destel/rill"
"go.wit.com/lib/protobuf/gitpb"
"go.wit.com/log"
)
// rill is awesome. long live rill
// attempt scan with rill
func (f *Forge) rillUpdate(pool1 int, pool2 int) (int, error) {
var repos []*gitpb.Repo
all := f.Repos.SortByFullPath()
for all.Scan() {
repo := all.Next()
repos = append(repos, repo)
}
// Convert a slice of user IDs into a channel
ids := rill.FromSlice(repos, nil)
// Read users from the API.
// Concurrency = 20
rills := rill.Map(ids, pool1, func(repo *gitpb.Repo) (*gitpb.Repo, error) {
return repo, nil
})
var counter int
// Activate users.
// Concurrency = 10
err := rill.ForEach(rills, pool2, func(repo *gitpb.Repo) error {
counter += 1
// log.Info("rill.ForEach() gopath=", repo.GetGoPath())
return f.updateRepo(repo)
})
return counter, err
}
func (f *Forge) updateRepo(repo *gitpb.Repo) error {
if !repo.IsValidDir() {
log.Printf("%10s %-50s gopath=%s\n", "git dir is missing:", repo.FullPath, repo.GetGoPath())
f.Repos.DeleteByFullPath(repo.FullPath)
f.configSave = true
return nil
}
if repo.DidRepoChange() {
f.configSave = true
// log.Info("repo changed ", repo.FullPath, repo.StateChange)
if err := repo.Reload(); err != nil {
return err
}
} else {
// log.Info("repo did not change", repo.FullPath, repo.StateChange)
}
if f.Config.IsReadOnly(repo.GetGoPath()) {
if repo.ReadOnly {
} else {
log.Info("readonly flag on repo is wrong", repo.GetGoPath())
repo.ReadOnly = true
f.configSave = true
}
}
return nil
}
var RillX int = 10
var RillY int = 20
// x is the size of the queued up pool (shouldn't matter here for this I think)
// y is how many simultanous functions will run
// todo: tune and compute x,y by # of CPUs and disk io
// todo: store x,y in forge config ? (or compute them. notsure)
func (f *Forge) RillReload() int {
var all []*gitpb.Repo
for repo := range f.Repos.IterAll() {
if !repo.IsValidDir() {
log.Printf("%s %-50s", "got an invalid repo in forgepb.RillReload()", repo.GetGoPath())
continue
}
all = append(all, repo)
}
// Convert a slice of user IDs into a channel
ids := rill.FromSlice(all, nil)
var counter int
// Read users from the API.
// Concurrency = 20
dirs := rill.Map(ids, RillX, func(repo *gitpb.Repo) (*gitpb.Repo, error) {
return repo, nil
})
rill.ForEach(dirs, RillY, func(repo *gitpb.Repo) error {
if !repo.DidRepoChange() {
return nil
}
f.configSave = true
repo.Reload()
counter += 1
return nil
})
return counter
}
// x is the size of the queued up pool (shouldn't matter here for this I think)
// y is how many simultanous functions will run
// todo: tune and compute x,y by # of CPUs and disk io
// todo: store x,y in forge config ? (or compute them. notsure)
func (f *Forge) RillFuncError(rillf func(*gitpb.Repo) error) map[string]*RillStats {
return f.RillRepos(rillf)
/*
var all []*gitpb.Repo
var stats map[string]*RillStats
stats = make(map[string]*RillStats)
for repo := range f.Repos.IterAll() {
if !repo.IsValidDir() {
log.Printf("%s %-50s", "got an invalid repo in forgepb.RillFuncError()", repo.GetGoPath())
continue
}
all = append(all, repo)
}
// Convert a slice of user IDs into a channel
ids := rill.FromSlice(all, nil)
var counter int
var watch int = 10
var meMu sync.Mutex
// Read users from the API.
// Concurrency = 20
dirs := rill.Map(ids, RillX, func(id *gitpb.Repo) (*gitpb.Repo, error) {
return id, nil
})
err := rill.ForEach(dirs, RillY, func(repo *gitpb.Repo) error {
meMu.Lock()
counter += 1
if counter > watch {
// log.Info("Processed", watch, "repos") // this doesn't work
watch += 50
}
meMu.Unlock()
rillSetStartTime(stats, repo.GetFullPath())
err := rillf(repo)
if err != nil {
rillSetError(stats, repo.GetFullPath(), err)
}
rillSetEndTime(stats, repo.GetFullPath())
return err
})
if err != nil {
log.Info("rill.ForEach() error:", err)
}
return stats
*/
}
func (f *Forge) ConfigRill(rillX int, rillY int) {
f.rillX = rillX
f.rillY = rillY
log.Infof("Setting rill values to %d,%d\n", f.rillX, f.rillY)
}
type RillStats struct {
Err error
Start time.Time
End time.Time
}
var rillMu sync.Mutex
// x is the size of the queued up pool (shouldn't matter here for this I think)
// y is how many simultanous functions will run
// todo: tune and compute x,y by # of CPUs and disk io
// todo: store x,y in forge config ? (or compute them. notsure)
func (f *Forge) RillRepos(rillf func(*gitpb.Repo) error) map[string]*RillStats {
var all []*gitpb.Repo
var stats map[string]*RillStats
stats = make(map[string]*RillStats)
for repo := range f.Repos.IterAll() {
if !repo.IsValidDir() {
log.Printf("got an invalid repo in forgepb.RillRepos() %-50s\n", repo.GetGoPath())
continue
}
all = append(all, repo)
}
// Convert a slice of user IDs into a channel
ids := rill.FromSlice(all, nil)
var counter int
var watch int = 10
// Read users from the API.
// Concurrency = 20
dirs := rill.Map(ids, f.rillX, func(id *gitpb.Repo) (*gitpb.Repo, error) {
return id, nil
})
rill.ForEach(dirs, f.rillY, func(repo *gitpb.Repo) error {
// todo: make this a goroutine to show stats to the user
rillMu.Lock()
counter += 1
if counter > watch {
// log.Info("Processed", watch, "repos") // this doesn't work
watch += 50
}
rillMu.Unlock()
rillSetStartTime(stats, repo.GetFullPath())
if err := rillf(repo); err != nil {
rillSetError(stats, repo.GetFullPath(), err)
}
rillSetEndTime(stats, repo.GetFullPath())
return nil
})
return stats
}
func rillSetError(stats map[string]*RillStats, fullpath string, err error) {
rillMu.Lock()
defer rillMu.Unlock()
if s, ok := stats[fullpath]; ok {
s.Err = err
return
}
log.Info("WHAT THE FUCK STATS ERROR", fullpath)
}
func rillSetStartTime(stats map[string]*RillStats, fullpath string) {
rillMu.Lock()
defer rillMu.Unlock()
if s, ok := stats[fullpath]; ok {
s.Start = time.Now()
return
}
var s *RillStats
s = new(RillStats)
s.Start = time.Now()
stats[fullpath] = s
}
func rillSetEndTime(stats map[string]*RillStats, fullpath string) {
rillMu.Lock()
defer rillMu.Unlock()
if s, ok := stats[fullpath]; ok {
s.End = time.Now()
return
}
log.Info("WHAT THE FUCK STATS END TIME", fullpath)
}