diff --git a/lib/src/main/java/com/github/shautvast/exceptional/CircularByteBuffer.java b/lib/src/main/java/com/github/shautvast/exceptional/CircularByteBuffer.java new file mode 100644 index 0000000..7030046 --- /dev/null +++ b/lib/src/main/java/com/github/shautvast/exceptional/CircularByteBuffer.java @@ -0,0 +1,110 @@ +package com.github.shautvast.exceptional; + +import java.nio.ByteBuffer; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Circular buffer for variable sized byte arrays + * The singlethread version + */ +@SuppressWarnings("StringTemplateMigration") +public class CircularByteBuffer { + + final ByteBuffer data; + int readIndex = 0; + int writeIndex = 0; + + public CircularByteBuffer(int capacity) { + data = ByteBuffer.allocate(capacity); + } + + public boolean put(byte[] bytes) { + int len = bytes.length; + int remaining; + // check capacity for bytes to insert + if (writeIndex >= readIndex) { + remaining = data.capacity() - writeIndex + readIndex; + } else { + remaining = readIndex - writeIndex; + } + if (remaining < len + 2) { + return false; + } else { + int remainingUntilEnd = data.capacity() - writeIndex; + if (remainingUntilEnd < len + 2) { + if (remainingUntilEnd > 1) { + // we can write the length + data.putShort(writeIndex, (short) len); + writeIndex += 2; + remainingUntilEnd -= 2; + if (remainingUntilEnd > 0) { + data.put(writeIndex, bytes, 0, remainingUntilEnd); + } + writeIndex = 0; + data.put(writeIndex, bytes, remainingUntilEnd, len - remainingUntilEnd); + writeIndex += len - remainingUntilEnd; + } else { + // we can write only one byte of the length + data.put(writeIndex, (byte) (len >> 8)); + writeIndex = 0; + data.put(writeIndex, (byte) (len & 0xff)); + writeIndex += 1; + data.put(writeIndex, bytes); + writeIndex += len; + } + } else { + data.putShort(writeIndex, (short) len); + writeIndex += 2; + data.put(writeIndex, bytes); + writeIndex += len; + } + return true; + } + } + + public byte[] get() { + int remainingUntilEnd = data.capacity() - readIndex; + int len; + if (remainingUntilEnd == 1) { + byte high = data.get(readIndex); + readIndex = 0; + byte low = data.get(readIndex); + readIndex = 1; + len = high << 8 | low; + remainingUntilEnd = len; + } else if (remainingUntilEnd == 2) { + len = data.getShort(readIndex); + readIndex = 0; + remainingUntilEnd = 0; + } else { + len = data.getShort(readIndex); + readIndex += 2; + remainingUntilEnd -= 2; + } + byte[] result = new byte[len]; + if (len <= remainingUntilEnd) { + data.get(readIndex, result); + readIndex += len; + } else { + data.get(readIndex, result, 0, remainingUntilEnd); + readIndex = 0; + data.get(readIndex, result, remainingUntilEnd, len - remainingUntilEnd); + readIndex += len - remainingUntilEnd; + } + return result; + } + + @Override + public String toString() { + return "CircularBuffer {r=" + this.readIndex + + ", w=" + + this.writeIndex + + ", data=" + + IntStream.range(0, this.data.array().length) + .map(x -> this.data.array()[x]) + .mapToObj(Integer::toString) + .collect(Collectors.joining(",", "[", "]")) + + "}"; + } +} diff --git a/lib/src/main/java/com/github/shautvast/exceptional/MPSCBufferWriter.java b/lib/src/main/java/com/github/shautvast/exceptional/MPSCBufferWriter.java new file mode 100644 index 0000000..3728807 --- /dev/null +++ b/lib/src/main/java/com/github/shautvast/exceptional/MPSCBufferWriter.java @@ -0,0 +1,50 @@ +package com.github.shautvast.exceptional; + +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Enables multithreaded writing, while keeping CircularByteBuffer simpler (only suitable for single-threaded writing) + */ +public class MPSCBufferWriter implements AutoCloseable { + + private static final ConcurrentLinkedDeque writeQueue = new ConcurrentLinkedDeque<>(); // unbounded + private static final ExecutorService executorService = Executors.newSingleThreadExecutor(); + private final AtomicBoolean active = new AtomicBoolean(false); + private final CircularByteBuffer buffer; + + public MPSCBufferWriter(CircularByteBuffer buffer) { + this.buffer = buffer; + startWriteQueueListener(); + } + + private void startWriteQueueListener() { + active.set(true); + executorService.submit(() -> { + while (active.get()) { + var element = writeQueue.pollFirst(); + if (element != null) { + while (!buffer.put(element) && active.get()) { + Thread.yield(); + } + } + } + + }); + } + + public void put(byte[] bytes) { + writeQueue.offerLast(bytes); + } + + /** + * Shuts down the background thread + */ + @Override + public void close() { + active.set(false); + executorService.close(); + } +} diff --git a/lib/src/test/java/com/github/shautvast/exceptional/CircularByteBufferTest.java b/lib/src/test/java/com/github/shautvast/exceptional/CircularByteBufferTest.java new file mode 100644 index 0000000..1ef927c --- /dev/null +++ b/lib/src/test/java/com/github/shautvast/exceptional/CircularByteBufferTest.java @@ -0,0 +1,124 @@ +package com.github.shautvast.exceptional; + +import org.junit.jupiter.api.Test; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.*; + +class CircularByteBufferTest { + + @Test + void testPutAndGet() { + CircularByteBuffer buffer = new CircularByteBuffer(9); + byte[] bytes = "hello".getBytes(UTF_8); + boolean written = buffer.put(bytes); + assertTrue(written); + assertArrayEquals(bytes, buffer.get()); + assertArrayEquals(new byte[]{0, 5, 104, 101, 108, 108, 111, 0, 0}, buffer.data.array()); + } + + @Test + void testPutFitsBeforeGet() { + CircularByteBuffer buffer = new CircularByteBuffer(14); + byte[] bytes = "hello".getBytes(UTF_8); + buffer.writeIndex = 7; + buffer.readIndex = 7; + buffer.put(bytes); + assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 5, 104, 101, 108, 108, 111}, buffer.data.array()); + buffer.writeIndex = 0; + // end of setup, situation where writeIndex < readIndex + boolean written = buffer.put(bytes); + assertTrue(written); + assertArrayEquals(new byte[]{0, 5, 104, 101, 108, 108, 111, 0, 5, 104, 101, 108, 108, 111}, buffer.data.array()); + } + + @Test + void testPutFitsNotBeforeGet() { + CircularByteBuffer buffer = new CircularByteBuffer(13); + byte[] bytes = "hello".getBytes(UTF_8); + buffer.writeIndex = 6; + buffer.readIndex = 6; + buffer.put(bytes); + assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 5, 104, 101, 108, 108, 111}, buffer.data.array()); + buffer.writeIndex = 0; + // end of setup, situation where writeIndex < readIndex + boolean written = buffer.put(bytes); + assertFalse(written); + assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 5, 104, 101, 108, 108, 111}, buffer.data.array()); + } + + @Test + void testWrapAroundPutLenAndOneCharBeforeWrap() { + CircularByteBuffer buffer = new CircularByteBuffer(9); + byte[] bytes = "hello".getBytes(UTF_8); + buffer.writeIndex = 6; + buffer.readIndex = 6; + boolean written = buffer.put(bytes); + assertTrue(written); + assertArrayEquals(new byte[]{101, 108, 108, 111, 0, 0, 0, 5, 104}, buffer.data.array()); + assertArrayEquals(bytes, buffer.get()); + } + + @Test + void testWrapAroundPutLenBeforeWrap() { + CircularByteBuffer buffer = new CircularByteBuffer(9); + byte[] bytes = "hello".getBytes(UTF_8); + buffer.writeIndex = 7; + buffer.readIndex = 7; + boolean written = buffer.put(bytes); + assertTrue(written); + assertArrayEquals(new byte[]{104, 101, 108, 108, 111, 0, 0, 0, 5}, buffer.data.array()); + assertArrayEquals(bytes, buffer.get()); + } + + @Test + void testWrapAroundPutLenSplitBeforeWrap() { + CircularByteBuffer buffer = new CircularByteBuffer(9); + byte[] bytes = "hello".getBytes(UTF_8); + buffer.writeIndex = 8; + buffer.readIndex = 8; + boolean written = buffer.put(bytes); + assertTrue(written); + assertArrayEquals(new byte[]{5, 104, 101, 108, 108, 111, 0, 0, 0}, buffer.data.array()); + assertArrayEquals(bytes, buffer.get()); + } + + @Test + void testNoFreeSpace() { + CircularByteBuffer buffer = new CircularByteBuffer(9); + byte[] bytes = "hello".getBytes(UTF_8); + boolean written1 = buffer.put(bytes); + assertTrue(written1); + boolean written2 = buffer.put(bytes); + assertFalse(written2); // no space left + } + + @Test + void testFreeSpaceReclaimed() { + CircularByteBuffer buffer = new CircularByteBuffer(9); + assertEquals(0, buffer.readIndex); + assertEquals(0, buffer.writeIndex); + + byte[] bytes = "hello".getBytes(UTF_8); + boolean written1 = buffer.put(bytes); + assertTrue(written1); + assertEquals(0, buffer.readIndex); + assertEquals(7, buffer.writeIndex); + + assertArrayEquals(bytes, buffer.get()); + assertEquals(7, buffer.readIndex); + assertEquals(7, buffer.writeIndex); + + boolean written2 = buffer.put(bytes); + assertTrue(written2); // the read has freed space + assertEquals(7, buffer.readIndex); + assertEquals(5, buffer.writeIndex); + + + assertArrayEquals(bytes, buffer.get()); + assertEquals(5, buffer.readIndex); + assertEquals(5, buffer.writeIndex); + } + + +} \ No newline at end of file diff --git a/lib/src/test/java/com/github/shautvast/exceptional/MPSCBufferWriterTest.java b/lib/src/test/java/com/github/shautvast/exceptional/MPSCBufferWriterTest.java new file mode 100644 index 0000000..7eafc03 --- /dev/null +++ b/lib/src/test/java/com/github/shautvast/exceptional/MPSCBufferWriterTest.java @@ -0,0 +1,19 @@ +package com.github.shautvast.exceptional; + +import org.junit.jupiter.api.Test; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; + +class MPSCBufferWriterTest { + + @Test + void test() { + CircularByteBuffer buffer = new CircularByteBuffer(9); + try (MPSCBufferWriter writer = new MPSCBufferWriter(buffer)) { + byte[] bytes = "cow".getBytes(UTF_8); + writer.put(bytes); + writer.put(bytes); + } + } +} \ No newline at end of file