From de7381ac3123491a293edf539979b790d9d7e8f6 Mon Sep 17 00:00:00 2001 From: Shautvast Date: Mon, 1 Jul 2024 11:29:11 +0200 Subject: [PATCH] yeah I think this is better --- .../exceptional/MPSCBufferWriter.java | 47 ++++---- .../exceptional/ExceptionLoggerTest.java | 2 +- rustlib/src/lib.rs | 113 +++++++----------- 3 files changed, 67 insertions(+), 95 deletions(-) diff --git a/lib/src/main/java/com/github/shautvast/exceptional/MPSCBufferWriter.java b/lib/src/main/java/com/github/shautvast/exceptional/MPSCBufferWriter.java index d563f07..62bf167 100644 --- a/lib/src/main/java/com/github/shautvast/exceptional/MPSCBufferWriter.java +++ b/lib/src/main/java/com/github/shautvast/exceptional/MPSCBufferWriter.java @@ -1,20 +1,19 @@ package com.github.shautvast.exceptional; -import com.fasterxml.jackson.databind.ObjectMapper; - import java.lang.foreign.*; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; /** * Enables multithreaded writing, while keeping CircularByteBuffer simpler (only suitable for single-threaded writing) */ public class MPSCBufferWriter implements AutoCloseable { - private static Linker linker; - private static SymbolLookup rustlib; - private static final ConcurrentLinkedDeque writeQueue = new ConcurrentLinkedDeque<>(); // unbounded + private static Linker linker; + private static SymbolLookup rustlib; + private static final LinkedBlockingDeque writeQueue = new LinkedBlockingDeque<>(); // unbounded private static final ExecutorService executorService = Executors.newSingleThreadExecutor(); private final AtomicBoolean active = new AtomicBoolean(false); @@ -26,44 +25,40 @@ public class MPSCBufferWriter implements AutoCloseable { active.set(true); executorService.submit(() -> { - // maybe test again with this part of the code somewhere else + // maybe test again with this part of the code somewhere else. Did have issues when setting this up in the main thread, but need to investigate. // setup of native memory ringbuffer var arena = Arena.ofConfined(); - var ringbufferMemory = arena.allocate(32768); + var ringbufferMemory = arena.allocate(0xffff); var buffer = new CircularByteBuffer(ringbufferMemory); arena = Arena.ofConfined(); linker = Linker.nativeLinker(); //TODO relative path, or configurable rustlib = SymbolLookup.libraryLookup("/Users/Shautvast/dev/exceptional/rustlib/target/debug/librustlib.dylib", arena); - MemorySegment create = rustlib.find("create_ring_buffer").orElseThrow(); - var createHandle = linker.downcallHandle(create, FunctionDescriptor.ofVoid( + MemorySegment create = rustlib.find("buffer_updated").orElseThrow(); + var updateHandle = linker.downcallHandle(create, FunctionDescriptor.ofVoid( ValueLayout.ADDRESS )); - try { - createHandle.invoke(ringbufferMemory); - } catch (Throwable e) { - throw new RuntimeException(e); - } // start polling from the queue and offer elements to the ringbuffer + byte[] element; while (active.get()) { - var element = writeQueue.pollFirst(); - if (element != null) { - while (!buffer.put(element) && active.get()) { - try { - Thread.sleep(1); // TODO remove the sleep - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - } - } try { - Thread.sleep(1); - } catch (InterruptedException e) { + element = writeQueue.takeFirst(); // blocking read ie. efficient wait loop + while (!buffer.put(element) && active.get()) + ; // busy loop supposed to be just 1 iteration, also depends on load and buffer size (TBD) + + // once the buffer is updated we can signal an update to the rust lib, so it will read the next element + updateHandle.invoke(ringbufferMemory); // the update call is not supposed to cause a lot of overhead (TBD) + // and this setup prevents thread sync issues and unnecessary waits + // + // the memory is allocated only once, we just pass the pointer every time + // that is the simplest way on the rust side + } catch (Throwable e) { throw new RuntimeException(e); } + } }); diff --git a/lib/src/test/java/com/github/shautvast/exceptional/ExceptionLoggerTest.java b/lib/src/test/java/com/github/shautvast/exceptional/ExceptionLoggerTest.java index b995400..b5f4e2b 100644 --- a/lib/src/test/java/com/github/shautvast/exceptional/ExceptionLoggerTest.java +++ b/lib/src/test/java/com/github/shautvast/exceptional/ExceptionLoggerTest.java @@ -6,7 +6,7 @@ import java.util.concurrent.TimeUnit; class ExceptionLoggerTest { -// @Test + @Test void test() throws InterruptedException { ExceptionLogger.log(new Throwable()); TimeUnit.SECONDS.sleep(30); diff --git a/rustlib/src/lib.rs b/rustlib/src/lib.rs index 1c92bac..2dced31 100644 --- a/rustlib/src/lib.rs +++ b/rustlib/src/lib.rs @@ -1,82 +1,59 @@ use std::ffi::c_char; -use std::thread; -use std::thread::sleep; -use std::time::Duration; mod throwable; const CAPACITY: isize = 32760; const READ: isize = 32760; -const WRITE: isize = 32764; #[no_mangle] -pub extern "C" fn create_ring_buffer(uc: *mut c_char) { - let p = uc as usize; //cast to usize makes it Send, so we can pass it to the thread - thread::spawn(move || { +pub extern "C" fn buffer_updated(buffer: *mut c_char) { + let mut read_pos = get_u32(buffer, READ) as isize; - let raw_string = p as *mut c_char; // cast back to *mut c_char + let mut remaining = CAPACITY - read_pos; + let len = if remaining == 1 { + let byte_high = get_u8(buffer, read_pos); + read_pos = 0; + let byte_low = get_u8(buffer, read_pos); + read_pos += 1; + let l = (byte_high as u16) << 8 | byte_low as u16; + remaining = l as isize; + l + } else if remaining == 2 { + let l = get_u16(buffer, read_pos); + read_pos = 0; + remaining = 0; + l + } else { + let l = get_u16(buffer, read_pos); + read_pos += 2; + remaining -= 2; + l + } as isize; - let mut read_pos = get_u32(raw_string, READ) as isize; - let mut write_pos = get_u32(raw_string, WRITE) as isize; - loop { - - // TODO something with tight loops - while read_pos == write_pos { - sleep(Duration::from_millis(1)); // hard to do this otherwise (better), because the other side is not rust, right?? - read_pos = get_u32(raw_string, READ) as isize; - write_pos = get_u32(raw_string, WRITE) as isize; - } - - let mut remaining = CAPACITY - read_pos; - let len = if remaining == 1 { - let byte_high = get_u8(raw_string, read_pos); - read_pos = 0; - let byte_low = get_u8(raw_string, read_pos); - read_pos += 1; - let l = (byte_high as u16) << 8 | byte_low as u16; - remaining = l as isize; - l - } else if remaining == 2 { - let l = get_u16(raw_string, read_pos); - read_pos = 0; - remaining = 0; - l - } else { - let l = get_u16(raw_string, read_pos); - read_pos += 2; - remaining -= 2; - l - } as isize; - - let mut result = Vec::with_capacity(len as usize); - if len <= remaining { - // this.data.get(readIndex, result); - for i in 0..len { - unsafe { result.push(*raw_string.offset(read_pos + i) as u8); } - } - read_pos += len; - } else { - for i in 0..remaining { - unsafe { result.push(*raw_string.offset(read_pos + i) as u8); } - } - read_pos = 0; - for i in 0..len - remaining { - unsafe { result.push(*raw_string.offset(i) as u8); } - } - read_pos += len - remaining; - } - put_u32(raw_string, READ, read_pos as u32); - - let string = String::from_utf8(result); - if let Ok(json) = string { - println!("receiving {}", json); - // let error: throwable::Throwable = serde_json::from_str(json).unwrap(); - // println!("{:?}", error); - } else { - println!("not ok"); - } + let mut result = Vec::with_capacity(len as usize); + if len <= remaining { + for i in 0..len { + unsafe { result.push(*buffer.offset(read_pos + i) as u8); } } - }); + read_pos += len; + } else { + for i in 0..remaining { + unsafe { result.push(*buffer.offset(read_pos + i) as u8); } + } + read_pos = 0; + for i in 0..len - remaining { + unsafe { result.push(*buffer.offset(i) as u8); } + } + read_pos += len - remaining; + } + put_u32(buffer, READ, read_pos as u32); + + let string = String::from_utf8(result); + if let Ok(json) = string { + println!("receiving {}", json); + } else { + println!("not ok"); + } } fn get_u8(s: *const c_char, pos: isize) -> u8 {