我们将讲述几个可以再实际项目中使用的包,分别展示了不同的并发模式,以及内部如何使用并发和通道,学习一下如何利用这个包简化并发程序的编写,虽然都是自我封装的包,但是不容小觑

runner包,是利用通道来监视程序的执行时间,如果时间太长,同样可以使用runner来终止程序

而且可以作为cron作业执行

首先是整体的一个代码示例

// Example is provided with help by Gabriel Aszalos.

// Package runner manages the running and lifetime of a process.

package runner

import (

“errors”

“os”

“os/signal”

“time”

)

// 指定时间内完成一组任务

// 如果发送了中断信号,就结束这种任务

type Runner struct {

// 等待从系统发送的信号,收发os.Signal接口类型的值

interrupt chan os.Signal

// 通道报告处理完成,用来被执行任务的goroutine来发送任务完成的信息

//如果任务出错,那么会返回一个error类型的值,如果没有出错,会返回一个nil作为error接口值

complete chan error

// 报告任务超时,如果收到了一个时间的值,那么就会清理状态并停止工作

timeout <-chan time.Time

// 切片保存一组索引顺序执行的函数

tasks []func(int)

}

// 在超时的时候返回

var ErrTimeout = errors.New(“received timeout”)

// 在收到操作系统的中断事件时返回

var ErrInterrupt = errors.New(“received interrupt”)

// 创建方法,返回一个新的Runner

func New(d time.Duration) *Runner {

return &Runner{

//初始化为容量为1的通道

interrupt: make(chan os.Signal, 1),

//无缓冲的通道,方便main函数安全的接收

complete: make(chan error),

//利用timer,在指定时间到达了之后,发送一个Time的值

timeout: time.After(d),

}

}

// 增加函数的方法

//接收一个int类型的函数,…代表多个,可变长参数

func (r *Runner) Add(tasks …func(int)) {

r.tasks = append(r.tasks, tasks…)

}

// 上帝函数,主流程,执行所有任务,监控通道

func (r *Runner) Start() error {

// 等待接收所有的信号

signal.Notify(r.interrupt, os.Interrupt)

// 用不同的goroutine执行不同的任务

go func() {

r.complete <- r.run()

}()

select {

// 任务完成时候处理信号

case err := <-r.complete:

return err

// 处理操作的信号

case <-r.timeout:

return ErrTimeout

}

}

//对应的run方法.执行每一个已经注册的任务

func (r *Runner) run() error {

for id, task := range r.tasks {

// 检测操作系统的中断信号

if r.gotInterrupt() {

return ErrInterrupt

}

// 迭代执行已经注册的任务

task(id)

}

return nil

}

// 检测是否收到了中断信号,和select语句的用法

func (r *Runner) gotInterrupt() bool {

select {

// 中断事件触发时候发出的信号

case <-r.interrupt:

// 停止接收

signal.Stop(r.interrupt)

return true

// 如果没有收到中断,走默认的信号,返回false

default:

return false

}

}

上面代码实现的函数主要是做到在分配的时间内完成工作,正常终止

程序没有完成的话,就自我中断

如果收到了中断的事件,那么就试图清理状态并且停止工作

代码的细节如下

首先是Runner的Struct

// 指定时间内完成一组任务

// 如果发送了中断信号,就结束这种任务

type Runner struct {

// 等待从系统发送的信号,收发os.Signal接口类型的值

interrupt chan os.Signal

// 通道报告处理完成,用来被执行任务的goroutine来发送任务完成的信息

//如果任务出错,那么会返回一个error类型的值,如果没有出错,会返回一个nil作为error接口值

complete chan error

// 报告任务超时,如果收到了一个时间的值,那么就会清理状态并停止工作

timeout <-chan time.Time

// 切片保存一组索引顺序执行的函数

tasks []func(int)

}

声明了3条通道,来管理不同的声明周期

interrupt接收os.Signal接口发来的终止

然后第二个字段被命名为complete,收发error接口类型的通道

最后是一个事件的值,我们利用time.Time来进行处理,内部自我调用了after函数,会在一定的时间后从隧道推出来一个值

然后是一个切片,保存了函数的切片

这是要执行的函数

然后是一个用于创建的函数

// 创建方法,返回一个新的Runner

func New(d time.Duration) *Runner {

return &Runner{

//初始化为容量为1的通道

interrupt: make(chan os.Signal, 1),

//无缓冲的通道,方便main函数安全的接收

complete: make(chan error),

//利用timer,在指定时间到达了之后,发送一个Time的值

timeout: time.After(d),

}

}

上面是一个New的工厂函数,接收一个time.Duration类型的值,然后返回一个Runner类型的指针,创建一个Runner类型的指针,初始化各个通道,task切片即使是nil也可以继续使用

然后是可变参数的展示

func (r *Runner) Add(tasks …func(int)) {

r.tasks = append(r.tasks,tasks..)

}

我们接受了一个tasks的可变参数,然后加到切片上

run函数是真正的执行函数

//对应的run方法.执行每一个已经注册的任务

func (r *Runner) run() error {

for id, task := range r.tasks {

// 检测操作系统的中断信号

if r.gotInterrupt() {

return ErrInterrupt

}

// 迭代执行已经注册的任务

task(id)

}

return nil

}

迭代这个tasks切片,顺序执行每个函数,函数在执行之前利用gorInterrupt来检查是否有从操作系统接受的事件

最后是Start函数

// 上帝函数,主流程,执行所有任务,监控通道

func (r *Runner) Start() error {

// 等待接收所有的信号

signal.Notify(r.interrupt, os.Interrupt)

// 用不同的goroutine执行不同的任务

go func() {

r.complete <- r.run()

}()

select {

// 任务完成时候处理信号

case err := <-r.complete:

return err

// 处理操作的信号

case <-r.timeout:

return ErrTimeout

}

}

我们设置了一个匿名函数,单独运行run

然后等待complete的返回

然后利用select来等待返回

如果是complete返回的,就直接返回err

如果是timeout返回的,就返回Errtimeout这个常量

接下来我们看下main函数

// This sample program demonstrates how to use a channel to

// monitor the amount of time the program is running and terminate

// the program if it runs too long.

package main

import (

“../../runner”

“log”

“os”

“time”

)

// timeout is the number of second the program has to finish.

const timeout = 10 * time.Second

// main is the entry point for the program.

func main() {

log.Println(“Starting work.”)

// Create a new timer value for this run.

r := runner.New(timeout)

// Add the tasks to be run.

r.Add(createTask(), createTask(), createTask())

// Run the tasks and handle the result.

if err := r.Start(); err != nil {

switch err {

case runner.ErrTimeout:

log.Println(“Terminating due to timeout.”)

os.Exit(1)

case runner.ErrInterrupt:

log.Println(“Terminating due to interrupt.”)

os.Exit(2)

default:

log.Println(“this is end”)

os.Exit(2)

}

}

log.Println(“Process ended.”)

}

// createTask returns an example task that sleeps for the specified

// number of seconds based on the id.

func createTask() func(int) {

return func(id int) {

log.Printf(“Processor – Task #%d.”, id)

time.Sleep(time.Duration(id) * time.Second)

}

}

我们声明了一个Runner,然后加入了几个函数

函数createTask函数中,我们只是休眠了一段时间

然后我们调用了Start.让主函数串行等待Start的返回

最后根据Start返回的变量来判断是否出现了错误,并进行退出

发表评论

邮箱地址不会被公开。 必填项已用*标注