一种常见的设计模式,生产者-消费者模式,应用非常的广泛,例如在Java中,就是利用生产者-消费者模式去实现的,使用线程池的时候,就是使用了生产者消费者模式

在并发编程中,不只是线程池,很多地方都用到了生产者-消费者模式,例如 Log4j中也有着相关的模式

关于生产者-消费者模式:其核心就是一个任务队列,生产者生产任务放入队列,消费者从队列中消费并拿取任务

图片

其优点非常的明显,就是解耦性非常高,在整个设计模式中,生产者和消费者没有直接的依赖关系,其之间的通信依靠的任务队列,所以具有非常好的解耦性

其还支持异步消费,能够良好的将读和写的速度差异进行平衡,不适用于普通方法的同步阻塞

为什么能够良好的将速度差异进行平衡呢,假设生产者的速度很慢,消费者的速度很高,1:3的比例,那么可以做到三个生产者对应一个消费者的线程,能够良好的在Java这个重量级语言中,避免过多的创建和销毁线程,只是支持使用适量的线程

甚至可以用于支持批处理,去处理批处理的任务,其应用场景非常的简单,就是一个任务不断的将执行结果放入这个阻塞队列,在消费端将结果批量的接受并执行,接下来是按照这种执行方式的示例代码

// 任务队列

BlockingQueue<Task> bq=new LinkedBlockingQueue<>(2000);

// 启动 5 个消费者线程

// 执行批量任务

void start() {

ExecutorService es=executors.newFixedThreadPool(5);

for (int i=0; i<5; i++) {

es.execute(()->{

try {

while (true) {

// 获取批量任务

List<Task> ts=pollTasks();

// 执行批量任务

execTasks(ts);

}

} catch (Exception e) {

e.printStackTrace();

}

});

}

}

// 从任务队列中获取批量任务

List<Task> pollTasks()

throws InterruptedException{

List<Task> ts=new LinkedList<>();

// 阻塞式获取一条任务

Task t = bq.take();

while (t != null) {

ts.add(t);

// 非阻塞式获取一条任务

t = bq.poll();

}

return ts;

}

// 批量执行任务

void execTasks(List<Task> ts) {

// 省略具体代码无数

}

在上述代码中,我们先进行了阻塞式的获取数据,这是为了避免上来就进行无限制的循环,减少不必要的内存占用

对应到拿一个现实例子去应用生产者-消费者模式,就是log4j中的刷盘功能,在log4j中,需要将日志信息写入文件中

刷盘的时机为

ERROR级别的日志立即刷盘

累计到500条刷盘

未刷盘时间到达5秒,进行刷盘

这就是一种生产者-消费者模式,将LogMsg放在阻塞队列中,然后消费者从阻塞队列中取出后按照条件写入文件

class Logger {

//任务队列

final BlockingQueue<LogMsg> bq = new BlockingQueue<>();

//flush批量

static final int batchSize = 500;

//只需要一个线程写日志

ExecutorService es = Executors.newFixedThreadPool(1);

//启动写日志线程

void start(){

File file= File.createTempFile(“foo”, “.log”);

final FileWriter writer= new FileWriter(file);

this.es.execute(()->{

try {

//未刷盘日志数量

int curIdx = 0;

long preFT=System.currentTimeMillis();

while (true) {

LogMsg log = bq.poll(5, TimeUnit.SECONDS);

//写日志

if (log != null) {

writer.write(log.toString());

++curIdx;

}

//如果不存在未刷盘数据,则无需刷盘

if (curIdx <= 0) {

continue;

}

//根据规则刷盘

if (log!=null && log.level==LEVEL.ERROR ||

curIdx == batchSize ||

System.currentTimeMillis()-preFT>5000){

writer.flush();

curIdx = 0;

preFT=System.currentTimeMillis();

}

}

}catch(Exception e){

e.printStackTrace();

} finally {

try {

writer.flush();

writer.close();

}catch(IOException e){

e.printStackTrace();

}

}

});

}

//写INFO级别日志

void info(String msg) {

bq.put(new LogMsg(

LEVEL.INFO, msg));

}

//写ERROR级别日志

void error(String msg) {

bq.put(new LogMsg(

LEVEL.ERROR, msg));

}

}

//日志级别

enum LEVEL {

INFO, ERROR

}

class LogMsg {

LEVEL level;

String msg;

//省略构造函数实现

LogMsg(LEVEL lvl, String msg){}

//省略toString()实现

String toString(){}

}

在通常这种日志线程中,可以通过一个毒丸将其写入到消费者,在消费者拿到这个毒丸,就终止自己的运行

总而言之,

消费者-生产者是一个适用非常广泛的执行策略,在分布式的场景中,用于削峰还是批量执行都行,而且在常见的分布式场景下,可以使用MQ来实现消费者-生产者模式

发表评论

邮箱地址不会被公开。 必填项已用*标注