Young87

SmartCat's Blog

So happy to code my life!

游戏开发交流QQ群号60398951

当前位置:首页 >跨站数据测试

DelayedQueue基本概念,java基础

DelayQueue: 延迟队列,他继承了AbstractQueue,实现了BlockingQueue,说明他是一个阻塞队列。
DelayQueue中的元素必须实现了Delayed接口(Deleyd接口继承了Comparable接口),队列的头部元素是最先过期的元素,使用了优先级队列保存元素,每次添加都会让最先到达过期的元素在队列头部。
如果此刻队列中没有元素的延时到期了,调用poll方法将会返回null.
当一个元素的getDealy()方法 返回值小于0表示该元素已经过期。DelayQueue中的元素不允许为null.

使用示范:

DelayQueue中的元素必须实现Delayed接口,

class Notify implements Delayed{

    private String msg;
    //什么时候应该执行
    private long time;

    public Notify(String msg, long time) {
        this.msg = msg;
        this.time = time;
    }
    //小于0时,表示已到达延时时间,只有getDelay小于0时,DelayQueu.poll()方法才会返回第一个到期的元素
    @Override
    public long getDelay(TimeUnit unit) {
        //这里的逻辑是:如果延时的时间time比当前时间大,说明
        return unit.convert(time - System.currentTimeMillis() ,TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return (int)(this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }
}

基本原理:

  1. 延时队列存放的是元素只可以存放Delayed类型的元素,存放到元素实际上放到的是优先级队列priorityQueue中.通过元素的compareTo方法来比较,将最先到达延时的元素放到第一个位置。这样在取的时候才会取得最先到达延时的元素
 public static void main(String[] args) throws InterruptedException {
       DelayQueue<Notify> queue = new DelayQueue<>();
       Notify msg1 = new Notify("hello2",System.currentTimeMillis()+2000);
       Notify msg2 =new Notify("hello1",System.currentTimeMillis()+1000);
       Notify msg3 =new Notify("hello3",System.currentTimeMillis()+3000);
     	//已经到达延时,直接获取
       Notify msg4 =new Notify("hello4",System.currentTimeMillis()-3000);
       queue.add(msg1);
       queue.add(msg2);
       queue.add(msg3);
       queue.add(msg4);
       //阻塞获取
       System.out.println(queue.take());
       System.out.println(queue.take());
       System.out.println(queue.take());
       System.out.println(queue.take());
       //延时时间没到的直接返回null 不阻塞
//        System.out.println(queue.poll());
//        System.out.println(queue.poll());
//        System.out.println(queue.poll());
//        System.out.println(queue.poll());
   }

打印结果---------------------
Notify{msg='hello4', time=1607760317852}
Notify{msg='hello1', time=1607760321852}
Notify{msg='hello2', time=1607760322852}
Notify{msg='hello3', time=1607760323852}
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {
	
    private final transient ReentrantLock lock = new ReentrantLock();
    //元素都存放到优先级队列中
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    
    private final Condition available = lock.newCondition();

    public boolean add(E e) {
        return offer(e);
    }
    
    //添加元素 
     public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //实际上就是往优先级队列中添加,根据到期的时间排序
            q.offer(e);
            if (q.peek() == e) {
                
                leader = null;
                //唤醒,让take操作阻塞的线程唤醒
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
    
    //从延时队列中获取一个元素,如果第一个元素还没有到达延时时间,直接返回null
     public E poll() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                //到了延时时间,直接从优先队列中弹出
                return q.poll();
        } finally {
            lock.unlock();
        }
    }
    
      public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                //如果没有元素,阻塞等待
                E first = q.peek();
                if (first == null)
                    available.await();
                else {
                //获得剩余的延时时间,小于0表示到期。
                    long delay = first.getDelay(NANOSECONDS);
                    //到达延时,弹出一个
                    if (delay <= 0)
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
}


除特别声明,本站所有文章均为原创,如需转载请以超级链接形式注明出处:SmartCat's Blog

上一篇: Kubernetes 集群搭建 v1.19.3

下一篇: 基于串口通信的标准库函数(SPL)与基于HAL库函数的stm32编程方式的差异

精华推荐