correct circular buffer

This commit is contained in:
Sander Hautvast 2024-06-28 16:13:48 +02:00
parent 824e9ac712
commit 5a8f35fc29
4 changed files with 303 additions and 0 deletions

View file

@ -0,0 +1,110 @@
package com.github.shautvast.exceptional;
import java.nio.ByteBuffer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
/**
* Circular buffer for variable sized byte arrays
* The singlethread version
*/
@SuppressWarnings("StringTemplateMigration")
public class CircularByteBuffer {
final ByteBuffer data;
int readIndex = 0;
int writeIndex = 0;
public CircularByteBuffer(int capacity) {
data = ByteBuffer.allocate(capacity);
}
public boolean put(byte[] bytes) {
int len = bytes.length;
int remaining;
// check capacity for bytes to insert
if (writeIndex >= readIndex) {
remaining = data.capacity() - writeIndex + readIndex;
} else {
remaining = readIndex - writeIndex;
}
if (remaining < len + 2) {
return false;
} else {
int remainingUntilEnd = data.capacity() - writeIndex;
if (remainingUntilEnd < len + 2) {
if (remainingUntilEnd > 1) {
// we can write the length
data.putShort(writeIndex, (short) len);
writeIndex += 2;
remainingUntilEnd -= 2;
if (remainingUntilEnd > 0) {
data.put(writeIndex, bytes, 0, remainingUntilEnd);
}
writeIndex = 0;
data.put(writeIndex, bytes, remainingUntilEnd, len - remainingUntilEnd);
writeIndex += len - remainingUntilEnd;
} else {
// we can write only one byte of the length
data.put(writeIndex, (byte) (len >> 8));
writeIndex = 0;
data.put(writeIndex, (byte) (len & 0xff));
writeIndex += 1;
data.put(writeIndex, bytes);
writeIndex += len;
}
} else {
data.putShort(writeIndex, (short) len);
writeIndex += 2;
data.put(writeIndex, bytes);
writeIndex += len;
}
return true;
}
}
public byte[] get() {
int remainingUntilEnd = data.capacity() - readIndex;
int len;
if (remainingUntilEnd == 1) {
byte high = data.get(readIndex);
readIndex = 0;
byte low = data.get(readIndex);
readIndex = 1;
len = high << 8 | low;
remainingUntilEnd = len;
} else if (remainingUntilEnd == 2) {
len = data.getShort(readIndex);
readIndex = 0;
remainingUntilEnd = 0;
} else {
len = data.getShort(readIndex);
readIndex += 2;
remainingUntilEnd -= 2;
}
byte[] result = new byte[len];
if (len <= remainingUntilEnd) {
data.get(readIndex, result);
readIndex += len;
} else {
data.get(readIndex, result, 0, remainingUntilEnd);
readIndex = 0;
data.get(readIndex, result, remainingUntilEnd, len - remainingUntilEnd);
readIndex += len - remainingUntilEnd;
}
return result;
}
@Override
public String toString() {
return "CircularBuffer {r=" + this.readIndex +
", w=" +
this.writeIndex +
", data=" +
IntStream.range(0, this.data.array().length)
.map(x -> this.data.array()[x])
.mapToObj(Integer::toString)
.collect(Collectors.joining(",", "[", "]")) +
"}";
}
}

View file

@ -0,0 +1,50 @@
package com.github.shautvast.exceptional;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Enables multithreaded writing, while keeping CircularByteBuffer simpler (only suitable for single-threaded writing)
*/
public class MPSCBufferWriter implements AutoCloseable {
private static final ConcurrentLinkedDeque<byte[]> writeQueue = new ConcurrentLinkedDeque<>(); // unbounded
private static final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final AtomicBoolean active = new AtomicBoolean(false);
private final CircularByteBuffer buffer;
public MPSCBufferWriter(CircularByteBuffer buffer) {
this.buffer = buffer;
startWriteQueueListener();
}
private void startWriteQueueListener() {
active.set(true);
executorService.submit(() -> {
while (active.get()) {
var element = writeQueue.pollFirst();
if (element != null) {
while (!buffer.put(element) && active.get()) {
Thread.yield();
}
}
}
});
}
public void put(byte[] bytes) {
writeQueue.offerLast(bytes);
}
/**
* Shuts down the background thread
*/
@Override
public void close() {
active.set(false);
executorService.close();
}
}

View file

