From 8383b47f35c91d98118a59755b9831ddc5c5e21f Mon Sep 17 00:00:00 2001 From: Jon Gjengset Date: Wed, 27 Sep 2017 17:38:51 -0400 Subject: [PATCH] Use BufStream for better I/O handling (#39) * Use bufstream for better read/write * Read with length 0 == EOF * Adapt read_delay test to write one chat at a time * Add test for eof reads * Neater interface for MockStream --- Cargo.toml | 1 + src/client.rs | 82 +++++++++++++++++++++++++--------------------- src/error.rs | 10 ++++++ src/lib.rs | 1 + src/mock_stream.rs | 59 +++++++++++++++++++-------------- 5 files changed, 92 insertions(+), 61 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b1d5c3d..d9b5aa7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ path = "src/lib.rs" [dependencies] openssl = "0.9" regex = "0.2" +bufstream = "0.1" [dev-dependencies] base64 = "0.2" diff --git a/src/client.rs b/src/client.rs index 177ba5f..6c8ea46 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,6 +2,7 @@ use std::net::{TcpStream, ToSocketAddrs}; use openssl::ssl::{SslConnector, SslStream}; use std::io::{self, Read, Write}; use std::time::Duration; +use bufstream::BufStream; use super::mailbox::Mailbox; use super::authenticator::Authenticator; @@ -15,8 +16,8 @@ const CR: u8 = 0x0d; const LF: u8 = 0x0a; /// Stream to interface with the IMAP server. This interface is only for the command stream. -pub struct Client { - stream: T, +pub struct Client { + stream: BufStream, tag: u32, pub debug: bool, } @@ -121,7 +122,7 @@ impl<'a, T: SetReadTimeout + Read + Write + 'a> IdleHandle<'a, T> { // re-issue it at least every 29 minutes to avoid being logged off. // This still allows a client to receive immediate mailbox updates even // though it need only "poll" at half hour intervals. - try!(self.client.stream.set_read_timeout(Some(self.keepalive))); + try!(self.client.stream.get_mut().set_read_timeout(Some(self.keepalive))); match self.wait() { Err(Error::Io(ref e)) if e.kind() == io::ErrorKind::TimedOut || e.kind() == io::ErrorKind::WouldBlock => { @@ -131,7 +132,7 @@ impl<'a, T: SetReadTimeout + Read + Write + 'a> IdleHandle<'a, T> { self.wait_keepalive() } r => { - try!(self.client.stream.set_read_timeout(None)); + try!(self.client.stream.get_mut().set_read_timeout(None)); r } } @@ -139,9 +140,9 @@ impl<'a, T: SetReadTimeout + Read + Write + 'a> IdleHandle<'a, T> { /// Block until the selected mailbox changes, or until the given amount of time has expired. pub fn wait_timeout(&mut self, timeout: Duration) -> Result<()> { - try!(self.client.stream.set_read_timeout(Some(timeout))); + try!(self.client.stream.get_mut().set_read_timeout(Some(timeout))); let res = self.wait(); - try!(self.client.stream.set_read_timeout(None)); + try!(self.client.stream.get_mut().set_read_timeout(None)); res } } @@ -191,7 +192,7 @@ impl Client { ) -> Result>> { // TODO This needs to be tested self.run_command_and_check_ok("STARTTLS")?; - SslConnector::connect(&ssl_connector, domain, self.stream) + SslConnector::connect(&ssl_connector, domain, try!(self.stream.into_inner())) .map(Client::new) .map_err(Error::Ssl) } @@ -224,7 +225,7 @@ impl Client { /// Creates a new client with the underlying stream. pub fn new(stream: T) -> Client { Client { - stream: stream, + stream: BufStream::new(stream), tag: INITIAL_TAG, debug: false, } @@ -428,6 +429,7 @@ impl Client { } try!(self.stream.write_all(content)); try!(self.stream.write_all(b"\r\n")); + try!(self.stream.flush()); self.read_response() } @@ -477,22 +479,17 @@ impl Client { } fn readline(&mut self) -> Result> { + use std::io::BufRead; let mut line_buffer: Vec = Vec::new(); - while line_buffer.len() < 2 || - (line_buffer[line_buffer.len() - 1] != LF && line_buffer[line_buffer.len() - 2] != CR) - { - let byte_buffer: &mut [u8] = &mut [0]; - let n = try!(self.stream.read(byte_buffer)); - if n > 0 { - line_buffer.push(byte_buffer[0]); - } + if try!(self.stream.read_until(LF, &mut line_buffer)) == 0 { + return Err(Error::ConnectionLost); } if self.debug { - let mut line = line_buffer.clone(); // Remove CRLF - line.truncate(line_buffer.len() - 2); - print!("S: {}\n", String::from_utf8(line).unwrap()); + let len = line_buffer.len(); + let line = &line_buffer[..(len - 2)]; + print!("S: {}\n", String::from_utf8_lossy(line)); } Ok(line_buffer) @@ -507,6 +504,7 @@ impl Client { fn write_line(&mut self, buf: &[u8]) -> Result<()> { try!(self.stream.write_all(buf)); try!(self.stream.write_all(&[CR, LF])); + try!(self.stream.flush()); if self.debug { print!("C: {}\n", String::from_utf8(buf.to_vec()).unwrap()); } @@ -547,17 +545,27 @@ mod tests { fn readline_delay_read() { let greeting = "* OK Dovecot ready.\r\n"; let expected_response: String = greeting.to_string(); - let mock_stream = MockStream::new_read_delay(greeting.as_bytes().to_vec()); + let mock_stream = MockStream::default().with_buf(greeting.as_bytes().to_vec()).with_delay(); let mut client = Client::new(mock_stream); let actual_response = String::from_utf8(client.readline().unwrap()).unwrap(); assert_eq!(expected_response, actual_response); } + #[test] + fn readline_eof() { + let mock_stream = MockStream::default().with_eof(); + let mut client = Client::new(mock_stream); + if let Err(Error::ConnectionLost) = client.readline() { + } else { + unreachable!("EOF read did not return connection lost"); + } + } + #[test] #[should_panic] fn readline_err() { // TODO Check the error test - let mock_stream = MockStream::new_err(); + let mock_stream = MockStream::default().with_err(); let mut client = Client::new(mock_stream); client.readline().unwrap(); } @@ -565,7 +573,7 @@ mod tests { #[test] fn create_command() { let base_command = "CHECK"; - let mock_stream = MockStream::new(Vec::new()); + let mock_stream = MockStream::default(); let mut imap_stream = Client::new(mock_stream); let expected_command = format!("a1 {}", base_command); @@ -593,7 +601,7 @@ mod tests { let mut client = Client::new(mock_stream); client.login(username, password).unwrap(); assert!( - client.stream.written_buf == command.as_bytes().to_vec(), + client.stream.get_ref().written_buf == command.as_bytes().to_vec(), "Invalid login command" ); } @@ -606,7 +614,7 @@ mod tests { let mut client = Client::new(mock_stream); client.logout().unwrap(); assert!( - client.stream.written_buf == command.as_bytes().to_vec(), + client.stream.get_ref().written_buf == command.as_bytes().to_vec(), "Invalid logout command" ); } @@ -627,7 +635,7 @@ mod tests { .rename(current_mailbox_name, new_mailbox_name) .unwrap(); assert!( - client.stream.written_buf == command.as_bytes().to_vec(), + client.stream.get_ref().written_buf == command.as_bytes().to_vec(), "Invalid rename command" ); } @@ -641,7 +649,7 @@ mod tests { let mut client = Client::new(mock_stream); client.subscribe(mailbox).unwrap(); assert!( - client.stream.written_buf == command.as_bytes().to_vec(), + client.stream.get_ref().written_buf == command.as_bytes().to_vec(), "Invalid subscribe command" ); } @@ -655,7 +663,7 @@ mod tests { let mut client = Client::new(mock_stream); client.unsubscribe(mailbox).unwrap(); assert!( - client.stream.written_buf == command.as_bytes().to_vec(), + client.stream.get_ref().written_buf == command.as_bytes().to_vec(), "Invalid unsubscribe command" ); } @@ -667,7 +675,7 @@ mod tests { let mut client = Client::new(mock_stream); client.expunge().unwrap(); assert!( - client.stream.written_buf == b"a1 EXPUNGE\r\n".to_vec(), + client.stream.get_ref().written_buf == b"a1 EXPUNGE\r\n".to_vec(), "Invalid expunge command" ); } @@ -679,7 +687,7 @@ mod tests { let mut client = Client::new(mock_stream); client.check().unwrap(); assert!( - client.stream.written_buf == b"a1 CHECK\r\n".to_vec(), + client.stream.get_ref().written_buf == b"a1 CHECK\r\n".to_vec(), "Invalid check command" ); } @@ -710,7 +718,7 @@ mod tests { let mut client = Client::new(mock_stream); let mailbox = client.examine(mailbox_name).unwrap(); assert!( - client.stream.written_buf == command.as_bytes().to_vec(), + client.stream.get_ref().written_buf == command.as_bytes().to_vec(), "Invalid examine command" ); assert!(mailbox == expected_mailbox, "Unexpected mailbox returned"); @@ -744,7 +752,7 @@ mod tests { let mut client = Client::new(mock_stream); let mailbox = client.select(mailbox_name).unwrap(); assert!( - client.stream.written_buf == command.as_bytes().to_vec(), + client.stream.get_ref().written_buf == command.as_bytes().to_vec(), "Invalid select command" ); assert!(mailbox == expected_mailbox, "Unexpected mailbox returned"); @@ -760,7 +768,7 @@ mod tests { let mut client = Client::new(mock_stream); let capabilities = client.capability().unwrap(); assert!( - client.stream.written_buf == b"a1 CAPABILITY\r\n".to_vec(), + client.stream.get_ref().written_buf == b"a1 CAPABILITY\r\n".to_vec(), "Invalid capability command" ); assert!( @@ -778,7 +786,7 @@ mod tests { let mut client = Client::new(mock_stream); client.create(mailbox_name).unwrap(); assert!( - client.stream.written_buf == command.as_bytes().to_vec(), + client.stream.get_ref().written_buf == command.as_bytes().to_vec(), "Invalid create command" ); } @@ -792,7 +800,7 @@ mod tests { let mut client = Client::new(mock_stream); client.delete(mailbox_name).unwrap(); assert!( - client.stream.written_buf == command.as_bytes().to_vec(), + client.stream.get_ref().written_buf == command.as_bytes().to_vec(), "Invalid delete command" ); } @@ -804,7 +812,7 @@ mod tests { let mut client = Client::new(mock_stream); client.noop().unwrap(); assert!( - client.stream.written_buf == b"a1 NOOP\r\n".to_vec(), + client.stream.get_ref().written_buf == b"a1 NOOP\r\n".to_vec(), "Invalid noop command" ); } @@ -816,7 +824,7 @@ mod tests { let mut client = Client::new(mock_stream); client.close().unwrap(); assert!( - client.stream.written_buf == b"a1 CLOSE\r\n".to_vec(), + client.stream.get_ref().written_buf == b"a1 CLOSE\r\n".to_vec(), "Invalid close command" ); } @@ -897,7 +905,7 @@ mod tests { let mut client = Client::new(MockStream::new(resp)); let _ = op(&mut client, seq, query); assert!( - client.stream.written_buf == line.as_bytes().to_vec(), + client.stream.get_ref().written_buf == line.as_bytes().to_vec(), "Invalid command" ); } diff --git a/src/error.rs b/src/error.rs index 9acbbd5..a653834 100644 --- a/src/error.rs +++ b/src/error.rs @@ -5,6 +5,7 @@ use std::error::Error as StdError; use std::net::TcpStream; use openssl::ssl::HandshakeError as SslError; +use bufstream::IntoInnerError as BufError; pub type Result = result::Result; @@ -19,6 +20,8 @@ pub enum Error { BadResponse(Vec), /// A NO response from the IMAP server. NoResponse(Vec), + /// The connection was terminated unexpectedly. + ConnectionLost, // Error parsing a server response. Parse(ParseError), // Error appending a mail @@ -31,6 +34,12 @@ impl From for Error { } } +impl From> for Error { + fn from(err: BufError) -> Error { + Error::Io(err.into()) + } +} + impl From> for Error { fn from(err: SslError) -> Error { Error::Ssl(err) @@ -55,6 +64,7 @@ impl StdError for Error { Error::Parse(ref e) => e.description(), Error::BadResponse(_) => "Bad Response", Error::NoResponse(_) => "No Response", + Error::ConnectionLost => "Connection lost", Error::Append => "Could not append mail to mailbox", } } diff --git a/src/lib.rs b/src/lib.rs index 18ef667..1a7687a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ //! imap is a IMAP client for Rust. extern crate openssl; +extern crate bufstream; extern crate regex; pub mod authenticator; diff --git a/src/mock_stream.rs b/src/mock_stream.rs index 5316c23..580311e 100644 --- a/src/mock_stream.rs +++ b/src/mock_stream.rs @@ -6,45 +6,52 @@ pub struct MockStream { read_pos: usize, pub written_buf: Vec, err_on_read: bool, + eof_on_read: bool, read_delay: usize, } -impl MockStream { - pub fn new(read_buf: Vec) -> MockStream { - MockStream { - read_buf: read_buf, - read_pos: 0, - written_buf: Vec::new(), - err_on_read: false, - read_delay: 0, - } - } - - pub fn new_err() -> MockStream { +impl Default for MockStream { + fn default() -> Self { MockStream { read_buf: Vec::new(), read_pos: 0, written_buf: Vec::new(), - err_on_read: true, + err_on_read: false, + eof_on_read: false, read_delay: 0, } } +} - pub fn new_read_delay(read_buf: Vec) -> MockStream { - MockStream { - read_buf: read_buf, - read_pos: 0, - written_buf: Vec::new(), - err_on_read: false, - read_delay: 1, - } +impl MockStream { + pub fn new(read_buf: Vec) -> MockStream { + MockStream::default().with_buf(read_buf) + } + + pub fn with_buf(mut self, read_buf: Vec) -> MockStream { + self.read_buf = read_buf; + self + } + + pub fn with_eof(mut self) -> MockStream { + self.eof_on_read = true; + self + } + + pub fn with_err(mut self) -> MockStream { + self.err_on_read = true; + self + } + + pub fn with_delay(mut self) -> MockStream { + self.read_delay = 1; + self } } impl Read for MockStream { fn read(&mut self, buf: &mut [u8]) -> Result { - if self.read_delay > 0 { - self.read_delay -= 1; + if self.eof_on_read { return Ok(0); } if self.err_on_read { @@ -53,7 +60,11 @@ impl Read for MockStream { if self.read_pos >= self.read_buf.len() { return Err(Error::new(ErrorKind::UnexpectedEof, "EOF")); } - let write_len = min(buf.len(), self.read_buf.len() - self.read_pos); + let mut write_len = min(buf.len(), self.read_buf.len() - self.read_pos); + if self.read_delay > 0 { + self.read_delay -= 1; + write_len = min(write_len, 1); + } let max_pos = self.read_pos + write_len; for x in self.read_pos..max_pos { buf[x - self.read_pos] = self.read_buf[x];