diff --git a/examples/repeater.rs b/examples/repeater.rs index 5738abcb..bcfdda99 100644 --- a/examples/repeater.rs +++ b/examples/repeater.rs @@ -7,8 +7,7 @@ async fn main() -> irc::error::Result<()> { nickname: Some("pickles".to_owned()), server: Some("chat.freenode.net".to_owned()), channels: vec!["#rust-spam".to_owned()], - burst_window_length: Some(4), - max_messages_in_burst: Some(4), + flood_penalty_threshold: Some(10_000), ..Default::default() }; diff --git a/src/client/data/config.rs b/src/client/data/config.rs index 9f65b2bc..d5d7cddb 100644 --- a/src/client/data/config.rs +++ b/src/client/data/config.rs @@ -178,12 +178,25 @@ pub struct Config { /// messages will be delayed automatically as appropriate. In particular, in the past /// `burst_window_length` seconds, there will never be more than `max_messages_in_burst` messages /// sent. + #[deprecated(note = "Unimplemented. Use flood_penalty_threshold instead.")] #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))] pub burst_window_length: Option, /// The maximum number of messages that can be sent in a burst window before they'll be delayed. /// Messages are automatically delayed as appropriate. + #[deprecated(note = "Unimplemented. Use flood_penalty_threshold instead.")] #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))] pub max_messages_in_burst: Option, + /// The penalty threshold in milliseconds for IRC flood protection (RFC 2813 §5.8). + /// Each outgoing message incurs a penalty based on both its byte length and command type, + /// matching the IRC server's own flood detection formula: + /// `(1 + message_bytes / 100) * 1000ms + command_penalty_ms`. + /// When accumulated penalty exceeds this threshold, outgoing messages are delayed until + /// the penalty drains below it. Penalty drains in real-time at 1ms per 1ms elapsed. + /// + /// Set to `0` to disable flood protection entirely. + /// Defaults to 10000 (10 seconds), matching the standard IRCd excess flood limit. + #[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))] + pub flood_penalty_threshold: Option, /// Whether the client should use NickServ GHOST to reclaim its primary nickname if it is in /// use. This has no effect if `nick_password` is not set. #[cfg_attr(feature = "serde", serde(skip_serializing_if = "is_false"))] @@ -600,7 +613,9 @@ impl Config { /// system maintains the invariant that in the past `burst_window_length` seconds, the maximum /// number of messages sent is `max_messages_in_burst`. /// This defaults to 8 seconds when not specified. + #[deprecated(note = "Unimplemented. Use flood_penalty_threshold instead.")] pub fn burst_window_length(&self) -> u32 { + #[allow(deprecated)] self.burst_window_length.as_ref().cloned().unwrap_or(8) } @@ -609,10 +624,19 @@ impl Config { /// system maintains the invariant that in the past `burst_window_length` seconds, the maximum /// number of messages sent is `max_messages_in_burst`. /// This defaults to 15 messages when not specified. + #[deprecated(note = "Unimplemented. Use flood_penalty_threshold instead.")] pub fn max_messages_in_burst(&self) -> u32 { + #[allow(deprecated)] self.max_messages_in_burst.as_ref().cloned().unwrap_or(15) } + /// Gets the penalty threshold in milliseconds for IRC flood protection. + /// When accumulated penalty exceeds this value, outgoing messages are delayed. + /// Defaults to 10000ms (10 seconds). Set to 0 to disable. + pub fn flood_penalty_threshold(&self) -> u32 { + self.flood_penalty_threshold.unwrap_or(10_000) + } + /// Gets whether or not to attempt nickname reclamation using NickServ GHOST. /// This defaults to false when not specified. pub fn should_ghost(&self) -> bool { diff --git a/src/client/mod.rs b/src/client/mod.rs index 8c3f08c6..e0819d9c 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -825,17 +825,103 @@ impl Sender { pub_sender_base!(); } -/// Future to handle outgoing messages. +/// Future to handle outgoing messages with IRC flood protection. /// -/// Note: this is essentially the same as a version of [SendAll](https://github.com/rust-lang-nursery/futures-rs/blob/master/futures-util/src/sink/send_all.rs) that owns it's sink and stream. +/// Implements a penalty-based throttle modeled after IRCd (RFC 2813 §5.8). Each outgoing +/// message incurs a penalty based on both its byte length and command type, matching the +/// server's own flood detection formula. When accumulated penalty exceeds a configurable +/// threshold (default 10s), messages are delayed until the penalty drains below it. +/// Penalty drains in real-time at 1ms per 1ms elapsed. +/// +/// Total penalty per message: `(1 + message_bytes / 100) * 1000 + command_penalty_ms` #[derive(Debug)] pub struct Outgoing { sink: SplitSink, stream: UnboundedReceiver, buffered: Option, + /// Accumulated penalty in milliseconds. + penalty: u64, + /// Threshold above which messages are delayed. 0 = disabled. + penalty_threshold: u64, + /// Last time penalty was drained. + last_penalty_check: tokio::time::Instant, + /// Active delay future for throttling. + delay: Option>>, } impl Outgoing { + /// Returns the penalty cost in milliseconds for a given IRC command. + /// + /// Values mirror the IRCd penalty model (RFC 2813 §5.8). The server-side + /// implementation charges `(1 + message_bytes / 100)` seconds as a base + /// cost per message, plus a command-specific penalty. We replicate this + /// client-side to stay under the server's flood threshold. + /// + /// The total penalty for a message is: `base_penalty(len) + command_penalty` + fn command_penalty(command: &Command) -> u64 { + match command { + // Connection control — never throttled client-side. The server + // does charge 1-2s for PONG, but delaying pong replies risks + // ping timeout disconnects, so we exempt these. + Command::PONG(..) | Command::QUIT(..) | Command::PASS(..) => 0, + + // CAP negotiation — must not be throttled during registration + Command::CAP(..) | Command::AUTHENTICATE(..) => 0, + + // NICK changes incur 3s on IRCd + Command::NICK(..) => 3000, + + // PART is expensive on IRCd (4s) + Command::PART(..) => 4000, + + // WHO, NAMES, LIST without args are catastrophic on IRCd (10s). + // With args they're 2s. We can't distinguish no-arg vs arg here + // since the Option is always present in the enum, so we check + // whether the argument is None/empty. + Command::WHO(ref mask, _) => match mask { + None => 10_000, + Some(m) if m.is_empty() => 10_000, + _ => 2000, + }, + Command::LIST(ref mask, _) | Command::NAMES(ref mask, _) => match mask { + None => 10_000, + Some(m) if m.is_empty() => 10_000, + _ => 2000, + }, + + // Other expensive server queries + Command::WHOIS(..) | Command::WHOWAS(..) => 3000, + Command::LINKS(..) | Command::STATS(..) => 3000, + Command::LUSERS(..) | Command::TRACE(..) => 2000, + Command::USERS(..) | Command::MOTD(..) | Command::INFO(..) => 5000, + + // PRIVMSG/NOTICE: IRCd charges 1s per target. We approximate + // with a flat 2s since most client sends target a single entity. + Command::PRIVMSG(..) | Command::NOTICE(..) => 2000, + + // JOIN, KICK, INVITE, MODE, TOPIC, AWAY, and all others: 2s + _ => 2000, + } + } + + /// Returns the base penalty in milliseconds derived from message length. + /// + /// Mirrors the IRCd formula: `(1 + message_bytes / 100)` seconds. + /// This ensures long messages incur proportionally higher penalties, + /// matching the server's own flood calculation. + fn length_penalty(message: &Message) -> u64 { + let len = message.to_string().len() as u64; + (1 + len / 100) * 1000 + } + + /// Drains accumulated penalty based on elapsed real time. + fn drain_penalty(&mut self) { + let now = tokio::time::Instant::now(); + let elapsed = now.duration_since(self.last_penalty_check).as_millis() as u64; + self.penalty = self.penalty.saturating_sub(elapsed); + self.last_penalty_check = now; + } + fn try_start_send( &mut self, cx: &mut Context<'_>, @@ -867,13 +953,62 @@ impl Future for Outgoing { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let this = &mut *self; + // If we're waiting on a throttle delay, poll it first. + if let Some(ref mut delay) = this.delay { + ready!(delay.as_mut().poll(cx)); + this.delay = None; + this.drain_penalty(); + } + + // Send the message that was buffered during the delay, then flush + // it to TCP immediately. Without this flush, messages accumulate + // in the codec buffer across multiple delay cycles and burst all + // at once when the stream finally goes idle — causing Excess Flood. if let Some(message) = this.buffered.take() { - ready!(this.try_start_send(cx, message))? + ready!(this.try_start_send(cx, message))?; + ready!(Pin::new(&mut this.sink).poll_flush(cx))?; } loop { match this.stream.poll_recv(cx) { - Poll::Ready(Some(message)) => ready!(this.try_start_send(cx, message))?, + Poll::Ready(Some(message)) => { + // Apply penalty-based throttle if enabled. + if this.penalty_threshold > 0 { + let cmd_cost = Self::command_penalty(&message.command); + if cmd_cost > 0 { + let len_cost = Self::length_penalty(&message); + let cost = len_cost + cmd_cost; + this.drain_penalty(); + this.penalty += cost; + + if this.penalty > this.penalty_threshold { + let excess = this.penalty - this.penalty_threshold; + log::debug!( + "Flood penalty {}ms exceeds threshold {}ms, delaying {}ms.", + this.penalty, + this.penalty_threshold, + excess, + ); + this.delay = Some(Box::pin(tokio::time::sleep( + std::time::Duration::from_millis(excess), + ))); + // Buffer the message and return Pending so the delay runs. + this.buffered = Some(message); + // Register waker with the delay future. + if let Some(ref mut delay) = this.delay { + let _ = delay.as_mut().poll(cx); + } + // Flush any messages already in the codec buffer before + // sleeping. Without this, a non-delayed message sent + // earlier in this poll cycle would stay buffered until + // the delay expires. + ready!(Pin::new(&mut this.sink).poll_flush(cx))?; + return Poll::Pending; + } + } + } + ready!(this.try_start_send(cx, message))? + } Poll::Ready(None) => { ready!(Pin::new(&mut this.sink).poll_flush(cx))?; return Poll::Ready(Ok(())); @@ -934,6 +1069,7 @@ impl Client { let (sink, incoming) = conn.split(); let sender = Sender { tx_outgoing }; + let penalty_threshold = config.flood_penalty_threshold() as u64; Ok(Client { sender: sender.clone(), @@ -943,6 +1079,10 @@ impl Client { sink, stream: rx_outgoing, buffered: None, + penalty: 0, + penalty_threshold, + last_penalty_check: tokio::time::Instant::now(), + delay: None, }), #[cfg(test)] view, @@ -1093,15 +1233,15 @@ impl Client { mod test { use std::{collections::HashMap, default::Default, thread, time::Duration}; - use super::Client; + use super::{Client, Outgoing}; #[cfg(feature = "channel-lists")] use crate::client::data::User; use crate::{ client::data::Config, error::Error, proto::{ - command::Command::{Raw, PRIVMSG}, - ChannelMode, IrcCodec, Mode, UserMode, + command::Command::{self, Raw, PRIVMSG}, + ChannelMode, IrcCodec, Message, Mode, UserMode, }, }; use anyhow::Result; @@ -1116,6 +1256,7 @@ mod test { channels: vec!["#test".to_string(), "#test2".to_string()], user_info: Some("Testing.".to_string()), use_mock_connection: true, + flood_penalty_threshold: Some(0), ..Default::default() } } @@ -1974,4 +2115,114 @@ mod test { ); Ok(()) } + + // -- Flood penalty unit tests -- + + #[test] + fn command_penalty_connection_control_is_zero() { + assert_eq!( + Outgoing::command_penalty(&Command::PONG(String::new(), None)), + 0 + ); + assert_eq!(Outgoing::command_penalty(&Command::QUIT(None)), 0); + assert_eq!(Outgoing::command_penalty(&Command::PASS(String::new())), 0); + } + + #[test] + fn command_penalty_cap_negotiation_is_zero() { + assert_eq!( + Outgoing::command_penalty(&Command::CAP( + None, + irc_proto::command::CapSubCommand::LS, + None, + None, + )), + 0 + ); + } + + #[test] + fn command_penalty_nick_is_3s() { + assert_eq!( + Outgoing::command_penalty(&Command::NICK("foo".into())), + 3000 + ); + } + + #[test] + fn command_penalty_part_is_4s() { + assert_eq!( + Outgoing::command_penalty(&Command::PART("#ch".into(), None)), + 4000 + ); + } + + #[test] + fn command_penalty_who_with_mask_is_2s() { + assert_eq!( + Outgoing::command_penalty(&Command::WHO(Some("#chan".into()), None)), + 2000 + ); + } + + #[test] + fn command_penalty_who_without_mask_is_10s() { + assert_eq!(Outgoing::command_penalty(&Command::WHO(None, None)), 10_000); + assert_eq!( + Outgoing::command_penalty(&Command::WHO(Some(String::new()), None)), + 10_000 + ); + } + + #[test] + fn command_penalty_privmsg_is_2s() { + assert_eq!( + Outgoing::command_penalty(&Command::PRIVMSG("#ch".into(), "hello".into())), + 2000 + ); + } + + #[test] + fn command_penalty_catchall_is_2s() { + assert_eq!( + Outgoing::command_penalty(&Command::JOIN("#ch".into(), None, None)), + 2000 + ); + assert_eq!( + Outgoing::command_penalty(&Command::Raw("WHO".into(), vec!["#ch".into()])), + 2000 + ); + } + + #[test] + fn length_penalty_short_message() { + // A short PRIVMSG: "PRIVMSG #test :hi\r\n" ≈ 19 bytes → (1 + 19/100) * 1000 = 1000 + let msg: Message = "PRIVMSG #test :hi\r\n".parse().unwrap(); + assert_eq!(Outgoing::length_penalty(&msg), 1000); + } + + #[test] + fn length_penalty_long_message() { + // Build a message > 200 bytes to verify scaling. + let long_text = "x".repeat(200); + let raw = format!("PRIVMSG #test :{}\r\n", long_text); + let msg: Message = raw.parse().unwrap(); + let len = msg.to_string().len() as u64; + assert_eq!(Outgoing::length_penalty(&msg), (1 + len / 100) * 1000); + // With ~216 bytes: (1 + 216/100) * 1000 = (1+2)*1000 = 3000 + assert!(Outgoing::length_penalty(&msg) >= 3000); + } + + #[test] + fn flood_penalty_threshold_disabled_by_zero() { + // Verify that test_config() disables penalty (threshold = 0). + let config = test_config(); + assert_eq!(config.flood_penalty_threshold(), 0); + } + + #[test] + fn flood_penalty_threshold_default_is_10s() { + let config = Config::default(); + assert_eq!(config.flood_penalty_threshold(), 10_000); + } }