阻塞式队列
生产者每1秒生产一个, 消费者每3秒消费一个
import java.util.concurrent.ArrayBlockingQueue;
/**
* 阻塞式队列
*/
public class Demo01 {
public static void main(String[] args) throws Exception {
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
new Thread(new Runnable() {
@Override
public void run() {
try {
for(int i=0;i<1000;i++){
Thread.sleep(1000);
queue.put("数据"+i);
System.out.println("生产者生产了数据:"+i);
}
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
for(int i=0;i<1000;i++){
Thread.sleep(3000);
String take = queue.take();
System.out.println("消费者消费了数据:"+take);
}
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}).start();
}
}
ArrayBlockingQueue 和 LinkedBlockingQueue
数组 链表
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* ArrayBlockingQueue
* LinkedBlockingQueue
*/
public class Demo02 {
public static void main(String[] args) throws InterruptedException {
//底层是数组,必须指定容量,且不可改变
BlockingQueue q1 = new ArrayBlockingQueue(3);
//底层是链表,可以不指定容量,但是默认容量是Integer.MAX_VALUE
BlockingQueue q2 = new LinkedBlockingQueue();
q1.put(1);
q1.take();
q2.put(2);
q2.take();
}
}
ConcurrentMap
线程安全且效率高的Map集合(通过读锁和写锁实现), HashTable也是线程安全的, 但是效率太低
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* ConcurrentMap
*/
public class Demo03 {
public static void main(String[] args) {
ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
map.put("110","apple");
String s = map.get("110");
System.out.println(s);
}
}
CountDownlatch
协调多个线程之间的执行顺序, 达成让一个线程等待其他线程达成一定条件后再执行的效果
import java.util.concurrent.CountDownLatch;
/**
* CountDownLatch
* 协调多个线程之间的执行顺序
* 达成让一个线程等待其他线程达成一定条件后再执行的效果
*/
public class Demo05 {
private static CountDownLatch cdl = new CountDownLatch(3);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
System.out.println("弟弟买米回来了..");
cdl.countDown();//将cdl中的数据减1 -- 2
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
System.out.println("姐姐买菜回来了..");
cdl.countDown();//将cdl中的数据减1 -- 1
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000);
System.out.println("爸爸买锅回来了..");
cdl.countDown();//将cdl中的数据减1 -- 0
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
//阻塞,直到cdl中的数据减为0
cdl.await();
System.out.println("妈妈开始做饭..");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
CyclicBarrier栅栏
import java.util.concurrent.CyclicBarrier;
/**
* CyclicBarrier栅栏
*/
public class Demo07 {
private static CyclicBarrier cb = new CyclicBarrier(3);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
System.out.println("亚瑟加载游戏完成。。");
cb.await();//当前到达栅栏的线程是否达到预定数量,如果没达到则阻塞,达到则所有线程一起放行
System.out.println("亚瑟开始游戏。。");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(3000);
System.out.println("安其拉加载游戏完成。。");
cb.await();//当前到达栅栏的线程是否达到预定数量,如果没达到则阻塞,达到则所有线程一起放行
System.out.println("安其拉开始游戏。。");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000);
System.out.println("妲己加载游戏完成。。");
cb.await();//当前到达栅栏的线程是否达到预定数量,如果没达到则阻塞,达到则所有线程一起放行
System.out.println("妲己开始游戏。。");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
Exchanger 交换机
import java.util.concurrent.Exchanger;
/**
* Exchanger
*/
public class Demo09 {
private static Exchanger<String> exchanger = new Exchanger<>();
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
System.out.println("老张到达交换点..");
String s = exchanger.exchange("天王盖地虎");
System.out.println("老张交换得到情报:"+s);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5000);
System.out.println("老李到达交换点..");
String s = exchanger.exchange("宝塔镇河妖");
System.out.println("老李交换得到情报:"+s);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
Semaphore:信号量
功能1:保护一段重要的代码一次不能超过n个线程访问
import java.util.concurrent.Semaphore;
/**
* Semaphore:信号量
* 功能1:保护一段重要的代码一次不能超过n个线程访问
*/
public class Demo10 {
private static Semaphore semaphore = new Semaphore(3);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();//获取一个信号量,如果获取到继续执行,没有获取到阻塞
System.out.println("游客1:开始参观人民大会堂..");
Thread.sleep(3000);
System.out.println("游客1:结束参观..");
semaphore.release();//释放一个信号量
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();//获取一个信号量,如果获取到继续执行,没有获取到阻塞
System.out.println("游客2:开始参观人民大会堂..");
Thread.sleep(5000);
System.out.println("游客2:结束参观..");
semaphore.release();//释放一个信号量
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();//获取一个信号量,如果获取到继续执行,没有获取到阻塞
System.out.println("游客3:开始参观人民大会堂..");
Thread.sleep(4000);
System.out.println("游客3:结束参观..");
semaphore.release();//释放一个信号量
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();//获取一个信号量,如果获取到继续执行,没有获取到阻塞
System.out.println("游客4:开始参观人民大会堂..");
Thread.sleep(5000);
System.out.println("游客4:结束参观..");
semaphore.release();//释放一个信号量
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();//获取一个信号量,如果获取到继续执行,没有获取到阻塞
System.out.println("游客5:开始参观人民大会堂..");
Thread.sleep(5000);
System.out.println("游客5:结束参观..");
semaphore.release();//释放一个信号量
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();//获取一个信号量,如果获取到继续执行,没有获取到阻塞
System.out.println("游客6:开始参观人民大会堂..");
Thread.sleep(5000);
System.out.println("游客6:结束参观..");
semaphore.release();//释放一个信号量
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
功能2:在两个线程之间发送信号。
import java.util.concurrent.Semaphore;
/**
* Semaphore 信号量
* 用法2:在两个线程之间发送信号。
*/
public class Demo11 {
private static Semaphore semaphore = new Semaphore(0);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("装卸师傅:装货。。");
Thread.sleep(5000);
System.out.println("装卸师傅:装好货,发信号。。");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("司机:在驾驶室等信儿~~");
semaphore.acquire();
System.out.println("司机:收到信号,发车,速度70迈~~~~");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
ThreadPool 线程池执行流程
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* ThreadPool 线程池执行流程
*/
public class Demo01 {
public static void main(String[] args) {
// 1.创建线程池
ThreadPoolExecutor pool = new ThreadPoolExecutor(
3, // 核心线程数
5,// 最大线程数
10,// 空闲线程存活时间
TimeUnit.SECONDS,// 时间单位
new ArrayBlockingQueue<>(5),// 任务排队用的阻塞式队列
new RejectedExecutionHandler() {// 拒绝服务助手
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("线程池已满,拒绝执行"+r);
}
});
// 2.执行任务
System.out.println(pool);
pool.submit(new Runnable() { @Override public void run() { while(true);}});
System.out.println(pool);
pool.submit(new Runnable() { @Override public void run() { while(true);}});
System.out.println(pool);
pool.submit(new Runnable() { @Override public void run() { while(true);}});
System.out.println(pool);
pool.submit(new Runnable() { @Override public void run() { while(true);}});
System.out.println(pool);
pool.submit(new Runnable() { @Override public void run() { while(true);}});
System.out.println(pool);
pool.submit(new Runnable() { @Override public void run() { while(true);}});
System.out.println(pool);
pool.submit(new Runnable() { @Override public void run() { while(true);}});
System.out.println(pool);
pool.submit(new Runnable() { @Override public void run() { while(true);}});
System.out.println(pool);
pool.submit(new Runnable() { @Override public void run() { while(true);}});
System.out.println(pool);
pool.submit(new Runnable() { @Override public void run() { while(true);}});
System.out.println(pool);
pool.submit(new Runnable() { @Override public void run() { while(true);}});
System.out.println(pool);
pool.submit(new Runnable() { @Override public void run() { while(true);}});
System.out.println(pool);
}
}
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* ThreadPool 线程池执行流程
*/
public class Demo02 {
private static int i = 10;
public static void main(String[] args) {
// 1.创建线程池
ThreadPoolExecutor pool = new ThreadPoolExecutor(
3, // 核心线程数
5,// 最大线程数
1,// 空闲线程存活时间
TimeUnit.SECONDS,// 时间单位
new ArrayBlockingQueue<>(5),// 任务排队用的阻塞式队列
new RejectedExecutionHandler() {// 拒绝服务助手
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("线程池已满,拒绝执行"+r);
}
});
// 2.执行任务
System.out.println(pool);
pool.submit(new Runnable() { @Override public void run() { while(i>=1){
System.out.println(1);
};}});
System.out.println(pool);
pool.submit(new Runnable() { @Override public void run() { while(i>=2){
System.out.println(2);
};}});
System.out.println(pool);
pool.submit(new Runnable() { @Override public void run() { while(i>=3){
System.out.println(3);
};}});
System.out.println(pool);
pool.submit(new Runnable() { @Override public void run() { while(i>=4){
System.out.println(4);
};}});
System.out.println(pool);
pool.submit(new Runnable() { @Override public void run() { while(i>=5){
System.out.println(5);
};}});
System.out.println(pool);
pool.submit(new Runnable() { @Override public void run() { while(i>=6){
System.out.println(6);
};}});
System.out.println(pool);
pool.submit(new Runnable() { @Override public void run() { while(i>=7){
System.out.println(7);
};}});
System.out.println(pool);
pool.submit(new Runnable() { @Override public void run() { while(i>=8){
System.out.println(8);
};}});
System.out.println(pool);
pool.submit(new Runnable() { @Override public void run() { while(i>=9){
System.out.println(9);
};}});
System.out.println(pool);
pool.submit(new Runnable() { @Override public void run() { while(i>=10){
System.out.println(10);
};}});
System.out.println(pool);
i--;
System.out.println(pool);
i--;
System.out.println(pool);
i--;
System.out.println(pool);
i--;
System.out.println(pool);
i--;
System.out.println(pool);
i--;
System.out.println(pool);
i--;
System.out.println(pool);
i--;
System.out.println(pool);
i--;
System.out.println(pool);
i--;
System.out.println(pool);
i--;
System.out.println(pool);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
i--;
System.out.println(pool);
}
}
ThreadPool 关闭线程池
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* ThreadPool 关闭线程池
*/
public class Demo03 {
public static void main(String[] args) {
// 1.创建线程池
ThreadPoolExecutor pool = new ThreadPoolExecutor(
3, // 核心线程数
5,// 最大线程数
5,// 空闲线程存活时间
TimeUnit.SECONDS,// 时间单位
new ArrayBlockingQueue<>(5),// 任务排队用的阻塞式队列
new RejectedExecutionHandler() {// 拒绝服务助手
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("线程池已满,拒绝执行"+r);
}
});
//2.执行任务
pool.submit(new Runnable() {
@Override
public void run() {
while(true){
}
}
});
//3.关闭任务
//--正常关闭线程池,不会立即关闭线程池,而是不再接收新任务,已经在池中的任务仍然会被执行,等到所有任务执行完毕后,线程池才真正关闭
//pool.shutdown();
//--立即关闭线程池,会立即关闭线程池,正在执行的任务可能会被打断,造成无法预估结果的意外情况
pool.shutdownNow();
}
}
ThreadPool 提交任务
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;
/**
* ThreadPool 提交任务
*/
public class Demo04 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1.创建线程池
ThreadPoolExecutor pool = new ThreadPoolExecutor(
3, // 核心线程数
5,// 最大线程数
5,// 空闲线程存活时间
TimeUnit.SECONDS,// 时间单位
new ArrayBlockingQueue<>(5),// 任务排队用的阻塞式队列
new RejectedExecutionHandler() {// 拒绝服务助手
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("线程池已满,拒绝执行"+r);
}
});
//2.提交任务
//---execute(Runnable) 没有返回值
// pool.execute(new Runnable() {
// @Override
// public void run() {
// System.out.println("任务1");
// }
// });
//---submit(Runnable) 会返回Future对象,通过它可以检测任务是否执行完成
// Future<?> future = pool.submit(new Runnable() {
// @Override
// public void run() {
// System.out.println("任务1开始");
// try {
// Thread.sleep(3000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// System.out.println("任务1结束");
// }
// });
// future.get();//此方法将会阻塞,直到future关联的任务执行完成
// System.out.println("future.get()返回了~");
//---submit(Callable) 会返回Future对象,通过它可以获取任务的执行结果,此外还可以得到call()方法执行完后的返回值
// Future<String> future = pool.submit(new Callable<String>() {
// @Override
// public String call() throws Exception {
// System.out.println("任务1开始");
// Thread.sleep(3000);
// System.out.println("任务1结束");
// return "Apple~~~";
// }
// });
// String s = future.get();//此方法将会阻塞,直到future关联的任务执行完成
// System.out.println("future.get()返回了~还带回了任务执行后的返回值~~"+s);
// Set<Callable<String>> set = new HashSet<>();
// set.add(new Callable<String>() {
// @Override
// public String call() throws Exception {
// System.out.println("任务1");
// return "Apple";
// }
// });
// set.add(new Callable<String>() {
// @Override
// public String call() throws Exception {
// System.out.println("任务2");
// return "Banana";
// }
// });
// set.add(new Callable<String>() {
// @Override
// public String call() throws Exception {
// System.out.println("任务3");
// return "Orange";
// }
// });
//---invokeAny
// String x = pool.invokeAny(set); //随机执行集合中的任务,只要有一个结束,就返回结果。此方法将会阻塞,直到有一个任务执行完成
// System.out.println(x);
//---invokeAll
// List<Future<String>> list = pool.invokeAll(set);//执行集合中的所有任务,此方法将会阻塞,直到所有任务执行完成
// list.forEach(future->{
// try {
// System.out.println(future.get());
// } catch (InterruptedException | ExecutionException e) {
// e.printStackTrace();
// }
// });
//3.关闭线程池
pool.shutdown();
}
}
Executors - 工具类,可以快速构建线程池
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
/**
* Executors - 工具类,可以快速构建线程池
*/
public class Demo05 {
public static void main(String[] args) {
/**
创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
5 int最大值 0 无界队列
new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue())
*/
ExecutorService pool = Executors.newScheduledThreadPool(5);
/**
创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。
对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。
new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
*/
//ExecutorService pool = Executors.newCachedThreadPool();
/**
创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。
在任意点,在大多数 nThreads 线程会处于处理任务的活动状态。
new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
*/
//ExecutorService pool = Executors.newFixedThreadPool(5);
/**
创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
*/
//ExecutorService pool = Executors.newSingleThreadExecutor();
}
}
线程并发安全问题
多个线程并发方法同一个资源,造成的问题
/**
* 线程并发安全问题
* 多个线程并发方法同一个资源,造成的问题
*/
public class Demo01 {
private static String name = "小明";
private static String gender = "男";
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
while(true){
synchronized (Demo01.class){
System.out.println("姓名:"+name+"性别:"+gender);
}
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
while(true){
synchronized (Demo01.class){
if(name.equals("小明")){
name = "小花";
gender = "女";
}else{
name = "小明";
gender = "男";
}
}
}
}
}).start();
}
}
Syncronized的缺陷1
本身是一个代码块,必须被完整地包含在单个方法里,无法跨方法操作锁
/**
* Syncronized的缺陷1:
* 本身是一个代码块,必须被完整地包含在单个方法里,无法跨方法操作锁
*/
public class Demo02 {
private static String name = "小明";
private static String gender = "男";
private static boolean isDBOK = false;
public static void saveToDB(){
if(isDBOK){
System.out.println("保存数据到数据库:"+name+","+gender);
}else{
//连接数据库需要很长时间,此时应该释放锁,防止阻塞等待锁的线程!!
//释放锁????
System.out.println("重连数据库,这需要一个小时...");
try {
Thread.sleep(1000*60*60);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
while(true){
synchronized (Demo01.class){
saveToDB();
}
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
while(true){
synchronized (Demo01.class){
if(name.equals("小明")){
name = "小花";
gender = "女";
}else{
name = "小明";
gender = "男";
}
System.out.println("修改数据成功!");
}
}
}
}).start();
}
}
Lock:锁
优点
1.
不需要找额外的锁对象! 它自己就是锁对象
2.
可以在不同的方法内加锁和解锁
3.
有tryLock()方法, 可以尝试加锁, 加不上也不阻塞, 直接返回false
4.
提供了Condition机制, 可以实现线程之间的通信, 协调线程执行顺序
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Lock:锁
* 优点0:不需要找额外的锁对象了!它自己就是锁对象!
*/
public class Demo03 {
private static String name = "小明";
private static String gender = "男";
public static void main(String[] args) {
//锁对象
Lock lock = new ReentrantLock();
new Thread(new Runnable() {
@Override
public void run() {
while(true){
lock.lock();//加锁
System.out.println("姓名:"+name+"性别:"+gender);
lock.unlock();//解锁
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
while(true){
lock.lock();//加锁
if(name.equals("小明")){
name = "小花";
gender = "女";
}else{
name = "小明";
gender = "男";
}
lock.unlock();//解锁
}
}
}).start();
}
}
Condition机制
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Lock:锁
* 优点0:不需要找额外的锁对象了!它自己就是锁对象!
* 优点1:可以在不同的方法内加锁和解锁!
* 优点2:有tryLock()方法,可以尝试加锁,加不上也不阻塞,直接返回false
* 优点3:提供了Condition机制,可以实现线程间的通信,协调线程执行顺序
*/
public class Demo08 {
private static String name = "小明";
private static String gender = "男";
private static Lock lock = new ReentrantLock();
private static Condition con1 = lock.newCondition();
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
while(true){
try {
lock.lock();
System.out.println("姓名:"+name+"性别:"+gender);
con1.signal();
con1.await();
lock.unlock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
while(true){
try {
lock.lock();
if(name.equals("小明")){
name = "小花";
gender = "女";
}else{
name = "小明";
gender = "男";
}
con1.signal();
con1.await();
lock.unlock();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
}
}
读写锁
读锁和读锁可以共存
写锁和任和锁不共存
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 读写锁 - 读+读
* 读锁和读锁可以共存
* 读锁和写锁不能共存
*/
public class Demo01 {
private static String name = "小明";
private static String gender = "男";
private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
lock.readLock().lock();
System.out.println("t1:"+name);
System.out.println("t1:"+gender);
lock.readLock().unlock();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
lock.readLock().lock();
System.out.println("t2:"+name);
System.out.println("t2:"+gender);
lock.readLock().unlock();
}
}).start();
}
}
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 读写锁 - 写+写
* 读锁和读锁可以共存
* 读锁和写锁不能共存
*/
public class Demo03 {
private static String name = "小明";
private static String gender = "男";
private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
lock.writeLock().lock();
name = "小花";
gender = "女";
lock.writeLock().unlock();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
lock.writeLock().lock();
name = "小黄";
gender = "未知";
lock.writeLock().unlock();
}
}).start();
}
}
原子操作
AtomicInteger
•
int getAndDercement()
以原子方式将当前值减 1
•
int decrementAndGet()
以原子形式将当前值减 1
•
int getAndIncrement()
以原子方式将当前值加 1
•
int incrementAndGet()
以原子方式将当前值加 1
import java.util.concurrent.atomic.AtomicInteger;
/**
* AtomicInteger
* int getAndDecrement()
* 以原子方式将当前值减 1。
* int decrementAndGet()
* 以原子方式将当前值减 1。
* int getAndIncrement()
* 以原子方式将当前值加 1。
* int incrementAndGet()
* 以原子方式将当前值加 1。
*/
public class Demo05 {
private static AtomicInteger ai = new AtomicInteger(0);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
for(int j=0;j<100000;j++){
ai.getAndIncrement();//i++
}
System.out.println("t1结束!");
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
for(int j=0;j<100000;j++){
ai.getAndIncrement();//i++
}
System.out.println("t2结束!");
}
}).start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(ai.get());
}
}
boolean compareAndSet(int expect, int update)
如果当前值 == 预期值, 则以原子方式将该值设置为给定的更新值
import java.util.concurrent.atomic.AtomicInteger;
/**
* AtomicInteger
* boolean compareAndSet(int expect, int update)
* 如果当前值 == 预期值,则以原子方式将该值设置为给定的更新值。
*/
public class Demo06 {
private static AtomicInteger ai = new AtomicInteger(0);
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
boolean flag = ai.compareAndSet(0,666);
System.out.println("t1结束!"+flag);
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
boolean flag = ai.compareAndSet(0, 999);
System.out.println("t2结束!"+flag);
}
}).start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(ai.get());
}
}
AtomticBoolean
/**
* AtomicBoolean
*/
public class Demo08 {
private static boolean flag = true;
public static void main(String[] args) {
for(int i=0;i<10000;i++){
new Thread(new Runnable() {
@Override
public void run() {
if(flag == true){
System.out.println(Thread.currentThread().getName()+"初始化数据库...");
flag = false;
}
}
}).start();
}
}
}
ThreadLocal - 线程局部变量
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
/**
* ThreadLocal - 线程局部变量
*/
public class Demo01{
private static Connection conn = null;
static{
try {
Class.forName("com.mysql.jdbc.Driver");
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/lc01","root","root");
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
//用户a
new Thread(new Runnable() {
@Override
public void run() {
String sql = "update user set addr=? where id=?";
try{
conn.setAutoCommit(false);
PreparedStatement ps = conn.prepareStatement(sql);
ps.setString(1,"hz4");
ps.setInt(2,1);
ps.executeUpdate();
conn.commit();
ps.close();
conn.close();
}catch (Exception e){
e.printStackTrace();
try {conn.rollback();} catch (SQLException ex) {ex.printStackTrace();}
}
}
}).start();
//用户b
new Thread(new Runnable() {
@Override
public void run() {
String sql = "update user set addr=? where id=?";
try{
conn.setAutoCommit(false);
PreparedStatement ps = conn.prepareStatement(sql);
ps.setString(1,"gz4");
ps.setInt(2,2);
ps.executeUpdate();
conn.commit();
ps.close();
conn.close();
}catch (Exception e){
e.printStackTrace();
try {conn.rollback();} catch (SQLException ex) {ex.printStackTrace();}
}
}
}).start();
}
}
多线程并发安全问题
•
有并发的线程 - 加锁
•
有个共享的资源 - ThreadLocal
•
有写操作 - scala语言
静态代理和动态代理(mvc AOP底层原理)
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
interface Star{
public void eat();
public void sign();
}
class FBB implements Star{
public void eat(){
System.out.println("FBB吃饭..");
}
public void sign(){
System.out.println("FBB签名..");
}
public void shufa(){
System.out.println("FBB书法..");
}
}
/**
* 代理设计模式 - 静态代理 动态代理 - JDK动态代理 - CGLIB动态代理
* Spring AOP
*/
public class Demo05 {
public static void main(String[] args) {
FBB fbb = new FBB();
//动态代理
Star zqd = (Star) Proxy.newProxyInstance(
//类加载器
fbb.getClass().getClassLoader(),
//要实现的接口
fbb.getClass().getInterfaces(),
//处理器
new InvocationHandler() {
/**
* @param proxy 代理者,张全蛋
* @param method 当前正在调用的方法
* @param args 当前正在调用的方法的参数
* @return 希望给调用者返回什么结果
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
System.out.println("ZQD:你谁啊???");
Object retObj = method.invoke(fbb, args);
System.out.println("ZQD:记录一下...");
return retObj;
}
}
);
zqd.eat();
zqd.sign();
//zqd.shufa();
// //静态代理者
// class ZL implements Star{
// public void eat(){
// System.out.println("ZL:你谁啊???");
// fbb.eat();
// System.out.println("ZL:记录一下...");
// }
// public void sign(){
// System.out.println("ZL:你谁啊???");
// fbb.sign();
// System.out.println("ZL:记录一下...");
// }
// }
// ZL zl = new ZL();
// zl.eat();
// zl.sign();
}
}
类加载控制器
类加载控制器(Class Loading Controller)是一种软件机制,用于控制编写和运行Java程序的类的加载行为。它可以在Java应用程序的运行时动态地加载、改变和卸载Java类,从而实现一些特殊的功能。
类加载控制器通常由ClassLoader类和相关的接口组成,它们能够通过Java虚拟机的类加载、链接、校验等过程,控制Java类的装载和使用。这些类可以对Java应用程序进行全面控制,使得Java应用程序的运行时间更加灵活和自适应。
常见的Java类加载控制器有以下几个:
1.
ClassLoader 类:实现 Java 虚拟机机制中的类加载器,动态地加载、链接和装载Java类,是Java类加载的基础。
2.
URLClassLoader 类:可以从指定的URL路径下动态地加载Java类。可以动态地加载网络上的Java类库。
3.
Class 文件加密器:可以对Java类文件进行加密,保护Java程序的安全性。
4.
Class文件修改器:可以动态地修改Java类文件,实现Java程序的动态更新和升级。
5.
运行时参数设置器:可以设置JVM的运行时参数,优化Java程序的运行性能。
类加载控制器广泛应用于Java应用程序的开发和运行过程中,通过控制类的加载行为,使得Java程序更加灵活和高效。