From ea10d9cbe85eac4fdd4660e74e92880b665d474d Mon Sep 17 00:00:00 2001 From: "Sergey B." Date: Tue, 12 May 2026 10:36:27 +0300 Subject: [PATCH 1/2] feat: diagnostics mode for proof failures --- diagnostics/log.go | 21 + diagnostics/service.go | 374 ++++++++++++++++++ diagnostics/service_test.go | 168 ++++++++ diagnostics/types.go | 43 ++ itest/backup_test.go | 66 +++- itest/custom_channels/breach_test.go | 8 +- itest/custom_channels/force_close_test.go | 4 +- .../group_tranches_force_close_test.go | 2 +- itest/custom_channels/helpers.go | 70 +++- itest/custom_channels/v1_upgrade_test.go | 8 +- itest/custom_channels/vars.go | 5 + itest/tapd_harness.go | 29 +- log.go | 4 + sample-tapd.conf | 4 + server.go | 18 + tapcfg/config.go | 5 + tapcfg/server.go | 14 + tapconfig/config.go | 3 + tapfreighter/chain_porter.go | 176 +++++++++ tapfreighter/chain_porter_test.go | 60 +++ tapgarden/planter_test.go | 8 +- 21 files changed, 1047 insertions(+), 43 deletions(-) create mode 100644 diagnostics/log.go create mode 100644 diagnostics/service.go create mode 100644 diagnostics/service_test.go create mode 100644 diagnostics/types.go diff --git a/diagnostics/log.go b/diagnostics/log.go new file mode 100644 index 0000000000..92a64ec8b3 --- /dev/null +++ b/diagnostics/log.go @@ -0,0 +1,21 @@ +package diagnostics + +import ( + "github.com/btcsuite/btclog/v2" +) + +// Subsystem defines the logging code for this subsystem. +const Subsystem = "DIAG" + +// log is a logger that is initialized with no output filters. +var log = btclog.Disabled + +// DisableLog disables all library log output. +func DisableLog() { + UseLogger(btclog.Disabled) +} + +// UseLogger uses a specified Logger to output package logging info. +func UseLogger(logger btclog.Logger) { + log = logger +} diff --git a/diagnostics/service.go b/diagnostics/service.go new file mode 100644 index 0000000000..0cba69840e --- /dev/null +++ b/diagnostics/service.go @@ -0,0 +1,374 @@ +package diagnostics + +import ( + "encoding/json" + "fmt" + "os" + "path/filepath" + "regexp" + "strings" + "sync" + "sync/atomic" + "time" +) + +const ( + defaultQueueSize = 64 + + // diagnosticsDirPerm restricts diagnostics directories to the owner + // so failure dumps are not world-readable. + diagnosticsDirPerm = 0o700 + + // diagnosticsFilePerm is owner read/write for metadata and proof + // blobs under the diagnostics tree. + diagnosticsFilePerm = 0o600 + + // runDirectoryNameFormat names each daemon run directory using Unix + // time and PID so concurrent or restarted tapd processes never + // collide under the shared root. + runDirectoryNameFormat = "ts%d-pid%d" + + // eventDirectoryNameFormat lays out each failure as + // -- (sequence is zero-padded). + eventDirectoryNameFormat = "%d-%s-%06d" +) + +// fileNameSanitizer removes characters that are unsafe or confusing in +// path components; runs of those characters become a single hyphen. +var fileNameSanitizer = regexp.MustCompile(`[^a-zA-Z0-9._-]+`) + +// storedFailureMetadata defines the metadata.json schema for each persisted +// diagnostics proof validation failure report. +type storedFailureMetadata struct { + // TapdVersion identifies the tapd build that wrote this report. + // Omitted when empty: a blank string carries no triage value, and + // omitempty keeps older readers from mis-parsing an empty field. + TapdVersion string `json:"tapd_version,omitempty"` + + // Timestamp records when the failure report was persisted. + Timestamp time.Time `json:"timestamp"` + // Stage identifies the proof validation stage that failed. + Stage string `json:"stage"` + // Error stores the original failure message. + Error string `json:"error"` + // AnchorTxID points to the anchor transaction when available. + // Omitted when empty: pre_broadcast runs before a confirmed anchor, + // so those reports legitimately have no txid to record. + AnchorTxID string `json:"anchor_txid,omitempty"` + // VPacketIndex points to the virtual packet that failed, if known. + // Omitted when nil: only callers that scope a failure to a virtual + // packet set this (e.g. pre_broadcast output proof checks). + VPacketIndex *int `json:"vpkt_idx,omitempty"` + // VPacketOutputIndex points to the packet output that failed, if + // known. Omitted when nil for the same scoping reasons as + // VPacketIndex. + VPacketOutputIndex *int `json:"vpkt_output_idx,omitempty"` + // TransferOutputIndex points to the transfer output that failed. + // Omitted when nil: post_broadcast output verification sets this; + // pre_broadcast paths use virtual packet indices instead. + TransferOutputIndex *int `json:"transfer_output_idx,omitempty"` + // OutputProofFiles lists output proof artifact filenames. + // Omitted when empty: no output proof bytes were attached, so there + // are no files to list. + OutputProofFiles []string `json:"output_proof_files,omitempty"` + // InputProofFiles lists input proof artifact filenames. + // Omitted when empty: no input proof artifacts were captured for + // this failure. + InputProofFiles []string `json:"input_proof_files,omitempty"` +} + +type queuedFailure struct { + id uint64 + failure ProofValidationFailure +} + +// Service stores proof-validation diagnostics on disk without blocking the +// caller. +type Service struct { + rootDir string + runDir string + // tapdVersion identifies the daemon build for metadata.json records. + tapdVersion string + + queue chan queuedFailure + + wg sync.WaitGroup + + started atomic.Bool + stopOnce sync.Once + + sequence uint64 + dropped uint64 + + nowFn func() time.Time +} + +// NewService creates a diagnostics service rooted at the given directory. +func NewService(rootDir, tapdVersion string) (*Service, error) { + if strings.TrimSpace(rootDir) == "" { + return nil, fmt.Errorf( + "diagnostics root directory cannot be empty", + ) + } + + return newService( + rootDir, tapdVersion, defaultQueueSize, time.Now, + ), nil +} + +// newService constructs a Service for tests and internal wiring with a +// configurable queue depth and clock. +func newService(rootDir, tapdVersion string, queueSize int, + nowFn func() time.Time) *Service { + + return &Service{ + rootDir: rootDir, + tapdVersion: strings.TrimSpace(tapdVersion), + queue: make(chan queuedFailure, queueSize), + nowFn: nowFn, + } +} + +// Start initializes the run directory and starts the async writer goroutine. +func (s *Service) Start() error { + if s.started.Load() { + return nil + } + + if err := os.MkdirAll(s.rootDir, diagnosticsDirPerm); err != nil { + return fmt.Errorf("create diagnostics root dir: %w", err) + } + + runDirName := fmt.Sprintf( + runDirectoryNameFormat, s.nowFn().Unix(), os.Getpid(), + ) + s.runDir = filepath.Join(s.rootDir, runDirName) + + if err := os.MkdirAll(s.runDir, diagnosticsDirPerm); err != nil { + return fmt.Errorf("create diagnostics run dir: %w", err) + } + + log.Infof("Diagnostics run directory initialized at %s", s.runDir) + + s.started.Store(true) + s.wg.Add(1) + go s.writer() + + return nil +} + +// Stop flushes pending writes and stops the diagnostics service. +func (s *Service) Stop() error { + s.stopOnce.Do(func() { + close(s.queue) + s.wg.Wait() + }) + + return nil +} + +// RunDir returns the active diagnostics run directory. +func (s *Service) RunDir() string { + return s.runDir +} + +// DroppedReports returns the number of dropped reports due to queue pressure. +func (s *Service) DroppedReports() uint64 { + return atomic.LoadUint64(&s.dropped) +} + +// CaptureProofValidationFailure stores a failure report asynchronously. +// +// The operation is non-blocking. If the queue is full, the report is +// dropped. +func (s *Service) CaptureProofValidationFailure( + failure ProofValidationFailure, +) { + + if s == nil || !s.started.Load() { + return + } + + queued := queuedFailure{ + id: atomic.AddUint64(&s.sequence, 1), + failure: cloneFailure(failure), + } + + select { + case s.queue <- queued: + default: + atomic.AddUint64(&s.dropped, 1) + log.Warnf("Diagnostics queue full, dropping proof failure "+ + "report (stage=%s)", failure.Stage) + } +} + +// writer drains s.queue until Stop closes it, then persists each +// snapshot under the active run directory. Stop pairs with Start by +// closing the queue and waiting on s.wg; this goroutine must run +// exactly once per successful Start. Disk errors are logged so one bad +// write cannot stall the rest of the queue. +func (s *Service) writer() { + defer s.wg.Done() + + for report := range s.queue { + if err := s.writeFailureReport(report); err != nil { + log.Warnf("Unable to write diagnostics report: %v", err) + } + } +} + +// writeFailureReport stores one proof validation failure and all referenced +// artifacts under the active run directory. +// +// The report is expected to be an immutable snapshot from the queue consumer. +func (s *Service) writeFailureReport(report queuedFailure) error { + failure := report.failure + if failure.Timestamp.IsZero() { + failure.Timestamp = s.nowFn().UTC() + } + + stage := sanitizeFileName(failure.Stage) + if stage == "" { + stage = "unknown" + } + + eventDirName := fmt.Sprintf( + eventDirectoryNameFormat, + failure.Timestamp.Unix(), stage, report.id, + ) + eventDir := filepath.Join(s.runDir, "proof-failures", eventDirName) + if err := os.MkdirAll(eventDir, diagnosticsDirPerm); err != nil { + return fmt.Errorf("create diagnostics event dir: %w", err) + } + + outputNames, err := writeArtifacts( + eventDir, "output-proof", failure.OutputProofs, + ) + if err != nil { + return err + } + + inputNames, err := writeArtifacts( + eventDir, "input-proof", failure.InputProofs, + ) + if err != nil { + return err + } + + metadata := storedFailureMetadata{ + TapdVersion: s.tapdVersion, + Timestamp: failure.Timestamp.UTC(), + Stage: failure.Stage, + Error: failure.Error, + AnchorTxID: failure.AnchorTxID, + VPacketIndex: failure.VPacketIndex, + VPacketOutputIndex: failure.VPacketOutputIndex, + TransferOutputIndex: failure.TransferOutputIndex, + OutputProofFiles: outputNames, + InputProofFiles: inputNames, + } + + metaJSON, err := json.MarshalIndent(metadata, "", " ") + if err != nil { + return fmt.Errorf("marshal diagnostics metadata: %w", err) + } + + metaPath := filepath.Join(eventDir, "metadata.json") + err = os.WriteFile(metaPath, metaJSON, diagnosticsFilePerm) + if err != nil { + return fmt.Errorf("write diagnostics metadata: %w", err) + } + + return nil +} + +// writeArtifacts writes each artifact into eventDir and returns the +// basenames stored on disk. Filenames are sanitized so untrusted labels +// cannot escape the event directory. +func writeArtifacts(eventDir, prefix string, + artifacts []ArtifactFile) ([]string, error) { + + if len(artifacts) == 0 { + return nil, nil + } + + writtenNames := make([]string, 0, len(artifacts)) + for idx := range artifacts { + artifact := artifacts[idx] + + fileName := strings.TrimSpace(artifact.FileName) + if fileName == "" { + fileName = fmt.Sprintf("%s-%d.bin", prefix, idx) + } + fileName = sanitizeFileName(fileName) + if fileName == "" { + fileName = fmt.Sprintf("%s-%d.bin", prefix, idx) + } + + artifactPath := filepath.Join(eventDir, fileName) + if err := os.WriteFile( + artifactPath, artifact.Data, diagnosticsFilePerm, + ); err != nil { + return nil, fmt.Errorf( + "write artifact %s: %w", fileName, err, + ) + } + + writtenNames = append(writtenNames, fileName) + } + + return writtenNames, nil +} + +// sanitizeFileName reduces a label to alphanumeric, dot, underscore, and +// hyphen characters so it is safe as a single path component. Leading +// and trailing hyphens are trimmed; the caller substitutes a default +// name when the result is empty. +func sanitizeFileName(name string) string { + sanitized := fileNameSanitizer.ReplaceAllString(name, "-") + return strings.Trim(sanitized, "-") +} + +// cloneFailure returns an independent copy of failure so queued work is +// not affected if the caller mutates slices or pointers afterward. +func cloneFailure(failure ProofValidationFailure) ProofValidationFailure { + clone := failure + clone.OutputProofs = cloneArtifacts(failure.OutputProofs) + clone.InputProofs = cloneArtifacts(failure.InputProofs) + clone.VPacketIndex = cloneIntPtr(failure.VPacketIndex) + clone.VPacketOutputIndex = cloneIntPtr(failure.VPacketOutputIndex) + clone.TransferOutputIndex = cloneIntPtr(failure.TransferOutputIndex) + return clone +} + +// cloneIntPtr returns a distinct pointer with the same value, or nil +// when the input is nil, so shared *int storage cannot race the writer. +func cloneIntPtr(value *int) *int { + if value == nil { + return nil + } + + valueCopy := *value + return &valueCopy +} + +// cloneArtifacts deep-copies artifact payloads so the async writer does +// not retain references to caller-owned byte buffers. +func cloneArtifacts(artifacts []ArtifactFile) []ArtifactFile { + if len(artifacts) == 0 { + return nil + } + + clones := make([]ArtifactFile, 0, len(artifacts)) + for idx := range artifacts { + artifact := artifacts[idx] + dataCopy := append([]byte(nil), artifact.Data...) + clones = append(clones, ArtifactFile{ + FileName: artifact.FileName, + Data: dataCopy, + }) + } + + return clones +} diff --git a/diagnostics/service_test.go b/diagnostics/service_test.go new file mode 100644 index 0000000000..ff0cd9d71e --- /dev/null +++ b/diagnostics/service_test.go @@ -0,0 +1,168 @@ +package diagnostics + +import ( + "encoding/json" + "os" + "path/filepath" + "regexp" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestServiceStartCreatesRunDir(t *testing.T) { + t.Parallel() + + rootDir := t.TempDir() + now := time.Unix(1_700_000_000, 0) + + service := newService(rootDir, "", 4, func() time.Time { + return now + }) + + require.NoError(t, service.Start()) + t.Cleanup(func() { + require.NoError(t, service.Stop()) + }) + + runDir := service.RunDir() + require.NotEmpty(t, runDir) + + base := filepath.Base(runDir) + require.Regexp(t, regexp.MustCompile(`^ts\d+-pid\d+$`), base) + + info, err := os.Stat(runDir) + require.NoError(t, err) + require.True(t, info.IsDir()) +} + +func TestServiceWritesFailureArtifacts(t *testing.T) { + t.Parallel() + + service, err := NewService(t.TempDir(), "test-version") + require.NoError(t, err) + require.NoError(t, service.Start()) + t.Cleanup(func() { + require.NoError(t, service.Stop()) + }) + + service.CaptureProofValidationFailure(ProofValidationFailure{ + Stage: StageProofVerificationPostBroadcast, + Error: "verification failed", + OutputProofs: []ArtifactFile{{ + FileName: "output-proof.bin", + Data: []byte{1, 2, 3}, + }}, + InputProofs: []ArtifactFile{{ + FileName: "input-proof.bin", + Data: []byte{4, 5}, + }}, + }) + + proofFailuresDir := filepath.Join(service.RunDir(), "proof-failures") + require.Eventually(t, func() bool { + entries, err := os.ReadDir(proofFailuresDir) + if err != nil || len(entries) != 1 { + return false + } + + eventDir := filepath.Join(proofFailuresDir, entries[0].Name()) + _, err = os.Stat(filepath.Join(eventDir, "metadata.json")) + return err == nil + }, time.Second, 20*time.Millisecond) + + entries, err := os.ReadDir(proofFailuresDir) + require.NoError(t, err) + require.Len(t, entries, 1) + + eventDir := filepath.Join(proofFailuresDir, entries[0].Name()) + _, err = os.Stat(filepath.Join(eventDir, "metadata.json")) + require.NoError(t, err) + + metadataBytes, err := os.ReadFile( + filepath.Join(eventDir, "metadata.json"), + ) + require.NoError(t, err) + + var metadata storedFailureMetadata + require.NoError(t, json.Unmarshal(metadataBytes, &metadata)) + require.Equal(t, "test-version", metadata.TapdVersion) + + outputProof, err := os.ReadFile( + filepath.Join(eventDir, "output-proof.bin"), + ) + require.NoError(t, err) + require.Equal(t, []byte{1, 2, 3}, outputProof) + + inputProof, err := os.ReadFile( + filepath.Join(eventDir, "input-proof.bin"), + ) + require.NoError(t, err) + require.Equal(t, []byte{4, 5}, inputProof) +} + +func TestServiceDropsReportsWhenQueueIsFull(t *testing.T) { + t.Parallel() + + service := newService(t.TempDir(), "", 1, time.Now) + require.NoError(t, service.Start()) + t.Cleanup(func() { + require.NoError(t, service.Stop()) + }) + + for idx := 0; idx < 1000; idx++ { + service.CaptureProofValidationFailure(ProofValidationFailure{ + Stage: StageProofVerificationPreBroadcast, + Error: "full queue test", + }) + } + + require.Eventually(t, func() bool { + return service.DroppedReports() > 0 + }, time.Second, 20*time.Millisecond) +} + +func TestCloneFailureDeepCopiesPointersAndArtifacts(t *testing.T) { + t.Parallel() + + vPktIdx := 2 + vPktOutIdx := 4 + transferOutIdx := 6 + + failure := ProofValidationFailure{ + VPacketIndex: &vPktIdx, + VPacketOutputIndex: &vPktOutIdx, + TransferOutputIndex: &transferOutIdx, + OutputProofs: []ArtifactFile{{ + FileName: "output-proof.bin", + Data: []byte{1, 2, 3}, + }}, + InputProofs: []ArtifactFile{{ + FileName: "input-proof.bin", + Data: []byte{4, 5, 6}, + }}, + } + + cloned := cloneFailure(failure) + + *failure.VPacketIndex = 10 + *failure.VPacketOutputIndex = 11 + *failure.TransferOutputIndex = 12 + failure.OutputProofs[0].Data[0] = 99 + failure.InputProofs[0].Data[0] = 88 + + require.NotSame(t, failure.VPacketIndex, cloned.VPacketIndex) + require.NotSame( + t, failure.VPacketOutputIndex, cloned.VPacketOutputIndex, + ) + require.NotSame( + t, failure.TransferOutputIndex, cloned.TransferOutputIndex, + ) + + require.Equal(t, 2, *cloned.VPacketIndex) + require.Equal(t, 4, *cloned.VPacketOutputIndex) + require.Equal(t, 6, *cloned.TransferOutputIndex) + require.Equal(t, []byte{1, 2, 3}, cloned.OutputProofs[0].Data) + require.Equal(t, []byte{4, 5, 6}, cloned.InputProofs[0].Data) +} diff --git a/diagnostics/types.go b/diagnostics/types.go new file mode 100644 index 0000000000..5dca9f8c93 --- /dev/null +++ b/diagnostics/types.go @@ -0,0 +1,43 @@ +package diagnostics + +import ( + "time" +) + +const ( + // StageProofVerificationPreBroadcast marks failures from pre-broadcast + // proof validation. + StageProofVerificationPreBroadcast = "pre_broadcast" + + // StageProofVerificationPostBroadcast marks failures from + // post-broadcast proof validation. + StageProofVerificationPostBroadcast = "post_broadcast" +) + +// ArtifactFile is a named binary artifact associated with a proof failure. +type ArtifactFile struct { + FileName string + Data []byte +} + +// ProofValidationFailure is an event emitted when proof validation fails. +type ProofValidationFailure struct { + Timestamp time.Time + + Stage string + Error string + + AnchorTxID string + + VPacketIndex *int + VPacketOutputIndex *int + TransferOutputIndex *int + + OutputProofs []ArtifactFile + InputProofs []ArtifactFile +} + +// Recorder accepts proof-validation failures for asynchronous persistence. +type Recorder interface { + CaptureProofValidationFailure(failure ProofValidationFailure) +} diff --git a/itest/backup_test.go b/itest/backup_test.go index dbc9ab2e4a..671197d1c0 100644 --- a/itest/backup_test.go +++ b/itest/backup_test.go @@ -20,8 +20,6 @@ import ( // 3. Bob imports the same backup again — 0 imported (idempotent) func testBackupRestoreGenesis(t *harnessTest) { ctxb := context.Background() - ctxt, cancel := context.WithTimeout(ctxb, defaultWaitTimeout) - defer cancel() // Mint a single asset on Alice. backupAssets := []*mintrpc.MintAssetRequest{ @@ -43,11 +41,13 @@ func testBackupRestoreGenesis(t *harnessTest) { require.Len(t.t, rpcAssets, 1) // Export a compact backup from Alice. + ctxt, cancel := context.WithTimeout(ctxb, defaultWaitTimeout) exportResp, err := t.tapd.ExportAssetWalletBackup( ctxt, &wrpc.ExportAssetWalletBackupRequest{ Mode: wrpc.BackupMode_COMPACT, }, ) + cancel() require.NoError(t.t, err) require.NotEmpty(t.t, exportResp.Backup) @@ -61,18 +61,22 @@ func testBackupRestoreGenesis(t *harnessTest) { }() // First import: 1 asset imported. + ctxt, cancel = context.WithTimeout(ctxb, defaultWaitTimeout) importResp, err := bobTapd.ImportAssetsFromBackup( ctxt, &wrpc.ImportAssetsFromBackupRequest{ Backup: exportResp.Backup, }, ) + cancel() require.NoError(t.t, err) require.Equal(t.t, uint32(1), importResp.NumImported) // Verify the asset matches. + ctxt, cancel = context.WithTimeout(ctxb, defaultWaitTimeout) bobAssets, err := bobTapd.ListAssets( ctxt, &taprpc.ListAssetRequest{}, ) + cancel() require.NoError(t.t, err) require.Len(t.t, bobAssets.Assets, 1) @@ -85,18 +89,22 @@ func testBackupRestoreGenesis(t *harnessTest) { imported.AssetGenesis.Name) // Second import: 0 imported (idempotent). + ctxt, cancel = context.WithTimeout(ctxb, defaultWaitTimeout) importResp2, err := bobTapd.ImportAssetsFromBackup( ctxt, &wrpc.ImportAssetsFromBackupRequest{ Backup: exportResp.Backup, }, ) + cancel() require.NoError(t.t, err) require.Equal(t.t, uint32(0), importResp2.NumImported) // Still exactly 1 asset. + ctxt, cancel = context.WithTimeout(ctxb, defaultWaitTimeout) bobAssetsFinal, err := bobTapd.ListAssets( ctxt, &taprpc.ListAssetRequest{}, ) + cancel() require.NoError(t.t, err) require.Len(t.t, bobAssetsFinal.Assets, 1) @@ -236,18 +244,24 @@ func testBackupRestoreTransferred(t *harnessTest) { require.NoError(t.t, charlieTapd.stop(!*noDelete)) }() + importCtx, importCancel := context.WithTimeout( + ctxb, defaultWaitTimeout*2, + ) charlieImport, err := charlieTapd.ImportAssetsFromBackup( - ctxt, &wrpc.ImportAssetsFromBackupRequest{ + importCtx, &wrpc.ImportAssetsFromBackupRequest{ Backup: rawBackup.Backup, }, ) + importCancel() require.NoError(t.t, err) require.Equal(t.t, uint32(len(mintedAssets)), charlieImport.NumImported) + listCtx, listCancel := context.WithTimeout(ctxb, defaultWaitTimeout) charlieAssets, err := charlieTapd.ListAssets( - ctxt, &taprpc.ListAssetRequest{}, + listCtx, &taprpc.ListAssetRequest{}, ) + listCancel() require.NoError(t.t, err) assertAssetsMatch(t, mintedAssets, charlieAssets.Assets) t.Logf("Charlie (RAW): %d assets imported", @@ -264,18 +278,24 @@ func testBackupRestoreTransferred(t *harnessTest) { require.NoError(t.t, daveTapd.stop(!*noDelete)) }() + importCtx, importCancel = context.WithTimeout( + ctxb, defaultWaitTimeout*2, + ) daveImport, err := daveTapd.ImportAssetsFromBackup( - ctxt, &wrpc.ImportAssetsFromBackupRequest{ + importCtx, &wrpc.ImportAssetsFromBackupRequest{ Backup: compactBackup.Backup, }, ) + importCancel() require.NoError(t.t, err) require.Equal(t.t, uint32(len(mintedAssets)), daveImport.NumImported) + listCtx, listCancel = context.WithTimeout(ctxb, defaultWaitTimeout) daveAssets, err := daveTapd.ListAssets( - ctxt, &taprpc.ListAssetRequest{}, + listCtx, &taprpc.ListAssetRequest{}, ) + listCancel() require.NoError(t.t, err) assertAssetsMatch(t, mintedAssets, daveAssets.Assets) t.Logf("Dave (COMPACT): %d assets imported", @@ -316,18 +336,24 @@ func testBackupRestoreTransferred(t *harnessTest) { ) // We explicitly stop Eve later for the stale check. + importCtx, importCancel = context.WithTimeout( + ctxb, defaultWaitTimeout*2, + ) eveImport, err := eveTapd.ImportAssetsFromBackup( - ctxt, &wrpc.ImportAssetsFromBackupRequest{ + importCtx, &wrpc.ImportAssetsFromBackupRequest{ Backup: optimisticBackup.Backup, }, ) + importCancel() require.NoError(t.t, err) require.Equal(t.t, uint32(len(mintedAssets)), eveImport.NumImported) + listCtx, listCancel = context.WithTimeout(ctxb, defaultWaitTimeout) eveAssets, err := eveTapd.ListAssets( - ctxt, &taprpc.ListAssetRequest{}, + listCtx, &taprpc.ListAssetRequest{}, ) + listCancel() require.NoError(t.t, err) assertAssetsMatch(t, mintedAssets, eveAssets.Assets) t.Logf("Eve (OPTIMISTIC): %d assets imported", @@ -372,9 +398,11 @@ func testBackupRestoreTransferred(t *harnessTest) { t.t, t.tapd, aliceRecvBase+len(mintedAssets), ) + listCtx, listCancel = context.WithTimeout(ctxb, defaultWaitTimeout) aliceAssets, err := t.tapd.ListAssets( - ctxt, &taprpc.ListAssetRequest{}, + listCtx, &taprpc.ListAssetRequest{}, ) + listCancel() require.NoError(t.t, err) for _, exp := range mintedAssets { @@ -411,11 +439,15 @@ func testBackupRestoreTransferred(t *harnessTest) { // Re-import Bob's RAW backup. Both assets had their anchor // outpoints spent on-chain when Eve sent them to Alice, so // both should be detected as stale and skipped. + staleCtx, staleCancel := context.WithTimeout( + ctxb, defaultWaitTimeout*2, + ) staleImport, err := restoredTapd.ImportAssetsFromBackup( - ctxt, &wrpc.ImportAssetsFromBackupRequest{ + staleCtx, &wrpc.ImportAssetsFromBackupRequest{ Backup: rawBackup.Backup, }, ) + staleCancel() require.NoError(t.t, err) require.Equal(t.t, uint32(0), staleImport.NumImported, "expected 0 imported (both outpoints are spent)") @@ -475,8 +507,6 @@ func assertAssetsMatch(t *harnessTest, expected []*taprpc.Asset, // 5. Verify asset counts and group key presence on both nodes func testBackupRestoreGrouped(t *harnessTest) { ctxb := context.Background() - ctxt, cancel := context.WithTimeout(ctxb, defaultWaitTimeout*4) - defer cancel() // Mint a grouped asset and an ungrouped asset together. mintReqs := []*mintrpc.MintAssetRequest{ @@ -512,18 +542,22 @@ func testBackupRestoreGrouped(t *harnessTest) { AssertNumGroups(t.t, t.tapd, 1) // Export RAW and COMPACT backups. + ctxt, cancel := context.WithTimeout(ctxb, defaultWaitTimeout) rawBackup, err := t.tapd.ExportAssetWalletBackup( ctxt, &wrpc.ExportAssetWalletBackupRequest{ Mode: wrpc.BackupMode_RAW, }, ) + cancel() require.NoError(t.t, err) + ctxt, cancel = context.WithTimeout(ctxb, defaultWaitTimeout) compactBackup, err := t.tapd.ExportAssetWalletBackup( ctxt, &wrpc.ExportAssetWalletBackupRequest{ Mode: wrpc.BackupMode_COMPACT, }, ) + cancel() require.NoError(t.t, err) // === Import RAW on Bob (no federation) === @@ -540,20 +574,24 @@ func testBackupRestoreGrouped(t *harnessTest) { require.NoError(t.t, bobTapd.stop(!*noDelete)) }() + ctxt, cancel = context.WithTimeout(ctxb, defaultWaitTimeout*2) bobImport, err := bobTapd.ImportAssetsFromBackup( ctxt, &wrpc.ImportAssetsFromBackupRequest{ Backup: rawBackup.Backup, }, ) + cancel() require.NoError(t.t, err) require.Equal(t.t, uint32(2), bobImport.NumImported, "both assets should import (grouped + ungrouped)") require.Equal(t.t, uint32(0), bobImport.NumSkipped, "no assets should be skipped") + ctxt, cancel = context.WithTimeout(ctxb, defaultWaitTimeout) bobAssets, err := bobTapd.ListAssets( ctxt, &taprpc.ListAssetRequest{}, ) + cancel() require.NoError(t.t, err) assertAssetsMatch(t, rpcAssets, bobAssets.Assets) @@ -575,20 +613,24 @@ func testBackupRestoreGrouped(t *harnessTest) { require.NoError(t.t, charlieTapd.stop(!*noDelete)) }() + ctxt, cancel = context.WithTimeout(ctxb, defaultWaitTimeout*2) charlieImport, err := charlieTapd.ImportAssetsFromBackup( ctxt, &wrpc.ImportAssetsFromBackupRequest{ Backup: compactBackup.Backup, }, ) + cancel() require.NoError(t.t, err) require.Equal(t.t, uint32(2), charlieImport.NumImported, "both assets should import (grouped + ungrouped)") require.Equal(t.t, uint32(0), charlieImport.NumSkipped, "no assets should be skipped") + ctxt, cancel = context.WithTimeout(ctxb, defaultWaitTimeout) charlieAssets, err := charlieTapd.ListAssets( ctxt, &taprpc.ListAssetRequest{}, ) + cancel() require.NoError(t.t, err) assertAssetsMatch(t, rpcAssets, charlieAssets.Assets) diff --git a/itest/custom_channels/breach_test.go b/itest/custom_channels/breach_test.go index 4c41a63044..e21470081d 100644 --- a/itest/custom_channels/breach_test.go +++ b/itest/custom_channels/breach_test.go @@ -109,7 +109,7 @@ func testCustomChannelsBreach(ctx context.Context, // Charlie's DB. fundingTxid, err := chainhash.NewHashFromStr(assetFundResp.Txid) require.NoError(t.t, err) - locateAssetTransfers(t.t, charlie, *fundingTxid) + locateAssetTransfers(t.t, net, charlie, *fundingTxid) // Charlie's balance should reflect that the funding asset is now // excluded from balance reporting by tapd. @@ -191,8 +191,8 @@ func testCustomChannelsBreach(ctx context.Context, // We should be able to find the transfer of the breach for both // parties. - locateAssetTransfers(t.t, charlie, *breachTxid) - locateAssetTransfers(t.t, dave, *breachTxid) + locateAssetTransfers(t.t, net, charlie, *breachTxid) + locateAssetTransfers(t.t, net, dave, *breachTxid) // With the breach transaction mined, Charlie should now have a // transaction in the mempool sweeping *both* commitment outputs. @@ -209,7 +209,7 @@ func testCustomChannelsBreach(ctx context.Context, mineBlocks(t, net, 1, 1) // Charlie should now have a transfer for his justice transaction. - locateAssetTransfers(t.t, charlie, *charlieJusticeTxid[0]) + locateAssetTransfers(t.t, net, charlie, *charlieJusticeTxid[0]) // Charlie's balance should now be the same as before the breach // attempt: the amount he minted at the very start. diff --git a/itest/custom_channels/force_close_test.go b/itest/custom_channels/force_close_test.go index 419a41227f..89c98bd2aa 100644 --- a/itest/custom_channels/force_close_test.go +++ b/itest/custom_channels/force_close_test.go @@ -125,7 +125,7 @@ func testCustomChannelsForceClose(ctx context.Context, // DB. fundingTxid, err := chainhash.NewHashFromStr(assetFundResp.Txid) require.NoError(t.t, err) - locateAssetTransfers(t.t, charlie, *fundingTxid) + locateAssetTransfers(t.t, net, charlie, *fundingTxid) // Charlie's balance should reflect that the funding asset is now // excluded from balance reporting by tapd. @@ -241,7 +241,7 @@ func testCustomChannelsForceClose(ctx context.Context, // At this point, a transfer should have been created for Dave's sweep // transaction. - locateAssetTransfers(t.t, dave, daveSweepTxHash) + locateAssetTransfers(t.t, net, dave, daveSweepTxHash) time.Sleep(time.Second * 1) diff --git a/itest/custom_channels/group_tranches_force_close_test.go b/itest/custom_channels/group_tranches_force_close_test.go index e088f11320..dce95efcda 100644 --- a/itest/custom_channels/group_tranches_force_close_test.go +++ b/itest/custom_channels/group_tranches_force_close_test.go @@ -273,7 +273,7 @@ func testCustomChannelsGroupTranchesForceClose(ctx context.Context, t.Logf("Erin sweep txid: %v", erinSweepTxHash) // Erin should now have an asset transfer for her sweep transaction. - locateAssetTransfers(t.t, erin, erinSweepTxHash) + locateAssetTransfers(t.t, net, erin, erinSweepTxHash) assertSpendableBalance( t.t, erin, nil, groupKey, fundingAmount-totalFirstSend, diff --git a/itest/custom_channels/helpers.go b/itest/custom_channels/helpers.go index 5245d60a94..9a910a848a 100644 --- a/itest/custom_channels/helpers.go +++ b/itest/custom_channels/helpers.go @@ -2525,13 +2525,51 @@ func assertSpendableBalance(t *testing.T, client *itest.IntegratedNode, } } +// lndCaughtUpToMiner returns nil once this node's lnd reports synced-to-chain +// and a block height at or above the miner's best height. Tapd learns anchor +// confirmations via lnd's notifier; polling ListTransfers before lnd has the +// mined tip flakes on busy CI. +func lndCaughtUpToMiner(net *itest.IntegratedNetworkHarness, + node *itest.IntegratedNode) error { + + _, minerHeight := net.Miner.GetBestBlock() + + ctx, cancel := context.WithTimeout( + context.Background(), 5*time.Second, + ) + defer cancel() + + info, err := node.LightningClient.GetInfo( + ctx, &lnrpc.GetInfoRequest{}, + ) + if err != nil { + return fmt.Errorf("getinfo %s: %w", node.Cfg.Name, err) + } + + if !info.SyncedToChain { + return fmt.Errorf("%s not synced to chain (miner_h=%d)", + node.Cfg.Name, minerHeight) + } + + if int32(info.BlockHeight) < minerHeight { + return fmt.Errorf("%s height %d < miner height %d", + node.Cfg.Name, info.BlockHeight, minerHeight) + } + + return nil +} + // locateAssetTransfers finds and returns the asset transfer for the given // transaction ID. -func locateAssetTransfers(t *testing.T, node *itest.IntegratedNode, - txid chainhash.Hash) *taprpc.AssetTransfer { +func locateAssetTransfers(t *testing.T, net *itest.IntegratedNetworkHarness, + node *itest.IntegratedNode, txid chainhash.Hash) *taprpc.AssetTransfer { var transfer *taprpc.AssetTransfer err := wait.NoError(func() error { + if err := lndCaughtUpToMiner(net, node); err != nil { + return err + } + ctxb := context.Background() forceCloseTransfer, err := node.ListTransfers( ctxb, &taprpc.ListTransfersRequest{ @@ -2550,13 +2588,19 @@ func locateAssetTransfers(t *testing.T, node *itest.IntegratedNode, transfer = forceCloseTransfer.Transfers[0] - if transfer.AnchorTxBlockHash == nil { - return fmt.Errorf("missing anchor block hash, " + - "transfer not confirmed") + blockHashSet := transfer.AnchorTxBlockHash != nil + confirmed := transfer.AnchorTxBlockHeight > 0 || blockHashSet + if !confirmed { + return fmt.Errorf("transfer %v not confirmed yet "+ + "(block_height=%d, block_hash_set=%t)", + txid, + transfer.AnchorTxBlockHeight, + blockHashSet, + ) } return nil - }, ccTransferTimeout) + }, ccTransferConfirmTimeout) require.NoError(t, err) return transfer @@ -3039,7 +3083,7 @@ func assertForceCloseSweeps(ctx context.Context, // After force closing, Bob should now have a transfer that tracks the // force closed commitment transaction. - locateAssetTransfers(t.t, bob, *closeTxid) + locateAssetTransfers(t.t, net, bob, *closeTxid) t.Logf("Settling Bob's hodl invoice") @@ -3089,8 +3133,8 @@ func assertForceCloseSweeps(ctx context.Context, // than from the earlier mempool checks to avoid RBF mismatches. bobSweepTxHash1 := bobSweepBlocks1[0].Transactions[1].TxHash() bobSweepTxHash2 := bobSweepBlocks2[0].Transactions[1].TxHash() - locateAssetTransfers(t.t, bob, bobSweepTxHash1) - locateAssetTransfers(t.t, bob, bobSweepTxHash2) + locateAssetTransfers(t.t, net, bob, bobSweepTxHash1) + locateAssetTransfers(t.t, net, bob, bobSweepTxHash2) t.Logf("Confirming Bob's remote HTLC success sweep") @@ -3121,7 +3165,7 @@ func assertForceCloseSweeps(ctx context.Context, // Wait for tapd to register the to-local sweep transfer. We use the // txid from the mined block to avoid RBF mismatches. aliceToLocalHash := aliceToLocalBlocks[0].Transactions[1].TxHash() - locateAssetTransfers(t.t, alice, aliceToLocalHash) + locateAssetTransfers(t.t, net, alice, aliceToLocalHash) t.Logf("Confirming Alice's to-local sweep") @@ -3170,7 +3214,7 @@ func assertForceCloseSweeps(ctx context.Context, // Use the txid from the mined block to avoid RBF mismatches. sweepTxHash := sweepBlocks[0].Transactions[1].TxHash() - locateAssetTransfers(t.t, alice, sweepTxHash) + locateAssetTransfers(t.t, net, alice, sweepTxHash) t.Logf("Confirming Alice's second level remote HTLC success sweep") @@ -3204,7 +3248,7 @@ func assertForceCloseSweeps(ctx context.Context, } sweepTxHash = sweepBlocks[0].Transactions[1].TxHash() - locateAssetTransfers(t.t, alice, sweepTxHash) + locateAssetTransfers(t.t, net, alice, sweepTxHash) // With the sweep transaction confirmed, Alice's balance should have // incremented by the amt of the HTLC. @@ -3373,7 +3417,7 @@ func assertForceCloseSweeps(ctx context.Context, } sweepTxHash = sweepBlocks[0].Transactions[1].TxHash() - locateAssetTransfers(t.t, alice, sweepTxHash) + locateAssetTransfers(t.t, net, alice, sweepTxHash) return aliceExpectedBalance, bobExpectedBalance } diff --git a/itest/custom_channels/v1_upgrade_test.go b/itest/custom_channels/v1_upgrade_test.go index 4e62d50cc8..d2f26ca2a6 100644 --- a/itest/custom_channels/v1_upgrade_test.go +++ b/itest/custom_channels/v1_upgrade_test.go @@ -100,7 +100,7 @@ func testCustomChannelsV1Upgrade(ctx context.Context, // Charlie's DB. fundingTxid, err := chainhash.NewHashFromStr(assetFundResp.Txid) require.NoError(t.t, err) - locateAssetTransfers(t.t, charlie, *fundingTxid) + locateAssetTransfers(t.t, net, charlie, *fundingTxid) // Charlie's balance should reflect that the funding asset is now // excluded from balance reporting by tapd. @@ -230,9 +230,9 @@ func testCustomChannelsV1Upgrade(ctx context.Context, // We should be able to find the transfer of the breach for both // parties. charlieBreachTransfer := locateAssetTransfers( - t.t, charlie, *breachTxid, + t.t, net, charlie, *breachTxid, ) - locateAssetTransfers(t.t, dave, *breachTxid) + locateAssetTransfers(t.t, net, dave, *breachTxid) require.Len(t.t, charlieBreachTransfer.Outputs, 2) assetOutput := charlieBreachTransfer.Outputs[0] @@ -283,7 +283,7 @@ func testCustomChannelsV1Upgrade(ctx context.Context, // Dave should now have a transfer for his justice transaction. daveJusticeTransfer := locateAssetTransfers( - t.t, dave, *daveJusticeTxid[0], + t.t, net, dave, *daveJusticeTxid[0], ) // Dave should claim all of the asset balance that was put into the diff --git a/itest/custom_channels/vars.go b/itest/custom_channels/vars.go index 567ae4d92f..227e92567d 100644 --- a/itest/custom_channels/vars.go +++ b/itest/custom_channels/vars.go @@ -29,6 +29,11 @@ var ( // parallel tranches, tapd block processing can be slow so we use a // generous timeout. ccTransferTimeout = 2 * time.Minute + + // ccTransferConfirmTimeout is the timeout used when polling for + // transfer confirmation metadata after a sweep tx is mined. On busy + // CI runners, this metadata can lag behind block inclusion. + ccTransferConfirmTimeout = 5 * time.Minute ) // lndArgsTemplate contains lnd flags used by all custom channel test nodes. diff --git a/itest/tapd_harness.go b/itest/tapd_harness.go index c6108037a3..8df22f936c 100644 --- a/itest/tapd_harness.go +++ b/itest/tapd_harness.go @@ -11,6 +11,7 @@ import ( "os/exec" "path/filepath" "strings" + "sync" "testing" "time" @@ -124,6 +125,12 @@ type tapdHarness struct { // the stop() method. processDone chan struct{} + // processErr stores an unexpected process exit error captured by the + // background wait goroutine. It is reported in stop(), where using + // testing logs is safe. + processErr error + processErrMu sync.Mutex + // logFile is the log file for this tapd instance's output. logFile *os.File @@ -630,14 +637,19 @@ func (hs *tapdHarness) start(expectErrExit bool) error { hs.cmd.Process.Pid) // Wait for the process to exit in the background. If it exits - // unexpectedly, log the error. Signal processDone when complete - // so stop() can wait for the process to fully exit. + // unexpectedly, capture the error and report it in stop(). Signal + // processDone when complete so stop() can wait for the process to + // fully exit. hs.processDone = make(chan struct{}) + hs.processErrMu.Lock() + hs.processErr = nil + hs.processErrMu.Unlock() go func() { err := hs.cmd.Wait() if err != nil && !expectErrExit { - hs.ht.t.Logf("tapd process (name=%v) exited with "+ - "error: %v", hs.cfg.LndNode.Cfg.Name, err) + hs.processErrMu.Lock() + hs.processErr = err + hs.processErrMu.Unlock() } close(hs.processDone) }() @@ -760,6 +772,15 @@ func (hs *tapdHarness) stop(deleteData bool) error { } } + hs.processErrMu.Lock() + processErr := hs.processErr + hs.processErr = nil + hs.processErrMu.Unlock() + if processErr != nil { + hs.ht.t.Logf("tapd process (name=%v) exited with error: %v", + hs.cfg.LndNode.Cfg.Name, processErr) + } + // Close the log file. if hs.logFile != nil { _ = hs.logFile.Close() diff --git a/log.go b/log.go index 8c9d4a9651..5b2496e167 100644 --- a/log.go +++ b/log.go @@ -6,6 +6,7 @@ import ( "github.com/lightninglabs/taproot-assets/authmailbox" "github.com/lightninglabs/taproot-assets/backup" "github.com/lightninglabs/taproot-assets/commitment" + "github.com/lightninglabs/taproot-assets/diagnostics" "github.com/lightninglabs/taproot-assets/healthcheck" "github.com/lightninglabs/taproot-assets/lndservices" "github.com/lightninglabs/taproot-assets/monitoring" @@ -150,6 +151,9 @@ func SetupLoggers(root *build.SubLoggerManager, AddSubLogger( root, backup.Subsystem, interceptor, backup.UseLogger, ) + AddSubLogger( + root, diagnostics.Subsystem, interceptor, diagnostics.UseLogger, + ) } // AddSubLogger is a helper method to conveniently create and register the diff --git a/sample-tapd.conf b/sample-tapd.conf index 70551b6299..c3c697ec70 100644 --- a/sample-tapd.conf +++ b/sample-tapd.conf @@ -36,6 +36,10 @@ ; The directory to store tapd's data within ; datadir=~/.tapd/data +; Root directory for proof-validation diagnostics artifacts. Diagnostics mode is +; disabled unless this is set. +; diagnostics-dir= + ; Directory to log output ; logdir=~/.tapd/logs diff --git a/server.go b/server.go index a2ca0dfb3d..6d061121d8 100644 --- a/server.go +++ b/server.go @@ -187,6 +187,18 @@ func (s *Server) initialize(interceptorChain *rpcperms.InterceptorChain) error { } } + // Start diagnostics mode first so startup errors are surfaced before + // sub-systems begin their work. + if s.cfg.DiagnosticsService != nil { + if err := s.cfg.DiagnosticsService.Start(); err != nil { + return fmt.Errorf("unable to start diagnostics "+ + "service: %w", err) + } + + shutdownFuncs["diagnosticsService"] = + s.cfg.DiagnosticsService.Stop + } + // First, we'll start the main batched asset minter. if err := s.cfg.AssetMinter.Start(); err != nil { return fmt.Errorf("unable to start asset minter: %w", err) @@ -886,6 +898,12 @@ func (s *Server) Stop() error { return err } + if s.cfg.DiagnosticsService != nil { + if err := s.cfg.DiagnosticsService.Stop(); err != nil { + return err + } + } + if s.macaroonService != nil { err := s.macaroonService.Stop() if err != nil { diff --git a/tapcfg/config.go b/tapcfg/config.go index fc19a37656..4bd30ff6ee 100644 --- a/tapcfg/config.go +++ b/tapcfg/config.go @@ -398,6 +398,7 @@ type Config struct { DataDir string `long:"datadir" description:"The directory to store tapd's data within"` LogDir string `long:"logdir" description:"Directory to log output."` + DiagnosticsDir string `long:"diagnostics-dir" description:"The root directory for tapd diagnostics artifacts. Diagnostics mode is disabled unless this is set."` MaxLogFiles int `long:"maxlogfiles" hidden:"true" description:"DEPRECATED! Use logging.file.max-files instead. Maximum logfiles to keep (0 for no rotation)"` MaxLogFileSize int `long:"maxlogfilesize" hidden:"true" description:"DEPRECATED! Use logging.file.max-file-size instead. Maximum logfile size in MB"` @@ -784,6 +785,7 @@ func ValidateConfig(cfg Config, cfgLogger btclog.Logger) (*Config, error) { cfg.RpcConf.TLSCertPath = CleanAndExpandPath(cfg.RpcConf.TLSCertPath) cfg.RpcConf.TLSKeyPath = CleanAndExpandPath(cfg.RpcConf.TLSKeyPath) cfg.LogDir = CleanAndExpandPath(cfg.LogDir) + cfg.DiagnosticsDir = CleanAndExpandPath(cfg.DiagnosticsDir) cfg.RpcConf.MacaroonPath = CleanAndExpandPath(cfg.RpcConf.MacaroonPath) if cfg.HashMailCourier != nil { @@ -930,6 +932,9 @@ func ValidateConfig(cfg Config, cfgLogger btclog.Logger) (*Config, error) { filepath.Dir(cfg.RpcConf.TLSKeyPath), filepath.Dir(cfg.RpcConf.MacaroonPath), } + if cfg.DiagnosticsDir != "" { + dirs = append(dirs, cfg.DiagnosticsDir) + } for _, dir := range dirs { if err := makeDirectory(dir); err != nil { return nil, err diff --git a/tapcfg/server.go b/tapcfg/server.go index 444a7176cb..4a209ee23f 100644 --- a/tapcfg/server.go +++ b/tapcfg/server.go @@ -15,6 +15,7 @@ import ( "github.com/lightninglabs/taproot-assets/address" "github.com/lightninglabs/taproot-assets/asset" "github.com/lightninglabs/taproot-assets/authmailbox" + "github.com/lightninglabs/taproot-assets/diagnostics" "github.com/lightninglabs/taproot-assets/fn" "github.com/lightninglabs/taproot-assets/healthcheck" "github.com/lightninglabs/taproot-assets/lndservices" @@ -269,6 +270,17 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, assetStore, proofFileStore, ) + var diagnosticsService *diagnostics.Service + if cfg.DiagnosticsDir != "" { + diagnosticsService, err = diagnostics.NewService( + cfg.DiagnosticsDir, tap.Version(), + ) + if err != nil { + return nil, fmt.Errorf("unable to create diagnostics "+ + "service: %w", err) + } + } + federationMembers := cfg.Universe.FederationServers switch cfg.ChainConf.Network { case "mainnet": @@ -695,6 +707,7 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, ErrChan: mainErrChan, BurnCommitter: supplyCommitManager, DelegationKeyChecker: addrBook, + DiagnosticsRecorder: diagnosticsService, }, ) @@ -836,6 +849,7 @@ func genServerConfig(cfg *Config, cfgLogger btclog.Logger, ProofArchive: proofArchive, AssetWallet: assetWallet, CoinSelect: coinSelect, + DiagnosticsService: diagnosticsService, ChainPorter: chainPorter, DisableSweepOrphanUtxos: cfg.Wallet.DisableSweepOrphanUtxos, FsmDaemonAdapters: lndFsmDaemonAdapters, diff --git a/tapconfig/config.go b/tapconfig/config.go index 8b054778ff..ac52d96674 100644 --- a/tapconfig/config.go +++ b/tapconfig/config.go @@ -9,6 +9,7 @@ import ( "github.com/lightninglabs/lndclient" "github.com/lightninglabs/taproot-assets/address" "github.com/lightninglabs/taproot-assets/authmailbox" + "github.com/lightninglabs/taproot-assets/diagnostics" "github.com/lightninglabs/taproot-assets/healthcheck" "github.com/lightninglabs/taproot-assets/lndservices" "github.com/lightninglabs/taproot-assets/monitoring" @@ -223,6 +224,8 @@ type Config struct { CoinSelect *tapfreighter.CoinSelect + DiagnosticsService *diagnostics.Service + ChainPorter tapfreighter.Porter // DisableSweepOrphanUtxos disables sweeping orphaned UTXOs into diff --git a/tapfreighter/chain_porter.go b/tapfreighter/chain_porter.go index 0dd2dd221b..2345e2f4d4 100644 --- a/tapfreighter/chain_porter.go +++ b/tapfreighter/chain_porter.go @@ -18,6 +18,7 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/lightninglabs/taproot-assets/address" "github.com/lightninglabs/taproot-assets/asset" + "github.com/lightninglabs/taproot-assets/diagnostics" "github.com/lightninglabs/taproot-assets/fn" "github.com/lightninglabs/taproot-assets/proof" "github.com/lightninglabs/taproot-assets/tapgarden" @@ -121,6 +122,10 @@ type ChainPorterConfig struct { // key for a given asset, which is required for creating supply // commitments. DelegationKeyChecker address.DelegationKeyChecker + + // DiagnosticsRecorder is an optional diagnostics sink for proof + // validation failures. + DiagnosticsRecorder diagnostics.Recorder } // ChainPorter is the main sub-system of the tapfreighter package. The porter @@ -621,6 +626,35 @@ func (p *ChainPorter) storeProofs(sendPkg *sendPackage) error { ctx, vCtx, outputProof, ) if err != nil { + if p.cfg.DiagnosticsRecorder != nil { + inputProofArtifacts, inputArtifactsErr := + p.collectInputProofArtifacts( + ctx, + inputsForAsset, + ) + if inputArtifactsErr != nil { + // Diagnostics capture is best-effort + // and must not hide the underlying + // proof verification failure. + log.Warnf("Unable to collect input "+ + "proof artifacts for "+ + "diagnostics "+ + "(output_idx=%d): %v", idx, + inputArtifactsErr) + } + + hash := parcel.AnchorTx.TxHash().String() + p.reportProofValidationFailure( + buildPostBroadcastProofFailureReport( + err, + hash, + idx, + outputProof.Blob, + inputProofArtifacts, + ), + ) + } + return fmt.Errorf("error verifying proof: %w", err) } @@ -1578,6 +1612,29 @@ func (p *ChainPorter) verifyOutputProofPreBroadcast(ctx context.Context, proof.WithSkipTimeLockValidationForFinalProof(), ) if err != nil { + if p.cfg.DiagnosticsRecorder != nil { + inputProofArtifacts, inputArtifactsErr := + p.collectInputProofArtifacts( + ctx, inputsForAsset, + ) + if inputArtifactsErr != nil { + log.Warnf("Unable to collect input proof "+ + "artifacts for diagnostics "+ + "(vpkt_idx=%d, "+ + "output_idx=%d): "+ + "%v", pktIdx, outIdx, + inputArtifactsErr) + } + + p.reportProofValidationFailure( + buildPreBroadcastProofFailureReport( + err, pktIdx, outIdx, + proofFileBuf.Bytes(), + inputProofArtifacts, + ), + ) + } + return fmt.Errorf("output proof verification "+ "failed (vpkt_idx=%d, "+ "output_idx=%d): %w", pktIdx, @@ -1587,6 +1644,125 @@ func (p *ChainPorter) verifyOutputProofPreBroadcast(ctx context.Context, return nil } +// reportProofValidationFailure forwards a constructed failure snapshot to +// the optional diagnostics recorder. Non-blocking behavior lives in +// diagnostics.Service.CaptureProofValidationFailure; this method is a +// no-op when no recorder is configured. +func (p *ChainPorter) reportProofValidationFailure( + failure diagnostics.ProofValidationFailure, +) { + + if p.cfg.DiagnosticsRecorder == nil { + return + } + + p.cfg.DiagnosticsRecorder.CaptureProofValidationFailure(failure) +} + +// buildPreBroadcastProofFailureReport builds a +// diagnostics.ProofValidationFailure for the pre_broadcast stage: error +// text, virtual packet indices, the output proof blob, and any input +// proof artifacts gathered for the support bundle. +func buildPreBroadcastProofFailureReport(err error, pktIdx, outIdx int, + outputProofBlob []byte, + inputProofs []diagnostics.ArtifactFile, +) diagnostics.ProofValidationFailure { + + pktIdxCopy := pktIdx + outIdxCopy := outIdx + preBroadcastStage := diagnostics.StageProofVerificationPreBroadcast + outputProofCopy := append([]byte(nil), outputProofBlob...) + + return diagnostics.ProofValidationFailure{ + Stage: preBroadcastStage, + Error: err.Error(), + VPacketIndex: &pktIdxCopy, + VPacketOutputIndex: &outIdxCopy, + OutputProofs: []diagnostics.ArtifactFile{ + { + FileName: "output-proof.bin", + Data: outputProofCopy, + }, + }, + InputProofs: inputProofs, + } +} + +// buildPostBroadcastProofFailureReport builds a +// diagnostics.ProofValidationFailure for the post_broadcast stage after +// the anchor transaction is known, including txid, transfer output +// index, and proof blobs for support. +func buildPostBroadcastProofFailureReport(err error, anchorTxID string, + transferOutputIdx int, outputProofBlob []byte, + inputProofs []diagnostics.ArtifactFile, +) diagnostics.ProofValidationFailure { + + transferOutputIdxCopy := transferOutputIdx + postBroadcastStage := diagnostics.StageProofVerificationPostBroadcast + outputFileName := fmt.Sprintf("output-proof-%d.bin", transferOutputIdx) + outputProofCopy := append([]byte(nil), outputProofBlob...) + + return diagnostics.ProofValidationFailure{ + Stage: postBroadcastStage, + Error: err.Error(), + AnchorTxID: anchorTxID, + TransferOutputIndex: &transferOutputIdxCopy, + OutputProofs: []diagnostics.ArtifactFile{ + { + FileName: outputFileName, + Data: outputProofCopy, + }, + }, + InputProofs: inputProofs, + } +} + +// encodeProofFile serializes a proof file into bytes for diagnostics +// attachment. +func encodeProofFile(inputProofFile *proof.File) ([]byte, error) { + var proofFileBuf bytes.Buffer + err := inputProofFile.Encode(&proofFileBuf) + if err != nil { + return nil, err + } + + return proofFileBuf.Bytes(), nil +} + +// collectInputProofArtifacts fetches and encodes each input proof for +// diagnostics. It fails fast on the first fetch or encode error because +// this runs only after proof verification already failed; returning a +// partial artifact set could mislead support, so callers get nothing +// unless the full set is collected. +func (p *ChainPorter) collectInputProofArtifacts(ctx context.Context, + inputs []asset.PrevID) ([]diagnostics.ArtifactFile, error) { + + artifacts := make([]diagnostics.ArtifactFile, 0, len(inputs)) + + // Fail immediately on any input proof error so we never persist a + // partial set that could be mistaken for a complete bundle. + for idx := range inputs { + inputProofFile, err := p.fetchInputProof(ctx, inputs[idx]) + if err != nil { + return nil, fmt.Errorf("fetch input proof %d: %w", + idx, err) + } + + blob, err := encodeProofFile(inputProofFile) + if err != nil { + return nil, fmt.Errorf("encode input proof %d: %w", + idx, err) + } + + artifacts = append(artifacts, diagnostics.ArtifactFile{ + FileName: fmt.Sprintf("input-proof-%d.bin", idx), + Data: blob, + }) + } + + return artifacts, nil +} + // verifyPacketInputProofs ensures that each virtual packet's inputs reference // a valid Taproot Asset commitment before the package is broadcast. func (p *ChainPorter) verifyPacketInputProofs(ctx context.Context, diff --git a/tapfreighter/chain_porter_test.go b/tapfreighter/chain_porter_test.go index 2c6f658e75..7337f50822 100644 --- a/tapfreighter/chain_porter_test.go +++ b/tapfreighter/chain_porter_test.go @@ -1,6 +1,7 @@ package tapfreighter import ( + "errors" "math/rand" "os" "testing" @@ -9,6 +10,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btclog/v2" "github.com/lightninglabs/taproot-assets/asset" + "github.com/lightninglabs/taproot-assets/diagnostics" "github.com/lightninglabs/taproot-assets/tappsbt" "github.com/stretchr/testify/require" ) @@ -108,3 +110,61 @@ func TestVerifySplitCommitmentWitnesses(t *testing.T) { }) } } + +func TestBuildPreBroadcastProofFailureReport(t *testing.T) { + t.Parallel() + + inputProofs := []diagnostics.ArtifactFile{ + { + FileName: "input-proof-0.bin", + Data: []byte{1, 2, 3}, + }, + } + + report := buildPreBroadcastProofFailureReport( + errors.New("boom"), 4, 7, []byte{9, 9}, inputProofs, + ) + + require.Equal( + t, diagnostics.StageProofVerificationPreBroadcast, + report.Stage, + ) + require.Equal(t, "boom", report.Error) + require.NotNil(t, report.VPacketIndex) + require.Equal(t, 4, *report.VPacketIndex) + require.NotNil(t, report.VPacketOutputIndex) + require.Equal(t, 7, *report.VPacketOutputIndex) + require.Len(t, report.OutputProofs, 1) + require.Equal(t, "output-proof.bin", report.OutputProofs[0].FileName) + require.Equal(t, []byte{9, 9}, report.OutputProofs[0].Data) + require.Len(t, report.InputProofs, 1) +} + +func TestBuildPostBroadcastProofFailureReport(t *testing.T) { + t.Parallel() + + inputProofs := []diagnostics.ArtifactFile{ + { + FileName: "input-proof-0.bin", + Data: []byte{1, 2, 3}, + }, + } + + report := buildPostBroadcastProofFailureReport( + errors.New("verify failed"), "abcd", 2, []byte{7, 8}, + inputProofs, + ) + + require.Equal( + t, diagnostics.StageProofVerificationPostBroadcast, + report.Stage, + ) + require.Equal(t, "verify failed", report.Error) + require.Equal(t, "abcd", report.AnchorTxID) + require.NotNil(t, report.TransferOutputIndex) + require.Equal(t, 2, *report.TransferOutputIndex) + require.Len(t, report.OutputProofs, 1) + require.Equal(t, "output-proof-2.bin", report.OutputProofs[0].FileName) + require.Equal(t, []byte{7, 8}, report.OutputProofs[0].Data) + require.Len(t, report.InputProofs, 1) +} diff --git a/tapgarden/planter_test.go b/tapgarden/planter_test.go index 6328586010..c970d2fe11 100644 --- a/tapgarden/planter_test.go +++ b/tapgarden/planter_test.go @@ -1311,11 +1311,13 @@ func testMintingTicker(t *mintingTestHarness) { t.queueSeedlingsInBatch(false, seedlings...) // Next, finalize the pending batch to continue with minting. - _ = t.finalizeBatchAssertFrozen(false) + currentBatch := t.finalizeBatchAssertFrozen(false) + require.NotNil(t, currentBatch) + require.NotNil(t, currentBatch.BatchKey.PubKey) // A single caretaker should have been launched as well. Next, assert // that the batch is already funded. - currentBatch := t.assertBatchProgressing() + currentBatch = t.assertBatchCommitted(currentBatch.BatchKey.PubKey) t.assertBatchGenesisTx(¤tBatch.GenesisPacket.FundedPsbt) // Now that the batch has been ticked, and the caretaker started, there @@ -1413,7 +1415,7 @@ func testMintingCancelFinalize(t *mintingTestHarness) { require.NotNil(t, thirdBatch.BatchKey.PubKey) thirdBatchKey := thirdBatch.BatchKey.PubKey - thirdBatch = t.assertBatchProgressing() + thirdBatch = t.assertBatchCommitted(thirdBatchKey) t.assertBatchGenesisTx(&thirdBatch.GenesisPacket.FundedPsbt) // Now that the batch has been ticked, and the caretaker started, there From 759fd4987d0f5a063007d335737148a0f5c634ec Mon Sep 17 00:00:00 2001 From: "Sergey B." Date: Tue, 12 May 2026 10:36:27 +0300 Subject: [PATCH 2/2] docs: updating release notes --- docs/release-notes/release-notes-0.8.0.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docs/release-notes/release-notes-0.8.0.md b/docs/release-notes/release-notes-0.8.0.md index f34ee0bc20..d4a0cfefff 100644 --- a/docs/release-notes/release-notes-0.8.0.md +++ b/docs/release-notes/release-notes-0.8.0.md @@ -398,6 +398,14 @@ `universe.mbox-cleanup-check-timeout` to configure periodic cleanup of auth mailbox messages whose claimed outpoints have been spent on chain. +- [PR#2060](https://github.com/lightninglabs/taproot-assets/pull/2060) + Add `diagnostics-dir` to enable diagnostics mode. When set, tapd creates + a per-run diagnostics directory and stores proof-validation failure + artifacts (including generated output proofs, related input proofs, and + metadata) for transfer troubleshooting. Failure metadata is captured as an + immutable snapshot, and pre-broadcast reports include collected input-proof + artifacts for consistent diagnostics. + ## Breaking Changes - [PR#1935](https://github.com/lightninglabs/taproot-assets/pull/1935)