阻塞式队列

生产者每1秒生产一个, 消费者每3秒消费一个


import java.util.concurrent.ArrayBlockingQueue;

/**
 * 阻塞式队列
 */
public class Demo01 {
    public static void main(String[] args) throws Exception {
        ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(5);

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    for(int i=0;i<1000;i++){
                        Thread.sleep(1000);
                        queue.put("数据"+i);
                        System.out.println("生产者生产了数据:"+i);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    for(int i=0;i<1000;i++){
                        Thread.sleep(3000);
                        String take = queue.take();
                        System.out.println("消费者消费了数据:"+take);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                }
            }
        }).start();
    }
}

ArrayBlockingQueue 和 LinkedBlockingQueue

​ 数组 链表

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * ArrayBlockingQueue
 * LinkedBlockingQueue
 */
public class Demo02 {
    public static void main(String[] args) throws InterruptedException {
        //底层是数组,必须指定容量,且不可改变
        BlockingQueue q1 = new ArrayBlockingQueue(3);
        //底层是链表,可以不指定容量,但是默认容量是Integer.MAX_VALUE
        BlockingQueue q2 = new LinkedBlockingQueue();
        q1.put(1);
        q1.take();
        q2.put(2);
        q2.take();
    }
}

ConcurrentMap

线程安全且效率高的Map集合(通过读锁和写锁实现), HashTable也是线程安全的, 但是效率太低



import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
 * ConcurrentMap
 */
public class Demo03 {
    public static void main(String[] args) {
        ConcurrentMap<String, String> map = new ConcurrentHashMap<>();
        map.put("110","apple");
        String s = map.get("110");
        System.out.println(s);
    }
}

CountDownlatch

协调多个线程之间的执行顺序, 达成让一个线程等待其他线程达成一定条件后再执行的效果



import java.util.concurrent.CountDownLatch;

/**
 * CountDownLatch
 * 协调多个线程之间的执行顺序
 * 达成让一个线程等待其他线程达成一定条件后再执行的效果
 */
public class Demo05 {
    private static CountDownLatch cdl = new CountDownLatch(3);
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                    System.out.println("弟弟买米回来了..");
                    cdl.countDown();//将cdl中的数据减1 -- 2
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(3000);
                    System.out.println("姐姐买菜回来了..");
                    cdl.countDown();//将cdl中的数据减1 -- 1
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5000);
                    System.out.println("爸爸买锅回来了..");
                    cdl.countDown();//将cdl中的数据减1 -- 0
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //阻塞,直到cdl中的数据减为0
                    cdl.await();
                    System.out.println("妈妈开始做饭..");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

CyclicBarrier栅栏



import java.util.concurrent.CyclicBarrier;

/**
 * CyclicBarrier栅栏
 */
public class Demo07 {
    private static CyclicBarrier cb = new CyclicBarrier(3);
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                    System.out.println("亚瑟加载游戏完成。。");
                    cb.await();//当前到达栅栏的线程是否达到预定数量,如果没达到则阻塞,达到则所有线程一起放行
                    System.out.println("亚瑟开始游戏。。");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(3000);
                    System.out.println("安其拉加载游戏完成。。");
                    cb.await();//当前到达栅栏的线程是否达到预定数量,如果没达到则阻塞,达到则所有线程一起放行
                    System.out.println("安其拉开始游戏。。");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5000);
                    System.out.println("妲己加载游戏完成。。");
                    cb.await();//当前到达栅栏的线程是否达到预定数量,如果没达到则阻塞,达到则所有线程一起放行
                    System.out.println("妲己开始游戏。。");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

Exchanger 交换机



import java.util.concurrent.Exchanger;

/**
 * Exchanger
 */
public class Demo09 {
    private static Exchanger<String> exchanger = new Exchanger<>();
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(2000);
                    System.out.println("老张到达交换点..");
                    String s = exchanger.exchange("天王盖地虎");
                    System.out.println("老张交换得到情报:"+s);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5000);
                    System.out.println("老李到达交换点..");
                    String s = exchanger.exchange("宝塔镇河妖");
                    System.out.println("老李交换得到情报:"+s);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