@ -0,0 +1,124 @@
package com.github.shautvast.exceptional;
import org.junit.jupiter.api.Test;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.*;
class CircularByteBufferTest {
@Test
void testPutAndGet() {
CircularByteBuffer buffer = new CircularByteBuffer(9);
byte[] bytes = "hello".getBytes(UTF_8);
boolean written = buffer.put(bytes);
assertTrue(written);
assertArrayEquals(bytes, buffer.get());
assertArrayEquals(new byte[]{0, 5, 104, 101, 108, 108, 111, 0, 0}, buffer.data.array());
}
@Test
void testPutFitsBeforeGet() {
CircularByteBuffer buffer = new CircularByteBuffer(14);
byte[] bytes = "hello".getBytes(UTF_8);
buffer.writeIndex = 7;
buffer.readIndex = 7;
buffer.put(bytes);
assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 5, 104, 101, 108, 108, 111}, buffer.data.array());
buffer.writeIndex = 0;
// end of setup, situation where writeIndex < readIndex
boolean written = buffer.put(bytes);
assertTrue(written);
assertArrayEquals(new byte[]{0, 5, 104, 101, 108, 108, 111, 0, 5, 104, 101, 108, 108, 111}, buffer.data.array());
}
@Test
void testPutFitsNotBeforeGet() {
CircularByteBuffer buffer = new CircularByteBuffer(13);
byte[] bytes = "hello".getBytes(UTF_8);
buffer.writeIndex = 6;
buffer.readIndex = 6;
buffer.put(bytes);
assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 5, 104, 101, 108, 108, 111}, buffer.data.array());
buffer.writeIndex = 0;
// end of setup, situation where writeIndex < readIndex
boolean written = buffer.put(bytes);
assertFalse(written);
assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 5, 104, 101, 108, 108, 111}, buffer.data.array());
}
@Test
void testWrapAroundPutLenAndOneCharBeforeWrap() {
CircularByteBuffer buffer = new CircularByteBuffer(9);
byte[] bytes = "hello".getBytes(UTF_8);
buffer.writeIndex = 6;
buffer.readIndex = 6;
boolean written = buffer.put(bytes);
assertTrue(written);
assertArrayEquals(new byte[]{101, 108, 108, 111, 0, 0, 0, 5, 104}, buffer.data.array());
assertArrayEquals(bytes, buffer.get());
}
@Test
void testWrapAroundPutLenBeforeWrap() {
CircularByteBuffer buffer = new CircularByteBuffer(9);
byte[] bytes = "hello".getBytes(UTF_8);
buffer.writeIndex = 7;
buffer.readIndex = 7;
boolean written = buffer.put(bytes);
assertTrue(written);
assertArrayEquals(new byte[]{104, 101, 108, 108, 111, 0, 0, 0, 5}, buffer.data.array());
assertArrayEquals(bytes, buffer.get());
}
@Test
void testWrapAroundPutLenSplitBeforeWrap() {
CircularByteBuffer buffer = new CircularByteBuffer(9);
byte[] bytes = "hello".getBytes(UTF_8);
buffer.writeIndex = 8;
buffer.readIndex = 8;
boolean written = buffer.put(bytes);
assertTrue(written);
assertArrayEquals(new byte[]{5, 104, 101, 108, 108, 111, 0, 0, 0}, buffer.data.array());
assertArrayEquals(bytes, buffer.get());
}
@Test
void testNoFreeSpace() {
CircularByteBuffer buffer = new CircularByteBuffer(9);
byte[] bytes = "hello".getBytes(UTF_8);
boolean written1 = buffer.put(bytes);
assertTrue(written1);
boolean written2 = buffer.put(bytes);
assertFalse(written2); // no space left
}
@Test
void testFreeSpaceReclaimed() {
CircularByteBuffer buffer = new CircularByteBuffer(9);
assertEquals(0, buffer.readIndex);
assertEquals(0, buffer.writeIndex);
byte[] bytes = "hello".getBytes(UTF_8);
boolean written1 = buffer.put(bytes);
assertTrue(written1);
assertEquals(0, buffer.readIndex);
assertEquals(7, buffer.writeIndex);
assertArrayEquals(bytes, buffer.get());
assertEquals(7, buffer.readIndex);
assertEquals(7, buffer.writeIndex);
boolean written2 = buffer.put(bytes);
assertTrue(written2); // the read has freed space
assertEquals(7, buffer.readIndex);
assertEquals(5, buffer.writeIndex);
assertArrayEquals(bytes, buffer.get());
assertEquals(5, buffer.readIndex);
assertEquals(5, buffer.writeIndex);
}
}

View file

@ -0,0 +1,19 @@
package com.github.shautvast.exceptional;
import org.junit.jupiter.api.Test;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
class MPSCBufferWriterTest {
@Test
void test() {
CircularByteBuffer buffer = new CircularByteBuffer(9);
try (MPSCBufferWriter writer = new MPSCBufferWriter(buffer)) {
byte[] bytes = "cow".getBytes(UTF_8);
writer.put(bytes);
writer.put(bytes);
}
}
}