put the indices at the end, less calculations needed for read and write

This commit is contained in:
Shautvast 2024-06-30 13:58:28 +02:00
parent d5823837e4
commit d1883f550f
2 changed files with 51 additions and 77 deletions

View file

@ -8,7 +8,7 @@ import java.util.stream.IntStream;
/** /**
* Circular buffer for variable sized byte arrays. The indices for read and write * Circular buffer for variable sized byte arrays. The indices for read and write
* are also stored in the bytebuffer, making changes visible to any non-java process that is reading. * are also stored in the bytebuffer, making changes visible to any non-java process that is reading.
* * <p>
* Written for a scenario with multiple concurrent writers, and a single reader in a non-java process * Written for a scenario with multiple concurrent writers, and a single reader in a non-java process
* This class itself is Not Threadsafe! It relies on MPSCBufferWriter for multithreaded writes. This queues * This class itself is Not Threadsafe! It relies on MPSCBufferWriter for multithreaded writes. This queues
* byte arrays waiting to be stored in the circular buffer. MPSCBufferWriter starts the only * byte arrays waiting to be stored in the circular buffer. MPSCBufferWriter starts the only
@ -20,15 +20,13 @@ import java.util.stream.IntStream;
* for reader/writer index deal with the offset value, so that the index (as method local variable) does not * for reader/writer index deal with the offset value, so that the index (as method local variable) does not
* include it (ie starting at 0). This simplifies the calculations that include these indices. Same goes for the * include it (ie starting at 0). This simplifies the calculations that include these indices. Same goes for the
* capacity. * capacity.
*
*/ */
//TODO put READ WRITE indices at the end
@SuppressWarnings("StringTemplateMigration") @SuppressWarnings("StringTemplateMigration")
public class CircularByteBuffer { public class CircularByteBuffer {
public static final int READ_POS = 0; private int READ_POS;
public static final int WRITE_POS = 4; private int WRITE_POS;
public static final int PAYLOADSTART_POS = 8; private int capacity;
final ByteBuffer data; final ByteBuffer data;
/** /**
@ -38,9 +36,8 @@ public class CircularByteBuffer {
* @param capacity the capacity of the CircularByteBuffer * @param capacity the capacity of the CircularByteBuffer
*/ */
public CircularByteBuffer(int capacity) { public CircularByteBuffer(int capacity) {
data = ByteBuffer.allocate(capacity + PAYLOADSTART_POS); // 8 extra for the read and write index this.data = ByteBuffer.allocate(capacity + 8); // 8 extra for the read and write index
data.putInt(READ_POS, PAYLOADSTART_POS); initIndices();
data.putInt(WRITE_POS, PAYLOADSTART_POS);
} }
/** /**
@ -52,6 +49,17 @@ public class CircularByteBuffer {
throw new IllegalArgumentException("Max memory size is 65527"); throw new IllegalArgumentException("Max memory size is 65527");
} }
this.data = memory.asByteBuffer(); this.data = memory.asByteBuffer();
initIndices();
}
private void initIndices() {
this.capacity = this.data.capacity() - 8;
READ_POS = this.capacity; // write values after logical capacity position
WRITE_POS = this.capacity + 4;
this.data.putInt(READ_POS, 0);
this.data.putInt(WRITE_POS, 0);
} }
public boolean put(byte[] bytes) { public boolean put(byte[] bytes) {
@ -62,57 +70,55 @@ public class CircularByteBuffer {
int writeIndex = getWriteIndex(); int writeIndex = getWriteIndex();
try { try {
if (writeIndex >= readIndex) { if (writeIndex >= readIndex) {
remaining = capacity() - writeIndex + readIndex; remaining = capacity - writeIndex + readIndex;
} else { } else {
remaining = readIndex - writeIndex; remaining = readIndex - writeIndex;
} }
if (remaining < len + 2) { if (remaining < len + 2) {
return false; return false;
} else { } else {
int remainingUntilEnd = capacity() - writeIndex; int remainingUntilEnd = capacity - writeIndex;
if (remainingUntilEnd < len + 2) { if (remainingUntilEnd < len + 2) {
if (remainingUntilEnd > 1) { if (remainingUntilEnd > 1) {
// we can write the length // we can write the length
putShort(writeIndex, (short) len); this.data.putShort(writeIndex, (short) len);
writeIndex += 2; writeIndex += 2;
remainingUntilEnd -= 2; remainingUntilEnd -= 2;
if (remainingUntilEnd > 0) { if (remainingUntilEnd > 0) {
put(writeIndex, bytes, 0, remainingUntilEnd); this.data.put(writeIndex, bytes, 0, remainingUntilEnd);
} }
writeIndex = 0; writeIndex = 0;
put(writeIndex, bytes, remainingUntilEnd, len); this.data.put(writeIndex, bytes, remainingUntilEnd, len - remainingUntilEnd);
writeIndex += len - remainingUntilEnd; writeIndex += len - remainingUntilEnd;
} else { } else {
// we can write only one byte of the length // we can write only one byte of the length
put(writeIndex, (byte) (len >> 8)); this.data.put(writeIndex, (byte) (len >> 8));
writeIndex = 0; writeIndex = 0;
put(writeIndex, (byte) (len & 0xff)); this.data.put(writeIndex, (byte) (len & 0xff));
writeIndex += 1; writeIndex += 1;
put(writeIndex, bytes); this.data.put(writeIndex, bytes);
writeIndex += len; writeIndex += len;
} }
} else { } else {
putShort(writeIndex, (short) len); this.data.putShort(writeIndex, (short) len);
writeIndex += 2; writeIndex += 2;
put(writeIndex, bytes); this.data.put(writeIndex, bytes);
writeIndex += len; writeIndex += len;
if (writeIndex == capacity()) { if (writeIndex == this.capacity) {
writeIndex = 0; writeIndex = 0;
} }
} }
return true; return true;
} }
} catch (Exception e) {
throw new RuntimeException(e);
} finally { } finally {
setWriteIndex(writeIndex); setWriteIndex(writeIndex);
} }
} }
private int capacity() {
return data.capacity() - PAYLOADSTART_POS;
}
/** /**
* The reader side is provided, for reference and testability only. * The reader side is provided, for reference and testability only.
* In practice, the reader is implemented outside of java * In practice, the reader is implemented outside of java
@ -120,32 +126,32 @@ public class CircularByteBuffer {
public byte[] get() { public byte[] get() {
int readIndex = getReadIndex(); int readIndex = getReadIndex();
try { try {
int remainingUntilEnd = capacity() - readIndex; int remainingUntilEnd = capacity - readIndex;
int len; int len;
if (remainingUntilEnd == 1) { if (remainingUntilEnd == 1) {
byte high = get(readIndex); byte high = this.data.get(readIndex);
readIndex = 0; readIndex = 0;
byte low = get(readIndex); byte low = this.data.get(readIndex);
readIndex += 1; readIndex += 1;
len = high << 8 | low; len = high << 8 | low;
remainingUntilEnd = len; remainingUntilEnd = len;
} else if (remainingUntilEnd == 2) { } else if (remainingUntilEnd == 2) {
len = getShort(readIndex); len = this.data.getShort(readIndex);
readIndex = 0; readIndex = 0;
remainingUntilEnd = 0; remainingUntilEnd = 0;
} else { } else {
len = getShort(readIndex); len = this.data.getShort(readIndex);
readIndex += 2; readIndex += 2;
remainingUntilEnd -= 2; remainingUntilEnd -= 2;
} }
byte[] result = new byte[len]; byte[] result = new byte[len];
if (len <= remainingUntilEnd) { if (len <= remainingUntilEnd) {
get(readIndex, result); this.data.get(readIndex, result);
readIndex += len; readIndex += len;
} else { } else {
get(readIndex, result, 0, remainingUntilEnd); this.data.get(readIndex, result, 0, remainingUntilEnd);
readIndex = 0; readIndex = 0;
get(readIndex, result, remainingUntilEnd, len - remainingUntilEnd); this.data.get(readIndex, result, remainingUntilEnd, len - remainingUntilEnd);
readIndex += len - remainingUntilEnd; readIndex += len - remainingUntilEnd;
} }
return result; return result;
@ -154,52 +160,20 @@ public class CircularByteBuffer {
} }
} }
private void get(int readIndex, byte[] result, int offset, int len) {
data.get(readIndex + PAYLOADSTART_POS, result, offset, len);
}
private void get(int readIndex, byte[] result) {
data.get(readIndex + PAYLOADSTART_POS, result);
}
private short getShort(int readIndex) {
return data.getShort(readIndex + PAYLOADSTART_POS);
}
private byte get(int readIndex) {
return data.get(readIndex + PAYLOADSTART_POS);
}
int getWriteIndex() { int getWriteIndex() {
return this.data.getInt(WRITE_POS) - PAYLOADSTART_POS; return this.data.getInt(WRITE_POS);
} }
void setWriteIndex(int writeIndex) { void setWriteIndex(int writeIndex) {
this.data.putInt(WRITE_POS, writeIndex + PAYLOADSTART_POS); this.data.putInt(WRITE_POS, writeIndex);
} }
int getReadIndex() { int getReadIndex() {
return this.data.getInt(READ_POS) - PAYLOADSTART_POS; return this.data.getInt(READ_POS);
} }
void setReadIndex(int readIndex) { void setReadIndex(int readIndex) {
this.data.putInt(READ_POS, readIndex + PAYLOADSTART_POS); this.data.putInt(READ_POS, readIndex);
}
void putShort(int index, short value) {
this.data.putShort(index + PAYLOADSTART_POS, value);
}
void put(int index, byte value) {
this.data.put(index + PAYLOADSTART_POS, value);
}
void put(int index, byte[] value) {
this.data.put(index + PAYLOADSTART_POS, value);
}
private void put(int writeIndex, byte[] bytes, int offset, int len) {
data.put(writeIndex + PAYLOADSTART_POS, bytes, offset, len - offset);
} }
@Override @Override
@ -208,7 +182,7 @@ public class CircularByteBuffer {
", w=" + ", w=" +
this.data.getInt(WRITE_POS) + this.data.getInt(WRITE_POS) +
", data=" + ", data=" +
IntStream.range(READ_POS, this.data.array().length) IntStream.range(0, capacity)
.map(x -> this.data.array()[x]) .map(x -> this.data.array()[x])
.mapToObj(Integer::toString) .mapToObj(Integer::toString)
.collect(Collectors.joining(",", "[", "]")) + .collect(Collectors.joining(",", "[", "]")) +

View file

@ -14,7 +14,7 @@ class CircularByteBufferTest {
boolean written = buffer.put(bytes); boolean written = buffer.put(bytes);
assertTrue(written); assertTrue(written);
assertArrayEquals(bytes, buffer.get()); assertArrayEquals(bytes, buffer.get());
assertArrayEquals(new byte[]{0, 0, 0, 15, 0, 0, 0, 15, 0, 5, 104, 101, 108, 108, 111, 0, 0}, buffer.data.array()); assertArrayEquals(new byte[]{0, 5, 104, 101, 108, 108, 111, 0, 0, 0, 0, 0, 7, 0, 0, 0, 7}, buffer.data.array());
} }
@ -25,12 +25,12 @@ class CircularByteBufferTest {
buffer.setWriteIndex(7); buffer.setWriteIndex(7);
buffer.setReadIndex(7); buffer.setReadIndex(7);
buffer.put(bytes); buffer.put(bytes);
assertArrayEquals(new byte[]{0, 0, 0, 15, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 0, 5, 104, 101, 108, 108, 111}, buffer.data.array()); assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 0, 5, 104, 101, 108, 108, 111, 0, 0, 0, 7, 0, 0, 0, 0}, buffer.data.array());
// buffer.setWriteIndex(0); // buffer.setWriteIndex(0);
// end of setup, situation where writeIndex < readIndex // end of setup, situation where writeIndex < readIndex
boolean written = buffer.put(bytes); boolean written = buffer.put(bytes);
assertTrue(written); assertTrue(written);
assertArrayEquals(new byte[]{0, 0, 0, 15, 0, 0, 0, 15, 0, 5, 104, 101, 108, 108, 111, 0, 5, 104, 101, 108, 108, 111}, buffer.data.array()); assertArrayEquals(new byte[]{0, 5, 104, 101, 108, 108, 111, 0, 5, 104, 101, 108, 108, 111, 0, 0, 0, 7, 0, 0, 0, 7}, buffer.data.array());
assertEquals(7, buffer.getReadIndex()); assertEquals(7, buffer.getReadIndex());
assertEquals(7, buffer.getWriteIndex()); assertEquals(7, buffer.getWriteIndex());
} }
@ -42,12 +42,12 @@ class CircularByteBufferTest {
buffer.setWriteIndex(6); buffer.setWriteIndex(6);
buffer.setReadIndex(6); buffer.setReadIndex(6);
buffer.put(bytes); buffer.put(bytes);
assertArrayEquals(new byte[]{0, 0, 0, 14, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 5, 104, 101, 108, 108, 111}, buffer.data.array()); assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 5, 104, 101, 108, 108, 111, 0, 0, 0, 6, 0, 0, 0, 0}, buffer.data.array());
// end of setup, situation where writeIndex < readIndex // end of setup, situation where writeIndex < readIndex
boolean written = buffer.put(bytes); boolean written = buffer.put(bytes);
assertFalse(written); assertFalse(written);
assertArrayEquals(new byte[]{0, 0, 0, 14, 0, 0, 0, 8, 0, 0, 0, 0, 0, 0, 0, 5, 104, 101, 108, 108, 111}, buffer.data.array()); assertArrayEquals(new byte[]{0, 0, 0, 0, 0, 0, 0, 5, 104, 101, 108, 108, 111, 0, 0, 0, 6, 0, 0, 0, 0}, buffer.data.array());
} }
@Test @Test
@ -58,7 +58,7 @@ class CircularByteBufferTest {
buffer.setReadIndex(6); buffer.setReadIndex(6);
boolean written = buffer.put(bytes); boolean written = buffer.put(bytes);
assertTrue(written); assertTrue(written);
assertArrayEquals(new byte[]{0, 0, 0, 14, 0, 0, 0, 12, 101, 108, 108, 111, 0, 0, 0, 5, 104}, buffer.data.array()); assertArrayEquals(new byte[]{101, 108, 108, 111, 0, 0, 0, 5, 104, 0, 0, 0, 6, 0, 0, 0, 4}, buffer.data.array());
assertArrayEquals(bytes, buffer.get()); assertArrayEquals(bytes, buffer.get());
} }
@ -70,7 +70,7 @@ class CircularByteBufferTest {
buffer.setReadIndex(7); buffer.setReadIndex(7);
boolean written = buffer.put(bytes); boolean written = buffer.put(bytes);
assertTrue(written); assertTrue(written);
assertArrayEquals(new byte[]{0, 0, 0, 15, 0, 0, 0, 13, 104, 101, 108, 108, 111, 0, 0, 0, 5}, buffer.data.array()); assertArrayEquals(new byte[]{104, 101, 108, 108, 111, 0, 0, 0, 5, 0, 0, 0, 7, 0, 0, 0, 5}, buffer.data.array());
assertArrayEquals(bytes, buffer.get()); assertArrayEquals(bytes, buffer.get());
} }
@ -82,7 +82,7 @@ class CircularByteBufferTest {
buffer.setReadIndex(8); buffer.setReadIndex(8);
boolean written = buffer.put(bytes); boolean written = buffer.put(bytes);
assertTrue(written); assertTrue(written);
assertArrayEquals(new byte[]{0, 0, 0, 16, 0, 0, 0, 14, 5, 104, 101, 108, 108, 111, 0, 0, 0}, buffer.data.array()); assertArrayEquals(new byte[]{5, 104, 101, 108, 108, 111, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 6}, buffer.data.array());
assertArrayEquals(bytes, buffer.get()); assertArrayEquals(bytes, buffer.get());
} }