Semaphore:信号量

功能1:保护一段重要的代码一次不能超过n个线程访问



import java.util.concurrent.Semaphore;

/**
 * Semaphore:信号量
 * 功能1:保护一段重要的代码一次不能超过n个线程访问
 */
public class Demo10 {
    private static Semaphore semaphore = new Semaphore(3);

    public static void main(String[] args) {

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    semaphore.acquire();//获取一个信号量,如果获取到继续执行,没有获取到阻塞
                    System.out.println("游客1:开始参观人民大会堂..");
                    Thread.sleep(3000);
                    System.out.println("游客1:结束参观..");
                    semaphore.release();//释放一个信号量
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    semaphore.acquire();//获取一个信号量,如果获取到继续执行,没有获取到阻塞
                    System.out.println("游客2:开始参观人民大会堂..");
                    Thread.sleep(5000);
                    System.out.println("游客2:结束参观..");
                    semaphore.release();//释放一个信号量
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    semaphore.acquire();//获取一个信号量,如果获取到继续执行,没有获取到阻塞
                    System.out.println("游客3:开始参观人民大会堂..");
                    Thread.sleep(4000);
                    System.out.println("游客3:结束参观..");
                    semaphore.release();//释放一个信号量
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    semaphore.acquire();//获取一个信号量,如果获取到继续执行,没有获取到阻塞
                    System.out.println("游客4:开始参观人民大会堂..");
                    Thread.sleep(5000);
                    System.out.println("游客4:结束参观..");
                    semaphore.release();//释放一个信号量
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    semaphore.acquire();//获取一个信号量,如果获取到继续执行,没有获取到阻塞
                    System.out.println("游客5:开始参观人民大会堂..");
                    Thread.sleep(5000);
                    System.out.println("游客5:结束参观..");
                    semaphore.release();//释放一个信号量
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    semaphore.acquire();//获取一个信号量,如果获取到继续执行,没有获取到阻塞
                    System.out.println("游客6:开始参观人民大会堂..");
                    Thread.sleep(5000);
                    System.out.println("游客6:结束参观..");
                    semaphore.release();//释放一个信号量
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

功能2:在两个线程之间发送信号。



import java.util.concurrent.Semaphore;

/**
 * Semaphore 信号量
 * 用法2:在两个线程之间发送信号。
 */
public class Demo11 {
    private static Semaphore semaphore = new Semaphore(0);
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("装卸师傅:装货。。");
                    Thread.sleep(5000);
                    System.out.println("装卸师傅:装好货,发信号。。");
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("司机:在驾驶室等信儿~~");
                    semaphore.acquire();
                    System.out.println("司机:收到信号,发车,速度70迈~~~~");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

ThreadPool 线程池执行流程



import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * ThreadPool 线程池执行流程
 */
public class Demo01 {
    public static void main(String[] args) {
        // 1.创建线程池
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                3, // 核心线程数
                5,// 最大线程数
                10,// 空闲线程存活时间
                TimeUnit.SECONDS,// 时间单位
                new ArrayBlockingQueue<>(5),// 任务排队用的阻塞式队列
                new RejectedExecutionHandler() {// 拒绝服务助手
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        System.out.println("线程池已满,拒绝执行"+r);
                    }
                });
        // 2.执行任务
        System.out.println(pool);
        pool.submit(new Runnable() { @Override public void run() { while(true);}});
        System.out.println(pool);
        pool.submit(new Runnable() { @Override public void run() { while(true);}});
        System.out.println(pool);
        pool.submit(new Runnable() { @Override public void run() { while(true);}});
        System.out.println(pool);
        pool.submit(new Runnable() { @Override public void run() { while(true);}});
        System.out.println(pool);
        pool.submit(new Runnable() { @Override public void run() { while(true);}});
        System.out.println(pool);
        pool.submit(new Runnable() { @Override public void run() { while(true);}});
        System.out.println(pool);
        pool.submit(new Runnable() { @Override public void run() { while(true);}});
        System.out.println(pool);
        pool.submit(new Runnable() { @Override public void run() { while(true);}});
        System.out.println(pool);
        pool.submit(new Runnable() { @Override public void run() { while(true);}});
        System.out.println(pool);
        pool.submit(new Runnable() { @Override public void run() { while(true);}});
        System.out.println(pool);
        pool.submit(new Runnable() { @Override public void run() { while(true);}});
        System.out.println(pool);
        pool.submit(new Runnable() { @Override public void run() { while(true);}});
        System.out.println(pool);
    }
}



import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * ThreadPool 线程池执行流程
 */
public class Demo02 {
    private static int i = 10;
    public static void main(String[] args) {
        // 1.创建线程池
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                3, // 核心线程数
                5,// 最大线程数
                1,// 空闲线程存活时间
                TimeUnit.SECONDS,// 时间单位
                new ArrayBlockingQueue<>(5),// 任务排队用的阻塞式队列
                new RejectedExecutionHandler() {// 拒绝服务助手
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        System.out.println("线程池已满,拒绝执行"+r);
                    }
                });
        // 2.执行任务
        System.out.println(pool);
        pool.submit(new Runnable() { @Override public void run() { while(i>=1){
            System.out.println(1);
        };}});
        System.out.println(pool);
        pool.submit(new Runnable() { @Override public void run() { while(i>=2){
            System.out.println(2);
        };}});
        System.out.println(pool);
        pool.submit(new Runnable() { @Override public void run() { while(i>=3){
            System.out.println(3);
        };}});
        System.out.println(pool);
        pool.submit(new Runnable() { @Override public void run() { while(i>=4){
            System.out.println(4);
        };}});
        System.out.println(pool);
        pool.submit(new Runnable() { @Override public void run() { while(i>=5){
            System.out.println(5);
        };}});
        System.out.println(pool);
        pool.submit(new Runnable() { @Override public void run() { while(i>=6){
            System.out.println(6);
        };}});
        System.out.println(pool);
        pool.submit(new Runnable() { @Override public void run() { while(i>=7){
            System.out.println(7);
        };}});
        System.out.println(pool);
        pool.submit(new Runnable() { @Override public void run() { while(i>=8){
            System.out.println(8);
        };}});
        System.out.println(pool);
        pool.submit(new Runnable() { @Override public void run() { while(i>=9){
            System.out.println(9);
        };}});
        System.out.println(pool);
        pool.submit(new Runnable() { @Override public void run() { while(i>=10){
            System.out.println(10);
        };}});
        System.out.println(pool);

