/// Step 6: Local IMAP server. /// /// Implements the nine commands skim uses (LOGIN, NOOP, SELECT, FETCH ×2, /// SEARCH, STORE, EXPUNGE, LOGOUT) on a plain-TCP listener on localhost. /// Each accepted connection is handled in its own tokio task. use std::sync::Arc; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::{TcpListener, TcpStream}; use crate::api::{ApiClient, LABEL_INBOX}; use crate::{crypto, SharedState}; // ── Public entry point ──────────────────────────────────────────────────────── pub async fn run(state: SharedState, port: u16) -> Result<(), Box> { let listener = TcpListener::bind(format!("0.0.0.0:{port}")).await?; println!("IMAP listening on port {port}"); run_with_listener(state, listener).await } /// Accept connections on an already-bound listener. Used by the embedded bridge /// so ports are guaranteed to be occupied before `start()` returns. pub async fn run_with_listener( state: SharedState, listener: TcpListener, ) -> Result<(), Box> { loop { let (socket, _addr) = listener.accept().await?; let state = Arc::clone(&state); tokio::spawn(async move { if let Err(e) = handle_connection(socket, state).await { eprintln!("IMAP connection error: {e}"); } }); } } // ── Per-connection handler ──────────────────────────────────────────────────── async fn handle_connection( socket: TcpStream, state: SharedState, ) -> Result<(), Box> { let (reader, mut writer) = socket.into_split(); let mut lines = BufReader::new(reader).lines(); let mut authenticated = false; writer .write_all(b"* OK IMAP4rev1 ProtonBridge ready\r\n") .await?; while let Some(raw) = lines.next_line().await? { let line = raw.trim_end(); if line.is_empty() { continue; } // Every IMAP command is "TAG COMMAND [rest…]" let mut parts = line.splitn(3, ' '); let tag = parts.next().unwrap_or("*").to_string(); let cmd = parts.next().unwrap_or("").to_ascii_uppercase(); let rest = parts.next().unwrap_or("").trim().to_string(); let logout = cmd == "LOGOUT"; let response = dispatch(&tag, &cmd, &rest, &state, &mut authenticated).await; // for log_line in response.lines() { // eprintln!("IMAP > {log_line}"); // } writer.write_all(response.as_bytes()).await?; if logout { break; } } Ok(()) } // ── Command dispatcher ──────────────────────────────────────────────────────── async fn dispatch( tag: &str, cmd: &str, rest: &str, state: &SharedState, authenticated: &mut bool, ) -> String { match cmd { "CAPABILITY" => { format!( "* CAPABILITY IMAP4rev1 AUTH=PLAIN\r\n\ {tag} OK CAPABILITY completed\r\n" ) } "LOGIN" => { let mut p = rest.splitn(2, ' '); let _user = unquote(p.next().unwrap_or("")); let pass = unquote(p.next().unwrap_or("")); let local_pw = state.lock().await.config.bridge.local_password.clone(); if pass == local_pw { *authenticated = true; format!("{tag} OK LOGIN completed\r\n") } else { format!("{tag} NO [AUTHENTICATIONFAILED] Invalid credentials\r\n") } } "NOOP" => format!("{tag} OK NOOP completed\r\n"), "LOGOUT" => format!("* BYE Logging out\r\n{tag} OK LOGOUT completed\r\n"), // All remaining commands require authentication. _ if !*authenticated => { format!("{tag} NO Not authenticated\r\n") } "SELECT" => cmd_select(tag, state).await, "EXAMINE" => cmd_select(tag, state).await, // read-only alias "FETCH" => cmd_fetch(tag, rest, state).await, "SEARCH" => cmd_search(tag, rest, state).await, "STORE" => cmd_store(tag, rest, state).await, "EXPUNGE" => cmd_expunge(tag, state).await, _ => format!("{tag} BAD Unknown or unimplemented command\r\n"), } } // ── SELECT ──────────────────────────────────────────────────────────────────── async fn cmd_select(tag: &str, state: &SharedState) -> String { let (http_client, session) = { let st = state.lock().await; (st.http_client.clone(), st.session.clone()) }; let api = ApiClient::new(&http_client, &session); let messages = match api.list_messages(LABEL_INBOX, 0, 50).await { Ok((msgs, _)) => msgs, Err(e) => { eprintln!("SELECT list_messages failed: {e}"); return format!("{tag} NO SELECT failed\r\n"); } }; let count = { let mut st = state.lock().await; st.store.load_all(messages); st.store.count() }; format!( "* {count} EXISTS\r\n\ * 0 RECENT\r\n\ * FLAGS (\\Answered \\Flagged \\Deleted \\Seen \\Draft)\r\n\ * OK [PERMANENTFLAGS (\\Deleted \\Seen \\*)] Limited\r\n\ * OK [UIDVALIDITY 1] UIDs valid\r\n\ * OK [UIDNEXT {next}] Predicted next UID\r\n\ {tag} OK [READ-WRITE] SELECT completed\r\n", next = count + 1, ) } // ── FETCH ───────────────────────────────────────────────────────────────────── async fn cmd_fetch(tag: &str, rest: &str, state: &SharedState) -> String { // rest = "seq_set ITEMS" or "seq_set (ITEMS)" let mut parts = rest.splitn(2, ' '); let seq_set = parts.next().unwrap_or("").to_string(); let items = parts .next() .unwrap_or("") .trim() .trim_matches(|c| c == '(' || c == ')') .to_ascii_uppercase(); if items.contains("HEADER.FIELDS") { fetch_headers(tag, &seq_set, state).await } else { fetch_body(tag, &seq_set, state).await } } async fn fetch_headers(tag: &str, seq_set: &str, state: &SharedState) -> String { // Collect all metadata under a single short lock — no I/O while locked. let rows: Vec<(u32, String, String, String, i64)> = { let st = state.lock().await; let count = st.store.count(); parse_seq_set(seq_set, count) .into_iter() .filter_map(|seq| { st.store.get_meta(seq).map(|m| { ( seq, m.subject.clone(), m.sender.name.clone(), m.sender.address.clone(), m.time, ) }) }) .collect() }; let mut out = Vec::new(); for (seq, subject, name, addr, time) in &rows { let block = format!( "Subject: {subject}\r\nFrom: {name} <{addr}>\r\nDate: {date}\r\n\r\n", date = rfc2822(*time), ); let n = block.as_bytes().len(); out.extend_from_slice( format!( "* {seq} FETCH (BODY[HEADER.FIELDS (SUBJECT FROM DATE)] {{{n}}}\r\n" ) .as_bytes(), ); out.extend_from_slice(block.as_bytes()); out.extend_from_slice(b")\r\n"); } out.extend_from_slice(format!("{tag} OK FETCH completed\r\n").as_bytes()); String::from_utf8_lossy(&out).into_owned() } async fn fetch_body(tag: &str, seq_set: &str, state: &SharedState) -> String { let count = state.lock().await.store.count(); let seqs = parse_seq_set(seq_set, count); let seq = match seqs.first() { Some(&s) => s, None => return format!("{tag} NO invalid sequence\r\n"), }; // Grab proton_id + connection details under a brief lock. let (proton_id, meta_snapshot, http_client, session) = { let st = state.lock().await; let m = match st.store.get_meta(seq) { Some(m) => ( m.subject.clone(), m.sender.name.clone(), m.sender.address.clone(), m.time, ), None => return format!("{tag} NO no such message\r\n"), }; let id = st.store.proton_id(seq).unwrap_or("").to_string(); (id, m, st.http_client.clone(), st.session.clone()) }; // API call — lock is released. let api = ApiClient::new(&http_client, &session); let full_msg = match api.get_message(&proton_id).await { Ok(m) => m, Err(e) => { eprintln!("FETCH get_message failed: {e}"); return format!("{tag} NO fetch failed\r\n"); } }; // Decrypt — re-acquire lock only for the synchronous crypto step. let plaintext = { let st = state.lock().await; let keys: Vec<&crypto::PrivateKey> = st.key_pool.iter().collect(); match crypto::decrypt_body(&full_msg.body, &keys) { Ok(p) => p, Err(e) => { eprintln!("FETCH decrypt_body failed: {e}"); return format!("{tag} NO decrypt failed\r\n"); } } }; let (subject, sender_name, sender_addr, time) = meta_snapshot; let message = format!( "From: {sender_name} <{sender_addr}>\r\n\ Subject: {subject}\r\n\ Date: {date}\r\n\ Content-Type: {mime}; charset=utf-8\r\n\ MIME-Version: 1.0\r\n\ \r\n\ {plaintext}", date = rfc2822(time), mime = full_msg.mime_type, ); let n = message.as_bytes().len(); let mut out = Vec::new(); out.extend_from_slice( format!("* {seq} FETCH (BODY[] {{{n}}}\r\n").as_bytes(), ); out.extend_from_slice(message.as_bytes()); out.extend_from_slice(b")\r\n"); out.extend_from_slice(format!("{tag} OK FETCH completed\r\n").as_bytes()); String::from_utf8_lossy(&out).into_owned() } // ── SEARCH ──────────────────────────────────────────────────────────────────── async fn cmd_search(tag: &str, criteria: &str, state: &SharedState) -> String { // skim sends: OR SUBJECT "query" FROM "query" let query = extract_quoted(criteria).unwrap_or(""); let seqs = state .lock() .await .store .search_subject_or_from(query); let nums: Vec = seqs.iter().map(|n| n.to_string()).collect(); format!( "* SEARCH {}\r\n{tag} OK SEARCH completed\r\n", nums.join(" ") ) } // ── STORE ───────────────────────────────────────────────────────────────────── async fn cmd_store(tag: &str, rest: &str, state: &SharedState) -> String { // rest = "seq_set +FLAGS (\Deleted)" (or similar) let mut parts = rest.splitn(2, ' '); let seq_set = parts.next().unwrap_or("").to_string(); let flags = parts.next().unwrap_or("").to_ascii_uppercase(); if !flags.contains("DELETED") { return format!("{tag} OK STORE completed\r\n"); } let mut st = state.lock().await; let count = st.store.count(); let seqs = parse_seq_set(&seq_set, count); let mut out = String::new(); for seq in seqs { st.store.mark_deleted(seq); out.push_str(&format!("* {seq} FETCH (FLAGS (\\Deleted))\r\n")); } out.push_str(&format!("{tag} OK STORE completed\r\n")); out } // ── EXPUNGE ─────────────────────────────────────────────────────────────────── async fn cmd_expunge(tag: &str, state: &SharedState) -> String { let (proton_ids, removed_seqs, http_client, session) = { let mut st = state.lock().await; let (ids, seqs) = st.store.expunge(); (ids, seqs, st.http_client.clone(), st.session.clone()) }; if !proton_ids.is_empty() { let api = ApiClient::new(&http_client, &session); if let Err(e) = api.delete_messages(&proton_ids).await { eprintln!("EXPUNGE delete_messages failed: {e}"); } } let mut out = String::new(); for seq in &removed_seqs { out.push_str(&format!("* {seq} EXPUNGE\r\n")); } out.push_str(&format!("{tag} OK EXPUNGE completed\r\n")); out } // ── Helpers ─────────────────────────────────────────────────────────────────── fn unquote(s: &str) -> &str { s.strip_prefix('"') .and_then(|s| s.strip_suffix('"')) .unwrap_or(s) } /// Extract the first double-quoted string from `s`. fn extract_quoted(s: &str) -> Option<&str> { let start = s.find('"')? + 1; let end = s[start..].find('"')? + start; Some(&s[start..end]) } /// Parse an IMAP sequence set (e.g. "1:50", "3,5,7", "42", "*") into a Vec of /// seq numbers, clamped to [1, max]. fn parse_seq_set(set: &str, max: u32) -> Vec { if max == 0 { return vec![]; } let mut result = Vec::new(); for part in set.split(',') { let part = part.trim(); if let Some((a, b)) = part.split_once(':') { let start: u32 = if a == "*" { max } else { a.parse().unwrap_or(1) }; let end: u32 = if b == "*" { max } else { b.parse().unwrap_or(max) }; let (lo, hi) = (start.min(end), start.max(end)); for seq in lo..=hi.min(max) { result.push(seq); } } else if part == "*" { result.push(max); } else { let seq: u32 = part.parse().unwrap_or(0); if seq >= 1 && seq <= max { result.push(seq); } } } result } /// Format a Unix timestamp as an RFC 2822 date string. fn rfc2822(unix: i64) -> String { use chrono::{DateTime, Utc}; DateTime::::from_timestamp(unix, 0) .unwrap_or_default() .format("%a, %d %b %Y %H:%M:%S +0000") .to_string() }