// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package network

import (
	"encoding/binary"
	"fmt"

	"github.com/ethereum/go-ethereum/logger"
	"github.com/ethereum/go-ethereum/logger/glog"
	"github.com/ethereum/go-ethereum/swarm/storage"
	"github.com/syndtr/goleveldb/leveldb"
	"github.com/syndtr/goleveldb/leveldb/iterator"
)

const counterKeyPrefix = 0x01

/*
syncDb is a queueing service for outgoing deliveries.
One instance per priority queue for each peer

a syncDb instance maintains an in-memory buffer (of capacity bufferSize)
once its in-memory buffer is full it switches to persisting in db
and dbRead iterator iterates through the items keeping their order
once the db read catches up (there is no more items in the db) then
it switches back to in-memory buffer.

when syncdb is stopped all items in the buffer are saved to the db
*/
type syncDb struct {
	start          []byte               // this syncdb starting index in requestdb
	key            storage.Key          // remote peers address key
	counterKey     []byte               // db key to persist counter
	priority       uint                 // priotity High|Medium|Low
	buffer         chan interface{}     // incoming request channel
	db             *storage.LDBDatabase // underlying db (TODO should be interface)
	done           chan bool            // chan to signal goroutines finished quitting
	quit           chan bool            // chan to signal quitting to goroutines
	total, dbTotal int                  // counts for one session
	batch          chan chan int        // channel for batch requests
	dbBatchSize    uint                 // number of items before batch is saved
}

// constructor needs a shared request db (leveldb)
// priority is used in the index key
// uses a buffer and a leveldb for persistent storage
// bufferSize, dbBatchSize are config parameters
func newSyncDb(db *storage.LDBDatabase, key storage.Key, priority uint, bufferSize, dbBatchSize uint, deliver func(interface{}, chan bool) bool) *syncDb {
	start := make([]byte, 42)
	start[1] = byte(priorities - priority)
	copy(start[2:34], key)

	counterKey := make([]byte, 34)
	counterKey[0] = counterKeyPrefix
	copy(counterKey[1:], start[1:34])

	syncdb := &syncDb{
		start:       start,
		key:         key,
		counterKey:  counterKey,
		priority:    priority,
		buffer:      make(chan interface{}, bufferSize),
		db:          db,
		done:        make(chan bool),
		quit:        make(chan bool),
		batch:       make(chan chan int),
		dbBatchSize: dbBatchSize,
	}
	glog.V(logger.Detail).Infof("syncDb[peer: %v, priority: %v] - initialised", key.Log(), priority)

	// starts the main forever loop reading from buffer
	go syncdb.bufferRead(deliver)
	return syncdb
}