        i--;
        System.out.println(pool);
        i--;
        System.out.println(pool);
        i--;
        System.out.println(pool);
        i--;
        System.out.println(pool);
        i--;
        System.out.println(pool);
        i--;
        System.out.println(pool);
        i--;
        System.out.println(pool);
        i--;
        System.out.println(pool);
        i--;
        System.out.println(pool);
        i--;
        System.out.println(pool);
        i--;
        System.out.println(pool);
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        i--;
        System.out.println(pool);

    }
}

ThreadPool 关闭线程池



import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * ThreadPool 关闭线程池
 */
public class Demo03 {
    public static void main(String[] args) {
        // 1.创建线程池
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                3, // 核心线程数
                5,// 最大线程数
                5,// 空闲线程存活时间
                TimeUnit.SECONDS,// 时间单位
                new ArrayBlockingQueue<>(5),// 任务排队用的阻塞式队列
                new RejectedExecutionHandler() {// 拒绝服务助手
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        System.out.println("线程池已满,拒绝执行"+r);
                    }
                });
        //2.执行任务
        pool.submit(new Runnable() {
            @Override
            public void run() {
                while(true){
                }
            }
        });
        //3.关闭任务
        //--正常关闭线程池,不会立即关闭线程池,而是不再接收新任务,已经在池中的任务仍然会被执行,等到所有任务执行完毕后,线程池才真正关闭
        //pool.shutdown();
        //--立即关闭线程池,会立即关闭线程池,正在执行的任务可能会被打断,造成无法预估结果的意外情况
        pool.shutdownNow();
    }
}

