diff --git a/rill.go b/rill.go index 7993586..cc77d71 100644 --- a/rill.go +++ b/rill.go @@ -149,3 +149,47 @@ func (f *Forge) RillFuncError(rillf func(*gitpb.Repo) error) int { return counter } + +// x is the size of the queued up pool (shouldn't matter here for this I think) +// y is how many simultanous functions will run +// todo: tune and compute x,y by # of CPUs and disk io +// todo: store x,y in forge config ? (or compute them. notsure) +func (f *Forge) RillRepo(rillX int, rillY int, rillf func(*gitpb.Repo) error) (int, error) { + var anyerr error + var all []*gitpb.Repo + for repo := range f.Repos.IterAll() { + if !repo.IsValidDir() { + log.Printf("%s %-50s", "got an invalid repo in forgepb.RillFuncError()", repo.GetGoPath()) + continue + } + all = append(all, repo) + } + // Convert a slice of user IDs into a channel + ids := rill.FromSlice(all, nil) + + var counter int + var watch int = 10 + var meMu sync.Mutex + + // Read users from the API. + // Concurrency = 20 + dirs := rill.Map(ids, rillX, func(id *gitpb.Repo) (*gitpb.Repo, error) { + return id, nil + }) + + rill.ForEach(dirs, rillY, func(repo *gitpb.Repo) error { + meMu.Lock() + counter += 1 + if counter > watch { + // log.Info("Processed", watch, "repos") // this doesn't work + watch += 50 + } + meMu.Unlock() + if err := rillf(repo); err != nil { + anyerr = err + } + return nil + }) + + return counter, anyerr +}