我们将讲述几个可以再实际项目中使用的包,分别展示了不同的并发模式,以及内部如何使用并发和通道,学习一下如何利用这个包简化并发程序的编写,虽然都是自我封装的包,但是不容小觑
runner包,是利用通道来监视程序的执行时间,如果时间太长,同样可以使用runner来终止程序
而且可以作为cron作业执行
首先是整体的一个代码示例
// Example is provided with help by Gabriel Aszalos.
// Package runner manages the running and lifetime of a process.
package runner
import (
“errors”
“os”
“os/signal”
“time”
)
// 指定时间内完成一组任务
// 如果发送了中断信号,就结束这种任务
type Runner struct {
// 等待从系统发送的信号,收发os.Signal接口类型的值
interrupt chan os.Signal
// 通道报告处理完成,用来被执行任务的goroutine来发送任务完成的信息
//如果任务出错,那么会返回一个error类型的值,如果没有出错,会返回一个nil作为error接口值
complete chan error
// 报告任务超时,如果收到了一个时间的值,那么就会清理状态并停止工作
timeout <-chan time.Time
// 切片保存一组索引顺序执行的函数
tasks []func(int)
}
// 在超时的时候返回
var ErrTimeout = errors.New(“received timeout”)
// 在收到操作系统的中断事件时返回
var ErrInterrupt = errors.New(“received interrupt”)
// 创建方法,返回一个新的Runner
func New(d time.Duration) *Runner {
return &Runner{
//初始化为容量为1的通道
interrupt: make(chan os.Signal, 1),
//无缓冲的通道,方便main函数安全的接收
complete: make(chan error),
//利用timer,在指定时间到达了之后,发送一个Time的值
timeout: time.After(d),
}
}
// 增加函数的方法
//接收一个int类型的函数,…代表多个,可变长参数
func (r *Runner) Add(tasks …func(int)) {
r.tasks = append(r.tasks, tasks…)
}
// 上帝函数,主流程,执行所有任务,监控通道
func (r *Runner) Start() error {
// 等待接收所有的信号
signal.Notify(r.interrupt, os.Interrupt)
// 用不同的goroutine执行不同的任务
go func() {
r.complete <- r.run()
}()
select {
// 任务完成时候处理信号
case err := <-r.complete:
return err
// 处理操作的信号
case <-r.timeout:
return ErrTimeout
}
}
//对应的run方法.执行每一个已经注册的任务
func (r *Runner) run() error {
for id, task := range r.tasks {
// 检测操作系统的中断信号
if r.gotInterrupt() {
return ErrInterrupt
}
// 迭代执行已经注册的任务
task(id)
}
return nil
}
// 检测是否收到了中断信号,和select语句的用法
func (r *Runner) gotInterrupt() bool {
select {
// 中断事件触发时候发出的信号
case <-r.interrupt:
// 停止接收
signal.Stop(r.interrupt)
return true
// 如果没有收到中断,走默认的信号,返回false
default:
return false
}
}
上面代码实现的函数主要是做到在分配的时间内完成工作,正常终止
程序没有完成的话,就自我中断
如果收到了中断的事件,那么就试图清理状态并且停止工作
代码的细节如下
首先是Runner的Struct
// 指定时间内完成一组任务
// 如果发送了中断信号,就结束这种任务
type Runner struct {
// 等待从系统发送的信号,收发os.Signal接口类型的值
interrupt chan os.Signal
// 通道报告处理完成,用来被执行任务的goroutine来发送任务完成的信息
//如果任务出错,那么会返回一个error类型的值,如果没有出错,会返回一个nil作为error接口值
complete chan error
// 报告任务超时,如果收到了一个时间的值,那么就会清理状态并停止工作
timeout <-chan time.Time
// 切片保存一组索引顺序执行的函数
tasks []func(int)
}
声明了3条通道,来管理不同的声明周期
interrupt接收os.Signal接口发来的终止
然后第二个字段被命名为complete,收发error接口类型的通道
最后是一个事件的值,我们利用time.Time来进行处理,内部自我调用了after函数,会在一定的时间后从隧道推出来一个值
然后是一个切片,保存了函数的切片
这是要执行的函数
然后是一个用于创建的函数
// 创建方法,返回一个新的Runner
func New(d time.Duration) *Runner {
return &Runner{
//初始化为容量为1的通道
interrupt: make(chan os.Signal, 1),
//无缓冲的通道,方便main函数安全的接收
complete: make(chan error),
//利用timer,在指定时间到达了之后,发送一个Time的值
timeout: time.After(d),
}
}
上面是一个New的工厂函数,接收一个time.Duration类型的值,然后返回一个Runner类型的指针,创建一个Runner类型的指针,初始化各个通道,task切片即使是nil也可以继续使用
然后是可变参数的展示
func (r *Runner) Add(tasks …func(int)) {
r.tasks = append(r.tasks,tasks..)
}
我们接受了一个tasks的可变参数,然后加到切片上
run函数是真正的执行函数
//对应的run方法.执行每一个已经注册的任务
func (r *Runner) run() error {
for id, task := range r.tasks {
// 检测操作系统的中断信号
if r.gotInterrupt() {
return ErrInterrupt
}
// 迭代执行已经注册的任务
task(id)
}
return nil
}
迭代这个tasks切片,顺序执行每个函数,函数在执行之前利用gorInterrupt来检查是否有从操作系统接受的事件
最后是Start函数
// 上帝函数,主流程,执行所有任务,监控通道
func (r *Runner) Start() error {
// 等待接收所有的信号
signal.Notify(r.interrupt, os.Interrupt)
// 用不同的goroutine执行不同的任务
go func() {
r.complete <- r.run()
}()
select {
// 任务完成时候处理信号
case err := <-r.complete:
return err
// 处理操作的信号
case <-r.timeout:
return ErrTimeout
}
}
我们设置了一个匿名函数,单独运行run
然后等待complete的返回
然后利用select来等待返回
如果是complete返回的,就直接返回err
如果是timeout返回的,就返回Errtimeout这个常量
接下来我们看下main函数
// This sample program demonstrates how to use a channel to
// monitor the amount of time the program is running and terminate
// the program if it runs too long.
package main
import (
“../../runner”
“log”
“os”
“time”
)
// timeout is the number of second the program has to finish.
const timeout = 10 * time.Second
// main is the entry point for the program.
func main() {
log.Println(“Starting work.”)
// Create a new timer value for this run.
r := runner.New(timeout)
// Add the tasks to be run.
r.Add(createTask(), createTask(), createTask())
// Run the tasks and handle the result.
if err := r.Start(); err != nil {
switch err {
case runner.ErrTimeout:
log.Println(“Terminating due to timeout.”)
os.Exit(1)
case runner.ErrInterrupt:
log.Println(“Terminating due to interrupt.”)
os.Exit(2)
default:
log.Println(“this is end”)
os.Exit(2)
}
}
log.Println(“Process ended.”)
}
// createTask returns an example task that sleeps for the specified
// number of seconds based on the id.
func createTask() func(int) {
return func(id int) {
log.Printf(“Processor – Task #%d.”, id)
time.Sleep(time.Duration(id) * time.Second)
}
}
我们声明了一个Runner,然后加入了几个函数
函数createTask函数中,我们只是休眠了一段时间
然后我们调用了Start.让主函数串行等待Start的返回
最后根据Start返回的变量来判断是否出现了错误,并进行退出