package com.kuang.unsafe;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* 同理可证:ConcurrentModificationException并发修改异常
* 解决方法:
* //1.Set<String> set = Collections.synchronizedSet(new HashSet<>());工具类的写法
* //2.Set<String> set = new CopyOnWriteArraySet<>();
*/
public class SetTest {
public static void main(String[] args) {
// Set<String> set = new HashSet<>();
// Set<String> set = Collections.synchronizedSet(new HashSet<>());
Set<String> set = new CopyOnWriteArraySet<>();
for (int i = 1; i <= 30; i++) {
new Thread(()->{
set.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(set);
},String.valueOf(i)).start();
}
}
}
hashSet底层是什么??
public HashSet() {
map = new HashMap<>();
}
//add Set本质是map key是无法重复的
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
//PRESENT是常量固定的值 不会变
private static final Object PRESENT = new Object();
Map不安全
回顾map的基本操作
package com.kuang.unsafe;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
//ConcurrentModificationException 并发修改异常
public class MapTest {
public static void main(String[] args) {
//map是这样用的吗? 不是 工作中不用HashMap
//默认等价于什么?? new HashMap<>(16,0.75);
//HashMap<String, String> map = new HashMap<>();
Map<String,String> map = new ConcurrentHashMap<>();
for (int i = 1; i < 30; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5));
System.out.println(map);
},String.valueOf(i)).start();
}
//加载因子 初始化容量
}
}
7.Callable
1.可以有返回值
2.可以抛出异常
3.方法不同 run()/ call()
代码测试
package com.kuang.callable;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.locks.ReentrantLock;
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//new Thread(new Runnable()).start();//Runnable()) 和FutureTask<V>()两者等价
//new Thread(new FutureTask<V>(Callable)).start();//FutureTask可以调用(Callable)
//new Thread().start()是可以启动Callable
new Thread().start();//怎么启动Callable
MyThread thread = new MyThread();
FutureTask<Integer> futureTask = new FutureTask<>(thread);//适配类
new Thread(futureTask,"A").start();
Integer o = (Integer) futureTask.get();//获取Callable返回结果
System.out.println(o);
}
}
class MyThread implements Callable<Integer> {
package com.kuang.callable;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.locks.ReentrantLock;
public class CallableTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//new Thread(new Runnable()).start();//Runnable()) 和FutureTask<V>()两者等价
//new Thread(new FutureTask<V>(Callable)).start();//FutureTask可以调用(Callable)
//new Thread().start()是可以启动Callable
new Thread().start();//怎么启动Callable
MyThread thread = new MyThread();
FutureTask<Integer> futureTask = new FutureTask<>(thread);//适配类
new Thread(futureTask,"A").start();//A为线程
new Thread(futureTask,"B").start();//B为线程 打印的结果只有一个call 因为结果会被缓存,效率高
Integer o = (Integer) futureTask.get();//获取Callable返回结果
//这个get方法可能会产生阻塞,把它放在最后
//或者使用异步通信来处理
System.out.println(o);
}
}
class MyThread implements Callable<Integer> {
细节:
1.缓存
2.结果可能需要等待,会阻塞!
8 常用的辅助类(必会)
8.1 CountDownLatch
package com.kuang.add;
import java.util.concurrent.CountDownLatch;
//计数器
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
//总数是6的倒计时, 必须要执行任务的时候,再使用
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"Go out");
countDownLatch.countDown();//数量减一
},String.valueOf(i)).start();
}
countDownLatch.await();//等待计数器归零,然后再向下执行
System.out.println("Close Door");
}
}
减法计数器
原理:
-
countDownLatch.countDown();//数量减一
-
countDownLatch.await();//等待计数器归零,然后再向下执行
每次有线程调用countDown()数量减一。假设计数器变为0,countDownLatch.await();就会被唤醒,继续执行!
8.2 CyclicBarrier
加法计数器
package com.kuang.add;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
/**
* 集齐7颗龙珠召唤神龙
*/
//召唤龙珠的线程
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("召唤神龙成功!");
});
for (int i = 1; i <=7; i++) {
final int temp = i;
//lambda能操作到i吗?
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"搜集"+temp+"个龙珠");
try {
cyclicBarrier.await();//等待7个线程结束
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
8.3 Semaphore
package com.kuang.add;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
public static void main(String[] args) {
//线程数量: 类比于停车位 3个停车位 限流的时候可以用
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <=6 ; i++) {
new Thread(()->{
//acquire() 得到
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"抢到车位");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//release() 释放
semaphore.release();
}
},String.valueOf(i)).start();
}
}
}
原理:
semaphore.acquire();//获取
,假设已经满了,等待,等待释放为止!
semaphore.release();//释放
,会将当前的信号量释放+1,然后唤醒等待的线程!
作用:多个共享资源互斥使用!并发限流,控制最大的线程数。
9读写锁
ReadWriteLock
package com.kuang.rw;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* 独占锁(写锁)一次只能被一个线程占有
* 共享锁(读锁) 多个线程可以同时占有
* ReadWriteLockDemo
* 读-读 可以共存
* 读-写 不能共存
* 写-写 不能共存
*/
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCacheLock myCache = new MyCacheLock();
//写入 lambda表达式无法访问外部变量,只有通过fianl变量去进行中间转换
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(()->{
myCache.put(temp+"",temp+"");
},String.valueOf(i)).start();
}
//读取
for (int i = 1; i <= 5; i++) {
final int temp = i;
new Thread(()->{
myCache.get(temp+"");
},String.valueOf(i)).start();
}
}
}
class MyCacheLock{
private volatile Map<String,Object> map = new HashMap<>();
//读写锁,更加细粒度的控制
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
//存,写的过程 只希望同时只有一个线程写
public void put(String key,Object value){
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+"写入"+key);
map.put(key, value);
System.out.println(Thread.currentThread().getName()+"写入OK");
} catch (Exception e) {
e.printStackTrace();
} finally {