ThreadPool 提交任务



import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.*;

/**
 * ThreadPool 提交任务
 */
public class Demo04 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1.创建线程池
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
            3, // 核心线程数
            5,// 最大线程数
            5,// 空闲线程存活时间
            TimeUnit.SECONDS,// 时间单位
            new ArrayBlockingQueue<>(5),// 任务排队用的阻塞式队列
            new RejectedExecutionHandler() {// 拒绝服务助手
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    System.out.println("线程池已满,拒绝执行"+r);
                }
            });
        //2.提交任务
        //---execute(Runnable) 没有返回值
//        pool.execute(new Runnable() {
//            @Override
//            public void run() {
//                System.out.println("任务1");
//            }
//        });

        //---submit(Runnable) 会返回Future对象,通过它可以检测任务是否执行完成
//        Future<?> future = pool.submit(new Runnable() {
//            @Override
//            public void run() {
//                System.out.println("任务1开始");
//                try {
//                    Thread.sleep(3000);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
//                System.out.println("任务1结束");
//            }
//        });
//        future.get();//此方法将会阻塞,直到future关联的任务执行完成
//        System.out.println("future.get()返回了~");

        //---submit(Callable)   会返回Future对象,通过它可以获取任务的执行结果,此外还可以得到call()方法执行完后的返回值
//        Future<String> future = pool.submit(new Callable<String>() {
//            @Override
//            public String call() throws Exception {
//                System.out.println("任务1开始");
//                Thread.sleep(3000);
//                System.out.println("任务1结束");
//                return "Apple~~~";
//            }
//        });
//        String s = future.get();//此方法将会阻塞,直到future关联的任务执行完成
//        System.out.println("future.get()返回了~还带回了任务执行后的返回值~~"+s);


//        Set<Callable<String>> set = new HashSet<>();
//        set.add(new Callable<String>() {
//            @Override
//            public String call() throws Exception {
//                System.out.println("任务1");
//                return "Apple";
//            }
//        });
//        set.add(new Callable<String>() {
//            @Override
//            public String call() throws Exception {
//                System.out.println("任务2");
//                return "Banana";
//            }
//        });
//        set.add(new Callable<String>() {
//            @Override
//            public String call() throws Exception {
//                System.out.println("任务3");
//                return "Orange";
//            }
//        });
        //---invokeAny
//        String x = pool.invokeAny(set); //随机执行集合中的任务,只要有一个结束,就返回结果。此方法将会阻塞,直到有一个任务执行完成
//        System.out.println(x);
        //---invokeAll
//        List<Future<String>> list = pool.invokeAll(set);//执行集合中的所有任务,此方法将会阻塞,直到所有任务执行完成
//        list.forEach(future->{
//            try {
//                System.out.println(future.get());
//            } catch (InterruptedException | ExecutionException e) {
//                e.printStackTrace();
//            }
//        });
        //3.关闭线程池
        pool.shutdown();
    }
}

Executors - 工具类,可以快速构建线程池



import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

/**
 * Executors - 工具类,可以快速构建线程池
 */
public class Demo05 {
    public static void main(String[] args) {
        /**
         创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
                                    5           int最大值         0                    无界队列
         new ThreadPoolExecutor(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue())
         */
        ExecutorService pool = Executors.newScheduledThreadPool(5);

        /**
           创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。
           对于执行很多短期异步任务的程序而言,这些线程池通常可提高程序性能。
         new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
         */
        //ExecutorService pool = Executors.newCachedThreadPool();

        /**
         创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。
         在任意点,在大多数 nThreads 线程会处于处理任务的活动状态。
         new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
         */
        //ExecutorService pool = Executors.newFixedThreadPool(5);

        /**
         创建一个使用单个 worker 线程的 Executor,以无界队列方式来运行该线程。
         new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())
         */
        //ExecutorService pool = Executors.newSingleThreadExecutor();
    }
}

