想象一个队列,你可以放任务进去,并且设置每个任务要延迟几分钟后执行。
这里的关键点就是延迟执行,如何保证每个任务按延时时间执行呢?可以把任务按延时时间从小到大排序,每次取第一个,依次取下去直到队列为空。假设取出的第一个任务到时间T才能执行,那么sleep(T-currentTime)之后就可以执行,即:
1 2 3 4 5
| while(queue.size > 0) task <--- queue.peak() remain <--- task.runTime - currentTime sleep(remain) process(task)
|
其中queue是一个优先队列,它的数据结构是小堆,最上面的一个值最小,每次插入一个元素的时间复杂度为O(logN),即便已有10^9个元素,再插入一个元素也不过移动40个左右元素,所以性能完全没有问题。
在多线程情况下,Java已有现有的实现,DelayQueue是一个延迟优先队列,它的队列元素都必须实现Delayed接口。下面分析它是如何实现延时执行的。
Delayed
1 2
| long getDelay(TimeUnit unit) int compareTo(Delayed o)
|
这两个方法都是为DelayQueue服务的。
DelayQueue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) return q.poll(); first = null; if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }
|
由此可见,take()方法永远不会返回null,除非你queue.add(null)。
整个DelayQueue并没有太难的东西,但是却有它很巧妙的地方。利用一个leader标志来防止task被重复执行,在线程await时将first置null。
30分钟订单未支付自动取消,这个原本是道面试题,我首先想的是轮询,但是无论怎么设计都感觉很费效率。之所以没想到用优先队列是因为我隐约觉得任务列表是一个链表结构,订单如果到2亿怎么办,我还没见过2亿一个的数组。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| package com.entity;
public class Order { private long id; private Status status; private long createTime;
public Order(long id, long createTime) { this.id = id; this.status = Status.UNPAYED; this.createTime = createTime; }
public enum Status { UNPAYED, UNCONFIRMED, FINISHED, CANCELED }
public long getId() { return id; }
public void setId(long id) { this.id = id; }
public Status getStatus() { return status; }
public void setStatus(Status status) { this.status = status; }
public long getCreateTime() { return createTime; }
public void setCreateTime(long createTime) { this.createTime = createTime; } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| package com.domain;
import com.entity.Order;
import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit;
public class OrderService { public static long DELAY_TIME_IN_MILLIS = 2 * 1000; private DelayQueue<OrderTask> que = new DelayQueue<>();
public static void main(String[] args) { OrderService service = new OrderService(); service.placeAnOrder(new Order(1, System.currentTimeMillis() + 10)); service.placeAnOrder(new Order(2, System.currentTimeMillis()));
while (true) { System.out.println(service.getExpiredOrder()); } }
public void placeAnOrder(Order order) { que.add(new OrderTask(order)); }
public long getExpiredOrder() { try { return que.take().orderId; } catch (InterruptedException e) { throw new RuntimeException(e); } }
class OrderTask implements Delayed { private long orderId; private long expiredTime;
public OrderTask(Order order) { this.orderId = order.getId(); this.expiredTime = order.getCreateTime() + DELAY_TIME_IN_MILLIS; }
@Override public long getDelay(TimeUnit unit) { return unit.convert(expiredTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); }
@Override public int compareTo(Delayed o) { return expiredTime > ((OrderTask) o).expiredTime ? 1 : -1; } } }
|