Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/release-notes/release-notes-0.8.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@
fixes inverted sort direction in `AssetRoots`, `AssetLeafKeys`, and
`QueryEvents` universe RPCs.

* [PR#2132](https://github.com/lightninglabs/taproot-assets/pull/2132)
fixes a chain porter failure mode where sweep parcels could become
stranded after `SendStateWaitTxConf` responded to the caller but a
later state failed. Recoverable post-delivery failures now requeue
pending parcels with backoff in-process, and sweeper-originated
channel transactions skip redundant porter broadcasts while still
importing local addresses.

# New Features

## Functional Enhancements
Expand Down
4 changes: 3 additions & 1 deletion itest/custom_channels/force_close_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,9 @@ func testCustomChannelsForceClose(ctx context.Context,
// This'll sweep his non-delay commitment output. We use the txid from
// the mined block (not the mempool) to avoid RBF mismatches.
daveSweepBlocks := mineBlocks(t, net, 1, 1)
daveSweepTxHash := daveSweepBlocks[0].Transactions[1].TxHash()
daveSweepTxHash := resolveMinedTransferTxid(
t.t, dave, daveSweepBlocks[0],
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use more context on the design decisions here. The old code failed with RBF (replace-by-fee) because it assumed the coinbase transaction at index 0, and sweep transaction is at index 1 in the block. And there's only one non-coinbase transaction in the block. The RBF sweep transactions invalidated this, or if multiple transactions are mined in the same block.

The new approach with resolveMinedTransferTxid finds which transaction in the block tapd know is a transfer. So decouples the tests from block transaction ordering, RBF replacements, multiple transactions in a block and tapd timing issues.


t.Logf("Dave sweep txid: %v", daveSweepTxHash)

Expand Down
4 changes: 3 additions & 1 deletion itest/custom_channels/group_tranches_force_close_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,9 @@ func testCustomChannelsGroupTranchesForceClose(ctx context.Context,
// Now we'll mine a block to confirm Erin's sweep transaction. We use
// the txid from the mined block to avoid RBF mismatches.
erinSweepBlocks := mineBlocks(t, net, 1, 1)
erinSweepTxHash := erinSweepBlocks[0].Transactions[1].TxHash()
erinSweepTxHash := resolveMinedTransferTxid(
t.t, erin, erinSweepBlocks[0],
)

t.Logf("Erin sweep txid: %v", erinSweepTxHash)

Expand Down
148 changes: 125 additions & 23 deletions itest/custom_channels/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1384,7 +1384,11 @@ func waitForSendEvent(t *testing.T,

for {
sendEvent, err := sendEvents.Recv()
require.NoError(t, err)
if err != nil {
t.Logf("send event stream ended before state %v: %v",
expectedState, err)
return
}

if sendEvent.SendState == expectedState.String() {
return
Expand Down Expand Up @@ -1422,14 +1426,14 @@ func closeAssetChannelAndAssert(t *ccHarnessTest,
ctxt, cancel := context.WithTimeout(ctxb, wait.DefaultTimeout)
defer cancel()

closeStream, _, err := net.CloseChannel(local, chanPoint, false)
require.NoError(t.t, err)

sendEvents, err := local.SubscribeSendEvents(
ctxt, &taprpc.SubscribeSendEventsRequest{},
)
require.NoError(t.t, err)

closeStream, _, err := net.CloseChannel(local, chanPoint, false)
require.NoError(t.t, err)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving the close after the SubscribeSendEvents fixes a race condition:

  1. CloseChannel starts the close process
  2. tapd processes close, fires SendStateComplete event
  3. SubscribeSendEvents called — but event already fired
  4. waitForSendEvent blocks forever waiting for missed event

By subscribing first, all events from the close operation are captured. This implements the standard "subscribe before action" pattern

So it's good we could use more comments IMO

assertWaitingCloseChannelAssetData(t.t, local, chanPoint)
assertWaitingCloseChannelAssetData(t.t, remote, chanPoint)

Expand Down Expand Up @@ -2530,38 +2534,130 @@ func assertSpendableBalance(t *testing.T, client *itest.IntegratedNode,
func locateAssetTransfers(t *testing.T, node *itest.IntegratedNode,
txid chainhash.Hash) *taprpc.AssetTransfer {

var transfer *taprpc.AssetTransfer
err := wait.NoError(func() error {
var (
transfer *taprpc.AssetTransfer
lastErr error
transferCount int
blockHashSet bool
blockHeight uint32
blockHeightHint uint32
pollInterval = 200 * time.Millisecond
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: 200 millis gets used in a few different spots, consider intorducing a constant

)

require.Eventually(t, func() bool {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will get us more detailed failure messages about the state and make the testing more robust

ctxb := context.Background()
forceCloseTransfer, err := node.ListTransfers(
ctxb, &taprpc.ListTransfersRequest{
AnchorTxid: txid.String(),
},
)
if err != nil {
return fmt.Errorf("unable to list %v transfers: %w",
lastErr = fmt.Errorf("unable to list %v transfers: %w",
node.Cfg.Name, err)
return false
}
if len(forceCloseTransfer.Transfers) != 1 {
return fmt.Errorf("%v is expecting %d transfers, "+
"has %d", node.Cfg.Name, 1,
len(forceCloseTransfer.Transfers))

transferCount = len(forceCloseTransfer.Transfers)
if transferCount != 1 {
lastErr = fmt.Errorf("%v expected %d transfers, got %d",
node.Cfg.Name, 1, transferCount)
return false
}

transfer = forceCloseTransfer.Transfers[0]
blockHashSet = transfer.AnchorTxBlockHash != nil
blockHeight = transfer.AnchorTxBlockHeight
blockHeightHint = transfer.AnchorTxHeightHint

if transfer.AnchorTxBlockHash == nil {
return fmt.Errorf("missing anchor block hash, " +
"transfer not confirmed")
// Anchor tx confirmation metadata is populated asynchronously.
if !blockHashSet || blockHeight == 0 || blockHeightHint == 0 {
lastErr = fmt.Errorf("transfer not confirmed yet")
return false
}

return nil
}, ccTransferTimeout)
require.NoError(t, err)
lastErr = nil
return true
}, ccTransferConfirmTimeout, pollInterval,
"failed to locate confirmed transfer for %v (node=%v): "+
"transfers=%d block_hash_set=%v block_height=%d "+
"height_hint=%d last_err=%v",
txid, node.Cfg.Name, transferCount, blockHashSet, blockHeight,
blockHeightHint, lastErr,
)

return transfer
}

// resolveMinedTransferTxid returns the transaction ID from a mined block that
// is associated with an asset transfer for the given node.
func resolveMinedTransferTxid(t *testing.T, node *itest.IntegratedNode,
block *wire.MsgBlock) chainhash.Hash {

t.Helper()

require.Greater(
t, len(block.Transactions), 1,
"expected at least one non-coinbase transaction in block",
)

candidateTxids := make([]chainhash.Hash, 0, len(block.Transactions)-1)
for i := 1; i < len(block.Transactions); i++ {
candidateTxids = append(
candidateTxids, block.Transactions[i].TxHash(),
)
}

var (
matchedTxid chainhash.Hash
matchCount int
lastErr error
)

require.Eventually(t, func() bool {
matchCount = 0
lastErr = nil

ctxb := context.Background()
for _, txid := range candidateTxids {
transfers, err := node.ListTransfers(
ctxb, &taprpc.ListTransfersRequest{
AnchorTxid: txid.String(),
},
)
if err != nil {
lastErr = fmt.Errorf(
"unable to list %v transfers: %w",
node.Cfg.Name, err,
)
return false
}

if len(transfers.Transfers) == 0 {
continue
}

matchedTxid = txid
matchCount++
}

if matchCount != 1 {
lastErr = fmt.Errorf(
"expected 1 matching transfer tx, got %d",
matchCount,
)
return false
}

return true
}, ccTransferConfirmTimeout, 200*time.Millisecond,
"failed to resolve transfer tx in mined block for %v: "+
"candidates=%v matches=%d last_err=%v",
node.Cfg.Name, candidateTxids, matchCount, lastErr,
)

return matchedTxid
}

// ---------------------------------------------------------------------------
// HTLC event helpers
// ---------------------------------------------------------------------------
Expand Down Expand Up @@ -3087,8 +3183,12 @@ func assertForceCloseSweeps(ctx context.Context,
// Wait for tapd to process the confirmed sweep transactions before
// checking balances. We extract the txid from the mined blocks rather
// than from the earlier mempool checks to avoid RBF mismatches.
bobSweepTxHash1 := bobSweepBlocks1[0].Transactions[1].TxHash()
bobSweepTxHash2 := bobSweepBlocks2[0].Transactions[1].TxHash()
bobSweepTxHash1 := resolveMinedTransferTxid(
t.t, bob, bobSweepBlocks1[0],
)
bobSweepTxHash2 := resolveMinedTransferTxid(
t.t, bob, bobSweepBlocks2[0],
)
locateAssetTransfers(t.t, bob, bobSweepTxHash1)
locateAssetTransfers(t.t, bob, bobSweepTxHash2)

Expand Down Expand Up @@ -3120,7 +3220,9 @@ func assertForceCloseSweeps(ctx context.Context,

// Wait for tapd to register the to-local sweep transfer. We use the
// txid from the mined block to avoid RBF mismatches.
aliceToLocalHash := aliceToLocalBlocks[0].Transactions[1].TxHash()
aliceToLocalHash := resolveMinedTransferTxid(
t.t, alice, aliceToLocalBlocks[0],
)
locateAssetTransfers(t.t, alice, aliceToLocalHash)

t.Logf("Confirming Alice's to-local sweep")
Expand Down Expand Up @@ -3169,7 +3271,7 @@ func assertForceCloseSweeps(ctx context.Context,
}

// Use the txid from the mined block to avoid RBF mismatches.
sweepTxHash := sweepBlocks[0].Transactions[1].TxHash()
sweepTxHash := resolveMinedTransferTxid(t.t, alice, sweepBlocks[0])
locateAssetTransfers(t.t, alice, sweepTxHash)

t.Logf("Confirming Alice's second level remote HTLC success sweep")
Expand Down Expand Up @@ -3203,7 +3305,7 @@ func assertForceCloseSweeps(ctx context.Context,
sweepBlocks = mineBlocks(t, net, 1, 1)
}

sweepTxHash = sweepBlocks[0].Transactions[1].TxHash()
sweepTxHash = resolveMinedTransferTxid(t.t, alice, sweepBlocks[0])
locateAssetTransfers(t.t, alice, sweepTxHash)

// With the sweep transaction confirmed, Alice's balance should have
Expand Down Expand Up @@ -3372,7 +3474,7 @@ func assertForceCloseSweeps(ctx context.Context,
sweepBlocks = mineBlocks(t, net, 1, 1)
}

sweepTxHash = sweepBlocks[0].Transactions[1].TxHash()
sweepTxHash = resolveMinedTransferTxid(t.t, alice, sweepBlocks[0])
locateAssetTransfers(t.t, alice, sweepTxHash)

return aliceExpectedBalance, bobExpectedBalance
Expand Down
9 changes: 6 additions & 3 deletions itest/custom_channels/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,14 @@ var (

ccShortTimeout = time.Second * 10

// ccTransferTimeout is the timeout used when waiting for an asset
// transfer to appear as confirmed. On CI runners with multiple
// ccTransferTimeout is the timeout for transfer-related payment RPCs.
ccTransferTimeout = 2 * time.Minute

// ccTransferConfirmTimeout is the timeout used when waiting for an
// asset transfer to appear as confirmed. On CI runners with multiple
// parallel tranches, tapd block processing can be slow so we use a
// generous timeout.
ccTransferTimeout = 2 * time.Minute
ccTransferConfirmTimeout = 5 * time.Minute
)

// lndArgsTemplate contains lnd flags used by all custom channel test nodes.
Expand Down
8 changes: 5 additions & 3 deletions tapchannel/aux_closer.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,8 @@ func (a *AuxChanCloser) ShutdownBlob(
func shipChannelTxn(txSender tapfreighter.Porter, chanTx *wire.MsgTx,
outputCommitments tappsbt.OutputCommitments,
vPkts []*tappsbt.VPacket, closeFee int64,
anchorTxHeightHint fn.Option[uint32]) error {
anchorTxHeightHint fn.Option[uint32],
skipAnchorTxBroadcast bool) error {

chanTxPsbt, err := tapsend.PrepareAnchoringTemplate(vPkts)
if err != nil {
Expand Down Expand Up @@ -679,7 +680,7 @@ func shipChannelTxn(txSender tapfreighter.Porter, chanTx *wire.MsgTx,
}
parcelLabel := fmt.Sprintf("channel-tx-%s", chanTx.TxHash().String())
preSignedParcel := tapfreighter.NewPreAnchoredParcel(
vPkts, nil, closeAnchor, false, parcelLabel,
vPkts, nil, closeAnchor, skipAnchorTxBroadcast, parcelLabel,
anchorTxHeightHint,
)
_, err = txSender.RequestShipment(preSignedParcel)
Expand Down Expand Up @@ -794,6 +795,7 @@ func (a *AuxChanCloser) FinalizeClose(desc types.AuxCloseDesc,
// as the transaction is being broadcast now.
return shipChannelTxn(
a.cfg.TxSender, closeTx, closeInfo.outputCommitments,
closeInfo.vPackets, closeInfo.closeFee, fn.None[uint32](),
closeInfo.vPackets, closeInfo.closeFee,
fn.None[uint32](), false,
)
}
4 changes: 2 additions & 2 deletions tapchannel/aux_sweeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -1731,7 +1731,7 @@ func (a *AuxSweeper) importCommitTx(req lnwallet.ResolutionReq,

return shipChannelTxn(
a.cfg.TxSender, req.CommitTx, outCommitments, vPackets,
int64(req.CommitFee), heightHint,
int64(req.CommitFee), heightHint, true,
)
}

Expand Down Expand Up @@ -2590,7 +2590,7 @@ func (a *AuxSweeper) registerAndBroadcastSweep(req *sweep.BumpRequest,
// and ship the transaction.
return shipChannelTxn(
a.cfg.TxSender, sweepTx, outCommitments, allVpkts, int64(fee),
heightHint,
heightHint, true,
)
}

Expand Down
Loading
Loading