信号量是一种线程同步结构,可用于在线程之间发送信号以避免信号丢失,或像使用锁一样保护临界区。 Java 5在java.util.concurrent包中附带了信号量实现,因此你不必实现自己的信号量。 尽管如此,了解其实现和使用背后的理论还是很有用的。
简单的信号量
如下是一个简单的信号量实现:
public class Semaphore {
private boolean signal = false;
public synchronized void take() {
this.signal = true;
this.notify();
}
public synchronized void release() throws InterruptedException{
while(!this.signal) wait();
this.signal = false;
}
}
take()方法发送一个信号,该信号存储在信号量内部。 release()方法等待信号。 收到信号标志后,将再次将其清除,并退出release()方法。
使用这样的信号量可以避免信号丢失。 用take()代替notify(),以及用release()代替wait()。 如果对take()的调用发生在对release()的调用之前,则调用release()的线程仍会知道take()被调用了,因为信号存储在内部的signal变量中。 使用wait()和notify()不会存储信号。
当使用信号量进行信号传递时,方法名take()和release()似乎有点奇怪。 该名称源于信号量作为锁来使用,如本文后面所述。 在这种情况下,该方法名更有意义。
使用信号量传递信号
如下是一个简化示例,两个线程使用信号量互相发信号:
Semaphore semaphore = new Semaphore();
SendingThread sender = new SendingThread(semaphore);
ReceivingThread receiver = new ReceivingThread(semaphore);
receiver.start();
sender.start();
public class SendingThread {
Semaphore semaphore = null;
public SendingThread(Semaphore semaphore){
this.semaphore = semaphore;
}
public void run(){
while(true){
//do something, then signal
this.semaphore.take();
}
}
}
public class RecevingThread {
Semaphore semaphore = null;
public ReceivingThread(Semaphore semaphore){
this.semaphore = semaphore;
}
public void run(){
while(true){
this.semaphore.release();
//receive signal, then do something...
}
}
}
计数信号量
上一节中的Semaphore实现不计算take()方法发送信号的次数。 我们可以更改Semaphore来做到这一点。 这称为计数信号量。 如下是计数信号量的简单实现:
public class CountingSemaphore {
private int signals = 0;
public synchronized void take() {
this.signals++;
this.notify();
}
public synchronized void release() throws InterruptedException{
while(this.signals == 0) wait();
this.signals--;
}
}
有界信号量
CoutingSemaphore没有存储信号数量的上限。 我们可以将信号量的实现更改为有上限的,如下所示:
public class BoundedSemaphore {
private int signals = 0;
private int bound = 0;
public BoundedSemaphore(int upperBound){
this.bound = upperBound;
}
public synchronized void take() throws InterruptedException{
while(this.signals == bound) wait();
this.signals++;
this.notify();
}
public synchronized void release() throws InterruptedException{
while(this.signals == 0) wait();
this.signals--;
this.notify();
}
}
请注意,如果信号数量等于上限,则take()方法现在会阻塞。 如果BoundedSemaphore已达到其信号上限,则调用take()的线程不再允许传递其信号,直到某个线程调用release()。
将信号量用作锁
可以将有界信号量用作锁。 为此,请将上限设置为1,并调用take()和release()来保护临界区。 如下是一个例子:
BoundedSemaphore semaphore = new BoundedSemaphore(1);
...
semaphore.take();
try{
//critical section
} finally {
semaphore.release();
}
与使用信号相比,方法take()和release()现在由同一线程调用。 由于只允许一个线程使用信号量,因此所有调用take()的其他线程都将被阻塞,直到调用release()为止。 由于总是先调用take(),因此release()的调用永远不会阻塞。
你还可以使用有界信号量来限制代码段中允许的线程数。 例如,在上面的示例中,如果将BoundedSemaphore的限制设置为5,情况会怎么样呢? 那样的话,一次允许5个线程进入临界区。 但是,你必须确保这5个线程的线程操作不会冲突,否则应用程序将出错。
从finally块内部调用release()方法以确保即使从临界区抛出异常也能被调用到。