DelayedQueue基本概念,java基础
日期: 2020-12-12 分类: 跨站数据测试 432次阅读
DelayQueue: 延迟队列,他继承了AbstractQueue,实现了BlockingQueue,说明他是一个阻塞队列。
DelayQueue中的元素必须实现了Delayed接口(Deleyd接口继承了Comparable接口),队列的头部元素是最先过期的元素,使用了优先级队列保存元素,每次添加都会让最先到达过期的元素在队列头部。
如果此刻队列中没有元素的延时到期了,调用poll方法将会返回null.
当一个元素的getDealy()方法 返回值小于0表示该元素已经过期。DelayQueue中的元素不允许为null.
使用示范:
DelayQueue中的元素必须实现Delayed接口,
class Notify implements Delayed{
private String msg;
//什么时候应该执行
private long time;
public Notify(String msg, long time) {
this.msg = msg;
this.time = time;
}
//小于0时,表示已到达延时时间,只有getDelay小于0时,DelayQueu.poll()方法才会返回第一个到期的元素
@Override
public long getDelay(TimeUnit unit) {
//这里的逻辑是:如果延时的时间time比当前时间大,说明
return unit.convert(time - System.currentTimeMillis() ,TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return (int)(this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
}
基本原理:
- 延时队列存放的是元素只可以存放Delayed类型的元素,存放到元素实际上放到的是优先级队列priorityQueue中.通过元素的compareTo方法来比较,将最先到达延时的元素放到第一个位置。这样在取的时候才会取得最先到达延时的元素
public static void main(String[] args) throws InterruptedException {
DelayQueue<Notify> queue = new DelayQueue<>();
Notify msg1 = new Notify("hello2",System.currentTimeMillis()+2000);
Notify msg2 =new Notify("hello1",System.currentTimeMillis()+1000);
Notify msg3 =new Notify("hello3",System.currentTimeMillis()+3000);
//已经到达延时,直接获取
Notify msg4 =new Notify("hello4",System.currentTimeMillis()-3000);
queue.add(msg1);
queue.add(msg2);
queue.add(msg3);
queue.add(msg4);
//阻塞获取
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
System.out.println(queue.take());
//延时时间没到的直接返回null 不阻塞
// System.out.println(queue.poll());
// System.out.println(queue.poll());
// System.out.println(queue.poll());
// System.out.println(queue.poll());
}
打印结果---------------------
Notify{msg='hello4', time=1607760317852}
Notify{msg='hello1', time=1607760321852}
Notify{msg='hello2', time=1607760322852}
Notify{msg='hello3', time=1607760323852}
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private final transient ReentrantLock lock = new ReentrantLock();
//元素都存放到优先级队列中
private final PriorityQueue<E> q = new PriorityQueue<E>();
private final Condition available = lock.newCondition();
public boolean add(E e) {
return offer(e);
}
//添加元素
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//实际上就是往优先级队列中添加,根据到期的时间排序
q.offer(e);
if (q.peek() == e) {
leader = null;
//唤醒,让take操作阻塞的线程唤醒
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
//从延时队列中获取一个元素,如果第一个元素还没有到达延时时间,直接返回null
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
//到了延时时间,直接从优先队列中弹出
return q.poll();
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
//如果没有元素,阻塞等待
E first = q.peek();
if (first == null)
available.await();
else {
//获得剩余的延时时间,小于0表示到期。
long delay = first.getDelay(NANOSECONDS);
//到达延时,弹出一个
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
}
除特别声明,本站所有文章均为原创,如需转载请以超级链接形式注明出处:SmartCat's Blog
标签:java java
精华推荐