From 56555e765c2e57184ab8d8d0542c7daceeb7ac56 Mon Sep 17 00:00:00 2001 From: Shautvast Date: Mon, 1 Jul 2024 11:04:49 +0200 Subject: [PATCH] ok, it's working, but I think I can improve the design --- .../exceptional/CircularByteBuffer.java | 51 +++++--- .../exceptional/ExceptionLogger.java | 28 +--- .../exceptional/MPSCBufferWriter.java | 44 ++++++- .../exceptional/CircularByteBufferTest.java | 46 ++++--- .../exceptional/ExceptionLoggerTest.java | 7 +- .../exceptional/MPSCBufferWriterTest.java | 21 +-- .../shautvast/exceptional/RingBufferTest.java | 9 +- rustlib/src/lib.rs | 120 ++++++++++++++++-- 8 files changed, 232 insertions(+), 94 deletions(-) diff --git a/lib/src/main/java/com/github/shautvast/exceptional/CircularByteBuffer.java b/lib/src/main/java/com/github/shautvast/exceptional/CircularByteBuffer.java index 02c56a6..b7c6181 100644 --- a/lib/src/main/java/com/github/shautvast/exceptional/CircularByteBuffer.java +++ b/lib/src/main/java/com/github/shautvast/exceptional/CircularByteBuffer.java @@ -24,8 +24,8 @@ import java.util.stream.IntStream; @SuppressWarnings("StringTemplateMigration") public class CircularByteBuffer { - private int READ_POS; - private int WRITE_POS; + private int readStartPos; + private int writeStartPos; private int capacity; final ByteBuffer data; @@ -54,11 +54,11 @@ public class CircularByteBuffer { private void initIndices() { this.capacity = this.data.capacity() - 8; - READ_POS = this.capacity; // write values after logical capacity position - WRITE_POS = this.capacity + 4; + readStartPos = this.capacity; // write values after logical capacity position + writeStartPos = this.capacity + 4; - this.data.putInt(READ_POS, 0); - this.data.putInt(WRITE_POS, 0); + this.data.putInt(readStartPos, 0); + this.data.putInt(writeStartPos, 0); } @@ -113,7 +113,8 @@ public class CircularByteBuffer { return true; } } catch (Exception e) { - throw new RuntimeException(e); + e.printStackTrace(); + return false; } finally { setWriteIndex(writeIndex); } @@ -121,10 +122,15 @@ public class CircularByteBuffer { /** * The reader side is provided, for reference and testability only. - * In practice, the reader is implemented outside of java + * In practice, the reader is implemented outside of java, see rustlib module */ public byte[] get() { int readIndex = getReadIndex(); + int writeIndex = getWriteIndex(); + if (readIndex == writeIndex) { + return null; + } + try { int remainingUntilEnd = capacity - readIndex; int len; @@ -161,31 +167,40 @@ public class CircularByteBuffer { } int getWriteIndex() { - return this.data.getInt(WRITE_POS); + return this.data.getInt(writeStartPos); } void setWriteIndex(int writeIndex) { - this.data.putInt(WRITE_POS, writeIndex); + this.data.putInt(writeStartPos, writeIndex); } int getReadIndex() { - return this.data.getInt(READ_POS); + return this.data.getInt(readStartPos); } void setReadIndex(int readIndex) { - this.data.putInt(READ_POS, readIndex); + this.data.putInt(readStartPos, readIndex); } @Override public String toString() { - return "CircularByteBuffer {r=" + this.data.getInt(READ_POS) + + return "CircularByteBuffer {r=" + this.data.getInt(readStartPos) + ", w=" + - this.data.getInt(WRITE_POS) + + this.data.getInt(writeStartPos) + ", data=" + - IntStream.range(0, capacity) - .map(x -> this.data.array()[x]) - .mapToObj(Integer::toString) - .collect(Collectors.joining(",", "[", "]")) + + bytesToString(this.data.array()) + "}"; } + + public static String bytesToString(byte[] bytes) { + if (bytes == null) { + return "null"; + } + return IntStream.range(0, bytes.length) + .map(x -> bytes[x]) + .mapToObj(Integer::toString) + .collect(Collectors.joining(",", "[", "]")); + } + + } diff --git a/lib/src/main/java/com/github/shautvast/exceptional/ExceptionLogger.java b/lib/src/main/java/com/github/shautvast/exceptional/ExceptionLogger.java index 052eafb..27b9d74 100644 --- a/lib/src/main/java/com/github/shautvast/exceptional/ExceptionLogger.java +++ b/lib/src/main/java/com/github/shautvast/exceptional/ExceptionLogger.java @@ -2,38 +2,16 @@ package com.github.shautvast.exceptional; import com.fasterxml.jackson.databind.ObjectMapper; -import java.lang.foreign.*; -import java.lang.invoke.MethodHandle; - @SuppressWarnings("unused") // this code is called from the instrumented code public class ExceptionLogger { - private static final Arena arena = Arena.ofConfined(); - private static final MemorySegment ringbufferMemory = arena.allocate(4096); - private static final Linker linker = Linker.nativeLinker(); - // //TODO relative path, or configurable - private static final SymbolLookup rustlib = SymbolLookup.libraryLookup("/Users/Shautvast/dev/exceptional/rustlib/target/debug/librustlib.dylib", arena); - private final static MethodHandle logNative; - private final static ObjectMapper objectMapper = new ObjectMapper(); - private final static MPSCBufferWriter bufferWriter; + private final static ObjectMapper objectMapper = new ObjectMapper();; + private final static MPSCBufferWriter bufferWriter=new MPSCBufferWriter(); - static { - MemorySegment logFunction = rustlib.find("log_java_exception").orElseThrow(); - logNative = linker.downcallHandle(logFunction, FunctionDescriptor.ofVoid( - ValueLayout.ADDRESS - )); - CircularByteBuffer buffer = new CircularByteBuffer(ringbufferMemory); - bufferWriter = new MPSCBufferWriter(buffer); - } - - // how does this behave in a multithreaded context?? - // probably need a ringbuffer with fixed memory to make this work efficiently public static void log(Throwable throwable) { try { // use json for now because of ease of integration if (throwable != null) { - String json = objectMapper.writeValueAsString(throwable); - var data = arena.allocateFrom(json); // reuse instead of reallocating? - logNative.invoke(data); // invoke the rust function + bufferWriter.put(objectMapper.writeValueAsBytes(throwable)); } } catch (Throwable e) { e.printStackTrace(System.err); diff --git a/lib/src/main/java/com/github/shautvast/exceptional/MPSCBufferWriter.java b/lib/src/main/java/com/github/shautvast/exceptional/MPSCBufferWriter.java index 3728807..d563f07 100644 --- a/lib/src/main/java/com/github/shautvast/exceptional/MPSCBufferWriter.java +++ b/lib/src/main/java/com/github/shautvast/exceptional/MPSCBufferWriter.java @@ -1,5 +1,8 @@ package com.github.shautvast.exceptional; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.lang.foreign.*; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -9,27 +12,58 @@ import java.util.concurrent.atomic.AtomicBoolean; * Enables multithreaded writing, while keeping CircularByteBuffer simpler (only suitable for single-threaded writing) */ public class MPSCBufferWriter implements AutoCloseable { - + private static Linker linker; + private static SymbolLookup rustlib; private static final ConcurrentLinkedDeque writeQueue = new ConcurrentLinkedDeque<>(); // unbounded private static final ExecutorService executorService = Executors.newSingleThreadExecutor(); private final AtomicBoolean active = new AtomicBoolean(false); - private final CircularByteBuffer buffer; - public MPSCBufferWriter(CircularByteBuffer buffer) { - this.buffer = buffer; + public MPSCBufferWriter() { startWriteQueueListener(); } private void startWriteQueueListener() { active.set(true); + executorService.submit(() -> { + // maybe test again with this part of the code somewhere else + + // setup of native memory ringbuffer + var arena = Arena.ofConfined(); + var ringbufferMemory = arena.allocate(32768); + var buffer = new CircularByteBuffer(ringbufferMemory); + + arena = Arena.ofConfined(); + linker = Linker.nativeLinker(); + //TODO relative path, or configurable + rustlib = SymbolLookup.libraryLookup("/Users/Shautvast/dev/exceptional/rustlib/target/debug/librustlib.dylib", arena); + MemorySegment create = rustlib.find("create_ring_buffer").orElseThrow(); + var createHandle = linker.downcallHandle(create, FunctionDescriptor.ofVoid( + ValueLayout.ADDRESS + )); + try { + createHandle.invoke(ringbufferMemory); + } catch (Throwable e) { + throw new RuntimeException(e); + } + + // start polling from the queue and offer elements to the ringbuffer while (active.get()) { var element = writeQueue.pollFirst(); if (element != null) { while (!buffer.put(element) && active.get()) { - Thread.yield(); + try { + Thread.sleep(1); // TODO remove the sleep + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } } + try { + Thread.sleep(1); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } }); diff --git a/lib/src/test/java/com/github/shautvast/exceptional/CircularByteBufferTest.java b/lib/src/test/java/com/github/shautvast/exceptional/CircularByteBufferTest.java index 19a024e..7329db5 100644 --- a/lib/src/test/java/com/github/shautvast/exceptional/CircularByteBufferTest.java +++ b/lib/src/test/java/com/github/shautvast/exceptional/CircularByteBufferTest.java @@ -9,7 +9,7 @@ class CircularByteBufferTest { @Test void testPutAndGet() { - CircularByteBuffer buffer = new CircularByteBuffer(9); + var buffer = new CircularByteBuffer(9); byte[] bytes = "hello".getBytes(UTF_8); boolean written = buffer.put(bytes); assertTrue(written); @@ -17,18 +17,24 @@ class CircularByteBufferTest { assertArrayEquals(new byte[]{0, 5, 104, 101, 108, 108, 111, 0, 0, 0, 0, 0, 7, 0, 0, 0, 7}, buffer.data.array()); } + @Test + void testJustGet() { + var buffer = new CircularByteBuffer(8); + System.out.println(CircularByteBuffer.bytesToString(buffer.get())); + } + @Test void testPutFitsBeforeGet() { - CircularByteBuffer buffer = new CircularByteBuffer(14); - byte[] bytes = "hello".getBytes(UTF_8); + var buffer = new CircularByteBuffer(14); + var bytes = "hello".getBytes(UTF_8); buffer.setWriteIndex(7); buffer.setReadIndex(7); buffer.put(bytes); assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 5, 104, 101, 108, 108, 111, 0, 0, 0, 7, 0, 0, 0, 0}, buffer.data.array()); // buffer.setWriteIndex(0); // end of setup, situation where writeIndex < readIndex - boolean written = buffer.put(bytes); + var written = buffer.put(bytes); assertTrue(written); assertArrayEquals(new byte[]{0, 5, 104, 101, 108, 108, 111, 0, 5, 104, 101, 108, 108, 111, 0, 0, 0, 7, 0, 0, 0, 7}, buffer.data.array()); assertEquals(7, buffer.getReadIndex()); @@ -37,8 +43,8 @@ class CircularByteBufferTest { @Test void testPutFitsNotBeforeGet() { - CircularByteBuffer buffer = new CircularByteBuffer(13); - byte[] bytes = "hello".getBytes(UTF_8); + var buffer = new CircularByteBuffer(13); + var bytes = "hello".getBytes(UTF_8); buffer.setWriteIndex(6); buffer.setReadIndex(6); buffer.put(bytes); @@ -52,8 +58,8 @@ class CircularByteBufferTest { @Test void testWrapAroundPutLenAndOneCharBeforeWrap() { - CircularByteBuffer buffer = new CircularByteBuffer(9); - byte[] bytes = "hello".getBytes(UTF_8); + var buffer = new CircularByteBuffer(9); + var bytes = "hello".getBytes(UTF_8); buffer.setWriteIndex(6); buffer.setReadIndex(6); boolean written = buffer.put(bytes); @@ -64,11 +70,11 @@ class CircularByteBufferTest { @Test void testWrapAroundPutLenBeforeWrap() { - CircularByteBuffer buffer = new CircularByteBuffer(9); - byte[] bytes = "hello".getBytes(UTF_8); + var buffer = new CircularByteBuffer(9); + var bytes = "hello".getBytes(UTF_8); buffer.setWriteIndex(7); buffer.setReadIndex(7); - boolean written = buffer.put(bytes); + var written = buffer.put(bytes); assertTrue(written); assertArrayEquals(new byte[]{104, 101, 108, 108, 111, 0, 0, 0, 5, 0, 0, 0, 7, 0, 0, 0, 5}, buffer.data.array()); assertArrayEquals(bytes, buffer.get()); @@ -76,11 +82,11 @@ class CircularByteBufferTest { @Test void testWrapAroundPutLenSplitBeforeWrap() { - CircularByteBuffer buffer = new CircularByteBuffer(9); - byte[] bytes = "hello".getBytes(UTF_8); + var buffer = new CircularByteBuffer(9); + var bytes = "hello".getBytes(UTF_8); buffer.setWriteIndex(8); buffer.setReadIndex(8); - boolean written = buffer.put(bytes); + var written = buffer.put(bytes); assertTrue(written); assertArrayEquals(new byte[]{5, 104, 101, 108, 108, 111, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 6}, buffer.data.array()); assertArrayEquals(bytes, buffer.get()); @@ -88,8 +94,8 @@ class CircularByteBufferTest { @Test void testNoFreeSpace() { - CircularByteBuffer buffer = new CircularByteBuffer(9); - byte[] bytes = "hello".getBytes(UTF_8); + var buffer = new CircularByteBuffer(9); + var bytes = "hello".getBytes(UTF_8); boolean written1 = buffer.put(bytes); assertTrue(written1); boolean written2 = buffer.put(bytes); @@ -98,12 +104,12 @@ class CircularByteBufferTest { @Test void testFreeSpaceReclaimed() { - CircularByteBuffer buffer = new CircularByteBuffer(9); + var buffer = new CircularByteBuffer(9); assertEquals(0, buffer.getReadIndex()); assertEquals(0, buffer.getWriteIndex()); - byte[] bytes = "hello".getBytes(UTF_8); - boolean written1 = buffer.put(bytes); + var bytes = "hello".getBytes(UTF_8); + var written1 = buffer.put(bytes); assertTrue(written1); assertEquals(0, buffer.getReadIndex()); assertEquals(7, buffer.getWriteIndex()); @@ -112,7 +118,7 @@ class CircularByteBufferTest { assertEquals(7, buffer.getReadIndex()); assertEquals(7, buffer.getWriteIndex()); - boolean written2 = buffer.put(bytes); + var written2 = buffer.put(bytes); assertTrue(written2); // the read has freed space assertEquals(7, buffer.getReadIndex()); assertEquals(5, buffer.getWriteIndex()); diff --git a/lib/src/test/java/com/github/shautvast/exceptional/ExceptionLoggerTest.java b/lib/src/test/java/com/github/shautvast/exceptional/ExceptionLoggerTest.java index 8ed57a2..b995400 100644 --- a/lib/src/test/java/com/github/shautvast/exceptional/ExceptionLoggerTest.java +++ b/lib/src/test/java/com/github/shautvast/exceptional/ExceptionLoggerTest.java @@ -2,13 +2,14 @@ package com.github.shautvast.exceptional; import org.junit.jupiter.api.Test; -import static org.junit.jupiter.api.Assertions.*; +import java.util.concurrent.TimeUnit; class ExceptionLoggerTest { - @Test - void test(){ +// @Test + void test() throws InterruptedException { ExceptionLogger.log(new Throwable()); + TimeUnit.SECONDS.sleep(30); } } \ No newline at end of file diff --git a/lib/src/test/java/com/github/shautvast/exceptional/MPSCBufferWriterTest.java b/lib/src/test/java/com/github/shautvast/exceptional/MPSCBufferWriterTest.java index 7eafc03..bcfc678 100644 --- a/lib/src/test/java/com/github/shautvast/exceptional/MPSCBufferWriterTest.java +++ b/lib/src/test/java/com/github/shautvast/exceptional/MPSCBufferWriterTest.java @@ -2,18 +2,23 @@ package com.github.shautvast.exceptional; import org.junit.jupiter.api.Test; +import java.lang.foreign.Arena; + import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertArrayEquals; class MPSCBufferWriterTest { - @Test - void test() { - CircularByteBuffer buffer = new CircularByteBuffer(9); - try (MPSCBufferWriter writer = new MPSCBufferWriter(buffer)) { - byte[] bytes = "cow".getBytes(UTF_8); - writer.put(bytes); - writer.put(bytes); - } +// @Test + void test() throws InterruptedException { + var arena = Arena.ofConfined(); + var ringbufferMemory = arena.allocate(4096); +// var buffer = new CircularByteBuffer(ringbufferMemory); + MPSCBufferWriter writer = new MPSCBufferWriter(); + byte[] bytes = "cow".getBytes(UTF_8); + writer.put(bytes); + writer.put(bytes); + Thread.sleep(10000); + writer.close(); } } \ No newline at end of file diff --git a/lib/src/test/java/com/github/shautvast/exceptional/RingBufferTest.java b/lib/src/test/java/com/github/shautvast/exceptional/RingBufferTest.java index 897c6ea..fcc9f3a 100644 --- a/lib/src/test/java/com/github/shautvast/exceptional/RingBufferTest.java +++ b/lib/src/test/java/com/github/shautvast/exceptional/RingBufferTest.java @@ -11,14 +11,15 @@ class RingBufferTest { @Test void testWriteAndRead() { - RingBuffer ringBuffer = new RingBuffer(MemorySegment.ofArray(new byte[16])); - ringBuffer.startReader(x -> System.out.println("read " + new String(x, StandardCharsets.UTF_8))); +// var ringBuffer = new CircularByteBuffer(MemorySegment.ofArray(new byte[16])); + var writer = new MPSCBufferWriter(); + +// writer.startReader(x -> System.out.println("read " + new String(x, StandardCharsets.UTF_8))); for (int i = 0; i < 10; i++) { System.out.println("put " + i + " in ring buffer"); byte[] testdata = ("test" + i).getBytes(StandardCharsets.UTF_8); - ringBuffer.write(testdata); + writer.put(testdata); } - ringBuffer.drain(); } } diff --git a/rustlib/src/lib.rs b/rustlib/src/lib.rs index a7f876d..1c92bac 100644 --- a/rustlib/src/lib.rs +++ b/rustlib/src/lib.rs @@ -1,17 +1,115 @@ +use std::ffi::c_char; +use std::thread; +use std::thread::sleep; +use std::time::Duration; + mod throwable; -use std::ffi::{c_char, CStr}; +const CAPACITY: isize = 32760; +const READ: isize = 32760; +const WRITE: isize = 32764; #[no_mangle] -pub extern "C" fn log_java_exception(raw_string: *const c_char) { - let c_str = unsafe { CStr::from_ptr(raw_string) }; - let string = c_str.to_str(); - if let Ok(json) = string { - println!("receiving {}", json); - let error: throwable::Throwable = serde_json::from_str(json).unwrap(); - println!("{:?}", error); - } +pub extern "C" fn create_ring_buffer(uc: *mut c_char) { + let p = uc as usize; //cast to usize makes it Send, so we can pass it to the thread + thread::spawn(move || { + + let raw_string = p as *mut c_char; // cast back to *mut c_char + + let mut read_pos = get_u32(raw_string, READ) as isize; + let mut write_pos = get_u32(raw_string, WRITE) as isize; + loop { + + // TODO something with tight loops + while read_pos == write_pos { + sleep(Duration::from_millis(1)); // hard to do this otherwise (better), because the other side is not rust, right?? + read_pos = get_u32(raw_string, READ) as isize; + write_pos = get_u32(raw_string, WRITE) as isize; + } + + let mut remaining = CAPACITY - read_pos; + let len = if remaining == 1 { + let byte_high = get_u8(raw_string, read_pos); + read_pos = 0; + let byte_low = get_u8(raw_string, read_pos); + read_pos += 1; + let l = (byte_high as u16) << 8 | byte_low as u16; + remaining = l as isize; + l + } else if remaining == 2 { + let l = get_u16(raw_string, read_pos); + read_pos = 0; + remaining = 0; + l + } else { + let l = get_u16(raw_string, read_pos); + read_pos += 2; + remaining -= 2; + l + } as isize; + + let mut result = Vec::with_capacity(len as usize); + if len <= remaining { + // this.data.get(readIndex, result); + for i in 0..len { + unsafe { result.push(*raw_string.offset(read_pos + i) as u8); } + } + read_pos += len; + } else { + for i in 0..remaining { + unsafe { result.push(*raw_string.offset(read_pos + i) as u8); } + } + read_pos = 0; + for i in 0..len - remaining { + unsafe { result.push(*raw_string.offset(i) as u8); } + } + read_pos += len - remaining; + } + put_u32(raw_string, READ, read_pos as u32); + + let string = String::from_utf8(result); + if let Ok(json) = string { + println!("receiving {}", json); + // let error: throwable::Throwable = serde_json::from_str(json).unwrap(); + // println!("{:?}", error); + } else { + println!("not ok"); + } + } + }); } -#[cfg(test)] -mod tests {} +fn get_u8(s: *const c_char, pos: isize) -> u8 { + unsafe { *s.offset(pos) as u8 } +} + + +fn get_u16(s: *const c_char, pos: isize) -> u16 { + let mut b: [u8; 2] = [0; 2]; + unsafe { + b[0] = *s.offset(pos) as u8; + b[1] = *s.offset(pos + 1) as u8; + } + u16::from_be_bytes(b) +} + +fn get_u32(s: *mut c_char, pos: isize) -> u32 { + let mut b: [u8; 4] = [0; 4]; + unsafe { + b[0] = *s.offset(pos) as u8; + b[1] = *s.offset(pos + 1) as u8; + b[2] = *s.offset(pos + 2) as u8; + b[3] = *s.offset(pos + 3) as u8; + } + u32::from_be_bytes(b) +} + +fn put_u32(s: *mut c_char, pos: isize, value: u32) { + let bytes = u32::to_be_bytes(value); + unsafe { + *s.offset(pos) = bytes[0] as c_char; + *s.offset(pos + 1) = bytes[1] as c_char; + *s.offset(pos + 2) = bytes[2] as c_char; + *s.offset(pos + 3) = bytes[3] as c_char; + } +} \ No newline at end of file