线程并发安全问题

多个线程并发方法同一个资源,造成的问题



/**
 * 线程并发安全问题
 *  多个线程并发方法同一个资源,造成的问题
 */
public class Demo01 {
    private static String name = "小明";
    private static String gender = "男";
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true){
                    synchronized (Demo01.class){
                        System.out.println("姓名:"+name+"性别:"+gender);
                    }
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true){
                    synchronized (Demo01.class){
                        if(name.equals("小明")){
                            name = "小花";
                            gender = "女";
                        }else{
                            name = "小明";
                            gender = "男";
                        }
                    }
                }
            }
        }).start();
    }
}

Syncronized的缺陷1

本身是一个代码块,必须被完整地包含在单个方法里,无法跨方法操作锁



/**
 * Syncronized的缺陷1:
 *  本身是一个代码块,必须被完整地包含在单个方法里,无法跨方法操作锁
 */
public class Demo02 {
    private static String name = "小明";
    private static String gender = "男";

    private static boolean isDBOK = false;

    public static void saveToDB(){
        if(isDBOK){
            System.out.println("保存数据到数据库:"+name+","+gender);
        }else{
            //连接数据库需要很长时间,此时应该释放锁,防止阻塞等待锁的线程!!
            //释放锁????
            System.out.println("重连数据库,这需要一个小时...");
            try {
                Thread.sleep(1000*60*60);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true){
                    synchronized (Demo01.class){
                        saveToDB();
                    }
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true){
                    synchronized (Demo01.class){
                        if(name.equals("小明")){
                            name = "小花";
                            gender = "女";
                        }else{
                            name = "小明";
                            gender = "男";
                        }
                        System.out.println("修改数据成功!");
                    }
                }
            }
        }).start();
    }
}

Lock:锁

优点

  1. 1.

    不需要找额外的锁对象! 它自己就是锁对象

  2. 2.

    可以在不同的方法内加锁和解锁

  3. 3.

    有tryLock()方法, 可以尝试加锁, 加不上也不阻塞, 直接返回false

  4. 4.

    提供了Condition机制, 可以实现线程之间的通信, 协调线程执行顺序



import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Lock:锁
 * 优点0:不需要找额外的锁对象了!它自己就是锁对象!
 */
public class Demo03 {
    private static String name = "小明";
    private static String gender = "男";

    public static void main(String[] args) {
        //锁对象
        Lock lock = new ReentrantLock();

        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true){
                    lock.lock();//加锁
                    System.out.println("姓名:"+name+"性别:"+gender);
                    lock.unlock();//解锁
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true){
                    lock.lock();//加锁
                    if(name.equals("小明")){
                        name = "小花";
                        gender = "女";
                    }else{
                        name = "小明";
                        gender = "男";
                    }
                    lock.unlock();//解锁
                }
            }
        }).start();
    }
}

Condition机制



import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Lock:锁
 * 优点0:不需要找额外的锁对象了!它自己就是锁对象!
 * 优点1:可以在不同的方法内加锁和解锁!
 * 优点2:有tryLock()方法,可以尝试加锁,加不上也不阻塞,直接返回false
 * 优点3:提供了Condition机制,可以实现线程间的通信,协调线程执行顺序
 */
public class Demo08 {
    private static String name = "小明";
    private static String gender = "男";

