From b054513ef81be18bcda53f78c797dd2cb35f012f Mon Sep 17 00:00:00 2001 From: "Sergey B." Date: Wed, 13 May 2026 15:40:10 +0300 Subject: [PATCH 1/2] fix(tapfreighter): recover stranded sweeper parcels without restart --- itest/custom_channels/force_close_test.go | 4 +- .../group_tranches_force_close_test.go | 4 +- itest/custom_channels/helpers.go | 148 ++++++++++-- itest/custom_channels/vars.go | 9 +- tapchannel/aux_closer.go | 8 +- tapchannel/aux_sweeper.go | 4 +- tapfreighter/chain_porter.go | 222 ++++++++++++++++-- tapfreighter/chain_porter_test.go | 121 ++++++++++ 8 files changed, 470 insertions(+), 50 deletions(-) diff --git a/itest/custom_channels/force_close_test.go b/itest/custom_channels/force_close_test.go index 419a41227f..ad223bf0e6 100644 --- a/itest/custom_channels/force_close_test.go +++ b/itest/custom_channels/force_close_test.go @@ -235,7 +235,9 @@ func testCustomChannelsForceClose(ctx context.Context, // This'll sweep his non-delay commitment output. We use the txid from // the mined block (not the mempool) to avoid RBF mismatches. daveSweepBlocks := mineBlocks(t, net, 1, 1) - daveSweepTxHash := daveSweepBlocks[0].Transactions[1].TxHash() + daveSweepTxHash := resolveMinedTransferTxid( + t.t, dave, daveSweepBlocks[0], + ) t.Logf("Dave sweep txid: %v", daveSweepTxHash) diff --git a/itest/custom_channels/group_tranches_force_close_test.go b/itest/custom_channels/group_tranches_force_close_test.go index e088f11320..4778283f7f 100644 --- a/itest/custom_channels/group_tranches_force_close_test.go +++ b/itest/custom_channels/group_tranches_force_close_test.go @@ -268,7 +268,9 @@ func testCustomChannelsGroupTranchesForceClose(ctx context.Context, // Now we'll mine a block to confirm Erin's sweep transaction. We use // the txid from the mined block to avoid RBF mismatches. erinSweepBlocks := mineBlocks(t, net, 1, 1) - erinSweepTxHash := erinSweepBlocks[0].Transactions[1].TxHash() + erinSweepTxHash := resolveMinedTransferTxid( + t.t, erin, erinSweepBlocks[0], + ) t.Logf("Erin sweep txid: %v", erinSweepTxHash) diff --git a/itest/custom_channels/helpers.go b/itest/custom_channels/helpers.go index 5245d60a94..97f7cb12a7 100644 --- a/itest/custom_channels/helpers.go +++ b/itest/custom_channels/helpers.go @@ -1384,7 +1384,11 @@ func waitForSendEvent(t *testing.T, for { sendEvent, err := sendEvents.Recv() - require.NoError(t, err) + if err != nil { + t.Logf("send event stream ended before state %v: %v", + expectedState, err) + return + } if sendEvent.SendState == expectedState.String() { return @@ -1422,14 +1426,14 @@ func closeAssetChannelAndAssert(t *ccHarnessTest, ctxt, cancel := context.WithTimeout(ctxb, wait.DefaultTimeout) defer cancel() - closeStream, _, err := net.CloseChannel(local, chanPoint, false) - require.NoError(t.t, err) - sendEvents, err := local.SubscribeSendEvents( ctxt, &taprpc.SubscribeSendEventsRequest{}, ) require.NoError(t.t, err) + closeStream, _, err := net.CloseChannel(local, chanPoint, false) + require.NoError(t.t, err) + assertWaitingCloseChannelAssetData(t.t, local, chanPoint) assertWaitingCloseChannelAssetData(t.t, remote, chanPoint) @@ -2530,8 +2534,17 @@ func assertSpendableBalance(t *testing.T, client *itest.IntegratedNode, func locateAssetTransfers(t *testing.T, node *itest.IntegratedNode, txid chainhash.Hash) *taprpc.AssetTransfer { - var transfer *taprpc.AssetTransfer - err := wait.NoError(func() error { + var ( + transfer *taprpc.AssetTransfer + lastErr error + transferCount int + blockHashSet bool + blockHeight uint32 + blockHeightHint uint32 + pollInterval = 200 * time.Millisecond + ) + + require.Eventually(t, func() bool { ctxb := context.Background() forceCloseTransfer, err := node.ListTransfers( ctxb, &taprpc.ListTransfersRequest{ @@ -2539,29 +2552,112 @@ func locateAssetTransfers(t *testing.T, node *itest.IntegratedNode, }, ) if err != nil { - return fmt.Errorf("unable to list %v transfers: %w", + lastErr = fmt.Errorf("unable to list %v transfers: %w", node.Cfg.Name, err) + return false } - if len(forceCloseTransfer.Transfers) != 1 { - return fmt.Errorf("%v is expecting %d transfers, "+ - "has %d", node.Cfg.Name, 1, - len(forceCloseTransfer.Transfers)) + + transferCount = len(forceCloseTransfer.Transfers) + if transferCount != 1 { + lastErr = fmt.Errorf("%v expected %d transfers, got %d", + node.Cfg.Name, 1, transferCount) + return false } transfer = forceCloseTransfer.Transfers[0] + blockHashSet = transfer.AnchorTxBlockHash != nil + blockHeight = transfer.AnchorTxBlockHeight + blockHeightHint = transfer.AnchorTxHeightHint - if transfer.AnchorTxBlockHash == nil { - return fmt.Errorf("missing anchor block hash, " + - "transfer not confirmed") + // Anchor tx confirmation metadata is populated asynchronously. + if !blockHashSet || blockHeight == 0 || blockHeightHint == 0 { + lastErr = fmt.Errorf("transfer not confirmed yet") + return false } - return nil - }, ccTransferTimeout) - require.NoError(t, err) + lastErr = nil + return true + }, ccTransferConfirmTimeout, pollInterval, + "failed to locate confirmed transfer for %v (node=%v): "+ + "transfers=%d block_hash_set=%v block_height=%d "+ + "height_hint=%d last_err=%v", + txid, node.Cfg.Name, transferCount, blockHashSet, blockHeight, + blockHeightHint, lastErr, + ) return transfer } +// resolveMinedTransferTxid returns the transaction ID from a mined block that +// is associated with an asset transfer for the given node. +func resolveMinedTransferTxid(t *testing.T, node *itest.IntegratedNode, + block *wire.MsgBlock) chainhash.Hash { + + t.Helper() + + require.Greater( + t, len(block.Transactions), 1, + "expected at least one non-coinbase transaction in block", + ) + + candidateTxids := make([]chainhash.Hash, 0, len(block.Transactions)-1) + for i := 1; i < len(block.Transactions); i++ { + candidateTxids = append( + candidateTxids, block.Transactions[i].TxHash(), + ) + } + + var ( + matchedTxid chainhash.Hash + matchCount int + lastErr error + ) + + require.Eventually(t, func() bool { + matchCount = 0 + lastErr = nil + + ctxb := context.Background() + for _, txid := range candidateTxids { + transfers, err := node.ListTransfers( + ctxb, &taprpc.ListTransfersRequest{ + AnchorTxid: txid.String(), + }, + ) + if err != nil { + lastErr = fmt.Errorf( + "unable to list %v transfers: %w", + node.Cfg.Name, err, + ) + return false + } + + if len(transfers.Transfers) == 0 { + continue + } + + matchedTxid = txid + matchCount++ + } + + if matchCount != 1 { + lastErr = fmt.Errorf( + "expected 1 matching transfer tx, got %d", + matchCount, + ) + return false + } + + return true + }, ccTransferConfirmTimeout, 200*time.Millisecond, + "failed to resolve transfer tx in mined block for %v: "+ + "candidates=%v matches=%d last_err=%v", + node.Cfg.Name, candidateTxids, matchCount, lastErr, + ) + + return matchedTxid +} + // --------------------------------------------------------------------------- // HTLC event helpers // --------------------------------------------------------------------------- @@ -3087,8 +3183,12 @@ func assertForceCloseSweeps(ctx context.Context, // Wait for tapd to process the confirmed sweep transactions before // checking balances. We extract the txid from the mined blocks rather // than from the earlier mempool checks to avoid RBF mismatches. - bobSweepTxHash1 := bobSweepBlocks1[0].Transactions[1].TxHash() - bobSweepTxHash2 := bobSweepBlocks2[0].Transactions[1].TxHash() + bobSweepTxHash1 := resolveMinedTransferTxid( + t.t, bob, bobSweepBlocks1[0], + ) + bobSweepTxHash2 := resolveMinedTransferTxid( + t.t, bob, bobSweepBlocks2[0], + ) locateAssetTransfers(t.t, bob, bobSweepTxHash1) locateAssetTransfers(t.t, bob, bobSweepTxHash2) @@ -3120,7 +3220,9 @@ func assertForceCloseSweeps(ctx context.Context, // Wait for tapd to register the to-local sweep transfer. We use the // txid from the mined block to avoid RBF mismatches. - aliceToLocalHash := aliceToLocalBlocks[0].Transactions[1].TxHash() + aliceToLocalHash := resolveMinedTransferTxid( + t.t, alice, aliceToLocalBlocks[0], + ) locateAssetTransfers(t.t, alice, aliceToLocalHash) t.Logf("Confirming Alice's to-local sweep") @@ -3169,7 +3271,7 @@ func assertForceCloseSweeps(ctx context.Context, } // Use the txid from the mined block to avoid RBF mismatches. - sweepTxHash := sweepBlocks[0].Transactions[1].TxHash() + sweepTxHash := resolveMinedTransferTxid(t.t, alice, sweepBlocks[0]) locateAssetTransfers(t.t, alice, sweepTxHash) t.Logf("Confirming Alice's second level remote HTLC success sweep") @@ -3203,7 +3305,7 @@ func assertForceCloseSweeps(ctx context.Context, sweepBlocks = mineBlocks(t, net, 1, 1) } - sweepTxHash = sweepBlocks[0].Transactions[1].TxHash() + sweepTxHash = resolveMinedTransferTxid(t.t, alice, sweepBlocks[0]) locateAssetTransfers(t.t, alice, sweepTxHash) // With the sweep transaction confirmed, Alice's balance should have @@ -3372,7 +3474,7 @@ func assertForceCloseSweeps(ctx context.Context, sweepBlocks = mineBlocks(t, net, 1, 1) } - sweepTxHash = sweepBlocks[0].Transactions[1].TxHash() + sweepTxHash = resolveMinedTransferTxid(t.t, alice, sweepBlocks[0]) locateAssetTransfers(t.t, alice, sweepTxHash) return aliceExpectedBalance, bobExpectedBalance diff --git a/itest/custom_channels/vars.go b/itest/custom_channels/vars.go index 567ae4d92f..cb00777612 100644 --- a/itest/custom_channels/vars.go +++ b/itest/custom_channels/vars.go @@ -24,11 +24,14 @@ var ( ccShortTimeout = time.Second * 10 - // ccTransferTimeout is the timeout used when waiting for an asset - // transfer to appear as confirmed. On CI runners with multiple + // ccTransferTimeout is the timeout for transfer-related payment RPCs. + ccTransferTimeout = 2 * time.Minute + + // ccTransferConfirmTimeout is the timeout used when waiting for an + // asset transfer to appear as confirmed. On CI runners with multiple // parallel tranches, tapd block processing can be slow so we use a // generous timeout. - ccTransferTimeout = 2 * time.Minute + ccTransferConfirmTimeout = 5 * time.Minute ) // lndArgsTemplate contains lnd flags used by all custom channel test nodes. diff --git a/tapchannel/aux_closer.go b/tapchannel/aux_closer.go index d41f58edc0..3341425a91 100644 --- a/tapchannel/aux_closer.go +++ b/tapchannel/aux_closer.go @@ -650,7 +650,8 @@ func (a *AuxChanCloser) ShutdownBlob( func shipChannelTxn(txSender tapfreighter.Porter, chanTx *wire.MsgTx, outputCommitments tappsbt.OutputCommitments, vPkts []*tappsbt.VPacket, closeFee int64, - anchorTxHeightHint fn.Option[uint32]) error { + anchorTxHeightHint fn.Option[uint32], + skipAnchorTxBroadcast bool) error { chanTxPsbt, err := tapsend.PrepareAnchoringTemplate(vPkts) if err != nil { @@ -679,7 +680,7 @@ func shipChannelTxn(txSender tapfreighter.Porter, chanTx *wire.MsgTx, } parcelLabel := fmt.Sprintf("channel-tx-%s", chanTx.TxHash().String()) preSignedParcel := tapfreighter.NewPreAnchoredParcel( - vPkts, nil, closeAnchor, false, parcelLabel, + vPkts, nil, closeAnchor, skipAnchorTxBroadcast, parcelLabel, anchorTxHeightHint, ) _, err = txSender.RequestShipment(preSignedParcel) @@ -794,6 +795,7 @@ func (a *AuxChanCloser) FinalizeClose(desc types.AuxCloseDesc, // as the transaction is being broadcast now. return shipChannelTxn( a.cfg.TxSender, closeTx, closeInfo.outputCommitments, - closeInfo.vPackets, closeInfo.closeFee, fn.None[uint32](), + closeInfo.vPackets, closeInfo.closeFee, + fn.None[uint32](), false, ) } diff --git a/tapchannel/aux_sweeper.go b/tapchannel/aux_sweeper.go index 5af5255b8f..e199b6e7bb 100644 --- a/tapchannel/aux_sweeper.go +++ b/tapchannel/aux_sweeper.go @@ -1731,7 +1731,7 @@ func (a *AuxSweeper) importCommitTx(req lnwallet.ResolutionReq, return shipChannelTxn( a.cfg.TxSender, req.CommitTx, outCommitments, vPackets, - int64(req.CommitFee), heightHint, + int64(req.CommitFee), heightHint, true, ) } @@ -2590,7 +2590,7 @@ func (a *AuxSweeper) registerAndBroadcastSweep(req *sweep.BumpRequest, // and ship the transaction. return shipChannelTxn( a.cfg.TxSender, sweepTx, outCommitments, allVpkts, int64(fee), - heightHint, + heightHint, true, ) } diff --git a/tapfreighter/chain_porter.go b/tapfreighter/chain_porter.go index 0dd2dd221b..97f97d62b0 100644 --- a/tapfreighter/chain_porter.go +++ b/tapfreighter/chain_porter.go @@ -36,6 +36,19 @@ const ( // we expect a send fragment to be valid for after its claimed outpoint // has been spent. This is roughly equivalent to 90 days. DefaultSendFragmentExpiryDelta = 12_960 + + // postDeliveryRetryBaseDelay is the initial backoff delay used when a + // parcel fails after we already returned the response to the caller. + postDeliveryRetryBaseDelay = time.Second + + // postDeliveryRetryMaxDelay is the maximum retry delay used for + // recoverable post-delivery failures. + postDeliveryRetryMaxDelay = 30 * time.Second + + // postDeliveryRetryMaxAttempts is the maximum number of retry + // attempts for recoverable post-delivery failures before the parcel is + // treated as permanently failed. + postDeliveryRetryMaxAttempts uint8 = 100 ) // VerifiedProofImporter is used to import verified proofs into the local proof @@ -145,6 +158,13 @@ type ChainPorter struct { // subscriberMtx guards the subscribers map. subscriberMtx sync.Mutex + // postDeliveryRetryMtx guards postDeliveryRetryAttempts. + postDeliveryRetryMtx sync.Mutex + + // postDeliveryRetryAttempts tracks retry attempts for recoverable + // post-delivery errors by anchor txid. + postDeliveryRetryAttempts map[chainhash.Hash]uint8 + *fn.ContextGuard } @@ -155,9 +175,10 @@ func NewChainPorter(cfg *ChainPorterConfig) *ChainPorter { map[uint64]*fn.EventReceiver[fn.Event], ) return &ChainPorter{ - cfg: cfg, - outboundParcels: make(chan Parcel), - subscribers: subscribers, + cfg: cfg, + outboundParcels: make(chan Parcel), + subscribers: subscribers, + postDeliveryRetryAttempts: make(map[chainhash.Hash]uint8), ContextGuard: &fn.ContextGuard{ DefaultTimeout: tapgarden.DefaultTimeout, Quit: make(chan struct{}), @@ -379,12 +400,28 @@ func (p *ChainPorter) advanceState(pkg *sendPackage, kit *parcelKit) { stateToExecute := pkg.SendState updatedPkg, err := p.stateStep(*pkg) if err != nil { - kit.errChan <- err + failedPkg := pkg + if updatedPkg != nil { + failedPkg = updatedPkg + } + + isRecoverable := p.schedulePostDeliveryRetry( + failedPkg, stateToExecute, err, + ) + if !isRecoverable { + p.clearPostDeliveryRetry(failedPkg) + p.reportParcelError(failedPkg, kit, err) + } + log.Errorf("Error evaluating state (%v): %v", pkg.SendState, err) + sendEventPkg := *pkg + if failedPkg != nil { + sendEventPkg = *failedPkg + } p.publishSubscriberEvent(newAssetSendErrorEvent( - err, stateToExecute, *pkg, + err, stateToExecute, sendEventPkg, )) return @@ -396,18 +433,166 @@ func (p *ChainPorter) advanceState(pkg *sendPackage, kit *parcelKit) { stateToExecute, *updatedPkg, )) + pkg = updatedPkg + // Exit the loop once the state machine has executed its final // state. - if pkg.SendState == SendStateComplete { + if stateToExecute == SendStateComplete { + p.clearPostDeliveryRetry(pkg) + log.Infof("ChainPorter completed state machine for "+ "parcel (anchor_txid=%v)", - updatedPkg.OutboundPkg.AnchorTx.TxHash()) + pkg.OutboundPkg.AnchorTx.TxHash()) return } + } +} - pkg = updatedPkg +// reportParcelError attempts to report an error for a failed parcel. +// +// For caller-initiated parcels, this blocks to preserve the existing +// synchronous error delivery behavior of RequestShipment. For background +// parcels (such as resumed parcels), the send is non-blocking to prevent +// goroutine stalls when there is no active listener. +func (p *ChainPorter) reportParcelError(pkg *sendPackage, kit *parcelKit, + err error) { + + if kit == nil || kit.errChan == nil { + return + } + + if pkg != nil && pkg.Parcel != nil { + select { + case kit.errChan <- err: + case <-p.Quit: + } + + return + } + + select { + case kit.errChan <- err: + default: + } +} + +// isRecoverablePostDeliveryState returns true if a state can be retried after +// the caller has already received a response. +func isRecoverablePostDeliveryState(state SendState) bool { + switch state { + case SendStateWaitTxConf: + fallthrough + case SendStateStorePostAnchorTxConf: + fallthrough + case SendStateTransferProofs: + return true + + default: + return false + } +} + +// nextPostDeliveryRetryDelay returns the next capped exponential backoff delay +// for a post-delivery parcel retry. +func nextPostDeliveryRetryDelay(attempt uint8) time.Duration { + delay := postDeliveryRetryBaseDelay + + for i := uint8(0); i < attempt; i++ { + delay *= 2 + if delay >= postDeliveryRetryMaxDelay { + return postDeliveryRetryMaxDelay + } + } + + return delay +} + +// schedulePostDeliveryRetry schedules a pending parcel retry when a recoverable +// post-delivery state fails. +func (p *ChainPorter) schedulePostDeliveryRetry(pkg *sendPackage, + failedState SendState, stateErr error) bool { + + switch { + case !isRecoverablePostDeliveryState(failedState): + return false + + case pkg == nil || pkg.OutboundPkg == nil: + return false + + case pkg.OutboundPkg.AnchorTx == nil: + return false + } + + anchorTxID := pkg.OutboundPkg.AnchorTx.TxHash() + + p.postDeliveryRetryMtx.Lock() + attempt := p.postDeliveryRetryAttempts[anchorTxID] + if attempt >= postDeliveryRetryMaxAttempts { + delete(p.postDeliveryRetryAttempts, anchorTxID) + p.postDeliveryRetryMtx.Unlock() + + log.Errorf("Recoverable send state failure exceeded retry "+ + "limit, marking parcel as permanently failed "+ + "(anchor_txid=%v, state=%v, max_retry_attempts=%d): %v", + anchorTxID, failedState, postDeliveryRetryMaxAttempts, + stateErr) + + return false + } + + nextAttempt := attempt + 1 + p.postDeliveryRetryAttempts[anchorTxID] = nextAttempt + p.postDeliveryRetryMtx.Unlock() + + delay := nextPostDeliveryRetryDelay(attempt) + log.Warnf( + "Recoverable send state failure, retrying pending parcel "+ + "(anchor_txid=%v, state=%v, retry_attempt=%d, "+ + "retry_delay=%v): %v", + anchorTxID, failedState, nextAttempt, delay, stateErr) + + pendingParcel := NewPendingParcel(pkg.OutboundPkg) + + go func() { + timer := time.NewTimer(delay) + defer timer.Stop() + + select { + case <-timer.C: + pendingRetry := Parcel(pendingParcel) + if !fn.SendOrQuit( + p.outboundParcels, pendingRetry, p.Quit, + ) { + + return + } + + log.Warnf("Re-queued pending parcel after recoverable "+ + "failure (anchor_txid=%v, retry_attempt=%d)", + anchorTxID, nextAttempt) + + case <-p.Quit: + return + } + }() + + return true +} + +// clearPostDeliveryRetry removes any retry bookkeeping for a parcel. +func (p *ChainPorter) clearPostDeliveryRetry(pkg *sendPackage) { + if pkg == nil || pkg.OutboundPkg == nil || + pkg.OutboundPkg.AnchorTx == nil { + + return } + + anchorTxID := pkg.OutboundPkg.AnchorTx.TxHash() + + p.postDeliveryRetryMtx.Lock() + delete(p.postDeliveryRetryAttempts, anchorTxID) + p.postDeliveryRetryMtx.Unlock() } // waitForTransferTxConf waits for the confirmation of the final transaction @@ -2022,6 +2207,17 @@ func (p *ChainPorter) stateStep(currentPkg sendPackage) (*sendPackage, error) { "disk: %w", err) } + ctx, cancel = p.WithCtxQuitNoTimeout() + defer cancel() + + err = p.importLocalAddresses(ctx, currentPkg.OutboundPkg) + if err != nil { + p.unlockInputs(ctx, ¤tPkg) + + return nil, fmt.Errorf("unable to import local "+ + "addresses: %w", err) + } + // If skip flag is set—bypass anchor broadcast and advance to // the confirmation wait state. if currentPkg.OutboundPkg.SkipAnchorTxBroadcast { @@ -2044,20 +2240,12 @@ func (p *ChainPorter) stateStep(currentPkg sendPackage) (*sendPackage, error) { ctx, cancel := p.WithCtxQuitNoTimeout() defer cancel() - err := p.importLocalAddresses(ctx, currentPkg.OutboundPkg) - if err != nil { - p.unlockInputs(ctx, ¤tPkg) - - return nil, fmt.Errorf("unable to import local "+ - "addresses: %w", err) - } - txHash := currentPkg.OutboundPkg.AnchorTx.TxHash() log.Infof("Broadcasting new transfer tx, txid=%v", txHash) // With the public key imported, we can now broadcast to the // network. - err = p.cfg.ChainBridge.PublishTransaction( + err := p.cfg.ChainBridge.PublishTransaction( ctx, currentPkg.OutboundPkg.AnchorTx, TransferTxLabel, ) switch { diff --git a/tapfreighter/chain_porter_test.go b/tapfreighter/chain_porter_test.go index 2c6f658e75..d100d497f2 100644 --- a/tapfreighter/chain_porter_test.go +++ b/tapfreighter/chain_porter_test.go @@ -1,6 +1,7 @@ package tapfreighter import ( + "errors" "math/rand" "os" "testing" @@ -17,6 +18,22 @@ func TestRunChainPorter(t *testing.T) { t.Parallel() } +func newTestChainPorter() *ChainPorter { + porter := NewChainPorter(&ChainPorterConfig{}) + porter.outboundParcels = make(chan Parcel, 1) + + return porter +} + +func newTestSendPackage(state SendState) *sendPackage { + return &sendPackage{ + SendState: state, + OutboundPkg: &OutboundParcel{ + AnchorTx: wire.NewMsgTx(2), + }, + } +} + func init() { rand.Seed(time.Now().Unix()) @@ -108,3 +125,107 @@ func TestVerifySplitCommitmentWitnesses(t *testing.T) { }) } } + +func TestAdvanceStateNonBlockingErrSignalBackgroundParcel(t *testing.T) { + t.Parallel() + + porter := newTestChainPorter() + pkg := newTestSendPackage(SendStateStartHandleAddrParcel) + kit := &parcelKit{ + errChan: make(chan error), + } + + done := make(chan struct{}) + go func() { + defer close(done) + + porter.advanceState(pkg, kit) + }() + + select { + case <-done: + case <-time.After(200 * time.Millisecond): + t.Fatalf("advanceState blocked on background error delivery") + } +} + +func TestAdvanceStatePermanentFailureClearsRetryBookkeeping(t *testing.T) { + t.Parallel() + + porter := newTestChainPorter() + pkg := newTestSendPackage(SendStateStartHandleAddrParcel) + pkg.Parcel = NewPendingParcel(pkg.OutboundPkg) + + txID := pkg.OutboundPkg.AnchorTx.TxHash() + porter.postDeliveryRetryAttempts[txID] = 2 + + kit := &parcelKit{ + errChan: make(chan error, 1), + } + + porter.advanceState(pkg, kit) + + select { + case err := <-kit.errChan: + require.Error(t, err) + case <-time.After(200 * time.Millisecond): + t.Fatalf("expected terminal error") + } + + _, exists := porter.postDeliveryRetryAttempts[txID] + require.False(t, exists) +} + +func TestSchedulePostDeliveryRetryIncrementsAndRequeues(t *testing.T) { + t.Parallel() + + porter := newTestChainPorter() + defer close(porter.Quit) + + pkg := newTestSendPackage(SendStateTransferProofs) + txID := pkg.OutboundPkg.AnchorTx.TxHash() + + recoverable := porter.schedulePostDeliveryRetry( + pkg, SendStateTransferProofs, errors.New("recoverable failure"), + ) + require.True(t, recoverable) + require.EqualValues(t, 1, porter.postDeliveryRetryAttempts[txID]) + + select { + case retryParcel := <-porter.outboundParcels: + pendingParcel, ok := retryParcel.(*PendingParcel) + require.True(t, ok) + require.Equal( + t, SendStateBroadcast, pendingParcel.pkg().SendState, + ) + + case <-time.After(1500 * time.Millisecond): + t.Fatalf("expected pending parcel to be re-queued") + } +} + +func TestSchedulePostDeliveryRetryMaxAttemptsStopsRetrying(t *testing.T) { + t.Parallel() + + porter := newTestChainPorter() + defer close(porter.Quit) + + pkg := newTestSendPackage(SendStateTransferProofs) + txID := pkg.OutboundPkg.AnchorTx.TxHash() + + porter.postDeliveryRetryAttempts[txID] = postDeliveryRetryMaxAttempts + + recoverable := porter.schedulePostDeliveryRetry( + pkg, SendStateTransferProofs, errors.New("still failing"), + ) + require.False(t, recoverable) + + _, exists := porter.postDeliveryRetryAttempts[txID] + require.False(t, exists) + + select { + case <-porter.outboundParcels: + t.Fatalf("did not expect pending parcel re-queue") + case <-time.After(100 * time.Millisecond): + } +} From 76f2208c21e6075579577b4a795eebed37ce2d5a Mon Sep 17 00:00:00 2001 From: "Sergey B." Date: Wed, 13 May 2026 15:49:27 +0300 Subject: [PATCH 2/2] chore: updated release notes --- docs/release-notes/release-notes-0.8.0.md | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docs/release-notes/release-notes-0.8.0.md b/docs/release-notes/release-notes-0.8.0.md index 954472d313..45c3e16b8a 100644 --- a/docs/release-notes/release-notes-0.8.0.md +++ b/docs/release-notes/release-notes-0.8.0.md @@ -102,6 +102,14 @@ fixes inverted sort direction in `AssetRoots`, `AssetLeafKeys`, and `QueryEvents` universe RPCs. +* [PR#2132](https://github.com/lightninglabs/taproot-assets/pull/2132) + fixes a chain porter failure mode where sweep parcels could become + stranded after `SendStateWaitTxConf` responded to the caller but a + later state failed. Recoverable post-delivery failures now requeue + pending parcels with backoff in-process, and sweeper-originated + channel transactions skip redundant porter broadcasts while still + importing local addresses. + # New Features ## Functional Enhancements