Skip to content

Commit 2b0689c

Browse files
committed
fix: wire up DashPay payment history to wallet transactions
The Payment History screen was always empty because it only read from the dashpay_payments database table, which was only populated when payments were sent through the app's DashPay UI. The process_incoming_payment() function existed but was never called. Changes: - Add scan_wallet_transactions_for_dashpay_payments() that cross- references SPV wallet transactions against DashPay address mappings and saves matches to the dashpay_payments table (idempotent) - Call it from LoadPaymentHistory backend task for immediate retroactive detection when the user opens the Payment History screen - Call it during SPV reconcile for ongoing automatic detection - Fix ContactDetailsScreen to load per-contact payment history from DB (was initialized empty and never populated) Closes #688
1 parent 768a834 commit 2b0689c

5 files changed

Lines changed: 311 additions & 32 deletions

File tree

src/backend_task/dashpay.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,40 @@ impl AppContext {
171171
),
172172
DashPayTask::LoadPaymentHistory { identity } => {
173173
let identity_id = identity.identity.id();
174+
175+
// Retroactively scan wallet transactions for DashPay payments
176+
// that may not yet be in the dashpay_payments table.
177+
// Clone transactions and drop the read lock before scanning
178+
// to reduce lock contention.
179+
// Iterate all associated wallets — an identity can have multiple.
180+
for wallet_arc in identity.associated_wallets.values() {
181+
let wallet_txs = match wallet_arc.read() {
182+
Ok(guard) => guard.transactions.clone(),
183+
Err(_) => continue,
184+
};
185+
186+
if wallet_txs.is_empty() {
187+
continue;
188+
}
189+
190+
match incoming_payments::scan_wallet_transactions_for_dashpay_payments(
191+
self,
192+
&identity_id,
193+
&wallet_txs,
194+
) {
195+
Ok(n) if n > 0 => {
196+
tracing::info!(
197+
"Retroactively discovered {} DashPay payment(s) from wallet transactions",
198+
n
199+
);
200+
}
201+
Err(e) => {
202+
tracing::warn!("Wallet transaction scan failed: {}", e);
203+
}
204+
_ => {}
205+
}
206+
}
207+
174208
let records = payments::load_payment_history(self, &identity_id, None).await?;
175209

176210
let network_str = self.network.to_string();

src/backend_task/dashpay/incoming_payments.rs

Lines changed: 162 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,42 @@ pub fn match_transaction_to_contact(
282282
.map_err(|e| format!("Failed to lookup address: {}", e))
283283
}
284284

285+
/// Record a single incoming DashPay payment in the database.
286+
///
287+
/// Saves the payment record and updates the highest receive index for the
288+
/// contact. This is the shared core of [`process_incoming_payment`] (real-time
289+
/// SPV path) and [`scan_wallet_transactions_for_dashpay_payments`] (retroactive
290+
/// scan).
291+
fn record_incoming_payment(
292+
app_context: &AppContext,
293+
tx_id: &str,
294+
owner_id: &Identifier,
295+
contact_id: &Identifier,
296+
amount_duffs: u64,
297+
address_index: u32,
298+
) -> Result<(), String> {
299+
app_context
300+
.db
301+
.save_payment(
302+
tx_id,
303+
contact_id, // from contact
304+
owner_id, // to us
305+
amount_duffs as i64,
306+
None, // memo — not available for incoming
307+
"received",
308+
)
309+
.map_err(|e| format!("Failed to save payment: {}", e))?;
310+
311+
// update_highest_receive_index uses MAX() internally, so calling
312+
// unconditionally is safe and idempotent.
313+
app_context
314+
.db
315+
.update_highest_receive_index(owner_id, contact_id, address_index + 1)
316+
.map_err(|e| format!("Failed to update receive index: {}", e))?;
317+
318+
Ok(())
319+
}
320+
285321
/// Process an incoming transaction that was detected by SPV
286322
/// This should be called when WalletEvent::TransactionReceived is received
287323
pub async fn process_incoming_payment(
@@ -298,31 +334,15 @@ pub async fn process_incoming_payment(
298334

299335
let (owner_id, contact_id, address_index) = mapping;
300336

301-
// Update the highest receive index if needed
302-
let current_indices = app_context
303-
.db
304-
.get_contact_address_indices(&owner_id, &contact_id)
305-
.map_err(|e| format!("Failed to get address indices: {}", e))?;
306-
307-
if address_index >= current_indices.highest_receive_index {
308-
app_context
309-
.db
310-
.update_highest_receive_index(&owner_id, &contact_id, address_index + 1)
311-
.map_err(|e| format!("Failed to update receive index: {}", e))?;
312-
}
313-
314-
// Save the payment record
315-
app_context
316-
.db
317-
.save_payment(
318-
tx_id,
319-
&contact_id, // from contact
320-
&owner_id, // to us
321-
amount_duffs as i64,
322-
None, // memo - not available for incoming
323-
"received",
324-
)
325-
.map_err(|e| format!("Failed to save payment: {}", e))?;
337+
// Record the payment and update address index
338+
record_incoming_payment(
339+
app_context,
340+
tx_id,
341+
&owner_id,
342+
&contact_id,
343+
amount_duffs,
344+
address_index,
345+
)?;
326346

327347
Ok(Some(IncomingPaymentInfo {
328348
tx_id: tx_id.to_string(),
@@ -345,6 +365,123 @@ pub struct IncomingPaymentInfo {
345365
pub address_index: u32,
346366
}
347367

368+
/// Scan wallet transactions against DashPay address mappings and save any
369+
/// matched payments to the `dashpay_payments` table.
370+
///
371+
/// This is the retroactive counterpart of [`process_incoming_payment`]: it
372+
/// iterates over *all* wallet transactions (already synced via SPV) and checks
373+
/// every output address against the stored address-mapping table. Payments
374+
/// that are already recorded (by tx_id) are skipped, so calling this function
375+
/// repeatedly is safe and idempotent.
376+
///
377+
/// It is designed to be called from:
378+
/// - the SPV reconcile flow (after wallet transactions are updated), and
379+
/// - the `LoadPaymentHistory` backend task (for immediate retroactive
380+
/// detection when the user opens the Payment History screen).
381+
pub fn scan_wallet_transactions_for_dashpay_payments(
382+
app_context: &AppContext,
383+
identity_id: &Identifier,
384+
transactions: &[crate::model::wallet::WalletTransaction],
385+
) -> Result<usize, String> {
386+
use dash_sdk::dpp::dashcore::Address;
387+
use std::collections::HashSet;
388+
389+
// Build a lookup set of addresses → (owner_id, contact_id, address_index)
390+
let mappings = app_context
391+
.db
392+
.get_all_dashpay_address_mappings(identity_id)
393+
.map_err(|e| format!("Failed to load DashPay address mappings: {}", e))?;
394+
395+
if mappings.is_empty() {
396+
return Ok(0);
397+
}
398+
399+
// address string → (contact_id, address_index)
400+
let address_map: std::collections::HashMap<String, (Identifier, u32)> = mappings
401+
.into_iter()
402+
.map(|(addr_str, contact_id, idx)| (addr_str, (contact_id, idx)))
403+
.collect();
404+
405+
// Collect already-known tx_ids to avoid duplicates.
406+
// We load up to 10,000 recent payments for client-side dedup; for identities
407+
// with more history the DB's `tx_id UNIQUE` constraint provides server-side
408+
// protection (at the cost of a benign constraint-violation log on collisions).
409+
let existing_payments = app_context
410+
.db
411+
.load_payment_history(identity_id, 10_000)
412+
.unwrap_or_default();
413+
let known_tx_ids: HashSet<String> = existing_payments.iter().map(|p| p.tx_id.clone()).collect();
414+
415+
let network = app_context.network;
416+
let mut saved = 0usize;
417+
418+
for wtx in transactions {
419+
let txid_str = wtx.txid.to_string();
420+
if known_tx_ids.contains(&txid_str) {
421+
continue;
422+
}
423+
424+
// Check every output for a matching DashPay address
425+
for output in &wtx.transaction.output {
426+
let addr = match Address::from_script(&output.script_pubkey, network) {
427+
Ok(a) => a,
428+
Err(_) => continue,
429+
};
430+
431+
let addr_str = addr.to_string();
432+
if let Some((contact_id, address_index)) = address_map.get(&addr_str) {
433+
// The address_map contains receive-side addresses (addresses contacts
434+
// use to pay us). For outgoing transactions, matching an output to
435+
// these addresses does NOT reliably mean we sent to the contact —
436+
// skip outgoing txs in the retroactive scan.
437+
if !wtx.is_incoming() {
438+
continue;
439+
}
440+
441+
let amount_duffs = output.value;
442+
443+
if let Err(e) = record_incoming_payment(
444+
app_context,
445+
&txid_str,
446+
identity_id,
447+
contact_id,
448+
amount_duffs,
449+
*address_index,
450+
) {
451+
tracing::warn!(
452+
tx_id = %txid_str,
453+
error = %e,
454+
"Failed to save scanned DashPay payment"
455+
);
456+
} else {
457+
saved += 1;
458+
tracing::info!(
459+
tx_id = %txid_str,
460+
contact = %contact_id.to_string(Encoding::Base58),
461+
amount = amount_duffs,
462+
direction = "received",
463+
address_index = address_index,
464+
"Saved DashPay payment from wallet transaction scan"
465+
);
466+
}
467+
468+
// One match per transaction is enough (avoid double-counting)
469+
break;
470+
}
471+
}
472+
}
473+
474+
if saved > 0 {
475+
tracing::info!(
476+
identity = %identity_id.to_string(Encoding::Base58),
477+
new_payments = saved,
478+
"DashPay wallet transaction scan complete"
479+
);
480+
}
481+
482+
Ok(saved)
483+
}
484+
348485
#[cfg(test)]
349486
mod tests {
350487
use super::*;

src/context/wallet_lifecycle.rs

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -779,11 +779,47 @@ impl AppContext {
779779
.map_err(|e| e.to_string())?;
780780
}
781781

782-
if let Some(wref) = wallets_guard.get(seed_hash)
782+
// Collect identity IDs and clone transactions while the write lock
783+
// is held, then drop the lock before scanning to reduce contention.
784+
let identity_ids: Vec<dash_sdk::platform::Identifier> = if let Some(wref) =
785+
wallets_guard.get(seed_hash)
783786
&& let Ok(mut wallet) = wref.write()
784787
&& !wallet_transactions.is_empty()
785788
{
786-
wallet.set_transactions(wallet_transactions);
789+
wallet.set_transactions(wallet_transactions.clone());
790+
use dash_sdk::dpp::identity::accessors::IdentityGettersV0;
791+
wallet.identities.values().map(|id| id.id()).collect()
792+
} else {
793+
vec![]
794+
};
795+
796+
// Scan wallet transactions for DashPay payments and save any
797+
// matches to the dashpay_payments table. This bridges the gap
798+
// between the SPV wallet (which knows about on-chain txs) and
799+
// the DashPay payment history (which only knew about payments
800+
// initiated through the app UI).
801+
for identity_id in &identity_ids {
802+
match crate::backend_task::dashpay::incoming_payments::scan_wallet_transactions_for_dashpay_payments(
803+
self,
804+
identity_id,
805+
&wallet_transactions,
806+
) {
807+
Ok(n) if n > 0 => {
808+
tracing::info!(
809+
identity = %identity_id,
810+
new_payments = n,
811+
"SPV reconcile: discovered DashPay payments from wallet transactions"
812+
);
813+
}
814+
Err(e) => {
815+
tracing::debug!(
816+
identity = %identity_id,
817+
error = %e,
818+
"SPV reconcile: DashPay payment scan failed"
819+
);
820+
}
821+
_ => {}
822+
}
787823
}
788824
}
789825

src/database/dashpay.rs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -528,7 +528,7 @@ impl crate::database::Database {
528528
payment_type: &str,
529529
) -> rusqlite::Result<i64> {
530530
let sql = "
531-
INSERT INTO dashpay_payments
531+
INSERT OR IGNORE INTO dashpay_payments
532532
(tx_id, from_identity_id, to_identity_id, amount, memo, payment_type)
533533
VALUES (?1, ?2, ?3, ?4, ?5, ?6)
534534
";
@@ -600,6 +600,48 @@ impl crate::database::Database {
600600
Ok(payments)
601601
}
602602

603+
/// Load payment history filtered to a specific contact relationship.
604+
/// Returns payments where both identity_id and contact_id are involved
605+
/// (either as sender or receiver), ordered by most recent first.
606+
pub fn load_payment_history_for_contact(
607+
&self,
608+
identity_id: &Identifier,
609+
contact_id: &Identifier,
610+
limit: u32,
611+
) -> rusqlite::Result<Vec<StoredPayment>> {
612+
let conn = self.conn.lock().unwrap();
613+
let mut stmt = conn.prepare(
614+
"SELECT id, tx_id, from_identity_id, to_identity_id, amount, memo,
615+
payment_type, status, created_at, confirmed_at
616+
FROM dashpay_payments
617+
WHERE (from_identity_id = ?1 OR to_identity_id = ?1)
618+
AND (from_identity_id = ?2 OR to_identity_id = ?2)
619+
ORDER BY created_at DESC
620+
LIMIT ?3",
621+
)?;
622+
623+
let identity_bytes = identity_id.to_buffer().to_vec();
624+
let contact_bytes = contact_id.to_buffer().to_vec();
625+
let payments = stmt
626+
.query_map(params![identity_bytes, contact_bytes, limit], |row| {
627+
Ok(StoredPayment {
628+
id: row.get(0)?,
629+
tx_id: row.get(1)?,
630+
from_identity_id: row.get(2)?,
631+
to_identity_id: row.get(3)?,
632+
amount: row.get(4)?,
633+
memo: row.get(5)?,
634+
payment_type: row.get(6)?,
635+
status: row.get(7)?,
636+
created_at: row.get(8)?,
637+
confirmed_at: row.get(9)?,
638+
})
639+
})?
640+
.collect::<Result<Vec<_>, _>>()?;
641+
642+
Ok(payments)
643+
}
644+
603645
/// Delete all DashPay data for a specific identity
604646
pub fn delete_dashpay_data_for_identity(
605647
&self,

0 commit comments

Comments
 (0)