先描述下信号量的意义

1
2
3
4
5
6
7
8
9
10
11
Semaphore是一个计数信号量。 在概念上,信号量维持一组许可证。
如果有必要,每个acquire()都会阻塞,直到许可证可用,然后才能使用它。 每个release()添加许可证,潜在地释放阻塞获取方。
但是,没有使用实际的许可证对象; Semaphore只保留可用数量的计数,并相应地执行。
信号量通常用于限制线程数,而不是访问某些(物理或逻辑)资源。
在这里插入图片描述


Semaphore实现的功能就类似有3个停车位,假如有6个人要停车,那么同时只能停多少辆车?
同时只能有3个人能够占用,当3个人中 的任何一个人开车离开后,其中等待的另外3个人中又有一个人可以来停车了。
另外等待的2个人中可以是随机获得优先机会,也可以是按照先来后到的顺序获得机会,这取决于构造Semaphore对象时传入的参数选项。
单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁”,再由另一个线程释放“锁”,这可应用于死锁恢复的一些场合。

批量处理集合util

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
* 2022-01-22更新: 尽量不要用DataBatchUtils batchUtils = new DataBatchUtils(list, INIT_NUM)的方式进行创建,
* 这样做的效率很差!使用(guava的)DataBatchUtils.batchInsert会好很多
* @author 94391
*/
public class DataBatchUtils<E> implements Iterator<List<E>> {

private int batchSize;

private List<E> origin;

private int index = 0;

private List<E> result;

private int size = 0;

public DataBatchUtils(List<E> origin, int batchSize) {
if (0 >= batchSize) {
throw new RuntimeException();
}

this.batchSize = batchSize;
this.origin = origin;
this.size = null == origin ? 0 : origin.size();
result = new ArrayList<E>(batchSize);
}

@Override
public boolean hasNext() {
return index < size;
}

@Override
public List<E> next() {
result.clear();
for (int i = 0; i < batchSize && index < size; i++) {
result.add(origin.get(index++));
}
return result;
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}

/**
* 批量插入ByGuava
* @param origin 需要批量操作的数据信任List
* @param batchSize 分批数(每多少条分割处理一次)
* @param <T>
* @return
*/
public static <T> List<List<T>> batchInsert(List<T> origin, int batchSize) {
Iterable<List<T>> subSets = Iterables.partition(origin, batchSize);
Iterator<List<T>> partitions = subSets.iterator();
List<List<T>> list = new LinkedList<>();
while (partitions.hasNext()) {
List<T> sub = partitions.next();
list.add(sub);
}
return list;
}
}

再来个线程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* @author 94391
*/
public class ThreadPoolExecutorBuilder {

private int corePoolSize;
private int maximumPoolSize;
private long keepAliveTime;
private TimeUnit unit;
private BlockingQueue<Runnable> workQueue;
private RejectedExecutionHandler handler;
private ThreadFactory threadFactory;
private static AtomicInteger threadId = new AtomicInteger();

/**
* 默认构造方法设置默认参数
*/
public ThreadPoolExecutorBuilder() {

int processors = Runtime.getRuntime().availableProcessors();

// 核心工作线程以及最大空闲线程数目
this.corePoolSize = processors;

this.maximumPoolSize = 2 * processors;

// 空闲线程的最大存活时间(注意参数生效的条件)
// 当线程池中线程数量大于corePoolSize(核心线程数量)或设置了allowCoreThreadTimeOut(是否允许空闲核心线程超时)时,
// 线程会根据keepAliveTime的值进行活性检查,一旦超时便销毁线程
this.keepAliveTime = 8;

this.unit = TimeUnit.SECONDS;

// 任务队列以及饱和处理策略
this.workQueue = new ArrayBlockingQueue<>(200);
this.handler = new ThreadPoolExecutor.AbortPolicy();

// 创建线程的工厂,一般可以用来对线程命名
this.threadFactory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "threadPoolWorker-" + threadId.getAndIncrement());
}
};
}

public ThreadPoolExecutor build() {
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}

public ThreadPoolExecutorBuilder setCapacity(int corePoolSize, int maximumPoolSize) {
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
return this;
}

public ThreadPoolExecutorBuilder setKeepAliveTime(long keepAliveTime, TimeUnit unit) {
this.keepAliveTime = keepAliveTime;
this.unit = unit;
return this;
}

public ThreadPoolExecutorBuilder setWorkQueueAndRejectHandler(BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
this.workQueue = workQueue;
this.handler = handler;
return this;
}

public ThreadPoolExecutorBuilder setThreadFactory(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
return this;
}
}

测试下(仅允许10条线程同时进行)

  • private final static Semaphore SEMAPHORE = new Semaphore(10);
  • private final static ThreadPoolExecutor THREAD_POOL_EXECUTOR = new ThreadPoolExecutorBuilder().build();
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    for (int i = 0; i < 100; i++) {
    THREAD_POOL_EXECUTOR.execute(new Runnable() {
    @Override
    public void run() {
    try {
    SEMAPHORE.acquire();
    if (CollectionUtils.isNotEmpty(list)) {
    //分批处理
    //DataBatchUtils batchUtils = new DataBatchUtils(list, <分为多少一个批次>);
    //while (batchUtils.hasNext()) {
    // List<String> list = batchUtils.next();
    // try {
    // ......
    // } catch (IOException e) {
    // e.printStackTrace();
    // }
    //分批处理
    List<List<String>> collectLists = DataBatchUtils.<String>batchInsert(collect,<分为多少一个批次>);
    for (List<String> collectList : collectLists) {
    try {
    ......
    } catch (IOException e) {
    e.printStackTrace();
    }
    }
    }
    }
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    SEMAPHORE.release();
    }
    }
    });
    }