goroutine 本质上是大号版的异步执行句柄,比之 nodejs 中的单线程事件循环处理器。之所以在使用 goroutine,感觉不到异步,在于 golang 已经封装了各种异步 io 操作,运行时一旦发现异步 io 状态发生改变,则适时进行 goroutine 切换。让你基本上感觉不到像基于事件编程所带来的直观上的任务执行乱序。
启动 VS 执行
goroutine 这种由运行时控制,构建于线程之上,又比线程粒度小的操作,操作系统一点也不 care。通常分配给线程内存会在 8M 左右,goroutine 则只有 2kb,一个 MB 一个 KB 完全在不同级别,单单性能上就提高很多。结论就是,在任务调度上,goroutine 是弱于线程的,但是在资源消耗上,goroutine 则是极低的。
异步编程为了追求程序的性能,强行的将线性的程序打乱,程序变得非常的混乱与复杂。对程序状态的管理也变得异常困难。接触网络编程的同学,很容易理解启动与执行完全是两个概念。
sum() // 同步执行普通函数,等待它执行完毕 go sum() // 用go关键字启动一个goroutine立即返回,它会异步执行
换而言之,普通函数是即调即用,用 go 关键字修饰的函数只是启动立即返回,在当前线程空闲的时候(main 函数是一个特殊的 goroutine,一旦它没空,意味着你的 gorotuine 哪怕启动了,也是作不出什么妖来。)作异步执行。更直白点,你看到的 goroutine 是它启动的样子,它的异步执行由运行时所决定,groutine 与 goroutine 执行顺序,还是不会忘本,回到异步执行流乱序的常态。
创建 goroutine
示例使用了 select 来阻塞主线程,若使用 sleep,则本身是不知道多长时间是恰到好处。
太长浪费资源,太短则有可能所请求的网站还没来得及响应,主线程就退出了,达不到请求目标网站的目的。
package main import ( "fmt" "io/ioutil" "log" "net/http" "time" ) func responseSize(url string) { fmt.Println("Step1: ", url) response, err := http.Get(url) if err != nil { log.Fatal(err) } fmt.Println("Step2: ", url) defer response.Body.Close() fmt.Println("Step3: ", url) body, err := ioutil.ReadAll(response.Body) if err != nil { log.Fatal(err) } fmt.Println("Step4: ", len(body)) } func main() { go responseSize("https://www.stackoverflow.com") go responseSize("https://www.shiyanlou.com") go responseSize("https://www.baidu.com") select {} }
执行 go run main.go。查看 4 个阶段响应,字节长度信息,以三个响应速度不一,响应资源不等网站为例。每次 print 都有 io 切换(即 goroutine 在运行时指挥下自动切换)。很明显 goroutine 的输出是无序的,与 goroutine 的启动顺序无关。
Step1: https://www.baidu.com Step1: https://www.stackoverflow.com Step1: https://www.shiyanlou.com Step2: https://www.baidu.com Step3: https://www.baidu.com Step4: 227 Step2: https://www.shiyanlou.com Step3: https://www.shiyanlou.com Step4: 196083 Step2: https://www.stackoverflow.com Step3: https://www.stackoverflow.com Step4: 116044
等待 goroutine 执行完毕
sync.WaitGroup 在此是同步阻塞主线程,等待一组 goroutine 执行完毕
Add 方法向 WaitGroup 实例设置默认计数,Done 减少一个,Wait 方法负责阻塞等待其它 goroutine 执行完毕。
... func responseSize(url string) { defer wg.Done() ... } var wg sync.WaitGroup func main() { wg.Add(3) ... wg.Wait() fmt.Println("terminate Program") }
从 goroutine 中取值
goroutine 间通常使用 channel,类似于 linux 中的 pipe 管道,来实现通信。
package main import ( "fmt" "io/ioutil" "log" "net/http" "sync" ) var wg sync.WaitGroup func responseSize(url string, nums chan<- int) { defer wg.Done() response, err := http.Get(url) if err != nil { log.Fatal(err) } defer response.Body.Close() body, err := ioutil.ReadAll(response.Body) if err != nil { log.Fatal(err) } nums <- len(body) // 将值写入通道 } func main() { nums := make(chan int) // 声明通道 wg.Add(3) go responseSize("https://www.stackoverflow.com", nums) fmt.Println(<-nums) // 发送数据 wg.Wait() close(nums) // 关闭通道
控制 goroutine 执行
使用通道可以控制 goroutine 的执行与暂停,通道方便 goroutine 间通信
package main import ( "fmt" "sync" "time" ) var i int func work() { time.Sleep(250 * time.Millisecond) i++ fmt.Println(i) } func routine(command <-chan string, wg *sync.WaitGroup) { defer wg.Done() var status = "Play" for { select { case cmd := <-command: fmt.Println(cmd) switch cmd { case "Stop": return case "Pause": status = "Pause" default: status = "Play" } default: if status == "Play" { work() } } } } func main() { var wg sync.WaitGroup wg.Add(1) command := make(chan string) go routine(command, &wg) time.Sleep(1 * time.Second) command <- "Pause" time.Sleep(1 * time.Second) command <- "Play" time.Sleep(1 * time.Second) command <- "Stop" wg.Wait() }
运行结果
1 2 3 4 Pause Play 5 6 7 8 9 Stop
原子函数修复条件竞争
争用条件是由于对共享资源的不同步访问而引起。原子函数提供了用于同步访问整数和指针的底层锁定机制。同步包下的 atomic 中的函数通过锁定对共享资源的访问来提供支持同步 goroutine 的支持。
package main import ( "fmt" "runtime" "sync" "sync/atomic" ) var ( counter int32 wg sync.WaitGroup ) func main() { wg.Add(3) go increment("python") go increment("java") go increment("Golang") wg.Wait() fmt.Println("Counter:", counter) } func increment(name string) { defer wg.Done() for range name { atomic.AddInt32(&counter, 1) runtime.Gosched() } }
使用原子函数对值时,它会强制一次有且仅一个 goroutine 能够完成值修改操作。当别 goroutine 试图调用任何原子函数时,会自动同步所对应引用的变量。
mutex 定义临界区
mutex 通常被用来定义临界区,确保该区域的代码一次只能被一个 goroutine 访问执行。
package main import ( "fmt" "sync" ) var ( counter int32 wg sync.WaitGroup mutex sync.Mutex ) func main() { wg.Add(3) go increment("Python") go increment("Go Programming Language") go increment("Golang") wg.Wait() fmt.Println("Counter:", counter) } func increment(lang string) { defer wg.Done() for i := 0; i < 3; i++ { mutex.Lock() { fmt.Println(lang) counter++ } mutex.Unlock() } }
去掉锁,执行下面语句,分析数据竞争
go run -race main.go
© 著作权归作者所有
发表评论