Skip to content
2 changes: 2 additions & 0 deletions bindings/ldk_node.udl
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ interface Node {
[Throws=NodeError]
void splice_out([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id, [ByRef]Address address, u64 splice_amount_sats);
[Throws=NodeError]
void bump_channel_funding_fee([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id);
[Throws=NodeError]
void close_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id);
[Throws=NodeError]
void force_close_channel([ByRef]UserChannelId user_channel_id, PublicKey counterparty_node_id, string? reason);
Expand Down
2 changes: 2 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1618,6 +1618,8 @@ fn build_with_store_internal(
Arc::clone(&pending_payment_store),
));

tx_broadcaster.set_wallet(Arc::downgrade(&wallet));

// Initialize the KeysManager
let cur_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).map_err(|e| {
log_error!(logger, "Failed to get current time: {}", e);
Expand Down
8 changes: 5 additions & 3 deletions src/chain/bitcoind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -571,16 +571,18 @@ impl BitcoindChainSource {
Ok(())
}

pub(crate) async fn process_broadcast_package(&self, package: Vec<Transaction>) {
pub(crate) async fn process_broadcast_package(
&self, txs: impl IntoIterator<Item = Transaction>,
) {
// While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28
// features, we should eventually switch to use `submitpackage` via the
// `rust-bitcoind-json-rpc` crate rather than just broadcasting individual
// transactions.
for tx in &package {
for tx in txs {
let txid = tx.compute_txid();
let timeout_fut = tokio::time::timeout(
Duration::from_secs(DEFAULT_TX_BROADCAST_TIMEOUT_SECS),
self.api_client.broadcast_transaction(tx),
self.api_client.broadcast_transaction(&tx),
);
match timeout_fut.await {
Ok(res) => match res {
Expand Down
6 changes: 4 additions & 2 deletions src/chain/electrum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,9 @@ impl ElectrumChainSource {
Ok(())
}

pub(crate) async fn process_broadcast_package(&self, package: Vec<Transaction>) {
pub(crate) async fn process_broadcast_package(
&self, txs: impl IntoIterator<Item = Transaction>,
) {
let electrum_client: Arc<ElectrumRuntimeClient> = if let Some(client) =
self.electrum_runtime_status.read().expect("lock").client().as_ref()
{
Expand All @@ -304,7 +306,7 @@ impl ElectrumChainSource {
return;
};

for tx in package {
for tx in txs {
electrum_client.broadcast(tx).await;
}
}
Expand Down
8 changes: 5 additions & 3 deletions src/chain/esplora.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,12 +355,14 @@ impl EsploraChainSource {
Ok(())
}

pub(crate) async fn process_broadcast_package(&self, package: Vec<Transaction>) {
for tx in &package {
pub(crate) async fn process_broadcast_package(
&self, txs: impl IntoIterator<Item = Transaction>,
) {
for tx in txs {
let txid = tx.compute_txid();
let timeout_fut = tokio::time::timeout(
Duration::from_secs(self.sync_config.timeouts_config.tx_broadcast_timeout_secs),
self.esplora_client.broadcast(tx),
self.esplora_client.broadcast(&tx),
);
match timeout_fut.await {
Ok(res) => match res {
Expand Down
20 changes: 16 additions & 4 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::config::{
WALLET_SYNC_INTERVAL_MINIMUM_SECS,
};
use crate::fee_estimator::OnchainFeeEstimator;
use crate::logger::{log_debug, log_info, log_trace, LdkLogger, Logger};
use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger};
use crate::runtime::Runtime;
use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet};
use crate::{Error, PersistedNodeMetrics};
Expand Down Expand Up @@ -453,15 +453,27 @@ impl ChainSource {
return;
}
Some(next_package) = receiver.recv() => {
let package = match self.tx_broadcaster.classify_package(next_package).await {
Ok(p) => p,
Err(e) => {
log_error!(
tx_bcast_logger,
"Skipping broadcast: failed to persist payment records: {:?}",
e,
);
continue;
},
};
let txs = package.into_iter().map(|(tx, _)| tx);
match &self.kind {
ChainSourceKind::Esplora(esplora_chain_source) => {
esplora_chain_source.process_broadcast_package(next_package).await
esplora_chain_source.process_broadcast_package(txs).await
},
ChainSourceKind::Electrum(electrum_chain_source) => {
electrum_chain_source.process_broadcast_package(next_package).await
electrum_chain_source.process_broadcast_package(txs).await
},
ChainSourceKind::Bitcoind(bitcoind_chain_source) => {
bitcoind_chain_source.process_broadcast_package(next_package).await
bitcoind_chain_source.process_broadcast_package(txs).await
},
}
}
Expand Down
24 changes: 24 additions & 0 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1585,6 +1585,20 @@ where
);
}

if let Err(e) = self
.wallet
.handle_channel_ready(channel_id, funding_txo.map(|txo| txo.txid))
.await
{
log_error!(
self.logger,
"Failed to graduate funding payment on ChannelReady for channel {}: {:?}",
channel_id,
e,
);
return Err(ReplayEvent());
}

self.liquidity_source
.lsps2_service()
.handle_channel_ready(user_channel_id, &channel_id, &counterparty_node_id)
Expand Down Expand Up @@ -1613,6 +1627,16 @@ where
} => {
log_info!(self.logger, "Channel {} closed due to: {}", channel_id, reason);

if let Err(e) = self.wallet.handle_channel_closed(channel_id).await {
log_error!(
self.logger,
"Failed to handle ChannelClosed for channel {}: {:?}",
channel_id,
e,
);
return Err(ReplayEvent());
}

let event = Event::ChannelClosed {
channel_id,
user_channel_id: UserChannelId(user_channel_id),
Expand Down
71 changes: 69 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1647,7 +1647,7 @@ impl Node {
if funding_template.prior_contribution().is_some() {
log_error!(
self.logger,
"Failed to splice channel: a prior splice contribution is pending"
"Failed to splice channel: a prior splice contribution is pending; use bump_channel_funding_fee to bump its fee"
);
return Err(Error::ChannelSplicingFailed);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It says use rbf_channel instead but that function doesn't let us change the amount in/out. Some like use rbf_channel to bump fee would be more accurate. Also would be nice if this had a separate error.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed the message, but I'm not sure it justifies a new error type. Callers will be able to check SpliceDetails once lightningdevkit/rust-lightning#4687 is included. @tnull Any preference?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should that be added to the 0.3 milestone?

}
Expand Down Expand Up @@ -1770,7 +1770,7 @@ impl Node {
if funding_template.prior_contribution().is_some() {
log_error!(
self.logger,
"Failed to splice channel: a prior splice contribution is pending"
"Failed to splice channel: a prior splice contribution is pending; use bump_channel_funding_fee to bump its fee"
);
return Err(Error::ChannelSplicingFailed);
Comment thread
jkczyz marked this conversation as resolved.
}
Expand Down Expand Up @@ -1807,6 +1807,73 @@ impl Node {
}
}

/// Replace a pending splice's funding transaction with a higher-feerate version.
///
/// If a prior splice negotiation is pending, this bumps its feerate via RBF. The prior
/// contribution is reused when possible; otherwise, coin selection is re-run.
///
/// # Experimental API
///
/// This API is experimental and may change in the future.
pub fn bump_channel_funding_fee(
&self, user_channel_id: &UserChannelId, counterparty_node_id: PublicKey,
) -> Result<(), Error> {
let open_channels =
self.channel_manager.list_channels_with_counterparty(&counterparty_node_id);
if let Some(channel_details) =
open_channels.iter().find(|c| c.user_channel_id == user_channel_id.0)
{
let min_feerate =
self.fee_estimator.estimate_fee_rate(ConfirmationTarget::ChannelFunding);
let max_feerate = FeeRate::from_sat_per_kwu(min_feerate.to_sat_per_kwu() * 3 / 2);

let funding_template = self
.channel_manager
.splice_channel(&channel_details.channel_id, &counterparty_node_id)
.map_err(|e| {
log_error!(self.logger, "Failed to RBF channel: {:?}", e);
Error::ChannelSplicingFailed
})?;

if funding_template.min_rbf_feerate().is_none() {
log_error!(self.logger, "Failed to RBF channel: no pending splice to replace");
return Err(Error::ChannelSplicingFailed);
}

let contribution = self
.runtime
.block_on(funding_template.rbf_prior_contribution(
None,
max_feerate,
Arc::clone(&self.wallet),
))
.map_err(|e| {
log_error!(self.logger, "Failed to RBF channel: {}", e);
Error::ChannelSplicingFailed
})?;

self.channel_manager
.funding_contributed(
&channel_details.channel_id,
&counterparty_node_id,
contribution,
None,
)
.map_err(|e| {
log_error!(self.logger, "Failed to RBF channel: {:?}", e);
Error::ChannelSplicingFailed
})
} else {
log_error!(
self.logger,
"Channel not found for user_channel_id {} and counterparty {}",
user_channel_id,
counterparty_node_id
);
Err(Error::ChannelSplicingFailed)
}
}

/// Manually sync the LDK and BDK wallets with the current chain state and update the fee rate
/// cache.
///
Expand Down
45 changes: 43 additions & 2 deletions src/payment/pending_payment_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,25 +6,52 @@
// accordance with one or both of these licenses.

use bitcoin::Txid;
use lightning::chain::chaininterface::FundingCandidate;
use lightning::impl_writeable_tlv_based;
use lightning::ln::channelmanager::PaymentId;

use crate::data_store::{StorableObject, StorableObjectUpdate};
use crate::payment::store::PaymentDetailsUpdate;
use crate::payment::PaymentDetails;

/// Marks an on-chain payment as belonging to an interactive-funding negotiation. The
/// last entry in `candidates` is the currently-broadcast tx; earlier entries are RBF
/// predecessors that may still confirm if reorgs intervene.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct FundingDetails {
/// Every negotiated candidate, oldest first.
pub candidates: Vec<FundingCandidate>,
}

impl_writeable_tlv_based!(FundingDetails, {
(0, candidates, optional_vec),
});

/// Represents a pending payment
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct PendingPaymentDetails {
/// The full payment details
pub details: PaymentDetails,
/// Transaction IDs that have replaced or conflict with this payment.
pub conflicting_txids: Vec<Txid>,
/// Set when the payment's transaction is an interactive-funding broadcast (channel
/// open or splice). The record transitions to [`PaymentStatus::Succeeded`] on
/// `ChannelReady` instead of after [`ANTI_REORG_DELAY`] confirmations.
///
/// [`PaymentStatus::Succeeded`]: crate::payment::store::PaymentStatus::Succeeded
/// [`ANTI_REORG_DELAY`]: lightning::chain::channelmonitor::ANTI_REORG_DELAY
pub funding_details: Option<FundingDetails>,
}

impl PendingPaymentDetails {
pub(crate) fn new(details: PaymentDetails, conflicting_txids: Vec<Txid>) -> Self {
Self { details, conflicting_txids }
Self { details, conflicting_txids, funding_details: None }
}

pub(crate) fn with_funding_details(
details: PaymentDetails, conflicting_txids: Vec<Txid>, funding_details: FundingDetails,
) -> Self {
Self { details, conflicting_txids, funding_details: Some(funding_details) }
}

/// Convert to finalized payment for the main payment store
Expand All @@ -36,13 +63,15 @@ impl PendingPaymentDetails {
impl_writeable_tlv_based!(PendingPaymentDetails, {
(0, details, required),
(2, conflicting_txids, optional_vec),
(4, funding_details, option),
});

#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct PendingPaymentDetailsUpdate {
pub id: PaymentId,
pub payment_update: Option<PaymentDetailsUpdate>,
pub conflicting_txids: Option<Vec<Txid>>,
pub funding_details: Option<Option<FundingDetails>>,
}

impl StorableObject for PendingPaymentDetails {
Expand All @@ -68,6 +97,13 @@ impl StorableObject for PendingPaymentDetails {
}
}

if let Some(new_funding_details) = update.funding_details {
if self.funding_details != new_funding_details {
self.funding_details = new_funding_details;
updated = true;
}
}

updated
}

Expand All @@ -89,6 +125,11 @@ impl From<&PendingPaymentDetails> for PendingPaymentDetailsUpdate {
} else {
Some(value.conflicting_txids.clone())
};
Self { id: value.id(), payment_update: Some(value.details.to_update()), conflicting_txids }
Self {
id: value.id(),
payment_update: Some(value.details.to_update()),
conflicting_txids,
funding_details: Some(value.funding_details.clone()),
}
}
}
Loading