From fa1d1a6cb78d1968bc63b89f6a53ea01230e7b2e Mon Sep 17 00:00:00 2001 From: Shautvast Date: Sun, 22 Feb 2026 19:13:20 +0100 Subject: [PATCH] =?UTF-8?q?Add=20IMAP=20server=20(step=206)=20=E2=80=94=20?= =?UTF-8?q?all=209=20commands=20implemented?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Plain-TCP listener on 0.0.0.0 (handles both localhost and 127.0.0.1). LOGIN, NOOP, SELECT (reloads inbox), FETCH header+body, SEARCH, STORE, EXPUNGE (deletes on ProtonMail), LOGOUT. FETCH body decrypts messages on demand: brief lock for ID lookup, API call without lock, brief lock again for crypto. RFC 3501 literal format with exact byte counts for imap-crate compatibility. Also: update store.expunge() to return (ids, seqs) in descending order for correct IMAP EXPUNGE response ordering; add chrono for RFC 2822 dates. Co-Authored-By: Claude Sonnet 4.6 --- Cargo.lock | 1 + proton-bridge/Cargo.toml | 1 + proton-bridge/src/imap_server.rs | 398 +++++++++++++++++++++++++++++++ proton-bridge/src/main.rs | 12 +- proton-bridge/src/store.rs | 25 +- 5 files changed, 426 insertions(+), 11 deletions(-) create mode 100644 proton-bridge/src/imap_server.rs diff --git a/Cargo.lock b/Cargo.lock index ef575e3..fed3660 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2953,6 +2953,7 @@ dependencies = [ "base64 0.22.1", "bcrypt", "cfb-mode", + "chrono", "env_logger", "num-bigint", "pgp", diff --git a/proton-bridge/Cargo.toml b/proton-bridge/Cargo.toml index 0d5df4f..deafe9e 100644 --- a/proton-bridge/Cargo.toml +++ b/proton-bridge/Cargo.toml @@ -16,6 +16,7 @@ rand = "0.8" pwhash = "0.3" # bcrypt with caller-supplied salt (used for SRP) bcrypt = "0.15" # reference bcrypt impl for key passphrase derivation pgp = { version = "0.14", default-features = false } # rpgp — OpenPGP decrypt +chrono = "0.4" env_logger = "0.11" aes = "0.8" cfb-mode = "0.8" diff --git a/proton-bridge/src/imap_server.rs b/proton-bridge/src/imap_server.rs new file mode 100644 index 0000000..0f76615 --- /dev/null +++ b/proton-bridge/src/imap_server.rs @@ -0,0 +1,398 @@ +/// Step 6: Local IMAP server. +/// +/// Implements the nine commands skim uses (LOGIN, NOOP, SELECT, FETCH ×2, +/// SEARCH, STORE, EXPUNGE, LOGOUT) on a plain-TCP listener on localhost. +/// Each accepted connection is handled in its own tokio task. +use std::sync::Arc; + +use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; +use tokio::net::{TcpListener, TcpStream}; + +use crate::api::{ApiClient, LABEL_INBOX}; +use crate::{crypto, SharedState}; + +// ── Public entry point ──────────────────────────────────────────────────────── + +pub async fn run(state: SharedState, port: u16) -> Result<(), Box> { + let listener = TcpListener::bind(format!("0.0.0.0:{port}")).await?; + println!("IMAP listening on port {port}"); + loop { + let (socket, addr) = listener.accept().await?; + eprintln!("IMAP: connection from {addr}"); + let state = Arc::clone(&state); + tokio::spawn(async move { + if let Err(e) = handle_connection(socket, state).await { + eprintln!("IMAP connection error: {e}"); + } + }); + } +} + +// ── Per-connection handler ──────────────────────────────────────────────────── + +async fn handle_connection( + socket: TcpStream, + state: SharedState, +) -> Result<(), Box> { + let (reader, mut writer) = socket.into_split(); + let mut lines = BufReader::new(reader).lines(); + let mut authenticated = false; + + writer + .write_all(b"* OK IMAP4rev1 ProtonBridge ready\r\n") + .await?; + + while let Some(raw) = lines.next_line().await? { + let line = raw.trim_end(); + if line.is_empty() { + continue; + } + eprintln!("IMAP < {line}"); + + // Every IMAP command is "TAG COMMAND [rest…]" + let mut parts = line.splitn(3, ' '); + let tag = parts.next().unwrap_or("*").to_string(); + let cmd = parts.next().unwrap_or("").to_ascii_uppercase(); + let rest = parts.next().unwrap_or("").trim().to_string(); + + let logout = cmd == "LOGOUT"; + let response = + dispatch(&tag, &cmd, &rest, &state, &mut authenticated).await; + + for log_line in response.lines() { + eprintln!("IMAP > {log_line}"); + } + writer.write_all(response.as_bytes()).await?; + if logout { + break; + } + } + Ok(()) +} + +// ── Command dispatcher ──────────────────────────────────────────────────────── + +async fn dispatch( + tag: &str, + cmd: &str, + rest: &str, + state: &SharedState, + authenticated: &mut bool, +) -> String { + match cmd { + "CAPABILITY" => { + format!( + "* CAPABILITY IMAP4rev1 AUTH=PLAIN\r\n\ + {tag} OK CAPABILITY completed\r\n" + ) + } + "LOGIN" => { + let mut p = rest.splitn(2, ' '); + let _user = unquote(p.next().unwrap_or("")); + let pass = unquote(p.next().unwrap_or("")); + let local_pw = state.lock().await.config.bridge.local_password.clone(); + if pass == local_pw { + *authenticated = true; + format!("{tag} OK LOGIN completed\r\n") + } else { + format!("{tag} NO [AUTHENTICATIONFAILED] Invalid credentials\r\n") + } + } + "NOOP" => format!("{tag} OK NOOP completed\r\n"), + "LOGOUT" => format!("* BYE Logging out\r\n{tag} OK LOGOUT completed\r\n"), + + // All remaining commands require authentication. + _ if !*authenticated => { + format!("{tag} NO Not authenticated\r\n") + } + + "SELECT" => cmd_select(tag, state).await, + "EXAMINE" => cmd_select(tag, state).await, // read-only alias + "FETCH" => cmd_fetch(tag, rest, state).await, + "SEARCH" => cmd_search(tag, rest, state).await, + "STORE" => cmd_store(tag, rest, state).await, + "EXPUNGE" => cmd_expunge(tag, state).await, + + _ => format!("{tag} BAD Unknown or unimplemented command\r\n"), + } +} + +// ── SELECT ──────────────────────────────────────────────────────────────────── + +async fn cmd_select(tag: &str, state: &SharedState) -> String { + let (http_client, session) = { + let st = state.lock().await; + (st.http_client.clone(), st.session.clone()) + }; + let api = ApiClient::new(&http_client, &session); + let messages = match api.list_messages(LABEL_INBOX, 0, 50).await { + Ok((msgs, _)) => msgs, + Err(e) => { + eprintln!("SELECT list_messages failed: {e}"); + return format!("{tag} NO SELECT failed\r\n"); + } + }; + let count = { + let mut st = state.lock().await; + st.store.load_all(messages); + st.store.count() + }; + format!( + "* {count} EXISTS\r\n\ + * 0 RECENT\r\n\ + * FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)\r\n\ + * OK [PERMANENTFLAGS (\\Deleted \\Seen \\*)] Limited\r\n\ + * OK [UIDVALIDITY 1] UIDs valid\r\n\ + * OK [UIDNEXT {next}] Predicted next UID\r\n\ + {tag} OK [READ-WRITE] SELECT completed\r\n", + next = count + 1, + ) +} + +// ── FETCH ───────────────────────────────────────────────────────────────────── + +async fn cmd_fetch(tag: &str, rest: &str, state: &SharedState) -> String { + // rest = "seq_set ITEMS" or "seq_set (ITEMS)" + let mut parts = rest.splitn(2, ' '); + let seq_set = parts.next().unwrap_or("").to_string(); + let items = parts + .next() + .unwrap_or("") + .trim() + .trim_matches(|c| c == '(' || c == ')') + .to_ascii_uppercase(); + + if items.contains("HEADER.FIELDS") { + fetch_headers(tag, &seq_set, state).await + } else { + fetch_body(tag, &seq_set, state).await + } +} + +async fn fetch_headers(tag: &str, seq_set: &str, state: &SharedState) -> String { + // Collect all metadata under a single short lock — no I/O while locked. + let rows: Vec<(u32, String, String, String, i64)> = { + let st = state.lock().await; + let count = st.store.count(); + parse_seq_set(seq_set, count) + .into_iter() + .filter_map(|seq| { + st.store.get_meta(seq).map(|m| { + ( + seq, + m.subject.clone(), + m.sender.name.clone(), + m.sender.address.clone(), + m.time, + ) + }) + }) + .collect() + }; + + let mut out = Vec::new(); + for (seq, subject, name, addr, time) in &rows { + let block = format!( + "Subject: {subject}\r\nFrom: {name} <{addr}>\r\nDate: {date}\r\n\r\n", + date = rfc2822(*time), + ); + let n = block.as_bytes().len(); + out.extend_from_slice( + format!( + "* {seq} FETCH (BODY[HEADER.FIELDS (SUBJECT FROM DATE)] {{{n}}}\r\n" + ) + .as_bytes(), + ); + out.extend_from_slice(block.as_bytes()); + out.extend_from_slice(b")\r\n"); + } + out.extend_from_slice(format!("{tag} OK FETCH completed\r\n").as_bytes()); + String::from_utf8_lossy(&out).into_owned() +} + +async fn fetch_body(tag: &str, seq_set: &str, state: &SharedState) -> String { + let count = state.lock().await.store.count(); + let seqs = parse_seq_set(seq_set, count); + let seq = match seqs.first() { + Some(&s) => s, + None => return format!("{tag} NO invalid sequence\r\n"), + }; + + // Grab proton_id + connection details under a brief lock. + let (proton_id, meta_snapshot, http_client, session) = { + let st = state.lock().await; + let m = match st.store.get_meta(seq) { + Some(m) => ( + m.subject.clone(), + m.sender.name.clone(), + m.sender.address.clone(), + m.time, + ), + None => return format!("{tag} NO no such message\r\n"), + }; + let id = st.store.proton_id(seq).unwrap_or("").to_string(); + (id, m, st.http_client.clone(), st.session.clone()) + }; + + // API call — lock is released. + let api = ApiClient::new(&http_client, &session); + let full_msg = match api.get_message(&proton_id).await { + Ok(m) => m, + Err(e) => { + eprintln!("FETCH get_message failed: {e}"); + return format!("{tag} NO fetch failed\r\n"); + } + }; + + // Decrypt — re-acquire lock only for the synchronous crypto step. + let plaintext = { + let st = state.lock().await; + let keys: Vec<&crypto::PrivateKey> = st.key_pool.iter().collect(); + match crypto::decrypt_body(&full_msg.body, &keys) { + Ok(p) => p, + Err(e) => { + eprintln!("FETCH decrypt_body failed: {e}"); + return format!("{tag} NO decrypt failed\r\n"); + } + } + }; + + let (subject, sender_name, sender_addr, time) = meta_snapshot; + let message = format!( + "From: {sender_name} <{sender_addr}>\r\n\ + Subject: {subject}\r\n\ + Date: {date}\r\n\ + Content-Type: {mime}; charset=utf-8\r\n\ + MIME-Version: 1.0\r\n\ + \r\n\ + {plaintext}", + date = rfc2822(time), + mime = full_msg.mime_type, + ); + let n = message.as_bytes().len(); + let mut out = Vec::new(); + out.extend_from_slice( + format!("* {seq} FETCH (BODY[] {{{n}}}\r\n").as_bytes(), + ); + out.extend_from_slice(message.as_bytes()); + out.extend_from_slice(b")\r\n"); + out.extend_from_slice(format!("{tag} OK FETCH completed\r\n").as_bytes()); + String::from_utf8_lossy(&out).into_owned() +} + +// ── SEARCH ──────────────────────────────────────────────────────────────────── + +async fn cmd_search(tag: &str, criteria: &str, state: &SharedState) -> String { + // skim sends: OR SUBJECT "query" FROM "query" + let query = extract_quoted(criteria).unwrap_or(""); + let seqs = state + .lock() + .await + .store + .search_subject_or_from(query); + let nums: Vec = seqs.iter().map(|n| n.to_string()).collect(); + format!( + "* SEARCH {}\r\n{tag} OK SEARCH completed\r\n", + nums.join(" ") + ) +} + +// ── STORE ───────────────────────────────────────────────────────────────────── + +async fn cmd_store(tag: &str, rest: &str, state: &SharedState) -> String { + // rest = "seq_set +FLAGS (\Deleted)" (or similar) + let mut parts = rest.splitn(2, ' '); + let seq_set = parts.next().unwrap_or("").to_string(); + let flags = parts.next().unwrap_or("").to_ascii_uppercase(); + + if !flags.contains("DELETED") { + return format!("{tag} OK STORE completed\r\n"); + } + + let mut st = state.lock().await; + let count = st.store.count(); + let seqs = parse_seq_set(&seq_set, count); + let mut out = String::new(); + for seq in seqs { + st.store.mark_deleted(seq); + out.push_str(&format!("* {seq} FETCH (FLAGS (\\Deleted))\r\n")); + } + out.push_str(&format!("{tag} OK STORE completed\r\n")); + out +} + +// ── EXPUNGE ─────────────────────────────────────────────────────────────────── + +async fn cmd_expunge(tag: &str, state: &SharedState) -> String { + let (proton_ids, removed_seqs, http_client, session) = { + let mut st = state.lock().await; + let (ids, seqs) = st.store.expunge(); + (ids, seqs, st.http_client.clone(), st.session.clone()) + }; + + if !proton_ids.is_empty() { + let api = ApiClient::new(&http_client, &session); + if let Err(e) = api.delete_messages(&proton_ids).await { + eprintln!("EXPUNGE delete_messages failed: {e}"); + } + } + + let mut out = String::new(); + for seq in &removed_seqs { + out.push_str(&format!("* {seq} EXPUNGE\r\n")); + } + out.push_str(&format!("{tag} OK EXPUNGE completed\r\n")); + out +} + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +fn unquote(s: &str) -> &str { + s.strip_prefix('"') + .and_then(|s| s.strip_suffix('"')) + .unwrap_or(s) +} + +/// Extract the first double-quoted string from `s`. +fn extract_quoted(s: &str) -> Option<&str> { + let start = s.find('"')? + 1; + let end = s[start..].find('"')? + start; + Some(&s[start..end]) +} + +/// Parse an IMAP sequence set (e.g. "1:50", "3,5,7", "42", "*") into a Vec of +/// seq numbers, clamped to [1, max]. +fn parse_seq_set(set: &str, max: u32) -> Vec { + if max == 0 { + return vec![]; + } + let mut result = Vec::new(); + for part in set.split(',') { + let part = part.trim(); + if let Some((a, b)) = part.split_once(':') { + let start: u32 = if a == "*" { max } else { a.parse().unwrap_or(1) }; + let end: u32 = if b == "*" { max } else { b.parse().unwrap_or(max) }; + let (lo, hi) = (start.min(end), start.max(end)); + for seq in lo..=hi.min(max) { + result.push(seq); + } + } else if part == "*" { + result.push(max); + } else { + let seq: u32 = part.parse().unwrap_or(0); + if seq >= 1 && seq <= max { + result.push(seq); + } + } + } + result +} + +/// Format a Unix timestamp as an RFC 2822 date string. +fn rfc2822(unix: i64) -> String { + use chrono::{DateTime, Utc}; + DateTime::::from_timestamp(unix, 0) + .unwrap_or_default() + .format("%a, %d %b %Y %H:%M:%S +0000") + .to_string() +} diff --git a/proton-bridge/src/main.rs b/proton-bridge/src/main.rs index 5e864a6..0d24987 100644 --- a/proton-bridge/src/main.rs +++ b/proton-bridge/src/main.rs @@ -2,6 +2,7 @@ mod api; mod auth; mod config; mod crypto; +mod imap_server; mod srp; mod store; @@ -80,7 +81,7 @@ async fn main() { store.load_all(messages); println!("Inbox: {} messages ({total} total)", store.count()); - let _state: SharedState = Arc::new(Mutex::new(BridgeState { + let state: SharedState = Arc::new(Mutex::new(BridgeState { store, key_pool, http_client: client, @@ -88,6 +89,15 @@ async fn main() { config: config.clone(), })); + // Spawn IMAP server. + let imap_state = Arc::clone(&state); + let imap_port = config.bridge.imap_port; + tokio::spawn(async move { + if let Err(e) = imap_server::run(imap_state, imap_port).await { + eprintln!("IMAP server error: {e}"); + } + }); + println!( "Bridge ready — IMAP :{}, SMTP :{} (Ctrl-C to stop)", config.bridge.imap_port, config.bridge.smtp_port diff --git a/proton-bridge/src/store.rs b/proton-bridge/src/store.rs index 1741918..24b4206 100644 --- a/proton-bridge/src/store.rs +++ b/proton-bridge/src/store.rs @@ -71,19 +71,24 @@ impl MessageStore { } } - /// Remove all pending-deleted messages and return their ProtonMail IDs. - /// After this call the Vec indices automatically provide new seq numbers. - pub fn expunge(&mut self) -> Vec { - let removed: Vec = self - .messages - .iter() - .filter(|m| self.deleted_pending.contains(&m.id)) - .map(|m| m.id.clone()) - .collect(); + /// Remove all pending-deleted messages. + /// Returns `(proton_ids, seqs)` where `seqs` are the pre-expunge IMAP + /// sequence numbers in **descending** order (correct for IMAP EXPUNGE + /// responses — highest seq first so client renumbering stays consistent). + pub fn expunge(&mut self) -> (Vec, Vec) { + let mut removed_ids = Vec::new(); + let mut removed_seqs = Vec::new(); + for (i, m) in self.messages.iter().enumerate() { + if self.deleted_pending.contains(&m.id) { + removed_ids.push(m.id.clone()); + removed_seqs.push(i as u32 + 1); + } + } self.messages .retain(|m| !self.deleted_pending.contains(&m.id)); self.deleted_pending.clear(); - removed + removed_seqs.sort_unstable_by(|a, b| b.cmp(a)); // descending + (removed_ids, removed_seqs) } /// Return the seq numbers of messages whose subject, sender name, or