Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions examples/repeater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ async fn main() -> irc::error::Result<()> {
nickname: Some("pickles".to_owned()),
server: Some("chat.freenode.net".to_owned()),
channels: vec!["#rust-spam".to_owned()],
burst_window_length: Some(4),
max_messages_in_burst: Some(4),
flood_penalty_threshold: Some(10_000),
..Default::default()
};

Expand Down
24 changes: 24 additions & 0 deletions src/client/data/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,25 @@ pub struct Config {
/// messages will be delayed automatically as appropriate. In particular, in the past
/// `burst_window_length` seconds, there will never be more than `max_messages_in_burst` messages
/// sent.
#[deprecated(note = "Unimplemented. Use flood_penalty_threshold instead.")]
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
pub burst_window_length: Option<u32>,
/// The maximum number of messages that can be sent in a burst window before they'll be delayed.
/// Messages are automatically delayed as appropriate.
#[deprecated(note = "Unimplemented. Use flood_penalty_threshold instead.")]
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
pub max_messages_in_burst: Option<u32>,
/// The penalty threshold in milliseconds for IRC flood protection (RFC 2813 §5.8).
/// Each outgoing message incurs a penalty based on both its byte length and command type,
/// matching the IRC server's own flood detection formula:
/// `(1 + message_bytes / 100) * 1000ms + command_penalty_ms`.
/// When accumulated penalty exceeds this threshold, outgoing messages are delayed until
/// the penalty drains below it. Penalty drains in real-time at 1ms per 1ms elapsed.
///
/// Set to `0` to disable flood protection entirely.
/// Defaults to 10000 (10 seconds), matching the standard IRCd excess flood limit.
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "Option::is_none"))]
pub flood_penalty_threshold: Option<u32>,
/// Whether the client should use NickServ GHOST to reclaim its primary nickname if it is in
/// use. This has no effect if `nick_password` is not set.
#[cfg_attr(feature = "serde", serde(skip_serializing_if = "is_false"))]
Expand Down Expand Up @@ -600,7 +613,9 @@ impl Config {
/// system maintains the invariant that in the past `burst_window_length` seconds, the maximum
/// number of messages sent is `max_messages_in_burst`.
/// This defaults to 8 seconds when not specified.
#[deprecated(note = "Unimplemented. Use flood_penalty_threshold instead.")]
pub fn burst_window_length(&self) -> u32 {
#[allow(deprecated)]
self.burst_window_length.as_ref().cloned().unwrap_or(8)
}

Expand All @@ -609,10 +624,19 @@ impl Config {
/// system maintains the invariant that in the past `burst_window_length` seconds, the maximum
/// number of messages sent is `max_messages_in_burst`.
/// This defaults to 15 messages when not specified.
#[deprecated(note = "Unimplemented. Use flood_penalty_threshold instead.")]
pub fn max_messages_in_burst(&self) -> u32 {
#[allow(deprecated)]
self.max_messages_in_burst.as_ref().cloned().unwrap_or(15)
}

/// Gets the penalty threshold in milliseconds for IRC flood protection.
/// When accumulated penalty exceeds this value, outgoing messages are delayed.
/// Defaults to 10000ms (10 seconds). Set to 0 to disable.
pub fn flood_penalty_threshold(&self) -> u32 {
self.flood_penalty_threshold.unwrap_or(10_000)
}

/// Gets whether or not to attempt nickname reclamation using NickServ GHOST.
/// This defaults to false when not specified.
pub fn should_ghost(&self) -> bool {
Expand Down
265 changes: 258 additions & 7 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -825,17 +825,103 @@ impl Sender {
pub_sender_base!();
}

/// Future to handle outgoing messages.
/// Future to handle outgoing messages with IRC flood protection.
///
/// Note: this is essentially the same as a version of [SendAll](https://github.com/rust-lang-nursery/futures-rs/blob/master/futures-util/src/sink/send_all.rs) that owns it's sink and stream.
/// Implements a penalty-based throttle modeled after IRCd (RFC 2813 §5.8). Each outgoing
/// message incurs a penalty based on both its byte length and command type, matching the
/// server's own flood detection formula. When accumulated penalty exceeds a configurable
/// threshold (default 10s), messages are delayed until the penalty drains below it.
/// Penalty drains in real-time at 1ms per 1ms elapsed.
///
/// Total penalty per message: `(1 + message_bytes / 100) * 1000 + command_penalty_ms`
#[derive(Debug)]
pub struct Outgoing {
sink: SplitSink<Connection, Message>,
stream: UnboundedReceiver<Message>,
buffered: Option<Message>,
/// Accumulated penalty in milliseconds.
penalty: u64,
/// Threshold above which messages are delayed. 0 = disabled.
penalty_threshold: u64,
/// Last time penalty was drained.
last_penalty_check: tokio::time::Instant,
/// Active delay future for throttling.
delay: Option<Pin<Box<tokio::time::Sleep>>>,
}

impl Outgoing {
/// Returns the penalty cost in milliseconds for a given IRC command.
///
/// Values mirror the IRCd penalty model (RFC 2813 §5.8). The server-side
/// implementation charges `(1 + message_bytes / 100)` seconds as a base
/// cost per message, plus a command-specific penalty. We replicate this
/// client-side to stay under the server's flood threshold.
///
/// The total penalty for a message is: `base_penalty(len) + command_penalty`
fn command_penalty(command: &Command) -> u64 {
match command {
// Connection control — never throttled client-side. The server
// does charge 1-2s for PONG, but delaying pong replies risks
// ping timeout disconnects, so we exempt these.
Command::PONG(..) | Command::QUIT(..) | Command::PASS(..) => 0,

// CAP negotiation — must not be throttled during registration
Command::CAP(..) | Command::AUTHENTICATE(..) => 0,

// NICK changes incur 3s on IRCd
Command::NICK(..) => 3000,

// PART is expensive on IRCd (4s)
Command::PART(..) => 4000,

// WHO, NAMES, LIST without args are catastrophic on IRCd (10s).
// With args they're 2s. We can't distinguish no-arg vs arg here
// since the Option is always present in the enum, so we check
// whether the argument is None/empty.
Command::WHO(ref mask, _) => match mask {
None => 10_000,
Some(m) if m.is_empty() => 10_000,
_ => 2000,
},
Command::LIST(ref mask, _) | Command::NAMES(ref mask, _) => match mask {
None => 10_000,
Some(m) if m.is_empty() => 10_000,
_ => 2000,
},

// Other expensive server queries
Command::WHOIS(..) | Command::WHOWAS(..) => 3000,
Command::LINKS(..) | Command::STATS(..) => 3000,
Command::LUSERS(..) | Command::TRACE(..) => 2000,
Command::USERS(..) | Command::MOTD(..) | Command::INFO(..) => 5000,

// PRIVMSG/NOTICE: IRCd charges 1s per target. We approximate
// with a flat 2s since most client sends target a single entity.
Command::PRIVMSG(..) | Command::NOTICE(..) => 2000,

// JOIN, KICK, INVITE, MODE, TOPIC, AWAY, and all others: 2s
_ => 2000,
}
}

/// Returns the base penalty in milliseconds derived from message length.
///
/// Mirrors the IRCd formula: `(1 + message_bytes / 100)` seconds.
/// This ensures long messages incur proportionally higher penalties,
/// matching the server's own flood calculation.
fn length_penalty(message: &Message) -> u64 {
let len = message.to_string().len() as u64;
(1 + len / 100) * 1000
}

/// Drains accumulated penalty based on elapsed real time.
fn drain_penalty(&mut self) {
let now = tokio::time::Instant::now();
let elapsed = now.duration_since(self.last_penalty_check).as_millis() as u64;
self.penalty = self.penalty.saturating_sub(elapsed);
self.last_penalty_check = now;
}

fn try_start_send(
&mut self,
cx: &mut Context<'_>,
Expand Down Expand Up @@ -867,13 +953,62 @@ impl Future for Outgoing {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = &mut *self;

// If we're waiting on a throttle delay, poll it first.
if let Some(ref mut delay) = this.delay {
ready!(delay.as_mut().poll(cx));
this.delay = None;
this.drain_penalty();
}

// Send the message that was buffered during the delay, then flush
// it to TCP immediately. Without this flush, messages accumulate
// in the codec buffer across multiple delay cycles and burst all
// at once when the stream finally goes idle — causing Excess Flood.
if let Some(message) = this.buffered.take() {
ready!(this.try_start_send(cx, message))?
ready!(this.try_start_send(cx, message))?;
ready!(Pin::new(&mut this.sink).poll_flush(cx))?;
}

loop {
match this.stream.poll_recv(cx) {
Poll::Ready(Some(message)) => ready!(this.try_start_send(cx, message))?,
Poll::Ready(Some(message)) => {
// Apply penalty-based throttle if enabled.
if this.penalty_threshold > 0 {
let cmd_cost = Self::command_penalty(&message.command);
if cmd_cost > 0 {
let len_cost = Self::length_penalty(&message);
let cost = len_cost + cmd_cost;
this.drain_penalty();
this.penalty += cost;

if this.penalty > this.penalty_threshold {
let excess = this.penalty - this.penalty_threshold;
log::debug!(
"Flood penalty {}ms exceeds threshold {}ms, delaying {}ms.",
this.penalty,
this.penalty_threshold,
excess,
);
this.delay = Some(Box::pin(tokio::time::sleep(
std::time::Duration::from_millis(excess),
)));
// Buffer the message and return Pending so the delay runs.
this.buffered = Some(message);
// Register waker with the delay future.
if let Some(ref mut delay) = this.delay {
let _ = delay.as_mut().poll(cx);
}
// Flush any messages already in the codec buffer before
// sleeping. Without this, a non-delayed message sent
// earlier in this poll cycle would stay buffered until
// the delay expires.
ready!(Pin::new(&mut this.sink).poll_flush(cx))?;
return Poll::Pending;
}
}
}
ready!(this.try_start_send(cx, message))?
}
Poll::Ready(None) => {
ready!(Pin::new(&mut this.sink).poll_flush(cx))?;
return Poll::Ready(Ok(()));
Expand Down Expand Up @@ -934,6 +1069,7 @@ impl Client {
let (sink, incoming) = conn.split();

let sender = Sender { tx_outgoing };
let penalty_threshold = config.flood_penalty_threshold() as u64;

Ok(Client {
sender: sender.clone(),
Expand All @@ -943,6 +1079,10 @@ impl Client {
sink,
stream: rx_outgoing,
buffered: None,
penalty: 0,
penalty_threshold,
last_penalty_check: tokio::time::Instant::now(),
delay: None,
}),
#[cfg(test)]
view,
Expand Down Expand Up @@ -1093,15 +1233,15 @@ impl Client {
mod test {
use std::{collections::HashMap, default::Default, thread, time::Duration};

use super::Client;
use super::{Client, Outgoing};
#[cfg(feature = "channel-lists")]
use crate::client::data::User;
use crate::{
client::data::Config,
error::Error,
proto::{
command::Command::{Raw, PRIVMSG},
ChannelMode, IrcCodec, Mode, UserMode,
command::Command::{self, Raw, PRIVMSG},
ChannelMode, IrcCodec, Message, Mode, UserMode,
},
};
use anyhow::Result;
Expand All @@ -1116,6 +1256,7 @@ mod test {
channels: vec!["#test".to_string(), "#test2".to_string()],
user_info: Some("Testing.".to_string()),
use_mock_connection: true,
flood_penalty_threshold: Some(0),
..Default::default()
}
}
Expand Down Expand Up @@ -1974,4 +2115,114 @@ mod test {
);
Ok(())
}

// -- Flood penalty unit tests --

#[test]
fn command_penalty_connection_control_is_zero() {
assert_eq!(
Outgoing::command_penalty(&Command::PONG(String::new(), None)),
0
);
assert_eq!(Outgoing::command_penalty(&Command::QUIT(None)), 0);
assert_eq!(Outgoing::command_penalty(&Command::PASS(String::new())), 0);
}

#[test]
fn command_penalty_cap_negotiation_is_zero() {
assert_eq!(
Outgoing::command_penalty(&Command::CAP(
None,
irc_proto::command::CapSubCommand::LS,
None,
None,
)),
0
);
}

#[test]
fn command_penalty_nick_is_3s() {
assert_eq!(
Outgoing::command_penalty(&Command::NICK("foo".into())),
3000
);
}

#[test]
fn command_penalty_part_is_4s() {
assert_eq!(
Outgoing::command_penalty(&Command::PART("#ch".into(), None)),
4000
);
}

#[test]
fn command_penalty_who_with_mask_is_2s() {
assert_eq!(
Outgoing::command_penalty(&Command::WHO(Some("#chan".into()), None)),
2000
);
}

#[test]
fn command_penalty_who_without_mask_is_10s() {
assert_eq!(Outgoing::command_penalty(&Command::WHO(None, None)), 10_000);
assert_eq!(
Outgoing::command_penalty(&Command::WHO(Some(String::new()), None)),
10_000
);
}

#[test]
fn command_penalty_privmsg_is_2s() {
assert_eq!(
Outgoing::command_penalty(&Command::PRIVMSG("#ch".into(), "hello".into())),
2000
);
}

#[test]
fn command_penalty_catchall_is_2s() {
assert_eq!(
Outgoing::command_penalty(&Command::JOIN("#ch".into(), None, None)),
2000
);
assert_eq!(
Outgoing::command_penalty(&Command::Raw("WHO".into(), vec!["#ch".into()])),
2000
);
}

#[test]
fn length_penalty_short_message() {
// A short PRIVMSG: "PRIVMSG #test :hi\r\n" ≈ 19 bytes → (1 + 19/100) * 1000 = 1000
let msg: Message = "PRIVMSG #test :hi\r\n".parse().unwrap();
assert_eq!(Outgoing::length_penalty(&msg), 1000);
}

#[test]
fn length_penalty_long_message() {
// Build a message > 200 bytes to verify scaling.
let long_text = "x".repeat(200);
let raw = format!("PRIVMSG #test :{}\r\n", long_text);
let msg: Message = raw.parse().unwrap();
let len = msg.to_string().len() as u64;
assert_eq!(Outgoing::length_penalty(&msg), (1 + len / 100) * 1000);
// With ~216 bytes: (1 + 216/100) * 1000 = (1+2)*1000 = 3000
assert!(Outgoing::length_penalty(&msg) >= 3000);
}

#[test]
fn flood_penalty_threshold_disabled_by_zero() {
// Verify that test_config() disables penalty (threshold = 0).
let config = test_config();
assert_eq!(config.flood_penalty_threshold(), 0);
}

#[test]
fn flood_penalty_threshold_default_is_10s() {
let config = Config::default();
assert_eq!(config.flood_penalty_threshold(), 10_000);
}
}