Merge pull request #159 from mredaelli/master

Support timing out wait on IDLE
This commit is contained in:
Jon Gjengset 2020-05-12 08:40:39 -04:00 committed by GitHub
commit c2062a5f07
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -31,6 +31,15 @@ pub struct Handle<'a, T: Read + Write> {
done: bool, done: bool,
} }
/// The result of a wait on a [`Handle`]
#[derive(Debug, PartialEq, Eq)]
pub enum WaitOutcome {
/// The wait timed out
TimedOut,
/// The mailbox was modified
MailboxChanged,
}
/// Must be implemented for a transport in order for a `Session` using that transport to support /// Must be implemented for a transport in order for a `Session` using that transport to support
/// operations with timeouts. /// operations with timeouts.
/// ///
@ -91,34 +100,37 @@ impl<'a, T: Read + Write + 'a> Handle<'a, T> {
/// Internal helper that doesn't consume self. /// Internal helper that doesn't consume self.
/// ///
/// This is necessary so that we can keep using the inner `Session` in `wait_keepalive`. /// This is necessary so that we can keep using the inner `Session` in `wait_keepalive`.
fn wait_inner(&mut self) -> Result<()> { fn wait_inner(&mut self, reconnect: bool) -> Result<WaitOutcome> {
let mut v = Vec::new(); let mut v = Vec::new();
loop { loop {
match self.session.readline(&mut v).map(|_| ()) { let result = match self.session.readline(&mut v).map(|_| ()) {
Err(Error::Io(ref e)) Err(Error::Io(ref e))
if e.kind() == io::ErrorKind::TimedOut if e.kind() == io::ErrorKind::TimedOut
|| e.kind() == io::ErrorKind::WouldBlock => || e.kind() == io::ErrorKind::WouldBlock =>
{ {
// we need to refresh the IDLE connection if reconnect {
self.terminate()?; self.terminate()?;
self.init()?; self.init()?;
return self.wait_inner(); return self.wait_inner(reconnect);
}
Ok(WaitOutcome::TimedOut)
} }
r => r, Ok(()) => Ok(WaitOutcome::MailboxChanged),
Err(r) => Err(r),
}?; }?;
// Handle Dovecot's imap_idle_notify_interval message // Handle Dovecot's imap_idle_notify_interval message
if v.eq_ignore_ascii_case(b"* OK Still here\r\n") { if v.eq_ignore_ascii_case(b"* OK Still here\r\n") {
v.clear(); v.clear();
} else { } else {
break Ok(()); break Ok(result);
} }
} }
} }
/// Block until the selected mailbox changes. /// Block until the selected mailbox changes.
pub fn wait(mut self) -> Result<()> { pub fn wait(mut self) -> Result<()> {
self.wait_inner() self.wait_inner(true).map(|_| ())
} }
} }
@ -147,16 +159,26 @@ impl<'a, T: SetReadTimeout + Read + Write + 'a> Handle<'a, T> {
// This still allows a client to receive immediate mailbox updates even // This still allows a client to receive immediate mailbox updates even
// though it need only "poll" at half hour intervals. // though it need only "poll" at half hour intervals.
let keepalive = self.keepalive; let keepalive = self.keepalive;
self.wait_timeout(keepalive) self.timed_wait(keepalive, true).map(|_| ())
} }
/// Block until the selected mailbox changes, or until the given amount of time has expired. /// Block until the selected mailbox changes, or until the given amount of time has expired.
pub fn wait_timeout(mut self, timeout: Duration) -> Result<()> { #[deprecated(note = "use wait_with_timeout instead")]
pub fn wait_timeout(self, timeout: Duration) -> Result<()> {
self.wait_with_timeout(timeout).map(|_| ())
}
/// Block until the selected mailbox changes, or until the given amount of time has expired.
pub fn wait_with_timeout(self, timeout: Duration) -> Result<WaitOutcome> {
self.timed_wait(timeout, false)
}
fn timed_wait(mut self, timeout: Duration, reconnect: bool) -> Result<WaitOutcome> {
self.session self.session
.stream .stream
.get_mut() .get_mut()
.set_read_timeout(Some(timeout))?; .set_read_timeout(Some(timeout))?;
let res = self.wait_inner(); let res = self.wait_inner(reconnect);
let _ = self.session.stream.get_mut().set_read_timeout(None).is_ok(); let _ = self.session.stream.get_mut().set_read_timeout(None).is_ok();
res res
} }