http: stream POST contents

This lets us stream the data straight from the creation of the packfile, instead
of having to buffer it all in memory.
This commit is contained in:
Carlos Martín Nieto 2017-05-22 10:14:30 +02:00 committed by Carlos Martín Nieto
parent 2d54afed61
commit 6fe4f461f6
1 changed files with 50 additions and 14 deletions

64
http.go
View File

@ -22,15 +22,14 @@ void _go_git_setup_smart_subtransport_stream(managed_smart_subtransport_stream *
import "C" import "C"
import ( import (
"bytes"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"net/http" "net/http"
"net/url" "net/url"
"reflect" "reflect"
"runtime" "runtime"
"sync"
"unsafe" "unsafe"
) )
@ -127,7 +126,14 @@ func (self *ManagedTransport) Action(url string, action SmartService) (SmartSubt
} }
req.Header["User-Agent"] = []string{"git/2.0 (git2go)"} req.Header["User-Agent"] = []string{"git/2.0 (git2go)"}
return newManagedHttpStream(self, req), nil
stream := newManagedHttpStream(self, req)
if req.Method == "POST" {
stream.recvReply.Add(1)
stream.sendRequestBackground()
}
return stream, nil
} }
func (self *ManagedTransport) Close() error { func (self *ManagedTransport) Close() error {
@ -179,31 +185,49 @@ type ManagedHttpStream struct {
owner *ManagedTransport owner *ManagedTransport
req *http.Request req *http.Request
resp *http.Response resp *http.Response
postBuffer bytes.Buffer reader *io.PipeReader
writer *io.PipeWriter
sentRequest bool sentRequest bool
recvReply sync.WaitGroup
httpError error
} }
func newManagedHttpStream(owner *ManagedTransport, req *http.Request) *ManagedHttpStream { func newManagedHttpStream(owner *ManagedTransport, req *http.Request) *ManagedHttpStream {
r, w := io.Pipe()
return &ManagedHttpStream{ return &ManagedHttpStream{
owner: owner, owner: owner,
req: req, req: req,
reader: r,
writer: w,
} }
} }
func (self *ManagedHttpStream) Read(buf []byte) (int, error) { func (self *ManagedHttpStream) Read(buf []byte) (int, error) {
if !self.sentRequest { if !self.sentRequest {
self.recvReply.Add(1)
if err := self.sendRequest(); err != nil { if err := self.sendRequest(); err != nil {
return 0, err return 0, err
} }
} }
if err := self.writer.Close(); err != nil {
return 0, err
}
self.recvReply.Wait()
if self.httpError != nil {
return 0, self.httpError
}
return self.resp.Body.Read(buf) return self.resp.Body.Read(buf)
} }
func (self *ManagedHttpStream) Write(buf []byte) error { func (self *ManagedHttpStream) Write(buf []byte) error {
// We write it all into a buffer and send it off when the transport asks if self.httpError != nil {
// us to read. return self.httpError
self.postBuffer.Write(buf) }
self.writer.Write(buf)
return nil return nil
} }
@ -211,18 +235,30 @@ func (self *ManagedHttpStream) Free() {
self.resp.Body.Close() self.resp.Body.Close()
} }
func (self *ManagedHttpStream) sendRequestBackground() {
go func() {
self.httpError = self.sendRequest()
}()
self.sentRequest = true
}
func (self *ManagedHttpStream) sendRequest() error { func (self *ManagedHttpStream) sendRequest() error {
defer self.recvReply.Done()
self.resp = nil
var resp *http.Response var resp *http.Response
var err error var err error
var userName string var userName string
var password string var password string
for { for {
req := &http.Request{ req := &http.Request{
Method: self.req.Method, Method: self.req.Method,
URL: self.req.URL, URL: self.req.URL,
Header: self.req.Header, Header: self.req.Header,
Body: ioutil.NopCloser(&self.postBuffer), }
ContentLength: int64(self.postBuffer.Len()), if req.Method == "POST" {
req.Body = self.reader
req.ContentLength = -1
} }
req.SetBasicAuth(userName, password) req.SetBasicAuth(userName, password)