Tomcat作为web服务器,对于每个客户端的请求将给予处理响应,但对于一台机器而言,访问请求的总流量有高峰期且服务器有物理极限,为了保证web服务器不被冲垮我们需要采取一些措施进行保护预防,需要稍微说明的此处的流量更多的是指套接字的连接数,通过控制套接字连接个数来控制流量。其中一种有效的方法就是采取流量控制,它就像在流量的入口增加了一道闸门,闸门的大小决定了流量的大小,一旦达到最大流量将关闭闸门停止接收直到有空闲通道。
创建一个流量控制器没思路?还是考虑下并发框架AQS吧,通过控制同步器的状态即可实现,如果忘了AQS框架相关知识请移步到前面多线程章节,具体思路是先初始化状态值,然后每到来一个socket就将状态加1,每关闭一个socket将状态减1,如此一来一旦状态值小于0则由AQS机制将停止对socket的接收,直到某个socket处理完释放。我们把思路拆成两部分,一是创建一个支持计数的控制器,另一个是将此控制器嵌入处理流程中。
① 控制器,整个过程是根据AQS推荐的自定义同步器的做法进行,但并没有使用AQS自带的状态变量,而是另外引入一个AtomicLong类型的count变量用于计数,其本质是一样的,不必过于纠结。控制器主要通过countUpOrAwait和countDown两个方法实现控制效果。
public class LimitLatch {
private class Sync extends AbstractQueuedSynchronizer {
public Sync() {}
@Override
protected int tryAcquireShared(int ignored) {
long newCount = count.incrementAndGet();
if (newCount > limit) {
count.decrementAndGet();
return -1;
} else {
return 1;
}
}
@Override
protected boolean tryReleaseShared(int arg) {
count.decrementAndGet();
return true;
}
}
private final Sync sync;
private final AtomicLong count;
private volatile long limit;
public LimitLatch(long limit) {
this.limit = limit;
this.count = new AtomicLong(0);
this.sync = new Sync();
}
public void countUpOrAwait() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public long countDown() {
sync.releaseShared(0);
long result = getCount();
return result;
}
}
② 流程嵌入控制器,伪代码如下,在接收socket前累加计数器,对socket的数据处理则交由另外的线程,它处理需要一段时间,假如这段时间又有1000个请求socket,则第1000个请求会导致停止程序阻塞,而唤醒的条件是线程池中的工作线程处理完其中一个socket并执行countDown操作。tomcat默认的大小为1000。
LimitLatch limitLatch = new LimitLatch(1000);
创建ServerSocket;
limitLatch.countUpOrAwait();//这里可能阻塞
Socket socket = ServerSocket.accept();
从线程池中获取一个空闲工作线程处理socket,处理完关闭socket并执行limitLatch.countDown();