    private static Lock lock = new ReentrantLock();
    private static Condition con1 = lock.newCondition();

    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true){
                    try {
                        lock.lock();
                        System.out.println("姓名:"+name+"性别:"+gender);
                        con1.signal();
                        con1.await();
                        lock.unlock();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true){
                    try {
                        lock.lock();
                        if(name.equals("小明")){
                            name = "小花";
                            gender = "女";
                        }else{
                            name = "小明";
                            gender = "男";
                        }
                        con1.signal();
                        con1.await();
                        lock.unlock();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }
}

读写锁

读锁和读锁可以共存

写锁和任和锁不共存



import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 读写锁 - 读+读
 * 读锁和读锁可以共存
 * 读锁和写锁不能共存
 */
public class Demo01 {
    private static String name = "小明";
    private static String gender = "男";

    private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

    public static void main(String[] args) {

        new Thread(new Runnable() {
            @Override
            public void run() {
                lock.readLock().lock();
                System.out.println("t1:"+name);
                System.out.println("t1:"+gender);
                lock.readLock().unlock();
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                lock.readLock().lock();
                System.out.println("t2:"+name);
                System.out.println("t2:"+gender);
                lock.readLock().unlock();
            }
        }).start();
    }
}



import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 读写锁 - 写+写
 * 读锁和读锁可以共存
 * 读锁和写锁不能共存
 */
public class Demo03 {
    private static String name = "小明";
    private static String gender = "男";

    private static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                lock.writeLock().lock();
                name = "小花";
                gender = "女";
                lock.writeLock().unlock();
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                lock.writeLock().lock();
                name = "小黄";
                gender = "未知";
                lock.writeLock().unlock();
            }
        }).start();
    }
}

原子操作

AtomicInteger

  • int getAndDercement()

    以原子方式将当前值减 1

  • int decrementAndGet()

    以原子形式将当前值减 1

  • int getAndIncrement()

    以原子方式将当前值加 1

  • int incrementAndGet()

    以原子方式将当前值加 1



import java.util.concurrent.atomic.AtomicInteger;

/**
 * AtomicInteger
 *  int getAndDecrement()
 *           以原子方式将当前值减 1。
 *  int decrementAndGet()
 *           以原子方式将当前值减 1。
 *  int getAndIncrement()
 *           以原子方式将当前值加 1。
 *  int incrementAndGet()
 *           以原子方式将当前值加 1。
 */
public class Demo05 {
    private static AtomicInteger ai = new AtomicInteger(0);
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                for(int j=0;j<100000;j++){
                    ai.getAndIncrement();//i++
                }
                System.out.println("t1结束!");
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                for(int j=0;j<100000;j++){
                    ai.getAndIncrement();//i++
                }
                System.out.println("t2结束!");
            }
        }).start();
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(ai.get());
    }
}

boolean compareAndSet(int expect, int update)

如果当前值 == 预期值, 则以原子方式将该值设置为给定的更新值



import java.util.concurrent.atomic.AtomicInteger;

/**
 * AtomicInteger
 *  boolean compareAndSet(int expect, int update)
 *           如果当前值 == 预期值,则以原子方式将该值设置为给定的更新值。
 */
public class Demo06 {
    private static AtomicInteger ai = new AtomicInteger(0);
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                boolean flag = ai.compareAndSet(0,666);
                System.out.println("t1结束!"+flag);
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                boolean flag = ai.compareAndSet(0, 999);
                System.out.println("t2结束!"+flag);
            }
        }).start();
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(ai.get());
    }
}

AtomticBoolean



/**
 * AtomicBoolean
 */
public class Demo08 {
    private static boolean flag = true;
    public static void main(String[] args) {
        for(int i=0;i<10000;i++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    if(flag == true){
                        System.out.println(Thread.currentThread().getName()+"初始化数据库...");
                        flag = false;
                    }
                }
            }).start();
        }
    }
}

ThreadLocal - 线程局部变量



import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * ThreadLocal - 线程局部变量
 */
