From d1883f550fe2c40f3a20c76f01acc56aad509e0e Mon Sep 17 00:00:00 2001 From: Shautvast Date: Sun, 30 Jun 2024 13:58:28 +0200 Subject: [PATCH] put the indices at the end, less calculations needed for read and write --- .../exceptional/CircularByteBuffer.java | 112 +++++++----------- .../exceptional/CircularByteBufferTest.java | 16 +-- 2 files changed, 51 insertions(+), 77 deletions(-) diff --git a/lib/src/main/java/com/github/shautvast/exceptional/CircularByteBuffer.java b/lib/src/main/java/com/github/shautvast/exceptional/CircularByteBuffer.java index 1cd4edd..02c56a6 100644 --- a/lib/src/main/java/com/github/shautvast/exceptional/CircularByteBuffer.java +++ b/lib/src/main/java/com/github/shautvast/exceptional/CircularByteBuffer.java @@ -8,7 +8,7 @@ import java.util.stream.IntStream; /** * Circular buffer for variable sized byte arrays. The indices for read and write * are also stored in the bytebuffer, making changes visible to any non-java process that is reading. - * + *

* Written for a scenario with multiple concurrent writers, and a single reader in a non-java process * This class itself is Not Threadsafe! It relies on MPSCBufferWriter for multithreaded writes. This queues * byte arrays waiting to be stored in the circular buffer. MPSCBufferWriter starts the only @@ -20,15 +20,13 @@ import java.util.stream.IntStream; * for reader/writer index deal with the offset value, so that the index (as method local variable) does not * include it (ie starting at 0). This simplifies the calculations that include these indices. Same goes for the * capacity. - * */ -//TODO put READ WRITE indices at the end @SuppressWarnings("StringTemplateMigration") public class CircularByteBuffer { - public static final int READ_POS = 0; - public static final int WRITE_POS = 4; - public static final int PAYLOADSTART_POS = 8; + private int READ_POS; + private int WRITE_POS; + private int capacity; final ByteBuffer data; /** @@ -38,9 +36,8 @@ public class CircularByteBuffer { * @param capacity the capacity of the CircularByteBuffer */ public CircularByteBuffer(int capacity) { - data = ByteBuffer.allocate(capacity + PAYLOADSTART_POS); // 8 extra for the read and write index - data.putInt(READ_POS, PAYLOADSTART_POS); - data.putInt(WRITE_POS, PAYLOADSTART_POS); + this.data = ByteBuffer.allocate(capacity + 8); // 8 extra for the read and write index + initIndices(); } /** @@ -52,6 +49,17 @@ public class CircularByteBuffer { throw new IllegalArgumentException("Max memory size is 65527"); } this.data = memory.asByteBuffer(); + initIndices(); + } + + private void initIndices() { + this.capacity = this.data.capacity() - 8; + READ_POS = this.capacity; // write values after logical capacity position + WRITE_POS = this.capacity + 4; + + this.data.putInt(READ_POS, 0); + this.data.putInt(WRITE_POS, 0); + } public boolean put(byte[] bytes) { @@ -62,57 +70,55 @@ public class CircularByteBuffer { int writeIndex = getWriteIndex(); try { if (writeIndex >= readIndex) { - remaining = capacity() - writeIndex + readIndex; + remaining = capacity - writeIndex + readIndex; } else { remaining = readIndex - writeIndex; } if (remaining < len + 2) { return false; } else { - int remainingUntilEnd = capacity() - writeIndex; + int remainingUntilEnd = capacity - writeIndex; if (remainingUntilEnd < len + 2) { if (remainingUntilEnd > 1) { // we can write the length - putShort(writeIndex, (short) len); + this.data.putShort(writeIndex, (short) len); writeIndex += 2; remainingUntilEnd -= 2; if (remainingUntilEnd > 0) { - put(writeIndex, bytes, 0, remainingUntilEnd); + this.data.put(writeIndex, bytes, 0, remainingUntilEnd); } writeIndex = 0; - put(writeIndex, bytes, remainingUntilEnd, len); + this.data.put(writeIndex, bytes, remainingUntilEnd, len - remainingUntilEnd); writeIndex += len - remainingUntilEnd; } else { // we can write only one byte of the length - put(writeIndex, (byte) (len >> 8)); + this.data.put(writeIndex, (byte) (len >> 8)); writeIndex = 0; - put(writeIndex, (byte) (len & 0xff)); + this.data.put(writeIndex, (byte) (len & 0xff)); writeIndex += 1; - put(writeIndex, bytes); + this.data.put(writeIndex, bytes); writeIndex += len; } } else { - putShort(writeIndex, (short) len); + this.data.putShort(writeIndex, (short) len); writeIndex += 2; - put(writeIndex, bytes); + this.data.put(writeIndex, bytes); writeIndex += len; - if (writeIndex == capacity()) { + if (writeIndex == this.capacity) { writeIndex = 0; } } return true; } + } catch (Exception e) { + throw new RuntimeException(e); } finally { setWriteIndex(writeIndex); } } - private int capacity() { - return data.capacity() - PAYLOADSTART_POS; - } - /** * The reader side is provided, for reference and testability only. * In practice, the reader is implemented outside of java @@ -120,32 +126,32 @@ public class CircularByteBuffer { public byte[] get() { int readIndex = getReadIndex(); try { - int remainingUntilEnd = capacity() - readIndex; + int remainingUntilEnd = capacity - readIndex; int len; if (remainingUntilEnd == 1) { - byte high = get(readIndex); + byte high = this.data.get(readIndex); readIndex = 0; - byte low = get(readIndex); + byte low = this.data.get(readIndex); readIndex += 1; len = high << 8 | low; remainingUntilEnd = len; } else if (remainingUntilEnd == 2) { - len = getShort(readIndex); + len = this.data.getShort(readIndex); readIndex = 0; remainingUntilEnd = 0; } else { - len = getShort(readIndex); + len = this.data.getShort(readIndex); readIndex += 2; remainingUntilEnd -= 2; } byte[] result = new byte[len]; if (len <= remainingUntilEnd) { - get(readIndex, result); + this.data.get(readIndex, result); readIndex += len; } else { - get(readIndex, result, 0, remainingUntilEnd); + this.data.get(readIndex, result, 0, remainingUntilEnd); readIndex = 0; - get(readIndex, result, remainingUntilEnd, len - remainingUntilEnd); + this.data.get(readIndex, result, remainingUntilEnd, len - remainingUntilEnd); readIndex += len - remainingUntilEnd; } return result; @@ -154,52 +160,20 @@ public class CircularByteBuffer { } } - private void get(int readIndex, byte[] result, int offset, int len) { - data.get(readIndex + PAYLOADSTART_POS, result, offset, len); - } - - private void get(int readIndex, byte[] result) { - data.get(readIndex + PAYLOADSTART_POS, result); - } - - private short getShort(int readIndex) { - return data.getShort(readIndex + PAYLOADSTART_POS); - } - - private byte get(int readIndex) { - return data.get(readIndex + PAYLOADSTART_POS); - } - int getWriteIndex() { - return this.data.getInt(WRITE_POS) - PAYLOADSTART_POS; + return this.data.getInt(WRITE_POS); } void setWriteIndex(int writeIndex) { - this.data.putInt(WRITE_POS, writeIndex + PAYLOADSTART_POS); + this.data.putInt(WRITE_POS, writeIndex); } int getReadIndex() { - return this.data.getInt(READ_POS) - PAYLOADSTART_POS; + return this.data.getInt(READ_POS); } void setReadIndex(int readIndex) { - this.data.putInt(READ_POS, readIndex + PAYLOADSTART_POS); - } - - void putShort(int index, short value) { - this.data.putShort(index + PAYLOADSTART_POS, value); - } - - void put(int index, byte value) { - this.data.put(index + PAYLOADSTART_POS, value); - } - - void put(int index, byte[] value) { - this.data.put(index + PAYLOADSTART_POS, value); - } - - private void put(int writeIndex, byte[] bytes, int offset, int len) { - data.put(writeIndex + PAYLOADSTART_POS, bytes, offset, len - offset); + this.data.putInt(READ_POS, readIndex); } @Override @@ -208,7 +182,7 @@ public class CircularByteBuffer { ", w=" + this.data.getInt(WRITE_POS) + ", data=" + - IntStream.range(READ_POS, this.data.array().length) + IntStream.range(0, capacity) .map(x -> this.data.array()[x]) .mapToObj(Integer::toString) .collect(Collectors.joining(",", "[", "]")) + diff --git a/lib/src/test/java/com/github/shautvast/exceptional/CircularByteBufferTest.java b/lib/src/test/java/com/github/shautvast/exceptional/CircularByteBufferTest.java index b5ae3cb..19a024e 100644 --- a/lib/src/test/java/com/github/shautvast/exceptional/CircularByteBufferTest.java +++ b/lib/src/test/java/com/github/shautvast/exceptional/CircularByteBufferTest.java @@ -14,7 +14,7 @@ class CircularByteBufferTest { boolean written = buffer.put(bytes); assertTrue(written); assertArrayEquals(bytes, buffer.get()); - assertArrayEquals(new byte[]{0, 0, 0, 15, 0, 0, 0, 15, 0, 5, 104, 101, 108, 108, 111, 0, 0}, buffer.data.array()); + assertArrayEquals(new byte[]{0, 5, 104, 101, 108, 108, 111, 0, 0, 0, 0, 0, 7, 0, 0, 0, 7}, buffer.data.array()); } @@ -25,12 +25,12 @@ class CircularByteBufferTest { buffer.setWriteIndex(7); buffer.setReadIndex(7); buffer.put(bytes); - assertArrayEquals(new byte[]{0, 0, 0, 15, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 5, 104, 101, 108, 108, 111}, buffer.data.array()); + assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 5, 104, 101, 108, 108, 111, 0, 0, 0, 7, 0, 0, 0, 0}, buffer.data.array()); // buffer.setWriteIndex(0); // end of setup, situation where writeIndex < readIndex boolean written = buffer.put(bytes); assertTrue(written); - assertArrayEquals(new byte[]{0, 0, 0, 15, 0, 0, 0, 15, 0, 5, 104, 101, 108, 108, 111, 0, 5, 104, 101, 108, 108, 111}, buffer.data.array()); + assertArrayEquals(new byte[]{0, 5, 104, 101, 108, 108, 111, 0, 5, 104, 101, 108, 108, 111, 0, 0, 0, 7, 0, 0, 0, 7}, buffer.data.array()); assertEquals(7, buffer.getReadIndex()); assertEquals(7, buffer.getWriteIndex()); } @@ -42,12 +42,12 @@ class CircularByteBufferTest { buffer.setWriteIndex(6); buffer.setReadIndex(6); buffer.put(bytes); - assertArrayEquals(new byte[]{0, 0, 0, 14, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 5, 104, 101, 108, 108, 111}, buffer.data.array()); + assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 5, 104, 101, 108, 108, 111, 0, 0, 0, 6, 0, 0, 0, 0}, buffer.data.array()); // end of setup, situation where writeIndex < readIndex boolean written = buffer.put(bytes); assertFalse(written); - assertArrayEquals(new byte[]{0, 0, 0, 14, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 5, 104, 101, 108, 108, 111}, buffer.data.array()); + assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 5, 104, 101, 108, 108, 111, 0, 0, 0, 6, 0, 0, 0, 0}, buffer.data.array()); } @Test @@ -58,7 +58,7 @@ class CircularByteBufferTest { buffer.setReadIndex(6); boolean written = buffer.put(bytes); assertTrue(written); - assertArrayEquals(new byte[]{0, 0, 0, 14, 0, 0, 0, 12, 101, 108, 108, 111, 0, 0, 0, 5, 104}, buffer.data.array()); + assertArrayEquals(new byte[]{101, 108, 108, 111, 0, 0, 0, 5, 104, 0, 0, 0, 6, 0, 0, 0, 4}, buffer.data.array()); assertArrayEquals(bytes, buffer.get()); } @@ -70,7 +70,7 @@ class CircularByteBufferTest { buffer.setReadIndex(7); boolean written = buffer.put(bytes); assertTrue(written); - assertArrayEquals(new byte[]{0, 0, 0, 15, 0, 0, 0, 13, 104, 101, 108, 108, 111, 0, 0, 0, 5}, buffer.data.array()); + assertArrayEquals(new byte[]{104, 101, 108, 108, 111, 0, 0, 0, 5, 0, 0, 0, 7, 0, 0, 0, 5}, buffer.data.array()); assertArrayEquals(bytes, buffer.get()); } @@ -82,7 +82,7 @@ class CircularByteBufferTest { buffer.setReadIndex(8); boolean written = buffer.put(bytes); assertTrue(written); - assertArrayEquals(new byte[]{0, 0, 0, 16, 0, 0, 0, 14, 5, 104, 101, 108, 108, 111, 0, 0, 0}, buffer.data.array()); + assertArrayEquals(new byte[]{5, 104, 101, 108, 108, 111, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 6}, buffer.data.array()); assertArrayEquals(bytes, buffer.get()); }