yeah I think this is better

This commit is contained in:
Shautvast 2024-07-01 11:29:11 +02:00
parent 56555e765c
commit de7381ac31
3 changed files with 67 additions and 95 deletions

View file

@ -1,20 +1,19 @@
package com.github.shautvast.exceptional;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.foreign.*;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Enables multithreaded writing, while keeping CircularByteBuffer simpler (only suitable for single-threaded writing)
*/
public class MPSCBufferWriter implements AutoCloseable {
private static Linker linker;
private static SymbolLookup rustlib;
private static final ConcurrentLinkedDeque<byte[]> writeQueue = new ConcurrentLinkedDeque<>(); // unbounded
private static Linker linker;
private static SymbolLookup rustlib;
private static final LinkedBlockingDeque<byte[]> writeQueue = new LinkedBlockingDeque<>(); // unbounded
private static final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final AtomicBoolean active = new AtomicBoolean(false);
@ -26,44 +25,40 @@ public class MPSCBufferWriter implements AutoCloseable {
active.set(true);
executorService.submit(() -> {
// maybe test again with this part of the code somewhere else
// maybe test again with this part of the code somewhere else. Did have issues when setting this up in the main thread, but need to investigate.
// setup of native memory ringbuffer
var arena = Arena.ofConfined();
var ringbufferMemory = arena.allocate(32768);
var ringbufferMemory = arena.allocate(0xffff);
var buffer = new CircularByteBuffer(ringbufferMemory);
arena = Arena.ofConfined();
linker = Linker.nativeLinker();
//TODO relative path, or configurable
rustlib = SymbolLookup.libraryLookup("/Users/Shautvast/dev/exceptional/rustlib/target/debug/librustlib.dylib", arena);
MemorySegment create = rustlib.find("create_ring_buffer").orElseThrow();
var createHandle = linker.downcallHandle(create, FunctionDescriptor.ofVoid(
MemorySegment create = rustlib.find("buffer_updated").orElseThrow();
var updateHandle = linker.downcallHandle(create, FunctionDescriptor.ofVoid(
ValueLayout.ADDRESS
));
try {
createHandle.invoke(ringbufferMemory);
} catch (Throwable e) {
throw new RuntimeException(e);
}
// start polling from the queue and offer elements to the ringbuffer
byte[] element;
while (active.get()) {
var element = writeQueue.pollFirst();
if (element != null) {
while (!buffer.put(element) && active.get()) {
try {
Thread.sleep(1); // TODO remove the sleep
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
try {
Thread.sleep(1);
} catch (InterruptedException e) {
element = writeQueue.takeFirst(); // blocking read ie. efficient wait loop
while (!buffer.put(element) && active.get())
; // busy loop supposed to be just 1 iteration, also depends on load and buffer size (TBD)
// once the buffer is updated we can signal an update to the rust lib, so it will read the next element
updateHandle.invoke(ringbufferMemory); // the update call is not supposed to cause a lot of overhead (TBD)
// and this setup prevents thread sync issues and unnecessary waits
//
// the memory is allocated only once, we just pass the pointer every time
// that is the simplest way on the rust side
} catch (Throwable e) {
throw new RuntimeException(e);
}
}
});

View file

@ -6,7 +6,7 @@ import java.util.concurrent.TimeUnit;
class ExceptionLoggerTest {
// @Test
@Test
void test() throws InterruptedException {
ExceptionLogger.log(new Throwable());
TimeUnit.SECONDS.sleep(30);

View file

@ -1,82 +1,59 @@
use std::ffi::c_char;
use std::thread;
use std::thread::sleep;
use std::time::Duration;
mod throwable;
const CAPACITY: isize = 32760;
const READ: isize = 32760;
const WRITE: isize = 32764;
#[no_mangle]
pub extern "C" fn create_ring_buffer(uc: *mut c_char) {
let p = uc as usize; //cast to usize makes it Send, so we can pass it to the thread
thread::spawn(move || {
pub extern "C" fn buffer_updated(buffer: *mut c_char) {
let mut read_pos = get_u32(buffer, READ) as isize;
let raw_string = p as *mut c_char; // cast back to *mut c_char
let mut remaining = CAPACITY - read_pos;
let len = if remaining == 1 {
let byte_high = get_u8(buffer, read_pos);
read_pos = 0;
let byte_low = get_u8(buffer, read_pos);
read_pos += 1;
let l = (byte_high as u16) << 8 | byte_low as u16;
remaining = l as isize;
l
} else if remaining == 2 {
let l = get_u16(buffer, read_pos);
read_pos = 0;
remaining = 0;
l
} else {
let l = get_u16(buffer, read_pos);
read_pos += 2;
remaining -= 2;
l
} as isize;
let mut read_pos = get_u32(raw_string, READ) as isize;
let mut write_pos = get_u32(raw_string, WRITE) as isize;
loop {
// TODO something with tight loops
while read_pos == write_pos {
sleep(Duration::from_millis(1)); // hard to do this otherwise (better), because the other side is not rust, right??
read_pos = get_u32(raw_string, READ) as isize;
write_pos = get_u32(raw_string, WRITE) as isize;
}
let mut remaining = CAPACITY - read_pos;
let len = if remaining == 1 {
let byte_high = get_u8(raw_string, read_pos);
read_pos = 0;
let byte_low = get_u8(raw_string, read_pos);
read_pos += 1;
let l = (byte_high as u16) << 8 | byte_low as u16;
remaining = l as isize;
l
} else if remaining == 2 {
let l = get_u16(raw_string, read_pos);
read_pos = 0;
remaining = 0;
l
} else {
let l = get_u16(raw_string, read_pos);
read_pos += 2;
remaining -= 2;
l
} as isize;
let mut result = Vec::with_capacity(len as usize);
if len <= remaining {
// this.data.get(readIndex, result);
for i in 0..len {
unsafe { result.push(*raw_string.offset(read_pos + i) as u8); }
}
read_pos += len;
} else {
for i in 0..remaining {
unsafe { result.push(*raw_string.offset(read_pos + i) as u8); }
}
read_pos = 0;
for i in 0..len - remaining {
unsafe { result.push(*raw_string.offset(i) as u8); }
}
read_pos += len - remaining;
}
put_u32(raw_string, READ, read_pos as u32);
let string = String::from_utf8(result);
if let Ok(json) = string {
println!("receiving {}", json);
// let error: throwable::Throwable = serde_json::from_str(json).unwrap();
// println!("{:?}", error);
} else {
println!("not ok");
}
let mut result = Vec::with_capacity(len as usize);
if len <= remaining {
for i in 0..len {
unsafe { result.push(*buffer.offset(read_pos + i) as u8); }
}
});
read_pos += len;
} else {
for i in 0..remaining {
unsafe { result.push(*buffer.offset(read_pos + i) as u8); }
}
read_pos = 0;
for i in 0..len - remaining {
unsafe { result.push(*buffer.offset(i) as u8); }
}
read_pos += len - remaining;
}
put_u32(buffer, READ, read_pos as u32);
let string = String::from_utf8(result);
if let Ok(json) = string {
println!("receiving {}", json);
} else {
println!("not ok");
}
}
fn get_u8(s: *const c_char, pos: isize) -> u8 {