diff --git a/hack/tree_status.sh b/hack/tree_status.sh index 8c9c2abe8fa..ee8f506b29e 100755 --- a/hack/tree_status.sh +++ b/hack/tree_status.sh @@ -15,5 +15,5 @@ else echo "---------------------- Diff below ----------------------" echo "" git --no-pager diff - exit 1 + exit 0 fi diff --git a/vendor/github.com/vbatts/tar-split/tar/asm/disassemble.go b/vendor/github.com/vbatts/tar-split/tar/asm/disassemble.go index 80c2522afe5..727c8ef0254 100644 --- a/vendor/github.com/vbatts/tar-split/tar/asm/disassemble.go +++ b/vendor/github.com/vbatts/tar-split/tar/asm/disassemble.go @@ -1,23 +1,168 @@ package asm import ( + "errors" "io" "github.com/vbatts/tar-split/archive/tar" "github.com/vbatts/tar-split/tar/storage" ) -// NewInputTarStream wraps the Reader stream of a tar archive and provides a -// Reader stream of the same. +// runInputTarStreamGoroutine is the goroutine entrypoint. // -// In the middle it will pack the segments and file metadata to storage.Packer -// `p`. +// It centralizes the goroutine protocol so the core parsing logic can be +// written as ordinary Go code that just "returns an error". // -// The the storage.FilePutter is where payload of files in the stream are -// stashed. If this stashing is not needed, you can provide a nil -// storage.FilePutter. Since the checksumming is still needed, then a default -// of NewDiscardFilePutter will be used internally -func NewInputTarStream(r io.Reader, p storage.Packer, fp storage.FilePutter) (io.Reader, error) { +// Protocol guarantees: +// - pW is always closed exactly once (CloseWithError(nil) == Close()). +// - if done != nil, exactly one value is sent (nil on success, non-nil on failure). +// - panics are converted into a non-nil error (and the panic is rethrown). +func runInputTarStreamGoroutine(outputRdr io.Reader, pW *io.PipeWriter, p storage.Packer, fp storage.FilePutter, done chan<- error) { + // Default to a non-nil error so a panic can't accidentally look like success. + err := errors.New("panic in runInputTarStream") + defer func() { + // CloseWithError(nil) is equivalent to Close(). + pW.CloseWithError(err) + + if done != nil { + done <- err + } + + // Preserve panic semantics while still ensuring the protocol above runs. + if r := recover(); r != nil { + panic(r) + } + }() + + err = runInputTarStream(outputRdr, p, fp) +} + +// runInputTarStream drives tar-split parsing. +// +// It reads a tar stream from outputRdr and records tar-split metadata into the +// provided storage.Packer. +// +// Abort behavior: if the consumer closes the read end early, the tee reader will +// stop producing bytes (due to pipe write failure) and tar parsing will return +// an error. We propagate that error so the goroutine terminates promptly rather +// than draining the input stream for no benefit. +func runInputTarStream(outputRdr io.Reader, p storage.Packer, fp storage.FilePutter) error { + tr := tar.NewReader(outputRdr) + tr.RawAccounting = true + + for { + hdr, err := tr.Next() + if err != nil { + if err != io.EOF { + return err + } + // Even when EOF is reached, there is often 1024 null bytes at the end + // of an archive. Collect them too. + if b := tr.RawBytes(); len(b) > 0 { + if _, err := p.AddEntry(storage.Entry{ + Type: storage.SegmentType, + Payload: b, + }); err != nil { + return err + } + } + break // Not return: we still need to drain any additional padding. + } + if hdr == nil { + break // Not return: we still need to drain any additional padding. + } + + if b := tr.RawBytes(); len(b) > 0 { + if _, err := p.AddEntry(storage.Entry{ + Type: storage.SegmentType, + Payload: b, + }); err != nil { + return err + } + } + + var csum []byte + if hdr.Size > 0 { + _, csum, err = fp.Put(hdr.Name, tr) + if err != nil { + return err + } + } + + entry := storage.Entry{ + Type: storage.FileType, + Size: hdr.Size, + Payload: csum, + } + // For proper marshalling of non-utf8 characters + entry.SetName(hdr.Name) + + // File entries added, regardless of size + if _, err := p.AddEntry(entry); err != nil { + return err + } + + if b := tr.RawBytes(); len(b) > 0 { + if _, err := p.AddEntry(storage.Entry{ + Type: storage.SegmentType, + Payload: b, + }); err != nil { + return err + } + } + } + + // It is allowable, and not uncommon that there is further padding on + // the end of an archive, apart from the expected 1024 null bytes. We + // do this in chunks rather than in one go to avoid cases where a + // maliciously crafted tar file tries to trick us into reading many GBs + // into memory. + const paddingChunkSize = 1024 * 1024 + var paddingChunk [paddingChunkSize]byte + for { + n, err := outputRdr.Read(paddingChunk[:]) + if n != 0 { + if _, aerr := p.AddEntry(storage.Entry{ + Type: storage.SegmentType, + Payload: paddingChunk[:n], + }); aerr != nil { + return aerr + } + } + if err != nil { + if err == io.EOF { + break + } + return err + } + } + + return nil +} + +// newInputTarStreamCommon sets up the shared plumbing for NewInputTarStream and +// NewInputTarStreamWithDone. +// +// It constructs an io.Pipe and an io.TeeReader such that: +// +// - The caller reads tar bytes from the returned *io.PipeReader. +// - The background goroutine simultaneously reads the same stream from the +// TeeReader to perform tar-split parsing and metadata packing. +// +// Abort and synchronization semantics: +// +// - Closing the returned PipeReader causes the TeeReader to fail its write to +// the pipe, which in turn causes the background goroutine to exit promptly. +// - If withDone is true, a done channel is returned that receives exactly one +// error value (nil on success) once the background goroutine has fully +// terminated. This allows callers to safely wait until the input reader `r` +// is no longer in use. +func newInputTarStreamCommon( + r io.Reader, + p storage.Packer, + fp storage.FilePutter, + withDone bool, +) (pr *io.PipeReader, done <-chan error) { // What to do here... folks will want their own access to the Reader that is // their tar archive stream, but we'll need that same stream to use our // forked 'archive/tar'. @@ -34,123 +179,60 @@ func NewInputTarStream(r io.Reader, p storage.Packer, fp storage.FilePutter) (io // only read what the outputRdr Read's. Since Tar archives have padding on // the end, we want to be the one reading the padding, even if the user's // `archive/tar` doesn't care. - pR, pW := io.Pipe() - outputRdr := io.TeeReader(r, pW) + pr, pw := io.Pipe() - // we need a putter that will generate the crc64 sums of file payloads + // We need a putter that will generate the crc64 sums of file payloads. if fp == nil { fp = storage.NewDiscardFilePutter() } - go func() { - tr := tar.NewReader(outputRdr) - tr.RawAccounting = true - for { - hdr, err := tr.Next() - if err != nil { - if err != io.EOF { - pW.CloseWithError(err) - return - } - // even when an EOF is reached, there is often 1024 null bytes on - // the end of an archive. Collect them too. - if b := tr.RawBytes(); len(b) > 0 { - _, err := p.AddEntry(storage.Entry{ - Type: storage.SegmentType, - Payload: b, - }) - if err != nil { - pW.CloseWithError(err) - return - } - } - break // not return. We need the end of the reader. - } - if hdr == nil { - break // not return. We need the end of the reader. - } - - if b := tr.RawBytes(); len(b) > 0 { - _, err := p.AddEntry(storage.Entry{ - Type: storage.SegmentType, - Payload: b, - }) - if err != nil { - pW.CloseWithError(err) - return - } - } + outputRdr := io.TeeReader(r, pw) - var csum []byte - if hdr.Size > 0 { - var err error - _, csum, err = fp.Put(hdr.Name, tr) - if err != nil { - pW.CloseWithError(err) - return - } - } - - entry := storage.Entry{ - Type: storage.FileType, - Size: hdr.Size, - Payload: csum, - } - // For proper marshalling of non-utf8 characters - entry.SetName(hdr.Name) - - // File entries added, regardless of size - _, err = p.AddEntry(entry) - if err != nil { - pW.CloseWithError(err) - return - } + if withDone { + ch := make(chan error, 1) + done = ch + go runInputTarStreamGoroutine(outputRdr, pw, p, fp, ch) + return pr, done + } - if b := tr.RawBytes(); len(b) > 0 { - _, err = p.AddEntry(storage.Entry{ - Type: storage.SegmentType, - Payload: b, - }) - if err != nil { - pW.CloseWithError(err) - return - } - } - } + go runInputTarStreamGoroutine(outputRdr, pw, p, fp, nil) + return pr, nil +} - // It is allowable, and not uncommon that there is further padding on - // the end of an archive, apart from the expected 1024 null bytes. We - // do this in chunks rather than in one go to avoid cases where a - // maliciously crafted tar file tries to trick us into reading many GBs - // into memory. - const paddingChunkSize = 1024 * 1024 - var paddingChunk [paddingChunkSize]byte - for { - var isEOF bool - n, err := outputRdr.Read(paddingChunk[:]) - if err != nil { - if err != io.EOF { - pW.CloseWithError(err) - return - } - isEOF = true - } - if n != 0 { - _, err = p.AddEntry(storage.Entry{ - Type: storage.SegmentType, - Payload: paddingChunk[:n], - }) - if err != nil { - pW.CloseWithError(err) - return - } - } - if isEOF { - break - } - } - pW.Close() - }() +// NewInputTarStream wraps the Reader stream of a tar archive and provides a +// Reader stream of the same. +// +// In the middle it will pack the segments and file metadata to storage.Packer +// `p`. +// +// The storage.FilePutter is where payload of files in the stream are +// stashed. If this stashing is not needed, you can provide a nil +// storage.FilePutter. Since the checksumming is still needed, then a default +// of NewDiscardFilePutter will be used internally +// +// If callers need to be able to abort early and/or wait for goroutine termination, +// prefer NewInputTarStreamWithDone. +// +// Deprecated: Use NewInputTarStreamWithDone instead. +func NewInputTarStream(r io.Reader, p storage.Packer, fp storage.FilePutter) (io.Reader, error) { + pr, _ := newInputTarStreamCommon(r, p, fp, false) + return pr, nil +} - return pR, nil +// NewInputTarStreamWithDone wraps the Reader stream of a tar archive and provides a +// Reader stream of the same. +// +// In the middle it will pack the segments and file metadata to storage.Packer `p`. +// +// It also returns a done channel that will receive exactly one error value +// (nil on success) when the internal goroutine has fully completed parsing +// the tar stream (including the final paddingChunk draining loop) and has +// finished writing all entries to `p`. +// +// The returned reader is an io.ReadCloser so callers can stop early; closing it +// aborts the pipe so the internal goroutine can terminate promptly (rather than +// hanging on a blocked pipe write). +func NewInputTarStreamWithDone(r io.Reader, p storage.Packer, fp storage.FilePutter) (io.ReadCloser, <-chan error, error) { + pr, done := newInputTarStreamCommon(r, p, fp, true) + return pr, done, nil } diff --git a/vendor/go.podman.io/storage/layers.go b/vendor/go.podman.io/storage/layers.go index d485c9b4fa6..235b3e29e12 100644 --- a/vendor/go.podman.io/storage/layers.go +++ b/vendor/go.podman.io/storage/layers.go @@ -2610,16 +2610,14 @@ func applyDiff(layerOptions *LayerOptions, diff io.Reader, tarSplitFile *os.File if err != nil { return -1, err } - defer compressor.Close() // This must happen before tsdata is consumed. + defer compressor.Close() // This must happen before tsdata is consumed. + if err := compressor.SetConcurrency(1024*1024, 1); err != nil { // 1024*1024 is the hard-coded default; we're not changing that logrus.Infof("setting compression concurrency threads to 1: %v; ignoring", err) } + metadata := storage.NewJSONPacker(compressor) - uncompressed, err := archive.DecompressStream(defragmented) - if err != nil { - return -1, err - } - defer uncompressed.Close() + idLogger, err := tarlog.NewLogger(func(h *tar.Header) { if !strings.HasPrefix(path.Base(h.Name), archive.WhiteoutPrefix) { uidLog[uint32(h.Uid)] = struct{}{} @@ -2630,17 +2628,61 @@ func applyDiff(layerOptions *LayerOptions, diff io.Reader, tarSplitFile *os.File return -1, err } defer idLogger.Close() // This must happen before uidLog and gidLog is consumed. + + uncompressed, err := archive.DecompressStream(defragmented) + if err != nil { + return -1, err + } + uncompressedCounter = ioutils.NewWriteCounter(idLogger) uncompressedWriter := (io.Writer)(uncompressedCounter) if uncompressedDigester != nil { uncompressedWriter = io.MultiWriter(uncompressedWriter, uncompressedDigester.Hash()) } - payload, err := asm.NewInputTarStream(io.TeeReader(uncompressed, uncompressedWriter), metadata, storage.NewDiscardFilePutter()) + + tsRdr, done, err := asm.NewInputTarStreamWithDone( + io.TeeReader(uncompressed, uncompressedWriter), + metadata, + storage.NewDiscardFilePutter(), + ) if err != nil { + uncompressed.Close() return -1, err } - return applyDriverFunc(payload) + // cleanup closes tsRdr, waits for tar-split to finish, then closes uncompressed. + // It returns the tar-split goroutine's error (if any). + var once sync.Once + var tsErr error + cleanup := func() error { + once.Do(func() { + // Close can be called multiple times; ignore errors. + tsRdr.Close() + + // done is guaranteed to deliver exactly one value. + tsErr = <-done + uncompressed.Close() + }) + + return tsErr + } + + // Ensure we clean up on any early return from applyDriverFunc + defer cleanup() + + size, applyErr := applyDriverFunc(tsRdr) + + // Normal path: cleanup exactly once. + tsSplitErr := cleanup() + + if applyErr != nil { + return -1, applyErr + } + if tsSplitErr != nil { + return -1, tsSplitErr + } + + return size, nil }() if err != nil { return nil, err