/*
bufferRead is a forever iterator loop that takes care of delivering
outgoing store requests reads from incoming buffer

its argument is the deliver function taking the item as first argument
and a quit channel as second.
Closing of this channel is supposed to abort all waiting for delivery
(typically network write)

The iteration switches between 2 modes,
* buffer mode reads the in-memory buffer and delivers the items directly
* db mode reads from the buffer and writes to the db, parallelly another
routine is started that reads from the db and delivers items

If there is buffer contention in buffer mode (slow network, high upload volume)
syncdb switches to db mode and starts dbRead
Once db backlog is delivered, it reverts back to in-memory buffer

It is automatically started when syncdb is initialised.

It saves the buffer to db upon receiving quit signal. syncDb#stop()
*/
func (self *syncDb) bufferRead(deliver func(interface{}, chan bool) bool) {
	var buffer, db chan interface{} // channels representing the two read modes
	var more bool
	var req interface{}
	var entry *syncDbEntry
	var inBatch, inDb int
	batch := new(leveldb.Batch)
	var dbSize chan int
	quit := self.quit
	counterValue := make([]byte, 8)

	// counter is used for keeping the items in order, persisted to db
	// start counter where db was at, 0 if not found
	data, err := self.db.Get(self.counterKey)
	var counter uint64
	if err == nil {
		counter = binary.BigEndian.Uint64(data)
		glog.V(logger.Detail).Infof("syncDb[%v/%v] - counter read from db at %v", self.key.Log(), self.priority, counter)
	} else {
		glog.V(logger.Detail).Infof("syncDb[%v/%v] - counter starts at %v", self.key.Log(), self.priority, counter)
	}

LOOP:
	for {
		// waiting for item next in the buffer, or quit signal or batch request
		select {
		// buffer only closes when writing to db
		case req = <-buffer:
			// deliver request : this is blocking on network write so
			// it is passed the quit channel as argument, so that it returns
			// if syncdb is stopped. In this case we need to save the item to the db
			more = deliver(req, self.quit)
			if !more {
				glog.V(logger.Debug).Infof("syncDb[%v/%v] quit: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total)
				// received quit signal, save request currently waiting delivery
				// by switching to db mode and closing the buffer
				buffer = nil
				db = self.buffer
				close(db)
				quit = nil // needs to block the quit case in select
				break      // break from select, this item will be written to the db
			}
			self.total++
			glog.V(logger.Detail).Infof("syncDb[%v/%v] deliver (db/total): %v/%v", self.key.Log(), self.priority, self.dbTotal, self.total)
			// by the time deliver returns, there were new writes to the buffer
			// if buffer contention is detected, switch to db mode which drains
			// the buffer so no process will block on pushing store requests
			if len(buffer) == cap(buffer) {
				glog.V(logger.Debug).Infof("syncDb[%v/%v] buffer full %v: switching to db. session tally (db/total): %v/%v", self.key.Log(), self.priority, cap(buffer), self.dbTotal, self.total)
				buffer = nil
				db = self.buffer
			}
			continue LOOP

			// incoming entry to put into db
		case req, more = <-db:
			if !more {
				// only if quit is called, saved all the buffer
				binary.BigEndian.PutUint64(counterValue, counter)
				batch.Put(self.counterKey, counterValue) // persist counter in batch
				self.writeSyncBatch(batch)               // save batch
				glog.V(logger.Detail).Infof("syncDb[%v/%v] quitting: save current batch to db", self.key.Log(), self.priority)
				break LOOP
			}
			self.dbTotal++
			self.total++
			// otherwise break after select
		case dbSize = <-self.batch:
			// explicit request for batch
			if inBatch == 0 && quit != nil {
				// there was no writes since the last batch so db depleted
				// switch to buffer mode
				glog.V(logger.Debug).Infof("syncDb[%v/%v] empty db: switching to buffer", self.key.Log(), self.priority)
				db = nil
				buffer = self.buffer
				dbSize <- 0 // indicates to 'caller' that batch has been written
				inDb = 0
				continue LOOP
			}
			binary.BigEndian.PutUint64(counterValue, counter)
			batch.Put(self.counterKey, counterValue)
			glog.V(logger.Debug).Infof("syncDb[%v/%v] write batch %v/%v - %x - %x", self.key.Log(), self.priority, inBatch, counter, self.counterKey, counterValue)
			batch = self.writeSyncBatch(batch)
			dbSize <- inBatch // indicates to 'caller' that batch has been written
			inBatch = 0
			continue LOOP

			// closing syncDb#quit channel is used to signal to all goroutines to quit
		case <-quit:
			// need to save backlog, so switch to db mode
			db = self.buffer
			buffer = nil
			quit = nil
			glog.V(logger.Detail).Infof("syncDb[%v/%v] quitting: save buffer to db", self.key.Log(), self.priority)
			close(db)
			continue LOOP
		}

		// only get here if we put req into db
		entry, err = self.newSyncDbEntry(req, counter)
		if err != nil {
			glog.V(logger.Warn).Infof("syncDb[%v/%v] saving request %v (#%v/%v) failed: %v", self.key.Log(), self.priority, req, inBatch, inDb, err)
			continue LOOP
		}
		batch.Put(entry.key, entry.val)
		glog.V(logger.Detail).Infof("syncDb[%v/%v] to batch %v '%v' (#%v/%v/%v)", self.key.Log(), self.priority, req, entry, inBatch, inDb, counter)
		// if just switched to db mode and not quitting, then launch dbRead
		// in a parallel go routine to send deliveries from db
		if inDb == 0 && quit != nil {
			glog.V(logger.Detail).Infof("syncDb[%v/%v] start dbRead")
			go self.dbRead(true, counter, deliver)
		}
		inDb++
		inBatch++
		counter++
		// need to save the batch if it gets too large (== dbBatchSize)
		if inBatch%int(self.dbBatchSize) == 0 {
			batch = self.writeSyncBatch(batch)
		}
	}
	glog.V(logger.Info).Infof("syncDb[%v:%v]: saved %v keys (saved counter at %v)", self.key.Log(), self.priority, inBatch, counter)
	close(self.done)
}

// writes the batch to the db and returns a new batch object
func (self *syncDb) writeSyncBatch(batch *leveldb.Batch) *leveldb.Batch {
	err := self.db.Write(batch)
	if err != nil {
		glog.V(logger.Warn).Infof("syncDb[%v/%v] saving batch to db failed: %v", self.key.Log(), self.priority, err)
		return batch
	}
	return new(leveldb.Batch)
}

// abstract type for db entries (TODO could be a feature of Receipts)
type syncDbEntry struct {
	key, val []byte
}

func (self syncDbEntry) String() string {
	return fmt.Sprintf("key: %x, value: %x", self.key, self.val)
}

/*
	dbRead is iterating over store requests to be sent over to the peer
	this is mainly to prevent crashes due to network output buffer contention (???)
	as well as to make syncronisation resilient to disconnects
	the messages are supposed to be sent in the p2p priority queue.

	the request DB is shared between peers, but domains for each syncdb
	are disjoint. dbkeys (42 bytes) are structured:
	* 0: 0x00 (0x01 reserved for counter key)
	* 1: priorities - priority (so that high priority can be replayed first)
	* 2-33: peers address
	* 34-41: syncdb counter to preserve order (this field is missing for the counter key)

	values (40 bytes) are:
	* 0-31: key
	* 32-39: request id

dbRead needs a boolean to indicate if on first round all the historical
record is synced. Second argument to indicate current db counter
The third is the function to apply
*/
func (self *syncDb) dbRead(useBatches bool, counter uint64, fun func(interface{}, chan bool) bool) {
	key := make([]byte, 42)
	copy(key, self.start)
	binary.BigEndian.PutUint64(key[34:], counter)
	var batches, n, cnt, total int
	var more bool
	var entry *syncDbEntry
	var it iterator.Iterator
	var del *leveldb.Batch
	batchSizes := make(chan int)

	for {
		// if useBatches is false, cnt is not set
		if useBatches {
			// this could be called before all cnt items sent out
			// so that loop is not blocking while delivering
			// only relevant if cnt is large
			select {
			case self.batch <- batchSizes:
			case <-self.quit:
				return
			}
			// wait for the write to finish and get the item count in the next batch
			cnt = <-batchSizes
			batches++
			if cnt == 0 {
				// empty
				return
			}
		}
		it = self.db.NewIterator()
		it.Seek(key)
		if !it.Valid() {
			copy(key, self.start)
			useBatches = true
			continue
		}
		del = new(leveldb.Batch)
		glog.V(logger.Detail).Infof("syncDb[%v/%v]: new iterator: %x (batch %v, count %v)", self.key.Log(), self.priority, key, batches, cnt)

		for n = 0; !useBatches || n < cnt; it.Next() {
			copy(key, it.Key())
			if len(key) == 0 || key[0] != 0 {
				copy(key, self.start)
				useBatches = true
				break
			}
			val := make([]byte, 40)
			copy(val, it.Value())
			entry = &syncDbEntry{key, val}
			// glog.V(logger.Detail).Infof("syncDb[%v/%v] - %v, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, self.key.Log(), batches, total, self.dbTotal, self.total)
			more = fun(entry, self.quit)
			if !more {
				// quit received when waiting to deliver entry, the entry will not be deleted
				glog.V(logger.Detail).Infof("syncDb[%v/%v] batch %v quit after %v/%v items", self.key.Log(), self.priority, batches, n, cnt)
				break
			}
			// since subsequent batches of the same db session are indexed incrementally
			// deleting earlier batches can be delayed and parallelised
			// this could be batch delete when db is idle (but added complexity esp when quitting)
			del.Delete(key)
			n++
			total++
		}
		glog.V(logger.Debug).Infof("syncDb[%v/%v] - db session closed, batches: %v, total: %v, session total from db: %v/%v", self.key.Log(), self.priority, batches, total, self.dbTotal, self.total)
		self.db.Write(del) // this could be async called only when db is idle
		it.Release()
	}
}

//
func (self *syncDb) stop() {
	close(self.quit)
	<-self.done
}

// calculate a dbkey for the request, for the db to work
// see syncdb for db key structure
// polimorphic: accepted types, see syncer#addRequest
func (self *syncDb) newSyncDbEntry(req interface{}, counter uint64) (entry *syncDbEntry, err error) {
	var key storage.Key
	var chunk *storage.Chunk
	var id uint64
	var ok bool
	var sreq *storeRequestMsgData

	if key, ok = req.(storage.Key); ok {
		id = generateId()
	} else if chunk, ok = req.(*storage.Chunk); ok {
		key = chunk.Key
		id = generateId()
	} else if sreq, ok = req.(*storeRequestMsgData); ok {
		key = sreq.Key
		id = sreq.Id
	} else if entry, ok = req.(*syncDbEntry); !ok {
		return nil, fmt.Errorf("type not allowed: %v (%T)", req, req)
	}

	// order by peer > priority > seqid
	// value is request id if exists
	if entry == nil {
		dbkey := make([]byte, 42)
		dbval := make([]byte, 40)

		// encode key
		copy(dbkey[:], self.start[:34]) // db  peer
		binary.BigEndian.PutUint64(dbkey[34:], counter)
		// encode value
		copy(dbval, key[:])
		binary.BigEndian.PutUint64(dbval[32:], id)

		entry = &syncDbEntry{dbkey, dbval}
	}
	return
}