public class Demo01{
    private static Connection conn = null;
    static{
        try {
            Class.forName("com.mysql.jdbc.Driver");
            conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/lc01","root","root");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

        public static void main(String[] args) {
        //用户a
        new Thread(new Runnable() {
            @Override
            public void run() {
                String sql = "update user set addr=? where id=?";
                try{
                    conn.setAutoCommit(false);
                    PreparedStatement ps = conn.prepareStatement(sql);
                    ps.setString(1,"hz4");
                    ps.setInt(2,1);
                    ps.executeUpdate();
                    conn.commit();
                    ps.close();
                    conn.close();
                }catch (Exception e){
                    e.printStackTrace();
                    try {conn.rollback();} catch (SQLException ex) {ex.printStackTrace();}
                }
            }
        }).start();
        //用户b
        new Thread(new Runnable() {
            @Override
            public void run() {
                String sql = "update user set addr=? where id=?";
                try{
                    conn.setAutoCommit(false);
                    PreparedStatement ps = conn.prepareStatement(sql);
                    ps.setString(1,"gz4");
                    ps.setInt(2,2);
                    ps.executeUpdate();
                    conn.commit();
                    ps.close();
                    conn.close();
                }catch (Exception e){
                    e.printStackTrace();
                    try {conn.rollback();} catch (SQLException ex) {ex.printStackTrace();}
                }
            }
        }).start();
    }
}

多线程并发安全问题

  • 有并发的线程 - 加锁

  • 有个共享的资源 - ThreadLocal

  • 有写操作 - scala语言

静态代理和动态代理(mvc AOP底层原理)



import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;

interface Star{
    public void eat();
    public void sign();
}

class FBB implements Star{
    public void eat(){
        System.out.println("FBB吃饭..");
    }
    public void sign(){
        System.out.println("FBB签名..");
    }
    public void shufa(){
        System.out.println("FBB书法..");
    }
}

/**
 * 代理设计模式 - 静态代理 动态代理 - JDK动态代理 - CGLIB动态代理
 * Spring AOP
 */
public class Demo05 {
    public static void main(String[] args) {
        FBB fbb = new FBB();

        //动态代理
        Star zqd = (Star) Proxy.newProxyInstance(
                //类加载器
                fbb.getClass().getClassLoader(),
                //要实现的接口
                fbb.getClass().getInterfaces(),
                //处理器
                new InvocationHandler() {
                    /**
                     * @param proxy 代理者,张全蛋
                     * @param method 当前正在调用的方法
                     * @param args 当前正在调用的方法的参数
                     * @return 希望给调用者返回什么结果
                     */
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        System.out.println("ZQD:你谁啊???");
                        Object retObj = method.invoke(fbb, args);
                        System.out.println("ZQD:记录一下...");
                        return retObj;
                    }
                }
        );

        zqd.eat();
        zqd.sign();
        //zqd.shufa();

//        //静态代理者
//        class ZL implements Star{
//            public void eat(){
//                System.out.println("ZL:你谁啊???");
//                fbb.eat();
//                System.out.println("ZL:记录一下...");
//            }
//            public void sign(){
//                System.out.println("ZL:你谁啊???");
//                fbb.sign();
//                System.out.println("ZL:记录一下...");
//            }
//        }
//        ZL zl = new ZL();
//        zl.eat();
//        zl.sign();
    }
}

类加载控制器

类加载控制器(Class Loading Controller)是一种软件机制,用于控制编写和运行Java程序的类的加载行为。它可以在Java应用程序的运行时动态地加载、改变和卸载Java类,从而实现一些特殊的功能。

类加载控制器通常由ClassLoader类和相关的接口组成,它们能够通过Java虚拟机的类加载、链接、校验等过程,控制Java类的装载和使用。这些类可以对Java应用程序进行全面控制,使得Java应用程序的运行时间更加灵活和自适应。

常见的Java类加载控制器有以下几个:

  1. 1.

    ClassLoader 类:实现 Java 虚拟机机制中的类加载器,动态地加载、链接和装载Java类,是Java类加载的基础。

  2. 2.

    URLClassLoader 类:可以从指定的URL路径下动态地加载Java类。可以动态地加载网络上的Java类库。

  3. 3.

    Class 文件加密器:可以对Java类文件进行加密,保护Java程序的安全性。

  4. 4.

    Class文件修改器:可以动态地修改Java类文件,实现Java程序的动态更新和升级。

  5. 5.

    运行时参数设置器:可以设置JVM的运行时参数,优化Java程序的运行性能。

类加载控制器广泛应用于Java应用程序的开发和运行过程中,通过控制类的加载行为,使得Java程序更加灵活和高效。