Java并发16:Semaphore


Java并发16:Semaphore

信号量由计算机科学家迪杰斯特拉(Dijkstra)于 1965 年提出,之后 15 年信号量一直都是并发编程领域的终结者。直到 1980 年管程才被提出,才有第二选择。目前几乎所有支持并发编程的语言都支持信号量机制,所以学好信号量很有必要。

信号量模型

一个计数器,一个等待队列,三个方法。

计数器和等待队列对外透明,通过信号量中的3个方法访问,分别是:

  • init():设置计数器初始值。
  • down():计数器的值减 1;如果计数器的值小于 0,当前线程将被阻塞,否则当前线程可以继续执行。
  • up():计数器的值加 1;如果计数器的值小于或者等于 0,唤醒等待队列中的一个线程,并将其从等待队列中移除。

这3个方法都是由信号量模型实现,Java中信号量模型由 java.util.concurrent.Semaphore 实现的,Semaphore 这个类能够保证这三个方法都是原子操作。

信号量模型图

简单示例

class Semaphore{
  // 计数器
  int count;
  // 等待队列
  Queue queue;
  // 初始化操作
  Semaphore(int c){
    this.count=c;
  }
  // 调用down() count减1  acquire()
  void down(){
    this.count--;
    if(this.count<0){
      //将当前线程插入等待队列
      //阻塞当前线程
    }
  }
  // 调用up() count+1  release()
  void up(){
    this.count++;
    //<=0说明有等待线程
    if(this.count<=0) {
      //移除等待队列中的某个线程T
      //唤醒线程T
    }
  }
}
static int count;
//初始化信号量 初始值为1
static final Semaphore s = new Semaphore(1);
//用信号量保证互斥    首先要明确acquire和release是原子操作
static void addOne() {
  // 调用后 count = 0 如再有线程调用方法,count=-1,新线程阻塞
  s.acquire();
  try {
    count+=1;
  } finally {
    //调用后 count+1 不一定为1
    s.release();
  }
}

限流器

从上面可以看到Semaphore相比Lock可以让多个线程访问临界区。

比如对象池的设计,要保证同一时间内<=N(对象池大小)个线程访问对象池。

class ObjPool<T, R> {
  final List<T> pool;
  // 用信号量实现限流器
  final Semaphore sem;
  // 构造函数 size即计数器初始值 t即池中的对象
  ObjPool(int size, T t){
    //线程安全的池
    pool = new Vector<T>(){};
    //初始化池
    for(int i=0; i<size; i++){
      pool.add(t);
    }
    //信号量
    sem = new Semaphore(size);
  }
  // 利用对象池的对象,调用func
  R exec(Function<T,R> func) {
    T t = null;
    try {
        semaphore.acquire();
        //从对象池取出
        t = pool.remove(0);
        return function.apply(t);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }finally {
        //释放资源还回对象池
        pool.add(t);
        semaphore.release();
    }
    return null;
  }
}

public class Go {
    public static void main(String[] args) {
        // 创建对象池
        ObjPool<String, String> pool = new ObjPool<String, String>(10, "do method");
        for (int i = 0; i < 20; i++) {
            new Thread(new Task(pool, String.valueOf(i))).start();
        }
    }

    static class Task implements Runnable {
        private final ObjPool<String, String> pool;

        public Task(ObjPool pool, String s) {
            this.pool = pool;
        }

        @Override
        public void run() {
            // 通过对象池获取t,之后执行 
            pool.execute(t -> {
                System.out.println(t);
                return t;
            });
        }
    }
}

思路主要体现在execute()的设计上。

思考

Vector换成Arraylist是否可以?

不可以。多线程同时在操作集合,用Arraylist不安全,会导致t可能为null

1、acquire;2、apply;3、finally release;4、return

别人的总结

我理解的和管程相比,信号量可以实现的独特功能就是同时允许多个线程进入临界区,但是信号量不能做的就是同时唤醒多个线程去争抢锁,只能唤醒一个阻塞中的线程,而且信号量模型是没有Condition的概念的,即阻塞线程被醒了直接就运行了而不会去检查此时临界条件是否已经不满足了,基于此考虑信号量模型才会设计出只能让一个线程被唤醒,否则就会出现因为缺少Condition检查而带来的线程安全问题。正因为缺失了Condition,所以用信号量来实现阻塞队列就很麻烦,因为要自己实现类似Condition的逻辑。

——CCC

进入临界区的N个线程不安全。add/remove都是不安全的。

remove()举例, ArrayListremove()源码:

>public E remove(int index) {
>rangeCheck(index);
>modCount++;
>// 假设两个线程 t1,t2都执行到这一步,t1让出cpu,t2执行
>E oldValue = elementData(index);
>//此步t1继续执行,t1,t2拿到的oldValue是一样的,两个线程能拿到同一个对象,明显线程不安全
>int numMoved = size - index - 1;
>if (numMoved > 0)
System.arraycopy(elementData, index+1, elementData, index,
         numMoved);
>elementData[--size] = null; // clear to let GC do its work
>return oldValue;
>}

——Presley


文章作者: Wendell
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Wendell !
评论
  目录