From e5b43461144a4139a2b69874741ba438cac70439 Mon Sep 17 00:00:00 2001 From: Jon Gjengset Date: Thu, 2 Mar 2017 17:55:32 -0500 Subject: [PATCH] Add IMAP IDLE support (#27) * Add IMAP IDLE support This patch adds support for the IMAP IDLE command specified in RFC 2177. It allows clients to block while waiting for changes to a selected mailbox, without having to poll the server. This is especially useful for monitoring a mailbox for new messages. The API is currently somewhat primitive: users can call `Client::idle()` to get an IDLE "handle", which they can then call `wait()` on to block until a change is detected. The implementation also provides `wait_keepalive()` (which tries to avoid connection timeouts) and `wait_timeout()` (which allows the user to specify the maximum time to block). Further down the line, the handle should probably also implement `Iterator` to allow clients to watch new events as they happen. This *could* be implemented today, but given that most other `Client` methods just return unparsed strings at the moment, I didn't feel like that enhancement was a high priority. For now, users will have to manually query the mailbox for what changed. Fixes #26. * Avoid unnecessary as_bytes() * Make wait_keepalive interval configurable * Avoid ?, which requires Rust 1.13 --- src/client.rs | 152 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 151 insertions(+), 1 deletion(-) diff --git a/src/client.rs b/src/client.rs index b51784b..0e5faf0 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,6 +1,7 @@ use std::net::{TcpStream, ToSocketAddrs}; use openssl::ssl::{SslContext, SslStream}; -use std::io::{Read, Write}; +use std::io::{self, Read, Write}; +use std::time::Duration; use super::mailbox::Mailbox; use super::authenticator::Authenticator; @@ -19,6 +20,149 @@ pub struct Client { pub debug: bool } +/// `IdleHandle` allows a client to block waiting for changes to the remote mailbox. +/// +/// The handle blocks using the IMAP IDLE command specificed in [RFC +/// 2177](https://tools.ietf.org/html/rfc2177). +/// +/// As long a the handle is active, the mailbox cannot be otherwise accessed. +pub struct IdleHandle<'a, T: Read + Write + 'a> { + client: &'a mut Client, + keepalive: Duration, +} + +/// Must be implemented for a transport in order for a `Client` using that transport to support +/// operations with timeouts. +/// +/// Examples of where this is useful is for `IdleHandle::wait_keepalive` and +/// `IdleHandle::wait_timeout`. +pub trait SetReadTimeout { + /// Set the timeout for subsequent reads to the given one. + /// + /// If `timeout` is `None`, the read timeout should be removed. + /// + /// See also `std::net::TcpStream::set_read_timeout`. + fn set_read_timeout(&mut self, timeout: Option) -> Result<()>; +} + +impl<'a, T: Read + Write + 'a> IdleHandle<'a, T> { + fn new(client: &'a mut Client) -> Result { + let mut h = IdleHandle { + client: client, + keepalive: Duration::from_secs(29 * 60), + }; + try!(h.init()); + Ok(h) + } + + fn init(&mut self) -> Result<()> { + // https://tools.ietf.org/html/rfc2177 + // + // The IDLE command takes no arguments. + try!(self.client.run_command("IDLE")); + + // A tagged response will be sent either + // + // a) if there's an error, or + // b) *after* we send DONE + let tag = format!("{}{} ", TAG_PREFIX, self.client.tag); + let raw_data = try!(self.client.readline()); + let line = String::from_utf8(raw_data).unwrap(); + if line.starts_with(&tag) { + try!(parse_response(vec![line])); + // We should *only* get a continuation on an error (i.e., it gives BAD or NO). + unreachable!(); + } else if !line.starts_with("+") { + return Err(Error::BadResponse(vec![line])); + } + + Ok(()) + } + + fn terminate(&mut self) -> Result<()> { + try!(self.client.write_line(b"DONE")); + let lines = try!(self.client.read_response()); + parse_response_ok(lines) + } + + /// Block until the selected mailbox changes. + pub fn wait(&mut self) -> Result<()> { + self.client.readline().map(|_| ()) + } + + /// Cancel the IDLE handle prematurely. + pub fn cancel(self) { + // causes Drop to be called + } +} + +impl<'a, T: SetReadTimeout + Read + Write + 'a> IdleHandle<'a, T> { + /// Set the keep-alive interval to use when `wait_keepalive` is called. + /// + /// The interval defaults to 29 minutes as dictated by RFC 2177. + pub fn set_keepalive(&mut self, interval: Duration) { + self.keepalive = interval; + } + + /// Block until the selected mailbox changes. + /// + /// This method differs from `IdleHandle::wait` in that it will periodically refresh the IDLE + /// connection, to prevent the server from timing out our connection. The keepalive interval is + /// set to 29 minutes by default, as dictated by RFC 2177, but can be changed using + /// `set_keepalive`. + /// + /// This is the recommended method to use for waiting. + pub fn wait_keepalive(&mut self) -> Result<()> { + // The server MAY consider a client inactive if it has an IDLE command + // running, and if such a server has an inactivity timeout it MAY log + // the client off implicitly at the end of its timeout period. Because + // of that, clients using IDLE are advised to terminate the IDLE and + // re-issue it at least every 29 minutes to avoid being logged off. + // This still allows a client to receive immediate mailbox updates even + // though it need only "poll" at half hour intervals. + try!(self.client.stream.set_read_timeout(Some(self.keepalive))); + match self.wait() { + Err(Error::Io(ref e)) if e.kind() == io::ErrorKind::TimedOut || + e.kind() == io::ErrorKind::WouldBlock => { + // we need to refresh the IDLE connection + try!(self.terminate()); + try!(self.init()); + self.wait_keepalive() + } + r => { + try!(self.client.stream.set_read_timeout(None)); + r + } + } + } + + /// Block until the selected mailbox changes, or until the given amount of time has expired. + pub fn wait_timeout(&mut self, timeout: Duration) -> Result<()> { + try!(self.client.stream.set_read_timeout(Some(timeout))); + let res = self.wait(); + try!(self.client.stream.set_read_timeout(None)); + res + } +} + +impl<'a, T: Read + Write + 'a> Drop for IdleHandle<'a, T> { + fn drop(&mut self) { + self.terminate().expect("IDLE connection did not terminate cleanly"); + } +} + +impl<'a> SetReadTimeout for TcpStream { + fn set_read_timeout(&mut self, timeout: Option) -> Result<()> { + TcpStream::set_read_timeout(self, timeout).map_err(|e| Error::Io(e)) + } +} + +impl<'a> SetReadTimeout for SslStream { + fn set_read_timeout(&mut self, timeout: Option) -> Result<()> { + self.get_ref().set_read_timeout(timeout).map_err(|e| Error::Io(e)) + } +} + impl Client { /// Creates a new client. pub fn connect(addr: A) -> Result> { @@ -230,6 +374,12 @@ impl Client { self.run_command_and_parse(&format!("STATUS {} {}", mailbox_name, status_data_items)) } + /// Returns a handle that can be used to block until the state of the currently selected + /// mailbox changes. + pub fn idle(&mut self) -> Result> { + IdleHandle::new(self) + } + /// Runs a command and checks if it returns OK. pub fn run_command_and_check_ok(&mut self, command: &str) -> Result<()> { let lines = try!(self.run_command_and_read_response(command));