added code for chapter 8 and 10

This commit is contained in:
Sander Hautvast 2019-09-23 12:38:07 +02:00
parent 8267d31d31
commit 645d004454
8 changed files with 262 additions and 0 deletions

View file

@ -69,3 +69,14 @@ This code shows the many ways you historically have to make the thread stop for
- [`Shutdown`](src/chapter7/Shutdown.java) and [`ShutdownNow`](src/chapter7/ShutdownNow.java) Show the difference between both methods. There are still a lot of tasks in queue when shutdown runs, and they will still be executed. After shutdownNow() there are less dots indicating earlier stop. The currently running task is being interrupted while sleeping.
- [`Finalizing`](src/chapter7/Finalizing.java) The single Finalizer thread is busy finalizing, only until the JVM exits. If you have resources to be closed outside the process, you have no guarantee that that will actually happen. @Deprecated
- [`OnShutdown`](src/chapter7/OnShutdown.java) Shutdown hooks are still a valid way to do things before the JVM exits.
## Chapter8 Applying Thread Pools
- [`Submissions`](src/chapter8/Submissions.java) As long as you submit _Runnables_ to a single workerthread, they end up in the queue and are run in order. But adding _Callables_ and hence _Futures_ can lead to deadlocks if tasks are interdependent. The outcome of the outer task is dependent in the inner task, but the inner cannot run as long as the outer is running. So the intended value is never printed.
- [`ThreadLocals`](src/chapter8/ThreadLocals.java) I'm open to suggestions here, because I think when an uncaught exception occurs in a task, the thread will be replaced.The code in java.util.concurrent.ThreadPoolExecutor (lines 1142-1160 and 994-1020) does indeed suggest that. But the string _'Value is not set'_ is never printed, implying the worker thread is never actually replaced. Beware though that this may always happen.
- [`BlockingQueue`](src/chapter8/BlockingQueues.java) juc.BlockingQueue can be used to pass objects to other threads. The take() method blocks until an object becomes available.
- [`SynchronousQueues`](src/chapter8/SynchronousQueues.java) A juc.SynchronousQueue offers a way to hand over a single object between threads. This is an optimization in that it uses no queue. The receiving thread must always call take() before the value can be handed over. Therefore this code raises the IllegalStateException 'Queue full'
- [`ParallelStreams`](src/chapter8/ParallelStreams.java) Shows multithreaded task execution using parallel streams. A funny thing is that the main thread is also used as a worker, which makes good sense when you think of it.
- [`CustomThreadPoolExecutor`](src/chapter8/CustomThreadPoolExecutor.java) Executors can be extended using provided hooks. This code (inspired by listing 8.9) shows how to capture task execution times.
## chapter 10 Avoiding Liveness Hazards
- [`Bank`](src/chapter10/Bank.java) Proves the point made in listing 10.2 concerning deadlock due to dynamic lock-reordering. The from account and to account will very likely be the same (but reversed) in 2 different threads. Locking on the accounts must always happen in the same order to avoid deadlock. The cure for this is in the book in listing 10.3.

80
src/chapter10/Bank.java Normal file
View file

@ -0,0 +1,80 @@
package chapter10;
import java.math.BigDecimal;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
public class Bank {
public static final int N_ACCOUNTS = 10;
private static final List<Account> accounts = new CopyOnWriteArrayList<>();
public static void main(String[] args) {
createAccounts();
ExecutorService threadPool = Executors.newFixedThreadPool(2);
//do transfers in 2 threads
threadPool.submit(() -> doMoneyTransfers());
threadPool.submit(() -> doMoneyTransfers());
}
private static void doMoneyTransfers() {
Random random = new Random();
while (true) {
Account from = accounts.get(random.nextInt(N_ACCOUNTS));
Account to = accounts.get(random.nextInt(N_ACCOUNTS));
if (from != to) {
transferMoney(from, to, new BigDecimal(random.nextInt(100)));
}
}
}
private static void createAccounts() {
Random random = new Random();
IntStream.rangeClosed(0, N_ACCOUNTS).forEach(i -> {
accounts.add(new Account(String.format("#%s", i), new BigDecimal(random.nextInt(2000))));
});
}
private static void transferMoney(Account from, Account to, BigDecimal amount) {
System.out.printf("transfer Eur %s from %s to %s%n", amount, from, to);
synchronized (from) {
synchronized (to) {
from.debit(amount);
to.credit(amount);
System.out.printf("Eur %s transferred%n", amount);
}
}
}
static class Account {
private String accountNumber;
private BigDecimal amount;
public Account(String number, BigDecimal amount) {
this.accountNumber = number;
this.amount = amount;
}
public void debit(BigDecimal debitAmount) {
this.amount.subtract(debitAmount);
}
public void credit(BigDecimal creditAmount) {
this.amount.add(creditAmount);
}
@Override
public String toString() {
return this.accountNumber;
}
}
}

