Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pkg/api/souin.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func EvictMapping(current types.Storer) {
}
}

if updated {
if updated && len(mapping.GetMapping()) > 0 {
v, e := proto.Marshal(mapping)
if e != nil {
fmt.Println("Impossible to re-encode the mapping", core.MappingKeyPrefix+k)
Expand All @@ -199,7 +199,6 @@ func EvictMapping(current types.Storer) {
current.Delete(core.MappingKeyPrefix + k)
}
}
time.Sleep(time.Minute)
}

func (s *SouinAPI) purgeMapping() {
Expand Down
82 changes: 31 additions & 51 deletions pkg/middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func registerMappingKeysEviction(logger core.Logger, storers []types.Storer) {
logger.Debugf("run mapping eviction for storer %s", current.Name())

api.EvictMapping(current)

time.Sleep(time.Minute)
}
}(storer)
}
Expand Down Expand Up @@ -342,8 +344,7 @@ func (s *SouinBaseHandler) Store(
if res.Header.Get("Content-Length") == "" {
res.Header.Set("Content-Length", fmt.Sprint(bLen))
}
respBodyMaxSize := int(s.Configuration.GetDefaultCache().GetMaxBodyBytes())
if respBodyMaxSize > 0 && bLen > respBodyMaxSize {
if customWriter.maxSizeReached {
customWriter.Header().Set("Cache-Status", status+"; detail=UPSTREAM-RESPONSE-TOO-LARGE; key="+rfc.GetCacheKeyFromCtx(rq.Context()))

return nil
Expand Down Expand Up @@ -516,9 +517,7 @@ func (s *SouinBaseHandler) Upstream(
}

err := s.Store(customWriter, rq, requestCc, cachedKey, uri)
defer customWriter.handleBuffer(func(b *bytes.Buffer) {
b.Reset()
})
defer customWriter.resetBuffer()

return singleflightValue{
body: customWriter.Buf.Bytes(),
Expand Down Expand Up @@ -581,19 +580,15 @@ func (s *SouinBaseHandler) Revalidate(validator *core.Revalidator, next handlerF
statusCode := customWriter.GetStatusCode()
if err == nil {
if validator.IfUnmodifiedSincePresent && statusCode != http.StatusNotModified {
customWriter.handleBuffer(func(b *bytes.Buffer) {
b.Reset()
})
customWriter.resetBuffer()
customWriter.Rw.WriteHeader(http.StatusPreconditionFailed)

return nil, errors.New("")
}

if validator.IfModifiedSincePresent {
if lastModified, err := time.Parse(time.RFC1123, customWriter.Header().Get("Last-Modified")); err == nil && validator.IfModifiedSince.Sub(lastModified) > 0 {
customWriter.handleBuffer(func(b *bytes.Buffer) {
b.Reset()
})
customWriter.resetBuffer()
customWriter.Rw.WriteHeader(http.StatusNotModified)

return nil, errors.New("")
Expand All @@ -615,9 +610,8 @@ func (s *SouinBaseHandler) Revalidate(validator *core.Revalidator, next handlerF
),
)

defer customWriter.handleBuffer(func(b *bytes.Buffer) {
b.Reset()
})
defer customWriter.resetBuffer()

return singleflightValue{
body: customWriter.Buf.Bytes(),
headers: customWriter.Header().Clone(),
Expand Down Expand Up @@ -789,7 +783,7 @@ func (s *SouinBaseHandler) ServeHTTP(rw http.ResponseWriter, rq *http.Request, n
bufPool.Reset()
defer s.bufPool.Put(bufPool)

customWriter := NewCustomWriter(req, rw, bufPool)
customWriter := NewCustomWriter(req, rw, bufPool, int(s.Configuration.GetDefaultCache().GetMaxBodyBytes()))
customWriter.Headers.Add("Range", req.Header.Get("Range"))
// req.Header.Del("Range")

Expand Down Expand Up @@ -823,6 +817,15 @@ func (s *SouinBaseHandler) ServeHTTP(rw http.ResponseWriter, rq *http.Request, n
backfillIds++
}

defer func() {
if fresh != nil {
_ = fresh.Body.Close()
}
if stale != nil {
_ = stale.Body.Close()
}
}()

headerName, _ := s.SurrogateKeyStorer.GetSurrogateControl(customWriter.Header())
if fresh != nil && (!modeContext.Strict || rfc.ValidateCacheControl(fresh, requestCc)) {
go func() {
Expand All @@ -838,18 +841,14 @@ func (s *SouinBaseHandler) ServeHTTP(rw http.ResponseWriter, rq *http.Request, n
}
if validator.NotModified {
customWriter.WriteHeader(http.StatusNotModified)
customWriter.handleBuffer(func(b *bytes.Buffer) {
b.Reset()
})
customWriter.resetBuffer()
_, _ = customWriter.Send()

return nil
}

customWriter.WriteHeader(response.StatusCode)
customWriter.handleBuffer(func(b *bytes.Buffer) {
_, _ = io.Copy(b, response.Body)
})
_, _ = customWriter.copyToBuffer(response.Body)
_, _ = customWriter.Send()

return nil
Expand All @@ -875,9 +874,7 @@ func (s *SouinBaseHandler) ServeHTTP(rw http.ResponseWriter, rq *http.Request, n
}
customWriter.WriteHeader(response.StatusCode)
s.Configuration.GetLogger().Debugf("Serve from cache %+v", req)
customWriter.handleBuffer(func(b *bytes.Buffer) {
_, _ = io.Copy(b, response.Body)
})
_, _ = customWriter.copyToBuffer(response.Body)
_, err := customWriter.Send()
prometheus.Increment(prometheus.CachedResponseCounter)

Expand All @@ -897,11 +894,9 @@ func (s *SouinBaseHandler) ServeHTTP(rw http.ResponseWriter, rq *http.Request, n
}
customWriter.WriteHeader(response.StatusCode)
rfc.HitStaleCache(&response.Header)
customWriter.handleBuffer(func(b *bytes.Buffer) {
_, _ = io.Copy(b, response.Body)
})
_, _ = customWriter.copyToBuffer(response.Body)
_, err := customWriter.Send()
customWriter = NewCustomWriter(req, rw, bufPool)
customWriter = NewCustomWriter(req, rw, bufPool, int(s.Configuration.GetDefaultCache().GetMaxBodyBytes()))
go func(v *core.Revalidator, goCw *CustomWriter, goRq *http.Request, goNext func(http.ResponseWriter, *http.Request) error, goCc *cacheobject.RequestCacheDirectives, goCk string, goUri string) {
_ = s.Revalidate(v, goNext, goCw, goRq, goCc, goCk, goUri)
}(validator, customWriter, req, next, requestCc, cachedKey, uri)
Expand All @@ -923,18 +918,13 @@ func (s *SouinBaseHandler) ServeHTTP(rw http.ResponseWriter, rq *http.Request, n
response.Header.Set("Cache-Status", response.Header.Get("Cache-Status")+code)
maps.Copy(customWriter.Header(), response.Header)
customWriter.WriteHeader(response.StatusCode)
customWriter.handleBuffer(func(b *bytes.Buffer) {
b.Reset()
_, _ = io.Copy(b, response.Body)
})
_, _ = customWriter.resetAndCopyToBuffer(response.Body)
_, err := customWriter.Send()

return err
}
rw.WriteHeader(http.StatusGatewayTimeout)
customWriter.handleBuffer(func(b *bytes.Buffer) {
b.Reset()
})
customWriter.resetBuffer()
_, err := customWriter.Send()

return err
Expand All @@ -945,9 +935,7 @@ func (s *SouinBaseHandler) ServeHTTP(rw http.ResponseWriter, rq *http.Request, n
rfc.SetCacheStatusHeader(response, storerName)
customWriter.WriteHeader(response.StatusCode)
maps.Copy(customWriter.Header(), response.Header)
customWriter.handleBuffer(func(b *bytes.Buffer) {
_, _ = io.Copy(b, response.Body)
})
_, _ = customWriter.copyToBuffer(response.Body)
_, _ = customWriter.Send()

return err
Expand All @@ -956,9 +944,7 @@ func (s *SouinBaseHandler) ServeHTTP(rw http.ResponseWriter, rq *http.Request, n

if statusCode != http.StatusNotModified && validator.Matched {
customWriter.WriteHeader(http.StatusNotModified)
customWriter.handleBuffer(func(b *bytes.Buffer) {
b.Reset()
})
customWriter.resetBuffer()
_, _ = customWriter.Send()

return err
Expand All @@ -973,9 +959,7 @@ func (s *SouinBaseHandler) ServeHTTP(rw http.ResponseWriter, rq *http.Request, n
customWriter.WriteHeader(response.StatusCode)
rfc.HitStaleCache(&response.Header)
maps.Copy(customWriter.Header(), response.Header)
customWriter.handleBuffer(func(b *bytes.Buffer) {
_, _ = io.Copy(b, response.Body)
})
_, _ = customWriter.copyToBuffer(response.Body)
_, err := customWriter.Send()

return err
Expand All @@ -989,9 +973,7 @@ func (s *SouinBaseHandler) ServeHTTP(rw http.ResponseWriter, rq *http.Request, n
customWriter.WriteHeader(response.StatusCode)
rfc.HitStaleCache(&response.Header)
maps.Copy(customWriter.Header(), response.Header)
customWriter.handleBuffer(func(b *bytes.Buffer) {
_, _ = io.Copy(b, response.Body)
})
_, _ = customWriter.copyToBuffer(response.Body)
_, err := customWriter.Send()

return err
Expand All @@ -1015,10 +997,8 @@ func (s *SouinBaseHandler) ServeHTTP(rw http.ResponseWriter, rq *http.Request, n
response.Header.Set("Cache-Status", response.Header.Get("Cache-Status")+code)
maps.Copy(customWriter.Header(), response.Header)
customWriter.WriteHeader(response.StatusCode)
customWriter.handleBuffer(func(b *bytes.Buffer) {
b.Reset()
_, _ = io.Copy(b, response.Body)
})
_, _ = customWriter.resetAndCopyToBuffer(response.Body)

_, err := customWriter.Send()

return err
Expand Down
103 changes: 75 additions & 28 deletions pkg/middleware/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package middleware
import (
"bytes"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"sync"

"github.com/darkweak/go-esi/esi"
"github.com/darkweak/souin/context"
"github.com/darkweak/souin/pkg/rfc"
)

Expand All @@ -19,32 +21,53 @@ type SouinWriterInterface interface {

var _ SouinWriterInterface = (*CustomWriter)(nil)

func NewCustomWriter(rq *http.Request, rw http.ResponseWriter, b *bytes.Buffer) *CustomWriter {
func NewCustomWriter(rq *http.Request, rw http.ResponseWriter, b *bytes.Buffer, maxSize int) *CustomWriter {
return &CustomWriter{
statusCode: 200,
Buf: b,
Req: rq,
Rw: rw,
Headers: http.Header{},
mutex: sync.Mutex{},
statusCode: 200,
Buf: b,
Req: rq,
Rw: rw,
Headers: http.Header{},
mutex: sync.Mutex{},
maxSize: maxSize,
maxSizeReached: false,
}
}

// CustomWriter handles the response and provide the way to cache the value
type CustomWriter struct {
Buf *bytes.Buffer
Rw http.ResponseWriter
Req *http.Request
Headers http.Header
headersSent bool
mutex sync.Mutex
statusCode int
Buf *bytes.Buffer
Rw http.ResponseWriter
Req *http.Request
Headers http.Header
headersSent bool
mutex sync.Mutex
statusCode int
maxSize int
maxSizeReached bool
}

func (r *CustomWriter) handleBuffer(callback func(*bytes.Buffer)) {
func (r *CustomWriter) resetBuffer() {
r.mutex.Lock()
callback(r.Buf)
r.mutex.Unlock()
defer r.mutex.Unlock()

r.Buf.Reset()
}

func (r *CustomWriter) copyToBuffer(src io.Reader) (int64, error) {
r.mutex.Lock()
defer r.mutex.Unlock()

return io.Copy(r.Buf, src)
}

func (r *CustomWriter) resetAndCopyToBuffer(src io.Reader) (int64, error) {
r.mutex.Lock()
defer r.mutex.Unlock()

r.Buf.Reset()

return io.Copy(r.Buf, src)
}

// Header will write the response headers
Expand Down Expand Up @@ -79,10 +102,34 @@ func (r *CustomWriter) WriteHeader(code int) {

// Write will write the response body
func (r *CustomWriter) Write(b []byte) (int, error) {
r.handleBuffer(func(actual *bytes.Buffer) {
actual.Grow(len(b))
_, _ = actual.Write(b)
})
r.mutex.Lock()
defer r.mutex.Unlock()

if r.maxSizeReached {
return r.Rw.Write(b)
}

if r.maxSize > 0 && (r.Buf.Len()+len(b)) > r.maxSize {
r.maxSizeReached = true

if !r.headersSent && r.Req.Context().Err() == nil {
r.Rw.Header().Set(
"Cache-Status",
fmt.Sprintf(
"%s; fwd=uri-miss; detail=UPSTREAM-RESPONSE-TOO-LARGE; key=%s",
r.Req.Context().Value(context.CacheName),
rfc.GetCacheKeyFromCtx(r.Req.Context()),
),
)
}

_, _ = r.Rw.Write(r.Buf.Bytes())

r.Buf.Reset()
}

r.Buf.Grow(len(b))
_, _ = r.Buf.Write(b)

return len(b), nil
}
Expand Down Expand Up @@ -142,9 +189,8 @@ func parseRange(rangeHeaders []string, contentRange string) ([]rangeValue, range

// Send delays the response to handle Cache-Status
func (r *CustomWriter) Send() (int, error) {
defer r.handleBuffer(func(b *bytes.Buffer) {
b.Reset()
})
defer r.resetBuffer()

storedLength := r.Header().Get(rfc.StoredLengthHeader)
if storedLength != "" {
r.Header().Set("Content-Length", storedLength)
Expand All @@ -156,7 +202,7 @@ func (r *CustomWriter) Send() (int, error) {

if r.Headers.Get("Range") != "" {

var bufStr string
bufStr := new(strings.Builder)
mimeType := r.Header().Get("Content-Type")

r.WriteHeader(http.StatusPartialContent)
Expand Down Expand Up @@ -198,16 +244,17 @@ func (r *CustomWriter) Send() (int, error) {
content = content[:header.to-header.from]
}

bufStr += fmt.Sprintf(`
bufStr.WriteString(fmt.Sprintf(`
%s
Content-Type: %s
Content-Range: bytes %d-%d/%d

%s
`, separator, mimeType, header.from, header.to, r.Buf.Len(), content)
`, separator, mimeType, header.from, header.to, r.Buf.Len(), content))
}

result = []byte(bufStr + separator + "--")
bufStr.WriteString(separator + "--")
result = []byte(bufStr.String())
}
}

Expand Down
Loading
Loading