Fix CircularBuffer and add unit tests (#3411)
This commit is contained in:
parent
e96f567bfc
commit
471d2c0b5d
@ -1,132 +1,61 @@
|
||||
package com.thealgorithms.datastructures.buffers;
|
||||
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class CircularBuffer {
|
||||
public class CircularBuffer<Item> {
|
||||
private final Item[] buffer;
|
||||
private final CircularPointer putPointer;
|
||||
private final CircularPointer getPointer;
|
||||
private final AtomicInteger size = new AtomicInteger(0);
|
||||
|
||||
private char[] _buffer;
|
||||
public final int _buffer_size;
|
||||
private int _write_index = 0;
|
||||
private int _read_index = 0;
|
||||
private AtomicInteger _readable_data = new AtomicInteger(0);
|
||||
|
||||
public CircularBuffer(int buffer_size) {
|
||||
if (!IsPowerOfTwo(buffer_size)) {
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
this._buffer_size = buffer_size;
|
||||
_buffer = new char[buffer_size];
|
||||
public CircularBuffer(int size) {
|
||||
//noinspection unchecked
|
||||
this.buffer = (Item[]) new Object[size];
|
||||
this.putPointer = new CircularPointer(0, size);
|
||||
this.getPointer = new CircularPointer(0, size);
|
||||
}
|
||||
|
||||
private boolean IsPowerOfTwo(int i) {
|
||||
return (i & (i - 1)) == 0;
|
||||
public boolean isEmpty() {
|
||||
return size.get() == 0;
|
||||
}
|
||||
|
||||
private int getTrueIndex(int i) {
|
||||
return i % _buffer_size;
|
||||
public boolean isFull() {
|
||||
return size.get() == buffer.length;
|
||||
}
|
||||
|
||||
public Character readOutChar() {
|
||||
Character result = null;
|
||||
public Item get() {
|
||||
if (isEmpty())
|
||||
return null;
|
||||
|
||||
// if we have data to read
|
||||
if (_readable_data.get() > 0) {
|
||||
result = Character.valueOf(_buffer[getTrueIndex(_read_index)]);
|
||||
_readable_data.decrementAndGet();
|
||||
_read_index++;
|
||||
Item item = buffer[getPointer.getAndIncrement()];
|
||||
size.decrementAndGet();
|
||||
return item;
|
||||
}
|
||||
|
||||
return result;
|
||||
public boolean put(Item item) {
|
||||
if (isFull())
|
||||
return false;
|
||||
|
||||
buffer[putPointer.getAndIncrement()] = item;
|
||||
size.incrementAndGet();
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean writeToCharBuffer(char c) {
|
||||
boolean result = false;
|
||||
private static class CircularPointer {
|
||||
private int pointer;
|
||||
private final int max;
|
||||
|
||||
// if we can write to the buffer
|
||||
if (_readable_data.get() < _buffer_size) {
|
||||
// write to buffer
|
||||
_buffer[getTrueIndex(_write_index)] = c;
|
||||
_readable_data.incrementAndGet();
|
||||
_write_index++;
|
||||
result = true;
|
||||
public CircularPointer(int pointer, int max) {
|
||||
this.pointer = pointer;
|
||||
this.max = max;
|
||||
}
|
||||
|
||||
return result;
|
||||
public int getAndIncrement() {
|
||||
if (pointer == max)
|
||||
pointer = 0;
|
||||
int tmp = pointer;
|
||||
pointer++;
|
||||
return tmp;
|
||||
}
|
||||
|
||||
private static class TestWriteWorker implements Runnable {
|
||||
|
||||
String _alphabet = "abcdefghijklmnopqrstuvwxyz0123456789";
|
||||
Random _random = new Random();
|
||||
CircularBuffer _buffer;
|
||||
|
||||
public TestWriteWorker(CircularBuffer cb) {
|
||||
this._buffer = cb;
|
||||
}
|
||||
|
||||
private char getRandomChar() {
|
||||
return _alphabet.charAt(_random.nextInt(_alphabet.length()));
|
||||
}
|
||||
|
||||
public void run() {
|
||||
while (!Thread.interrupted()) {
|
||||
if (!_buffer.writeToCharBuffer(getRandomChar())) {
|
||||
Thread.yield();
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
} catch (InterruptedException e) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestReadWorker implements Runnable {
|
||||
|
||||
CircularBuffer _buffer;
|
||||
|
||||
public TestReadWorker(CircularBuffer cb) {
|
||||
this._buffer = cb;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
System.out.println("Printing Buffer:");
|
||||
while (!Thread.interrupted()) {
|
||||
Character c = _buffer.readOutChar();
|
||||
if (c != null) {
|
||||
System.out.print(c.charValue());
|
||||
} else {
|
||||
Thread.yield();
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
} catch (InterruptedException e) {
|
||||
System.out.println();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
int buffer_size = 1024;
|
||||
// create circular buffer
|
||||
CircularBuffer cb = new CircularBuffer(buffer_size);
|
||||
|
||||
// create threads that read and write the buffer.
|
||||
Thread write_thread = new Thread(new TestWriteWorker(cb));
|
||||
Thread read_thread = new Thread(new TestReadWorker(cb));
|
||||
read_thread.start();
|
||||
write_thread.start();
|
||||
|
||||
// wait some amount of time
|
||||
Thread.sleep(10000);
|
||||
|
||||
// interrupt threads and exit
|
||||
write_thread.interrupt();
|
||||
read_thread.interrupt();
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,127 @@
|
||||
package com.thealgorithms.datastructures.buffers;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.RepeatedTest;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicIntegerArray;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
class CircularBufferTest {
|
||||
private static final int BUFFER_SIZE = 10;
|
||||
private CircularBuffer<Integer> buffer;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
buffer = new CircularBuffer<>(BUFFER_SIZE);
|
||||
}
|
||||
|
||||
@Test
|
||||
void isEmpty() {
|
||||
assertTrue(buffer.isEmpty());
|
||||
buffer.put(generateInt());
|
||||
assertFalse(buffer.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
void isFull() {
|
||||
assertFalse(buffer.isFull());
|
||||
buffer.put(generateInt());
|
||||
assertFalse(buffer.isFull());
|
||||
|
||||
for (int i = 1; i < BUFFER_SIZE; i++)
|
||||
buffer.put(generateInt());
|
||||
assertTrue(buffer.isFull());
|
||||
}
|
||||
|
||||
@Test
|
||||
void get() {
|
||||
assertNull(buffer.get());
|
||||
for (int i = 0; i < 100; i++)
|
||||
buffer.put(i);
|
||||
for (int i = 0; i < BUFFER_SIZE; i++)
|
||||
assertEquals(i, buffer.get());
|
||||
assertNull(buffer.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
void put() {
|
||||
for (int i = 0; i < BUFFER_SIZE; i++)
|
||||
assertTrue(buffer.put(generateInt()));
|
||||
assertFalse(buffer.put(generateInt()));
|
||||
}
|
||||
|
||||
@RepeatedTest(1000)
|
||||
void concurrentTest() throws InterruptedException {
|
||||
final int numberOfThreadsForProducers = 3;
|
||||
final int numberOfThreadsForConsumers = 2;
|
||||
final int numberOfItems = 300;
|
||||
final CountDownLatch producerCountDownLatch = new CountDownLatch(numberOfItems);
|
||||
final CountDownLatch consumerCountDownLatch = new CountDownLatch(numberOfItems);
|
||||
final AtomicIntegerArray resultAtomicArray = new AtomicIntegerArray(numberOfItems);
|
||||
|
||||
// We are running 2 ExecutorService simultaneously 1 - producer, 2 - consumer
|
||||
// Run producer threads to populate buffer.
|
||||
ExecutorService putExecutors = Executors.newFixedThreadPool(numberOfThreadsForProducers);
|
||||
putExecutors.execute(() -> {
|
||||
while (producerCountDownLatch.getCount() > 0) {
|
||||
int count = (int) producerCountDownLatch.getCount();
|
||||
boolean put = buffer.put(count);
|
||||
while (!put) put = buffer.put(count);
|
||||
producerCountDownLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
// Run consumer threads to retrieve the data from buffer.
|
||||
ExecutorService getExecutors = Executors.newFixedThreadPool(numberOfThreadsForConsumers);
|
||||
getExecutors.execute(() -> {
|
||||
while (consumerCountDownLatch.getCount() > 0) {
|
||||
int count = (int) consumerCountDownLatch.getCount();
|
||||
Integer item = buffer.get();
|
||||
while (item == null) item = buffer.get();
|
||||
resultAtomicArray.set(count - 1, item);
|
||||
consumerCountDownLatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
producerCountDownLatch.await();
|
||||
consumerCountDownLatch.await();
|
||||
putExecutors.shutdown();
|
||||
getExecutors.shutdown();
|
||||
shutDownExecutorSafely(putExecutors);
|
||||
shutDownExecutorSafely(getExecutors);
|
||||
|
||||
List<Integer> resultArray = getSortedListFrom(resultAtomicArray);
|
||||
for (int i = 0; i < numberOfItems; i++) {
|
||||
int expectedItem = i + 1;
|
||||
assertEquals(expectedItem, resultArray.get(i));
|
||||
}
|
||||
}
|
||||
|
||||
private int generateInt() {
|
||||
return ThreadLocalRandom.current().nextInt(0, 100);
|
||||
}
|
||||
|
||||
private void shutDownExecutorSafely(ExecutorService executorService) {
|
||||
try {
|
||||
if (!executorService.awaitTermination(1_000, TimeUnit.MILLISECONDS))
|
||||
executorService.shutdownNow();
|
||||
} catch (InterruptedException e) {
|
||||
executorService.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
public List<Integer> getSortedListFrom(AtomicIntegerArray atomicArray) {
|
||||
int length = atomicArray.length();
|
||||
ArrayList<Integer> result = new ArrayList<>(length);
|
||||
for (int i = 0; i < length; i++)
|
||||
result.add(atomicArray.get(i));
|
||||
result.sort(Comparator.comparingInt(o -> o));
|
||||
return result;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user