diff --git a/audio/compositors.go b/audio/compositors.go index 78151fb..e7eb41e 100644 --- a/audio/compositors.go +++ b/audio/compositors.go @@ -7,22 +7,36 @@ import ( // Take returns a Streamer which streams s for at most d duration. // -// TODO: should Take propagate an error? +// The returned Streamer propagates s's errors throught Err. func Take(d time.Duration, s Streamer) Streamer { - currSample := 0 - numSamples := int(math.Ceil(d.Seconds() * SampleRate)) - return StreamerFunc(func(samples [][2]float64) (n int, ok bool) { - if currSample >= numSamples { - return 0, false - } - toStream := numSamples - currSample - if len(samples) < toStream { - toStream = len(samples) - } - sn, sok := s.Stream(samples[:toStream]) - currSample += sn - return sn, sok - }) + return &take{ + s: s, + currSample: 0, + numSamples: int(math.Ceil(d.Seconds() * SampleRate)), + } +} + +type take struct { + s Streamer + currSample int + numSamples int +} + +func (t *take) Stream(samples [][2]float64) (n int, ok bool) { + if t.currSample >= t.numSamples { + return 0, false + } + toStream := t.numSamples - t.currSample + if len(samples) < toStream { + toStream = len(samples) + } + sn, sok := t.s.Stream(samples[:toStream]) + t.currSample += sn + return sn, sok +} + +func (t *take) Err() error { + return t.s.Err() } // Seq takes zero or more Streamers and returns a Streamer which streams them one by one without pauses.