go-ethereum/swarm/storage/localstore/subscription_push.go

175 lines
5.0 KiB
Go

// Copyright 2019 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package localstore
import (
"context"
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/swarm/chunk"
"github.com/ethereum/go-ethereum/swarm/shed"
"github.com/ethereum/go-ethereum/swarm/spancontext"
"github.com/opentracing/opentracing-go"
olog "github.com/opentracing/opentracing-go/log"
)
// SubscribePush returns a channel that provides storage chunks with ordering from push syncing index.
// Returned stop function will terminate current and further iterations, and also it will close
// the returned channel without any errors. Make sure that you check the second returned parameter
// from the channel to stop iteration when its value is false.
func (db *DB) SubscribePush(ctx context.Context) (c <-chan chunk.Chunk, stop func()) {
metricName := "localstore.SubscribePush"
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
chunks := make(chan chunk.Chunk)
trigger := make(chan struct{}, 1)
db.pushTriggersMu.Lock()
db.pushTriggers = append(db.pushTriggers, trigger)
db.pushTriggersMu.Unlock()
// send signal for the initial iteration
trigger <- struct{}{}
stopChan := make(chan struct{})
var stopChanOnce sync.Once
go func() {
defer metrics.GetOrRegisterCounter(metricName+".done", nil).Inc(1)
// close the returned chunkInfo channel at the end to
// signal that the subscription is done
defer close(chunks)
// sinceItem is the Item from which the next iteration
// should start. The first iteration starts from the first Item.
var sinceItem *shed.Item
for {
select {
case <-trigger:
// iterate until:
// - last index Item is reached
// - subscription stop is called
// - context is done
metrics.GetOrRegisterCounter(metricName+".iter", nil).Inc(1)
ctx, sp := spancontext.StartSpan(ctx, metricName+".iter")
iterStart := time.Now()
var count int
err := db.pushIndex.Iterate(func(item shed.Item) (stop bool, err error) {
// get chunk data
dataItem, err := db.retrievalDataIndex.Get(item)
if err != nil {
return true, err
}
select {
case chunks <- chunk.NewChunk(dataItem.Address, dataItem.Data):
count++
// set next iteration start item
// when its chunk is successfully sent to channel
sinceItem = &item
return false, nil
case <-stopChan:
// gracefully stop the iteration
// on stop
return true, nil
case <-db.close:
// gracefully stop the iteration
// on database close
return true, nil
case <-ctx.Done():
return true, ctx.Err()
}
}, &shed.IterateOptions{
StartFrom: sinceItem,
// sinceItem was sent as the last Address in the previous
// iterator call, skip it in this one
SkipStartFromItem: true,
})
totalTimeMetric(metricName+".iter", iterStart)
sp.FinishWithOptions(opentracing.FinishOptions{
LogRecords: []opentracing.LogRecord{
{
Timestamp: time.Now(),
Fields: []olog.Field{olog.Int("count", count)},
},
},
})
if err != nil {
metrics.GetOrRegisterCounter(metricName+".iter.error", nil).Inc(1)
log.Error("localstore push subscription iteration", "err", err)
return
}
case <-stopChan:
// terminate the subscription
// on stop
return
case <-db.close:
// terminate the subscription
// on database close
return
case <-ctx.Done():
err := ctx.Err()
if err != nil {
log.Error("localstore push subscription", "err", err)
}
return
}
}
}()
stop = func() {
stopChanOnce.Do(func() {
close(stopChan)
})
db.pushTriggersMu.Lock()
defer db.pushTriggersMu.Unlock()
for i, t := range db.pushTriggers {
if t == trigger {
db.pushTriggers = append(db.pushTriggers[:i], db.pushTriggers[i+1:]...)
break
}
}
}
return chunks, stop
}
// triggerPushSubscriptions is used internally for starting iterations
// on Push subscriptions. Whenever new item is added to the push index,
// this function should be called.
func (db *DB) triggerPushSubscriptions() {
db.pushTriggersMu.RLock()
triggers := db.pushTriggers
db.pushTriggersMu.RUnlock()
for _, t := range triggers {
select {
case t <- struct{}{}:
default:
}
}
}