ok, it's working, but I think I can improve the design

This commit is contained in:
Shautvast 2024-07-01 11:04:49 +02:00
parent d1883f550f
commit 56555e765c
8 changed files with 232 additions and 94 deletions

View file

@ -24,8 +24,8 @@ import java.util.stream.IntStream;
@SuppressWarnings("StringTemplateMigration") @SuppressWarnings("StringTemplateMigration")
public class CircularByteBuffer { public class CircularByteBuffer {
private int READ_POS; private int readStartPos;
private int WRITE_POS; private int writeStartPos;
private int capacity; private int capacity;
final ByteBuffer data; final ByteBuffer data;
@ -54,11 +54,11 @@ public class CircularByteBuffer {
private void initIndices() { private void initIndices() {
this.capacity = this.data.capacity() - 8; this.capacity = this.data.capacity() - 8;
READ_POS = this.capacity; // write values after logical capacity position readStartPos = this.capacity; // write values after logical capacity position
WRITE_POS = this.capacity + 4; writeStartPos = this.capacity + 4;
this.data.putInt(READ_POS, 0); this.data.putInt(readStartPos, 0);
this.data.putInt(WRITE_POS, 0); this.data.putInt(writeStartPos, 0);
} }
@ -113,7 +113,8 @@ public class CircularByteBuffer {
return true; return true;
} }
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); e.printStackTrace();
return false;
} finally { } finally {
setWriteIndex(writeIndex); setWriteIndex(writeIndex);
} }
@ -121,10 +122,15 @@ public class CircularByteBuffer {
/** /**
* The reader side is provided, for reference and testability only. * The reader side is provided, for reference and testability only.
* In practice, the reader is implemented outside of java * In practice, the reader is implemented outside of java, see rustlib module
*/ */
public byte[] get() { public byte[] get() {
int readIndex = getReadIndex(); int readIndex = getReadIndex();
int writeIndex = getWriteIndex();
if (readIndex == writeIndex) {
return null;
}
try { try {
int remainingUntilEnd = capacity - readIndex; int remainingUntilEnd = capacity - readIndex;
int len; int len;
@ -161,31 +167,40 @@ public class CircularByteBuffer {
} }
int getWriteIndex() { int getWriteIndex() {
return this.data.getInt(WRITE_POS); return this.data.getInt(writeStartPos);
} }
void setWriteIndex(int writeIndex) { void setWriteIndex(int writeIndex) {
this.data.putInt(WRITE_POS, writeIndex); this.data.putInt(writeStartPos, writeIndex);
} }
int getReadIndex() { int getReadIndex() {
return this.data.getInt(READ_POS); return this.data.getInt(readStartPos);
} }
void setReadIndex(int readIndex) { void setReadIndex(int readIndex) {
this.data.putInt(READ_POS, readIndex); this.data.putInt(readStartPos, readIndex);
} }
@Override @Override
public String toString() { public String toString() {
return "CircularByteBuffer {r=" + this.data.getInt(READ_POS) + return "CircularByteBuffer {r=" + this.data.getInt(readStartPos) +
", w=" + ", w=" +
this.data.getInt(WRITE_POS) + this.data.getInt(writeStartPos) +
", data=" + ", data=" +
IntStream.range(0, capacity) bytesToString(this.data.array()) +
.map(x -> this.data.array()[x])
.mapToObj(Integer::toString)
.collect(Collectors.joining(",", "[", "]")) +
"}"; "}";
} }
public static String bytesToString(byte[] bytes) {
if (bytes == null) {
return "null";
}
return IntStream.range(0, bytes.length)
.map(x -> bytes[x])
.mapToObj(Integer::toString)
.collect(Collectors.joining(",", "[", "]"));
}
} }

View file

@ -2,38 +2,16 @@ package com.github.shautvast.exceptional;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.foreign.*;
import java.lang.invoke.MethodHandle;
@SuppressWarnings("unused") // this code is called from the instrumented code @SuppressWarnings("unused") // this code is called from the instrumented code
public class ExceptionLogger { public class ExceptionLogger {
private static final Arena arena = Arena.ofConfined(); private final static ObjectMapper objectMapper = new ObjectMapper();;
private static final MemorySegment ringbufferMemory = arena.allocate(4096); private final static MPSCBufferWriter bufferWriter=new MPSCBufferWriter();
private static final Linker linker = Linker.nativeLinker();
// //TODO relative path, or configurable
private static final SymbolLookup rustlib = SymbolLookup.libraryLookup("/Users/Shautvast/dev/exceptional/rustlib/target/debug/librustlib.dylib", arena);
private final static MethodHandle logNative;
private final static ObjectMapper objectMapper = new ObjectMapper();
private final static MPSCBufferWriter bufferWriter;
static {
MemorySegment logFunction = rustlib.find("log_java_exception").orElseThrow();
logNative = linker.downcallHandle(logFunction, FunctionDescriptor.ofVoid(
ValueLayout.ADDRESS
));
CircularByteBuffer buffer = new CircularByteBuffer(ringbufferMemory);
bufferWriter = new MPSCBufferWriter(buffer);
}
// how does this behave in a multithreaded context??
// probably need a ringbuffer with fixed memory to make this work efficiently
public static void log(Throwable throwable) { public static void log(Throwable throwable) {
try { try {
// use json for now because of ease of integration // use json for now because of ease of integration
if (throwable != null) { if (throwable != null) {
String json = objectMapper.writeValueAsString(throwable); bufferWriter.put(objectMapper.writeValueAsBytes(throwable));
var data = arena.allocateFrom(json); // reuse instead of reallocating?
logNative.invoke(data); // invoke the rust function
} }
} catch (Throwable e) { } catch (Throwable e) {
e.printStackTrace(System.err); e.printStackTrace(System.err);

View file

@ -1,5 +1,8 @@
package com.github.shautvast.exceptional; package com.github.shautvast.exceptional;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.foreign.*;
import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -9,28 +12,59 @@ import java.util.concurrent.atomic.AtomicBoolean;
* Enables multithreaded writing, while keeping CircularByteBuffer simpler (only suitable for single-threaded writing) * Enables multithreaded writing, while keeping CircularByteBuffer simpler (only suitable for single-threaded writing)
*/ */
public class MPSCBufferWriter implements AutoCloseable { public class MPSCBufferWriter implements AutoCloseable {
private static Linker linker;
private static SymbolLookup rustlib;
private static final ConcurrentLinkedDeque<byte[]> writeQueue = new ConcurrentLinkedDeque<>(); // unbounded private static final ConcurrentLinkedDeque<byte[]> writeQueue = new ConcurrentLinkedDeque<>(); // unbounded
private static final ExecutorService executorService = Executors.newSingleThreadExecutor(); private static final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final AtomicBoolean active = new AtomicBoolean(false); private final AtomicBoolean active = new AtomicBoolean(false);
private final CircularByteBuffer buffer;
public MPSCBufferWriter(CircularByteBuffer buffer) { public MPSCBufferWriter() {
this.buffer = buffer;
startWriteQueueListener(); startWriteQueueListener();
} }
private void startWriteQueueListener() { private void startWriteQueueListener() {
active.set(true); active.set(true);
executorService.submit(() -> { executorService.submit(() -> {
// maybe test again with this part of the code somewhere else
// setup of native memory ringbuffer
var arena = Arena.ofConfined();
var ringbufferMemory = arena.allocate(32768);
var buffer = new CircularByteBuffer(ringbufferMemory);
arena = Arena.ofConfined();
linker = Linker.nativeLinker();
//TODO relative path, or configurable
rustlib = SymbolLookup.libraryLookup("/Users/Shautvast/dev/exceptional/rustlib/target/debug/librustlib.dylib", arena);
MemorySegment create = rustlib.find("create_ring_buffer").orElseThrow();
var createHandle = linker.downcallHandle(create, FunctionDescriptor.ofVoid(
ValueLayout.ADDRESS
));
try {
createHandle.invoke(ringbufferMemory);
} catch (Throwable e) {
throw new RuntimeException(e);
}
// start polling from the queue and offer elements to the ringbuffer
while (active.get()) { while (active.get()) {
var element = writeQueue.pollFirst(); var element = writeQueue.pollFirst();
if (element != null) { if (element != null) {
while (!buffer.put(element) && active.get()) { while (!buffer.put(element) && active.get()) {
Thread.yield(); try {
Thread.sleep(1); // TODO remove the sleep
} catch (InterruptedException e) {
throw new RuntimeException(e);
} }
} }
} }
try {
Thread.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}); });
} }

View file

@ -9,7 +9,7 @@ class CircularByteBufferTest {
@Test @Test
void testPutAndGet() { void testPutAndGet() {
CircularByteBuffer buffer = new CircularByteBuffer(9); var buffer = new CircularByteBuffer(9);
byte[] bytes = "hello".getBytes(UTF_8); byte[] bytes = "hello".getBytes(UTF_8);
boolean written = buffer.put(bytes); boolean written = buffer.put(bytes);
assertTrue(written); assertTrue(written);
@ -17,18 +17,24 @@ class CircularByteBufferTest {
assertArrayEquals(new byte[]{0, 5, 104, 101, 108, 108, 111, 0, 0, 0, 0, 0, 7, 0, 0, 0, 7}, buffer.data.array()); assertArrayEquals(new byte[]{0, 5, 104, 101, 108, 108, 111, 0, 0, 0, 0, 0, 7, 0, 0, 0, 7}, buffer.data.array());
} }
@Test
void testJustGet() {
var buffer = new CircularByteBuffer(8);
System.out.println(CircularByteBuffer.bytesToString(buffer.get()));
}
@Test @Test
void testPutFitsBeforeGet() { void testPutFitsBeforeGet() {
CircularByteBuffer buffer = new CircularByteBuffer(14); var buffer = new CircularByteBuffer(14);
byte[] bytes = "hello".getBytes(UTF_8); var bytes = "hello".getBytes(UTF_8);
buffer.setWriteIndex(7); buffer.setWriteIndex(7);
buffer.setReadIndex(7); buffer.setReadIndex(7);
buffer.put(bytes); buffer.put(bytes);
assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 5, 104, 101, 108, 108, 111, 0, 0, 0, 7, 0, 0, 0, 0}, buffer.data.array()); assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 5, 104, 101, 108, 108, 111, 0, 0, 0, 7, 0, 0, 0, 0}, buffer.data.array());
// buffer.setWriteIndex(0); // buffer.setWriteIndex(0);
// end of setup, situation where writeIndex < readIndex // end of setup, situation where writeIndex < readIndex
boolean written = buffer.put(bytes); var written = buffer.put(bytes);
assertTrue(written); assertTrue(written);
assertArrayEquals(new byte[]{0, 5, 104, 101, 108, 108, 111, 0, 5, 104, 101, 108, 108, 111, 0, 0, 0, 7, 0, 0, 0, 7}, buffer.data.array()); assertArrayEquals(new byte[]{0, 5, 104, 101, 108, 108, 111, 0, 5, 104, 101, 108, 108, 111, 0, 0, 0, 7, 0, 0, 0, 7}, buffer.data.array());
assertEquals(7, buffer.getReadIndex()); assertEquals(7, buffer.getReadIndex());
@ -37,8 +43,8 @@ class CircularByteBufferTest {
@Test @Test
void testPutFitsNotBeforeGet() { void testPutFitsNotBeforeGet() {
CircularByteBuffer buffer = new CircularByteBuffer(13); var buffer = new CircularByteBuffer(13);
byte[] bytes = "hello".getBytes(UTF_8); var bytes = "hello".getBytes(UTF_8);
buffer.setWriteIndex(6); buffer.setWriteIndex(6);
buffer.setReadIndex(6); buffer.setReadIndex(6);
buffer.put(bytes); buffer.put(bytes);
@ -52,8 +58,8 @@ class CircularByteBufferTest {
@Test @Test
void testWrapAroundPutLenAndOneCharBeforeWrap() { void testWrapAroundPutLenAndOneCharBeforeWrap() {
CircularByteBuffer buffer = new CircularByteBuffer(9); var buffer = new CircularByteBuffer(9);
byte[] bytes = "hello".getBytes(UTF_8); var bytes = "hello".getBytes(UTF_8);
buffer.setWriteIndex(6); buffer.setWriteIndex(6);
buffer.setReadIndex(6); buffer.setReadIndex(6);
boolean written = buffer.put(bytes); boolean written = buffer.put(bytes);
@ -64,11 +70,11 @@ class CircularByteBufferTest {
@Test @Test
void testWrapAroundPutLenBeforeWrap() { void testWrapAroundPutLenBeforeWrap() {
CircularByteBuffer buffer = new CircularByteBuffer(9); var buffer = new CircularByteBuffer(9);
byte[] bytes = "hello".getBytes(UTF_8); var bytes = "hello".getBytes(UTF_8);
buffer.setWriteIndex(7); buffer.setWriteIndex(7);
buffer.setReadIndex(7); buffer.setReadIndex(7);
boolean written = buffer.put(bytes); var written = buffer.put(bytes);
assertTrue(written); assertTrue(written);
assertArrayEquals(new byte[]{104, 101, 108, 108, 111, 0, 0, 0, 5, 0, 0, 0, 7, 0, 0, 0, 5}, buffer.data.array()); assertArrayEquals(new byte[]{104, 101, 108, 108, 111, 0, 0, 0, 5, 0, 0, 0, 7, 0, 0, 0, 5}, buffer.data.array());
assertArrayEquals(bytes, buffer.get()); assertArrayEquals(bytes, buffer.get());
@ -76,11 +82,11 @@ class CircularByteBufferTest {
@Test @Test
void testWrapAroundPutLenSplitBeforeWrap() { void testWrapAroundPutLenSplitBeforeWrap() {
CircularByteBuffer buffer = new CircularByteBuffer(9); var buffer = new CircularByteBuffer(9);
byte[] bytes = "hello".getBytes(UTF_8); var bytes = "hello".getBytes(UTF_8);
buffer.setWriteIndex(8); buffer.setWriteIndex(8);
buffer.setReadIndex(8); buffer.setReadIndex(8);
boolean written = buffer.put(bytes); var written = buffer.put(bytes);
assertTrue(written); assertTrue(written);
assertArrayEquals(new byte[]{5, 104, 101, 108, 108, 111, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 6}, buffer.data.array()); assertArrayEquals(new byte[]{5, 104, 101, 108, 108, 111, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 6}, buffer.data.array());
assertArrayEquals(bytes, buffer.get()); assertArrayEquals(bytes, buffer.get());
@ -88,8 +94,8 @@ class CircularByteBufferTest {
@Test @Test
void testNoFreeSpace() { void testNoFreeSpace() {
CircularByteBuffer buffer = new CircularByteBuffer(9); var buffer = new CircularByteBuffer(9);
byte[] bytes = "hello".getBytes(UTF_8); var bytes = "hello".getBytes(UTF_8);
boolean written1 = buffer.put(bytes); boolean written1 = buffer.put(bytes);
assertTrue(written1); assertTrue(written1);
boolean written2 = buffer.put(bytes); boolean written2 = buffer.put(bytes);
@ -98,12 +104,12 @@ class CircularByteBufferTest {
@Test @Test
void testFreeSpaceReclaimed() { void testFreeSpaceReclaimed() {
CircularByteBuffer buffer = new CircularByteBuffer(9); var buffer = new CircularByteBuffer(9);
assertEquals(0, buffer.getReadIndex()); assertEquals(0, buffer.getReadIndex());
assertEquals(0, buffer.getWriteIndex()); assertEquals(0, buffer.getWriteIndex());
byte[] bytes = "hello".getBytes(UTF_8); var bytes = "hello".getBytes(UTF_8);
boolean written1 = buffer.put(bytes); var written1 = buffer.put(bytes);
assertTrue(written1); assertTrue(written1);
assertEquals(0, buffer.getReadIndex()); assertEquals(0, buffer.getReadIndex());
assertEquals(7, buffer.getWriteIndex()); assertEquals(7, buffer.getWriteIndex());
@ -112,7 +118,7 @@ class CircularByteBufferTest {
assertEquals(7, buffer.getReadIndex()); assertEquals(7, buffer.getReadIndex());
assertEquals(7, buffer.getWriteIndex()); assertEquals(7, buffer.getWriteIndex());
boolean written2 = buffer.put(bytes); var written2 = buffer.put(bytes);
assertTrue(written2); // the read has freed space assertTrue(written2); // the read has freed space
assertEquals(7, buffer.getReadIndex()); assertEquals(7, buffer.getReadIndex());
assertEquals(5, buffer.getWriteIndex()); assertEquals(5, buffer.getWriteIndex());

View file

@ -2,13 +2,14 @@ package com.github.shautvast.exceptional;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*; import java.util.concurrent.TimeUnit;
class ExceptionLoggerTest { class ExceptionLoggerTest {
@Test // @Test
void test(){ void test() throws InterruptedException {
ExceptionLogger.log(new Throwable()); ExceptionLogger.log(new Throwable());
TimeUnit.SECONDS.sleep(30);
} }
} }

View file

@ -2,18 +2,23 @@ package com.github.shautvast.exceptional;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.lang.foreign.Arena;
import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertArrayEquals;
class MPSCBufferWriterTest { class MPSCBufferWriterTest {
@Test // @Test
void test() { void test() throws InterruptedException {
CircularByteBuffer buffer = new CircularByteBuffer(9); var arena = Arena.ofConfined();
try (MPSCBufferWriter writer = new MPSCBufferWriter(buffer)) { var ringbufferMemory = arena.allocate(4096);
// var buffer = new CircularByteBuffer(ringbufferMemory);
MPSCBufferWriter writer = new MPSCBufferWriter();
byte[] bytes = "cow".getBytes(UTF_8); byte[] bytes = "cow".getBytes(UTF_8);
writer.put(bytes); writer.put(bytes);
writer.put(bytes); writer.put(bytes);
} Thread.sleep(10000);
writer.close();
} }
} }

View file

@ -11,14 +11,15 @@ class RingBufferTest {
@Test @Test
void testWriteAndRead() { void testWriteAndRead() {
RingBuffer ringBuffer = new RingBuffer(MemorySegment.ofArray(new byte[16])); // var ringBuffer = new CircularByteBuffer(MemorySegment.ofArray(new byte[16]));
ringBuffer.startReader(x -> System.out.println("read " + new String(x, StandardCharsets.UTF_8))); var writer = new MPSCBufferWriter();
// writer.startReader(x -> System.out.println("read " + new String(x, StandardCharsets.UTF_8)));
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
System.out.println("put " + i + " in ring buffer"); System.out.println("put " + i + " in ring buffer");
byte[] testdata = ("test" + i).getBytes(StandardCharsets.UTF_8); byte[] testdata = ("test" + i).getBytes(StandardCharsets.UTF_8);
ringBuffer.write(testdata); writer.put(testdata);
} }
ringBuffer.drain();
} }
} }

View file

@ -1,17 +1,115 @@
use std::ffi::c_char;
use std::thread;
use std::thread::sleep;
use std::time::Duration;
mod throwable; mod throwable;
use std::ffi::{c_char, CStr}; const CAPACITY: isize = 32760;
const READ: isize = 32760;
const WRITE: isize = 32764;
#[no_mangle] #[no_mangle]
pub extern "C" fn log_java_exception(raw_string: *const c_char) { pub extern "C" fn create_ring_buffer(uc: *mut c_char) {
let c_str = unsafe { CStr::from_ptr(raw_string) }; let p = uc as usize; //cast to usize makes it Send, so we can pass it to the thread
let string = c_str.to_str(); thread::spawn(move || {
let raw_string = p as *mut c_char; // cast back to *mut c_char
let mut read_pos = get_u32(raw_string, READ) as isize;
let mut write_pos = get_u32(raw_string, WRITE) as isize;
loop {
// TODO something with tight loops
while read_pos == write_pos {
sleep(Duration::from_millis(1)); // hard to do this otherwise (better), because the other side is not rust, right??
read_pos = get_u32(raw_string, READ) as isize;
write_pos = get_u32(raw_string, WRITE) as isize;
}
let mut remaining = CAPACITY - read_pos;
let len = if remaining == 1 {
let byte_high = get_u8(raw_string, read_pos);
read_pos = 0;
let byte_low = get_u8(raw_string, read_pos);
read_pos += 1;
let l = (byte_high as u16) << 8 | byte_low as u16;
remaining = l as isize;
l
} else if remaining == 2 {
let l = get_u16(raw_string, read_pos);
read_pos = 0;
remaining = 0;
l
} else {
let l = get_u16(raw_string, read_pos);
read_pos += 2;
remaining -= 2;
l
} as isize;
let mut result = Vec::with_capacity(len as usize);
if len <= remaining {
// this.data.get(readIndex, result);
for i in 0..len {
unsafe { result.push(*raw_string.offset(read_pos + i) as u8); }
}
read_pos += len;
} else {
for i in 0..remaining {
unsafe { result.push(*raw_string.offset(read_pos + i) as u8); }
}
read_pos = 0;
for i in 0..len - remaining {
unsafe { result.push(*raw_string.offset(i) as u8); }
}
read_pos += len - remaining;
}
put_u32(raw_string, READ, read_pos as u32);
let string = String::from_utf8(result);
if let Ok(json) = string { if let Ok(json) = string {
println!("receiving {}", json); println!("receiving {}", json);
let error: throwable::Throwable = serde_json::from_str(json).unwrap(); // let error: throwable::Throwable = serde_json::from_str(json).unwrap();
println!("{:?}", error); // println!("{:?}", error);
} else {
println!("not ok");
} }
}
});
} }
#[cfg(test)] fn get_u8(s: *const c_char, pos: isize) -> u8 {
mod tests {} unsafe { *s.offset(pos) as u8 }
}
fn get_u16(s: *const c_char, pos: isize) -> u16 {
let mut b: [u8; 2] = [0; 2];
unsafe {
b[0] = *s.offset(pos) as u8;
b[1] = *s.offset(pos + 1) as u8;
}
u16::from_be_bytes(b)
}
fn get_u32(s: *mut c_char, pos: isize) -> u32 {
let mut b: [u8; 4] = [0; 4];
unsafe {
b[0] = *s.offset(pos) as u8;
b[1] = *s.offset(pos + 1) as u8;
b[2] = *s.offset(pos + 2) as u8;
b[3] = *s.offset(pos + 3) as u8;
}
u32::from_be_bytes(b)
}
fn put_u32(s: *mut c_char, pos: isize, value: u32) {
let bytes = u32::to_be_bytes(value);
unsafe {
*s.offset(pos) = bytes[0] as c_char;
*s.offset(pos + 1) = bytes[1] as c_char;
*s.offset(pos + 2) = bytes[2] as c_char;
*s.offset(pos + 3) = bytes[3] as c_char;
}
}