一种常见的设计模式,生产者-消费者模式,应用非常的广泛,例如在Java中,就是利用生产者-消费者模式去实现的,使用线程池的时候,就是使用了生产者消费者模式
在并发编程中,不只是线程池,很多地方都用到了生产者-消费者模式,例如 Log4j中也有着相关的模式
关于生产者-消费者模式:其核心就是一个任务队列,生产者生产任务放入队列,消费者从队列中消费并拿取任务
其优点非常的明显,就是解耦性非常高,在整个设计模式中,生产者和消费者没有直接的依赖关系,其之间的通信依靠的任务队列,所以具有非常好的解耦性
其还支持异步消费,能够良好的将读和写的速度差异进行平衡,不适用于普通方法的同步阻塞
为什么能够良好的将速度差异进行平衡呢,假设生产者的速度很慢,消费者的速度很高,1:3的比例,那么可以做到三个生产者对应一个消费者的线程,能够良好的在Java这个重量级语言中,避免过多的创建和销毁线程,只是支持使用适量的线程
甚至可以用于支持批处理,去处理批处理的任务,其应用场景非常的简单,就是一个任务不断的将执行结果放入这个阻塞队列,在消费端将结果批量的接受并执行,接下来是按照这种执行方式的示例代码
// 任务队列
BlockingQueue<Task> bq=new LinkedBlockingQueue<>(2000);
// 启动 5 个消费者线程
// 执行批量任务
void start() {
ExecutorService es=executors.newFixedThreadPool(5);
for (int i=0; i<5; i++) {
es.execute(()->{
try {
while (true) {
// 获取批量任务
List<Task> ts=pollTasks();
// 执行批量任务
execTasks(ts);
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
// 从任务队列中获取批量任务
List<Task> pollTasks()
throws InterruptedException{
List<Task> ts=new LinkedList<>();
// 阻塞式获取一条任务
Task t = bq.take();
while (t != null) {
ts.add(t);
// 非阻塞式获取一条任务
t = bq.poll();
}
return ts;
}
// 批量执行任务
void execTasks(List<Task> ts) {
// 省略具体代码无数
}
在上述代码中,我们先进行了阻塞式的获取数据,这是为了避免上来就进行无限制的循环,减少不必要的内存占用
对应到拿一个现实例子去应用生产者-消费者模式,就是log4j中的刷盘功能,在log4j中,需要将日志信息写入文件中
刷盘的时机为
ERROR级别的日志立即刷盘
累计到500条刷盘
未刷盘时间到达5秒,进行刷盘
这就是一种生产者-消费者模式,将LogMsg放在阻塞队列中,然后消费者从阻塞队列中取出后按照条件写入文件
class Logger {
//任务队列
final BlockingQueue<LogMsg> bq = new BlockingQueue<>();
//flush批量
static final int batchSize = 500;
//只需要一个线程写日志
ExecutorService es = Executors.newFixedThreadPool(1);
//启动写日志线程
void start(){
File file= File.createTempFile(“foo”, “.log”);
final FileWriter writer= new FileWriter(file);
this.es.execute(()->{
try {
//未刷盘日志数量
int curIdx = 0;
long preFT=System.currentTimeMillis();
while (true) {
LogMsg log = bq.poll(5, TimeUnit.SECONDS);
//写日志
if (log != null) {
writer.write(log.toString());
++curIdx;
}
//如果不存在未刷盘数据,则无需刷盘
if (curIdx <= 0) {
continue;
}
//根据规则刷盘
if (log!=null && log.level==LEVEL.ERROR ||
curIdx == batchSize ||
System.currentTimeMillis()-preFT>5000){
writer.flush();
curIdx = 0;
preFT=System.currentTimeMillis();
}
}
}catch(Exception e){
e.printStackTrace();
} finally {
try {
writer.flush();
writer.close();
}catch(IOException e){
e.printStackTrace();
}
}
});
}
//写INFO级别日志
void info(String msg) {
bq.put(new LogMsg(
LEVEL.INFO, msg));
}
//写ERROR级别日志
void error(String msg) {
bq.put(new LogMsg(
LEVEL.ERROR, msg));
}
}
//日志级别
enum LEVEL {
INFO, ERROR
}
class LogMsg {
LEVEL level;
String msg;
//省略构造函数实现
LogMsg(LEVEL lvl, String msg){}
//省略toString()实现
String toString(){}
}
在通常这种日志线程中,可以通过一个毒丸将其写入到消费者,在消费者拿到这个毒丸,就终止自己的运行
总而言之,
消费者-生产者是一个适用非常广泛的执行策略,在分布式的场景中,用于削峰还是批量执行都行,而且在常见的分布式场景下,可以使用MQ来实现消费者-生产者模式