golang routine using

首先介绍了golang中channel的基本语法以及基本的操作,之后从runtime源码的实现角度从更深的层次上对channel进行了理解,最后整理了在实际项目中所遇到的使用channel的几个实际的场景。

channel的基本使用语法

channel是go语言中的特殊的机制,既可以让这两个函数通过相互传递特定类型的值来进行通信,又可以同步两个并发执行的函数。事实上这也是channel的两个主要功能。

按照通道在初始化时是否有缓冲值,又可以分为缓冲通道与非缓冲通道。通道初始化的时候需要使用make进行,比如make(chan int,10)声明一个缓冲空间为10个int的通道,直接make(chan int)就是声明一个非缓冲通道。(注意,在golang中内建函数make仅适合 slice map channel具体可以参考官方文档(https://golang.org/pkg/builtin/#new))

channel的其他基本操作:
直接采用内建函数close(strChan)就可以关闭通道,应该保证在安全的情况下进行关闭通道的操作;内建函数 len(strChan)可以查看通道中当前有的元素的数量;cap(strChan)可以查看通道的总的容量,总容量一旦初始化之后就不会再发生改变了;使用<- chanInstance 可以将channel中的元素写出来返回一个指向该元素的引用,类似地使用 chanInstance <- elem可以将对应类型的元素写入到channel中;注意element , ok := <-chann 的这种语法, 如果通道被关闭则ok的值会变为false,element的值会变为该通道类型的零值;从通道中迭代取出元素的操作还可以使用 for item:=range channelinstance {}来进行操作,当通道已经被关闭或者没有值可以再接收的话,for循环会立即被结束。

注意几点

  • 无论怎样都不应该在接收端关闭通道,因为无法判断发送端是否还有数据要发送,通道有一个很好的特性,就是发送端关闭通道后,接收端仍然可以正常接受已经存在通道中的数据。
  • 谁启的通道谁最后负责关,如果通道不关闭的话,等待队列(稍候具体介绍)中的gouroutine都没有被释放,可能会导致程序的deadlock。

关于select操作
还需要再强调下select操作的特性,因为channel常常和select操作结合在一起使用:

select {
case <-t1:
...
break
case <-t2:
...
default:
...
}

在go语言中执行select语句的时候,会自动地自上而下地对各个case中的表达式进行判断求值,而且直到所有的求值操作都完成之后才会考虑选其中的某个case去执行。当发现第一个满足选择条件的case的时候,这个case中的语句就会被执行,其他的语句就会被忽略,当有多个case都满足情况的话,系统会根据一个伪随机算法决定哪个case会被执行。default是一个特殊的case,如果没有合适的case的话default中的语句就会被执行。如果select语句中没有加上default语句,那么如果此时没有case符合条件的话,当前goroutine就会一直阻塞在当前的这一条select语句上。因此default条件:对于select而言是必不可少的。

通常select还会和for语句结合在一起来使用,因为单独的select操作只会被选择一次,比如在在某些情况下希望持续不断地使用select从通道中读出信息,此时就需要在for循环中潜逃一层select的操作,具体可以参考使用场景(2)中介绍的两种方法。

channel的底层实现

也算是看了一点golang runtime的源码,由于每个版本的实现都有所不同,罗列具体的源码反而没有太大的意义(况且自己也没有能力全都看明白),这里主要是阐述关键的数据结构和实现思路。这里使用的代码版本是go1.6.2,这一篇的内容讲的比较好,是介绍1.2版本中的channel相关实现。

channel的几个最主要的操作就是通过make创建,之后通过chann<-将内容写入通道缓存以及通过<-chann从通道中写出内容,如果写出的时候通道中已经没有缓存的数据,代码就会阻塞再当前的操作上,所谓阻塞,在代码的层面就相当于执行了类似lock的操作并且陷入了某个循环中,或者是说当前的线程被放入到的等待队列,状态由running变成了waiting不再接受cpu的调度。

channel中的主要结构是一个循环的双向链表,在创建channel的时候,会根据channel中将要存储的元素的类型以及所提交的channel的缓存空间的大小来动态分配对应大小的内存空间。具体的实现代码可以参考runtime/chan.go中的makechan函数。

此外在channel的结构体中还有两个等待队列,即是recvq以及sendq,这两个队列中存储的是一个名为sudog的结构,这个结构是goroutine结构体的封装。recvq队列中存储的sudog表示等待通过该channel接收数据的goroutine实例,已经准备好接收数据但是发送方还没有准备好的gouroutine,存储的是所谓的正在等待的reader,sendq中存储的goroutine是已经发送了数据但是接收方还没有准备好的goroutine,存储的是所谓的正在等待的writer。当channel执行close操作的时候,recvq以及sendq队列中的元素都会被释放掉,状态又waiting变为ready。具体的代码可以参考runtime/chan.go中的closechan函数。

下面通过channel的行为来进一步说明通过channel收发信息时候的同步和异步的操作:比如申请了一个非缓冲通道,也就是说在申请的时候通道中缓冲区的大小为0。此时如果没有将对应的内容写入到channel直接希望从channel中写出内容,会发生什么?根据之前的分析,由于此通道是非缓冲的,send函数在执行writer操作的的时候发现还没有进行reader的读入操作,于是会将自己所在的goroutine的状态变成waiting放入到sendq中,之后等到指定的内容通过同一个channel的recieve操作读入,这个时候会从sendq的队列中取出刚才等待的goroutine然后把内容拷贝到其中,这样就完成了一次接受和发送的操作。反过来也是类似的,现向channel中发送信息,但是用于从channel中接收信息的goroutine没有准备好,同样是先把goroutine的状态变成waiting,直到接收的goroutine准备好之后,完成首发操作。虽然描述起来比较容易理解,但是代码实现上还是很复杂的,具体设计比较多的golang的协程相关的模型,这里不再详细讨论了,具体可以参考runtime/chan.go中的chanrecv以及chansend函数。还需要补充一点。比如对于下面的代码:

func main() {
done := make(chan string, 1)
fmt.Println("before test")
//go func() {
// time.Sleep(time.Second * 10)
//}()
ele, ok := <-done
fmt.Println(ele, ok)
fmt.Println("test")
}

没有写入直接写出,在执行的时候会发生deadlock(golang中检测deadlock的操作的running状态的内核线程数目变为0)。因为代码运行到ele, ok := <-done 这一步,根据之前的分析,当前goroutine已经变成waiting了。接收方,发送方,channel,三个要素齐全之后,一次通信才能完成。将以上的代码改成如下形式:

func main() {
done := make(chan string)
fmt.Println("before test")
go func() {
time.Sleep(time.Second * 2)
done <- "test channel info"
}()
fmt.Println("sender preparation...")
ele, ok := <-done
fmt.Println(ele, ok)
fmt.Println("test")
}
/*
before test
sender preparation...
test channel info true
test
*/

可以看到在sender preparation…执行完成之后,会等待2s,这2s的时候,main函数所在的goroutine实质由于执行到了ele, ok := <-done语句而变成了waiting状态,此时正在运行的goroutine是go func()中新启动的这个goroutine,main函数所在的goroutiine已经在waiting。(这里有个疑问,根据注释time.Sleep执行的是pause goroutine,这个操作不会使得goroutine进入waiting状态吗?看起来此时当前的goroutine就是卡在当前这条语句的位置上,当前goroutine一直运行着并且一直占据着cpu)。2s的时间过后,channel中收到了sender发送来的信息,这个时候将receiver唤起,状态变成running,之后将sender中的信息返回出来,reciever被唤起之后执行完成chanrecv函数中剩余的代码,返回结果。

以上的工作过程就是所谓的“channel的同步通信”的操作。进一步,如果channel中有一定的缓存空间,就可以使用“异步通信”的操作方式了,writer将数据write到缓存中,reader从缓存中读取数据,通信的行为不用等到两者同时就位,此时算是异步的的方式,比如向channel中发送了数据就可以得到了返回结果,但是此时接收者并没有接收到消息。

从操作系统线程通信的角度来讲,channel的这种通信方式本质上是通过共享内存的方式实现的,介绍channel的时候各种资料上都会罗列这么一句话:

“Don’t communicate by sharing memory, share memory by communicating”

除此之外还有CSP模型(Communication Sequential Processes),大致上就是说通信和内存共享的关系,在golang中共享内存是目的,通信是手段,大概是这么个意思,另外一种相对的模型是Actor模式,都是并发模型的实现模模型,自己也没有理解的特别深,这里列一些参考资源:用甘特图描述很清晰(http://arild.github.io/csp-presentation/#11)并发程序需要己解决的问题,其实最后说白了就是临界区的问题,以及不同并发线程之间通信的问题?

channel的几种使用场景

说到底最后还是要落实到代码上,以上的分析应该已经对channel有了相对透彻的了解,这部分通过代码来讨论一些channel的使用场景。

主进程等待goroutine运行结束

package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan int, 1)
sign := make(chan byte, 2)
go func() {
for i := 0; i < 5; i++ {
ch <- i
time.Sleep(1 * time.Second)
}
close(ch)
fmt.Println("The channel is closed.")
sign <- 0
}()
go func() {
//这个循环会一直尝试从ch中读取信息出来 即使ch已经被发送端关闭
//但还是可以读信息出来 最后当ok 为false的时候 说明已经没有数据从ch中读出
//跳出循环 注意这种判断方式
for {
fmt.Printf("before extract channel len: %v ,", len(ch))
e, ok := <-ch
fmt.Printf("channel value: %d if extract ok :(%v) after extraction channel len : %v channel cap : %v \n", e, ok, len(ch), cap(ch))
if !ok {
break
}
time.Sleep(2 * time.Second)
}
fmt.Println("Done.")
sign <- 1
}()
//要是不添加两次取值的操作的话 主进程就会马上结束 这里相当于是实现了一个
//同步的操作 等待两个go func都结束之后 再结束主进程 注意这种技巧
<-sign
<-sign
}
/*output:
before extract channel len: 1 ,channel value: 0 if extract ok :(true) after extraction channel len : 0 channel cap : 1
before extract channel len: 1 ,channel value: 1 if extract ok :(true) after extraction channel len : 0 channel cap : 1
before extract channel len: 1 ,channel value: 2 if extract ok :(true) after extraction channel len : 0 channel cap : 1
before extract channel len: 1 ,channel value: 3 if extract ok :(true) after extraction channel len : 0 channel cap : 1
The channel is closed.
before extract channel len: 1 ,channel value: 4 if extract ok :(true) after extraction channel len : 0 channel cap : 1
before extract channel len: 0 ,channel value: 0 if extract ok :(false) after extraction channel len : 0 channel cap : 1
Done.
*/

可以看到在以上的程序中,要是没有最后的两个<-sign操作,在另外的两个goroutine还没有运行结束的时候,main函数就运行结束了,程序也就退出了。

注意,其实实现类似的操作在golang中比较好的一种方式是使用 sync.Waitgroup的相关操作:Wait blocks until the WaitGroup counter is zero,有点类似一个计数器一样,在创建goroutine的时候使用add操作添加记录的数目,goroutine执行完成之后就执行-1的操作,最后通通wait操作,等待所有的goroutine都执行完之后再向下执行主函数的操作。

goroutine调度以及携带参数

这个和channel的使用关联不太大,主要是goroutine调度时候上下文的一些问题,现记录在这里。

package main
import (
"fmt"
"runtime"
)
func main() {
names := []string{"E", "H", "R", "J", "M"}
for _, name := range names {
go func() {
fmt.Printf("Hello , %s \n", name)
}()
}
//要是不添加runtime的话 就不会有内容输出
//因为for循环执行速度太快了 直接循环结束跳出了最后的循环
//之后 for循环中生成的5个go func 会被分别进行调度
runtime.Gosched()
}
/* output
Hello , M
Hello , M
Hello , M
Hello , M
Hello , M
*/
根据代码可以看出,具体循环的时候for循环中的go func 的调度并不是按照想象的那样,一次循环一个go func ,不要对go func的执行时机做任何假设。

优化方案

一种思路是把runtime.Gosched()函数放在每次for循环结束的时候,这样每次for循环之后,都会被重新调度一次,可能会出现正确的结果,并不是每次都准确,比如go func的程序需要运行一段时间,在这段运行的时间之内,可能就已经循环了几个元素过去了。

package main
import (
"fmt"
"runtime"
"time"
)
func main() {
names := []string{"E", "H", "R", "J", "M", "N", "O", "P"}
for _, name := range names {
go func() {
time.Sleep(1000 * time.Nanosecond)
fmt.Printf("Hello , %s \n", name)
}()
runtime.Gosched()
}
}
/* output:
Hello , E
Hello , J
Hello , J
Hello , P
Hello , P
Hello , P
*/

还有一种思路是采用传递参数的方式,就是给goroutine带上了参数,虽然goroutine已经脱离了main函数的控制,但是它已经带上了main函数给的烙印,相当于是某种解耦的感觉,for循环多次就不会影响打印的结果了,此时go func(who string){...}(name) 这里的who string这部分内容相当于是形参,后面的括号中(name)相当于传递的是实参,比如下面代码:

package main
import (
"fmt"
"runtime"
"time"
)
func main() {
names := []string{"E", "H", "R", "J", "M", "N", "O", "P"}
for _, name := range names {
go func(who string) {
time.Sleep(1000 * time.Nanosecond)
fmt.Printf("Hello , %s \n", who)
}(name)
}
runtime.Gosched()
}
/* output:
Hello , E
Hello , H
Hello , R
Hello , J
Hello , M
*/

但是这个方法仍然很有问题,只能保证在函数执行时间很短的时候结果正常,而且不输出重复的内容,如果程序执行时间比较长的话,很有可能main函数会被提前结束,按顺序生成的多个goroutine在cpu那里会不会仍然按照顺序被调度执行?这个仍然不确定?有几个goroutine会不能被正常调度到并且执行,比如像上面的代码的输出样子,而且每次输出的结果也都是不确定的。

使用channel修改

再一细想,使用channel也是能实现以上意图的,毕竟channel的主要功能就是不同goroutine之间进行通信,记住这一点就ok了,比如可以继续改成如下代码:

func main() {
names := []string{"E", "H", "R", "J", "M", "N", "O", "P"}
chastr := make(chan string, 1)
done := make(chan struct{})
go func() {
for {
item, ok := <-chastr
fmt.Println(ok, item)
if !ok {
break
}
}
done <- struct{}{}
}()
for _, name := range names {
chastr <- name
}
close(chastr)
<-done
}

定时等待

编码的时候遇到这样一个场景,服务创建成功之后,需要等待ip被分配,ip被分配好之后,服务才正式部署成功,最后将所有的信息返回给前台,于是打算这样实现,在服务创建成功之后就不断循环,查询ip如果分配成功了就返回,如果超过了时间也返回失败。

第一个例子中退出的方式采用的是标记的思路形式,每次循环结束的时候会检查一下标记看看是否退出,第二个采用的是特殊的语法,直接跳出最外层的循环,注意这种时间控制的实现,还是弄成一个defalt一个case比较好,由于case的调度可能有随机性,因此正常执行的内容放在default的部分,时间控制的那个channel放在某一个case当中。

package main
import (
"fmt"
"time"
)
func main() {
sign := make(chan int)
chtemp := make(chan int, 5)
go func() {
for i := 0; i < 5; i++ {
time.Sleep(time.Millisecond * 300)
chtemp <- i
}
close(chtemp)
}()
var e int
ok := true
//new 一个新的channel返回 注意这里要提前声明好
t := time.After(time.Second)
go func() {
for {
select {
case <-t:
fmt.Println("time out")
ok = false
break
//注意这里是使用 = 而不是 :=
default:
e, ok = <-chtemp
fmt.Printf("value : %v \n", e)
if !ok {
break
}
}
if !ok {
sign <- 1
break
}
}
}()
<-sign
}


//一个时间控制的channel
//注意这个要在循环之外单独声明 否则每次都会分配一个新的 time.After的channel返回过来
t := time.After(time.Second * 10)
//注意这种跳出多层循环的操作方式 要是单层使用break的话 仅仅跳出的是 select 那一层的循环
A:
for {
select {
//如果时间到了 就返回错误信息
case <-t:
log.Println("time out to allocate ip")
//delete the se which deploy failed
a.Ctx.ResponseWriter.Header().Set("Content-Type", "application/json")
http.Error(a.Ctx.ResponseWriter, `{"errorMessage":"`+"deploy error : time out"+`"}`, 406)
break A
//如果时间没到 就是 t 还没有发回信息 select语句就默认跳转到default块中
//执行查找ip是否分配的操作
default:
//log.Println("logout:", <-timeout)
sename := service.ObjectMeta.Labels["name"]
podslist, err := a.Podip(sename)
if err != nil {
log.Println(err.Error())
a.Ctx.ResponseWriter.Header().Set("Content-Type", "application/json")
http.Error(a.Ctx.ResponseWriter, `{"errorMessage":"`+err.Error()+`"}`, 406)
break A
}
if len(podslist) == 0 {
continue
} else {
log.Println("allocation ok ......")
a.Data["json"] = detail
a.ServeJson()
break A
}
}
}

从资源池中获取信息

常常有这样一种场景,把某些信息从旧的资源池中取出来,经过一些加工处理,再放入新的资源池中,这个过程如果按传统的方式就是采用完全串行的方式效率会很低,粒度太粗了,具体的粒度可以细化以每次所取的单位资源为粒度。
比如以书上p339为例,有一个资源池存储这person的信息,将每个person从中取出来,之后进行一些处理,再存到新的资源池中,这里用oldarray以及newarray来模拟旧的和新的资源池:

具体的代码如下:

package main
//参考go 并发编程实战 p337
import (
"log"
"strconv"
"time"
)
type Person struct {
name string
age int
addr string
}
var oldpersonarray = [5]Person{}
var newpersonarray = [5]Person{}
type PersonHandler interface {
Batch(origs <-chan Person) <-chan Person
Handle(orig *Person)
}
//struct 实现了personhandler 接口
type PersonHandlerImpl struct{}
//从origs接收信息 处理之后再返回给新的channel
func (handler PersonHandlerImpl) Batch(origs <-chan Person) <-chan Person {
dests := make(chan Person, 100)
go func() {
for {
p, ok := <-origs
if !ok {
close(dests)
break
}
handler.Handle(&p)
log.Printf("old value : %v\n", p)
//time.Sleep(time.Second)
dests <- p
}
}()
return dests
}
//这里要使用引用传递
func (handler PersonHandlerImpl) Handle(orig *Person) {
orig.addr = "new address"
}
func getPersonHandler() PersonHandler {
return &PersonHandlerImpl{}
}
//print the oldpersonarray into the chan<-Person
func fetchPerson(origs chan<- Person) {
for _, v := range oldpersonarray {
//fmt.Printf("get the value : %v %v \n", k, v)
time.Sleep(time.Second)
origs <- v
}
close(origs)
}
//fetch the value from the channel and store it into the newpersonarray
func savePerson(dest <-chan Person) <-chan int {
intChann := make(chan int)
go func() {
index := 0
for {
p, ok := <-dest
if !ok {
break
}
//time.Sleep(time.Second)
log.Printf("new value transfer %v \n", p)
newpersonarray[index] = p
index++
}
intChann <- 1
}()
return intChann
}
func init() {
//使用range的话是值传递 这里要给oldpersonarray赋值进来
tmplen := len(oldpersonarray)
for i := 0; i < tmplen; i++ {
oldpersonarray[i].addr = "old address"
oldpersonarray[i].age = i
oldpersonarray[i].name = strconv.Itoa(i)
}
log.Printf("first print init value : %v\n", oldpersonarray)
}
func main() {
handeler := getPersonHandler()
origs := make(chan Person, 100)
dests := handeler.Batch(origs)
//go func() { fetchPerson(origs) }()
// 不加go func的话 要等这句执行完 才能执行下一句
// 则orgis信息都输出 完全关闭掉 这个时候 从dest接收信息的语句才开始执行
// 所以不会动态输出 这句加上go func的话 就会没隔 1s 动态输出
// 如果将fetchPerson 再往前面放一句 则old value也不会动态输出
fetchPerson(origs)
sign := savePerson(dests)
<-sign
log.Printf("last print new value : %v \n", newpersonarray)
}

整体的结构图如下(todo):

代码结构图 update
代码结构图

代码基本分析:

  • 首先声明一个 PersonHandler 的接口,之后声明一个struct PersonHandlerImpl 将接口中的两个方法都实现了,init函数用于进行oldarray的初始化工作。注意为了减少出错,内部的函数在方声明的时候都是单向的channel。
  • 1,2 fetchperson从oldarray中区数据,并把数据存到origs channel中,注意最后取完数据到通道之后,要由发送方将channel关闭,否则可能造成deadlock。注意在main函数中,如果fech操作没有放到一个goroutine中来执行,就仍然是串行的,相当于是把数据都放入到channel中,另一端才开始取,没发挥出并发的优势。
  • 3,4 Batch函数将person信息从origs中取出来,进行处理后,同时传到dests中,最后将dests返回,注意这里不是全部传入之后才将dests返回,而是新启动一个goroutine执行传入操作,同时将dests返回,注意要主动关闭channel。
  • 5 savePerson操作接收一个<-chann 之后从中接受person信息,将值写入到新的资源池中,最后全部写入结束之后,传一个sign channel给主进程,结束。
  • 总结,在需要动态输出信息的时候,goroutine往往是和channel结合在一起使用。最常见的用法是,一个goroutine负责向channel中写入数据,之后将channel返回,由其他进程取出信息。比如之前写过的一些websocket从前台接受信息,后台处理信息之后再动态返回给前台打出结果的模型,就和这个差不多,总之具体的异步执行流程要理清楚,都有哪些channel,负责传递的信息分别是什么。

参考资源

http://shanks.leanote.com/post/%E6%B7%B1%E5%BA%A6%E5%89%96%E6%9E%90channel
http://skoo.me/go/2013/09/20/go-runtime-channel
http://skoo.me/go/2014/07/09/channel-timeout
http://www.cnblogs.com/hustcat/p/4003729.html
http://www.jianshu.com/p/36e246c6153d
https://blog.altoros.com/golang-part-1-main-concepts-and-project-structure.html
《GO语言学习笔记》

推荐文章