Add IMAP IDLE support (#27)

* Add IMAP IDLE support

This patch adds support for the IMAP IDLE command specified in RFC 2177.
It allows clients to block while waiting for changes to a selected
mailbox, without having to poll the server. This is especially useful
for monitoring a mailbox for new messages.

The API is currently somewhat primitive: users can call `Client::idle()`
to get an IDLE "handle", which they can then call `wait()` on to block
until a change is detected. The implementation also provides
`wait_keepalive()` (which tries to avoid connection timeouts) and
`wait_timeout()` (which allows the user to specify the maximum time to
block).

Further down the line, the handle should probably also implement
`Iterator` to allow clients to watch new events as they happen. This
*could* be implemented today, but given that most other `Client` methods
just return unparsed strings at the moment, I didn't feel like that
enhancement was a high priority. For now, users will have to manually
query the mailbox for what changed.

Fixes #26.

* Avoid unnecessary as_bytes()

* Make wait_keepalive interval configurable

* Avoid ?, which requires Rust 1.13
This commit is contained in:
Jon Gjengset 2017-03-02 17:55:32 -05:00 committed by Matt McCoy
parent 1ef94f86dc
commit e5b4346114

View file

@ -1,6 +1,7 @@
use std::net::{TcpStream, ToSocketAddrs}; use std::net::{TcpStream, ToSocketAddrs};
use openssl::ssl::{SslContext, SslStream}; use openssl::ssl::{SslContext, SslStream};
use std::io::{Read, Write}; use std::io::{self, Read, Write};
use std::time::Duration;
use super::mailbox::Mailbox; use super::mailbox::Mailbox;
use super::authenticator::Authenticator; use super::authenticator::Authenticator;
@ -19,6 +20,149 @@ pub struct Client<T> {
pub debug: bool pub debug: bool
} }
/// `IdleHandle` allows a client to block waiting for changes to the remote mailbox.
///
/// The handle blocks using the IMAP IDLE command specificed in [RFC
/// 2177](https://tools.ietf.org/html/rfc2177).
///
/// As long a the handle is active, the mailbox cannot be otherwise accessed.
pub struct IdleHandle<'a, T: Read + Write + 'a> {
client: &'a mut Client<T>,
keepalive: Duration,
}
/// Must be implemented for a transport in order for a `Client` using that transport to support
/// operations with timeouts.
///
/// Examples of where this is useful is for `IdleHandle::wait_keepalive` and
/// `IdleHandle::wait_timeout`.
pub trait SetReadTimeout {
/// Set the timeout for subsequent reads to the given one.
///
/// If `timeout` is `None`, the read timeout should be removed.
///
/// See also `std::net::TcpStream::set_read_timeout`.
fn set_read_timeout(&mut self, timeout: Option<Duration>) -> Result<()>;
}
impl<'a, T: Read + Write + 'a> IdleHandle<'a, T> {
fn new(client: &'a mut Client<T>) -> Result<Self> {
let mut h = IdleHandle {
client: client,
keepalive: Duration::from_secs(29 * 60),
};
try!(h.init());
Ok(h)
}
fn init(&mut self) -> Result<()> {
// https://tools.ietf.org/html/rfc2177
//
// The IDLE command takes no arguments.
try!(self.client.run_command("IDLE"));
// A tagged response will be sent either
//
// a) if there's an error, or
// b) *after* we send DONE
let tag = format!("{}{} ", TAG_PREFIX, self.client.tag);
let raw_data = try!(self.client.readline());
let line = String::from_utf8(raw_data).unwrap();
if line.starts_with(&tag) {
try!(parse_response(vec![line]));
// We should *only* get a continuation on an error (i.e., it gives BAD or NO).
unreachable!();
} else if !line.starts_with("+") {
return Err(Error::BadResponse(vec![line]));
}
Ok(())
}
fn terminate(&mut self) -> Result<()> {
try!(self.client.write_line(b"DONE"));
let lines = try!(self.client.read_response());
parse_response_ok(lines)
}
/// Block until the selected mailbox changes.
pub fn wait(&mut self) -> Result<()> {
self.client.readline().map(|_| ())
}
/// Cancel the IDLE handle prematurely.
pub fn cancel(self) {
// causes Drop to be called
}
}
impl<'a, T: SetReadTimeout + Read + Write + 'a> IdleHandle<'a, T> {
/// Set the keep-alive interval to use when `wait_keepalive` is called.
///
/// The interval defaults to 29 minutes as dictated by RFC 2177.
pub fn set_keepalive(&mut self, interval: Duration) {
self.keepalive = interval;
}
/// Block until the selected mailbox changes.
///
/// This method differs from `IdleHandle::wait` in that it will periodically refresh the IDLE
/// connection, to prevent the server from timing out our connection. The keepalive interval is
/// set to 29 minutes by default, as dictated by RFC 2177, but can be changed using
/// `set_keepalive`.
///
/// This is the recommended method to use for waiting.
pub fn wait_keepalive(&mut self) -> Result<()> {
// The server MAY consider a client inactive if it has an IDLE command
// running, and if such a server has an inactivity timeout it MAY log
// the client off implicitly at the end of its timeout period. Because
// of that, clients using IDLE are advised to terminate the IDLE and
// re-issue it at least every 29 minutes to avoid being logged off.
// This still allows a client to receive immediate mailbox updates even
// though it need only "poll" at half hour intervals.
try!(self.client.stream.set_read_timeout(Some(self.keepalive)));
match self.wait() {
Err(Error::Io(ref e)) if e.kind() == io::ErrorKind::TimedOut ||
e.kind() == io::ErrorKind::WouldBlock => {
// we need to refresh the IDLE connection
try!(self.terminate());
try!(self.init());
self.wait_keepalive()
}
r => {
try!(self.client.stream.set_read_timeout(None));
r
}
}
}
/// Block until the selected mailbox changes, or until the given amount of time has expired.
pub fn wait_timeout(&mut self, timeout: Duration) -> Result<()> {
try!(self.client.stream.set_read_timeout(Some(timeout)));
let res = self.wait();
try!(self.client.stream.set_read_timeout(None));
res
}
}
impl<'a, T: Read + Write + 'a> Drop for IdleHandle<'a, T> {
fn drop(&mut self) {
self.terminate().expect("IDLE connection did not terminate cleanly");
}
}
impl<'a> SetReadTimeout for TcpStream {
fn set_read_timeout(&mut self, timeout: Option<Duration>) -> Result<()> {
TcpStream::set_read_timeout(self, timeout).map_err(|e| Error::Io(e))
}
}
impl<'a> SetReadTimeout for SslStream<TcpStream> {
fn set_read_timeout(&mut self, timeout: Option<Duration>) -> Result<()> {
self.get_ref().set_read_timeout(timeout).map_err(|e| Error::Io(e))
}
}
impl Client<TcpStream> { impl Client<TcpStream> {
/// Creates a new client. /// Creates a new client.
pub fn connect<A: ToSocketAddrs>(addr: A) -> Result<Client<TcpStream>> { pub fn connect<A: ToSocketAddrs>(addr: A) -> Result<Client<TcpStream>> {
@ -230,6 +374,12 @@ impl<T: Read+Write> Client<T> {
self.run_command_and_parse(&format!("STATUS {} {}", mailbox_name, status_data_items)) self.run_command_and_parse(&format!("STATUS {} {}", mailbox_name, status_data_items))
} }
/// Returns a handle that can be used to block until the state of the currently selected
/// mailbox changes.
pub fn idle(&mut self) -> Result<IdleHandle<T>> {
IdleHandle::new(self)
}
/// Runs a command and checks if it returns OK. /// Runs a command and checks if it returns OK.
pub fn run_command_and_check_ok(&mut self, command: &str) -> Result<()> { pub fn run_command_and_check_ok(&mut self, command: &str) -> Result<()> {
let lines = try!(self.run_command_and_read_response(command)); let lines = try!(self.run_command_and_read_response(command));