View file

@ -0,0 +1,27 @@
package chapter8;
import java.util.concurrent.*;
public class BlockingQueues {
public static void main(String[] args) {
BlockingQueue<String> synchronousQueue = new ArrayBlockingQueue<>(10);
ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);
executor.submit(() -> {
System.out.println("waiting for something...");
try {
System.out.println(synchronousQueue.take());
System.out.println(synchronousQueue.take());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
executor.schedule(() -> {
synchronousQueue.add("here it is");
synchronousQueue.add("here it is again");
}, 2, TimeUnit.SECONDS);
executor.shutdown();
}
}

View file

@ -0,0 +1,55 @@
package chapter8;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
public class CustomThreadPoolExecutor {
private static final List<Long> executionTimes = new CopyOnWriteArrayList<>();
public static void main(String[] args) throws InterruptedException {
System.out.println("start handing tasks");
ThreadPoolExecutor executor = newTimingExecutor();
Random random = new Random();
IntStream.rangeClosed(0, 1).forEach(__ ->
executor.submit(() -> {
try {
int time = random.nextInt(1000);
System.out.printf("Task working for %d milliseconds%n", time);
TimeUnit.MILLISECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
TimeUnit.SECONDS.sleep(2);
System.out.printf("There were %d tasks with the following durations (in milliseconds): %s", executionTimes.size(), executionTimes);
executor.shutdown();
}
private static ThreadPoolExecutor newTimingExecutor() {
return new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>()) {
private final ThreadLocal<Long> starttime = new ThreadLocal<>();
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
starttime.set(System.currentTimeMillis());
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
long endtime = System.currentTimeMillis();
long executiontime = endtime - starttime.get();
executionTimes.add(executiontime);
}
};
}
}

View file

@ -0,0 +1,14 @@
package chapter8;
import java.util.stream.IntStream;
public class ParallelStreams {
public static void main(String[] args) {
IntStream.range(0, 8)
.parallel()
.forEach((i) -> {
System.out.printf("thread %s, value %d%n",Thread.currentThread().getName(),i);
});
}
}

View file

@ -0,0 +1,19 @@
package chapter8;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Submissions {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> future = executor.submit(() -> {
return executor.submit(() -> "value").get();
});
System.out.println(future.get());
}
}

View file

@ -0,0 +1,32 @@
package chapter8;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
public class SynchronousQueues {
public static void main(String[] args) {
SynchronousQueue<String> synchronousQueue = new SynchronousQueue<>();
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
synchronousQueue.add("here is your value");
Thread thread = new Thread(() -> {
System.out.println("waiting for something...");
try {
System.out.println(synchronousQueue.take());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
thread.setDaemon(true);
thread.start();
executor.schedule(() -> {
synchronousQueue.add("here it is");
}, 2, TimeUnit.SECONDS);
executor.shutdown();
}
}

View file

@ -0,0 +1,24 @@
package chapter8;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadLocals {
private static final ThreadLocal<String> HOLDER = new ThreadLocal<>();
public static void main(String[] args) throws InterruptedException {
ExecutorService threadPool = Executors.newSingleThreadExecutor();
threadPool.submit(() -> {
HOLDER.set("value");
throw new RuntimeException("foo");
});
threadPool.submit(() -> {
String value = HOLDER.get();
if (!"value".equals(value)) {
System.out.println("Value is not set");
}
});
}
}