Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/zed.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func registerBackupCmd(rootCmd *cobra.Command) {
}

backupCreateCmd := &cobra.Command{
Use: "create <filename>",
Short: "Backup a permission system to a file",
Use: "create <filename-or-dash>",
Short: "Back up a permission system to a file, or to stdout by passing \"-\"",
Args: commands.ValidationWrapper(cobra.MaximumNArgs(1)),
RunE: withErrorHandling(backupCreateCmdFunc),
}
Expand Down
52 changes: 41 additions & 11 deletions pkg/backupformat/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,22 @@ func (e *OcfEncoder) Close() error {
// format, while also persisting it to a file and maintaining a lockfile that
// tracks the progress so that it can be resumed if stopped.
type OcfFileEncoder struct {
file *os.File
// file is the destination the encoder writes to. For regular backups this
// is a file on disk; when streaming it is os.Stdout.
file *os.File
// fileIsStream is true when the underlying file is a stream (e.g. os.Stdout)
// for which lockfile-based progress tracking and Sync/Close are not
// applicable.
fileIsStream bool
// lastSyncedCursor is the most recent cursor value written to the lockfile.
// It is used to avoid redundant lockfile writes when the cursor has not
// advanced since the previous Append call.
lastSyncedCursor string
completed bool
// completed indicates that the backup finished successfully. When true,
// Close removes the lockfile because no resume is needed.
completed bool
// OcfEncoder is the embedded AVRO OCF encoder that performs the actual
// serialization of relationships into the file.
*OcfEncoder
}

Expand All @@ -204,6 +217,9 @@ func (fe *OcfFileEncoder) lockFileName() string {
}

func (fe *OcfFileEncoder) Cursor() (string, error) {
if fe.fileIsStream {
return "", errors.New("resume is not supported when streaming to stdout")
}
cursorBytes, err := os.ReadFile(fe.lockFileName())
if os.IsNotExist(err) {
return "", errors.New("completed backup file already exists")
Expand All @@ -218,7 +234,8 @@ func NewFileEncoder(filename string) (e *OcfFileEncoder, existed bool, err error
backupExisted := filename != "-" && err == nil

var f *os.File
if filename == "-" {
isStream := filename == "-"
if isStream {
f = os.Stdout
} else {
var err error
Expand All @@ -228,14 +245,19 @@ func NewFileEncoder(filename string) (e *OcfFileEncoder, existed bool, err error
}
}

return &OcfFileEncoder{file: f, OcfEncoder: &OcfEncoder{w: f}}, backupExisted, nil
return &OcfFileEncoder{file: f, fileIsStream: isStream, OcfEncoder: &OcfEncoder{w: f}}, backupExisted, nil
}

func (fe *OcfFileEncoder) Append(r *v1.Relationship, cursor string) error {
if err := fe.OcfEncoder.Append(r, cursor); err != nil {
return fmt.Errorf("error storing relationship: %w", err)
}

// Streaming destinations (e.g. stdout) can't be resumed, so skip writing the cursor lockfile.
if fe.fileIsStream {
return nil
}

if cursor != fe.lastSyncedCursor { // Only write to disk when necessary
if err := atomic.WriteFile(fe.lockFileName(), bytes.NewBufferString(cursor)); err != nil {
return fmt.Errorf("failed to store cursor in lockfile: %w", err)
Expand All @@ -251,23 +273,31 @@ func (fe *OcfFileEncoder) MarkComplete() { fe.completed = true }
func (fe *OcfFileEncoder) Close() error {
// Don't throw any errors if the file is nil when flushing/closing.
safeClose := func() error {
if fe.file != nil && fe.enc != nil {
fe.OcfEncoder.Close()
return errors.Join(fe.file.Sync(), fe.file.Close())
if fe.file == nil || fe.enc == nil {
return nil
}
return nil
fe.OcfEncoder.Close()
// Stdout is owned by the process; Sync would fail with
// "inappropriate ioctl for device" and we must not close it.
if fe.fileIsStream {
return nil
}
return errors.Join(fe.file.Sync(), fe.file.Close())
}

removeCompleted := func(filename string) error {
removeCompleted := func() error {
if fe.fileIsStream {
return nil
}
if fe.completed {
return os.Remove(filename)
return os.Remove(fe.lockFileName())
}
return nil
}

return errors.Join(
safeClose(),
removeCompleted(fe.lockFileName()),
removeCompleted(),
)
}

Expand Down
45 changes: 45 additions & 0 deletions pkg/backupformat/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,51 @@ func TestNewFileEncoder(t *testing.T) {
}
}

func TestFileEncoderStreamingToStdout(t *testing.T) {
// Redirect stdout to a temp file so the encoder writes there instead of
// the test runner's stdout. Don't run in parallel: it mutates os.Stdout.
tempDir := t.TempDir()
fakeStdout, err := os.Create(filepath.Join(tempDir, "fake-stdout"))
require.NoError(t, err)

origStdout := os.Stdout
os.Stdout = fakeStdout
t.Cleanup(func() {
os.Stdout = origStdout
_ = fakeStdout.Close()
})

enc, existed, err := NewFileEncoder("-")
require.NoError(t, err)
require.False(t, existed)

require.NoError(t, enc.WriteSchema("definition user {}", "tok"))

rel := &v1.Relationship{
Resource: &v1.ObjectReference{ObjectType: "user", ObjectId: "1"},
Relation: "...",
Subject: &v1.SubjectReference{Object: &v1.ObjectReference{ObjectType: "user", ObjectId: "2"}},
}
// Append must not try to create a lockfile next to "-".
require.NoError(t, enc.Append(rel, "cursor-1"))

_, statErr := os.Stat("-.lock")
require.True(t, os.IsNotExist(statErr), "no lockfile should be created when streaming to stdout")

// Resume isn't possible on a stream — Cursor must surface a clear error.
_, err = enc.Cursor()
require.ErrorContains(t, err, "stdout")

enc.MarkComplete()
// Close must not Sync/Close stdout, which would error with
// "inappropriate ioctl for device" on most platforms.
require.NoError(t, enc.Close())

info, err := os.Stat(fakeStdout.Name())
require.NoError(t, err)
require.Positive(t, info.Size(), "encoded data should have been written to stdout")
}

func TestWithProgress(t *testing.T) {
tests := []struct {
name string
Expand Down
Loading