Golang Pipe
- pipe是反向的copy,copy要求提供一对rw,而pipe提供一对rw,两者都是为流服务的。
- 简而言之就是,pipe的reader读不到就会阻塞,而普通buffer读不到就认为EOF或者报错。业务里这样的需求太少了哈哈
- 假如一个函数要求提供一个writer,他会不定期写一些什么进去,另一个函数要求提供一个reader,会不定期读些什么出来。假如是一般的buffer,读出eof就退出了,而不能等到writer写新的东西进来。
本文摘录了许式伟 《Go,基于连接与组合的语言》部分内容,为了便于理解,我在其后端写了个完整的示例程序帮助理解,这篇文章 一是展示go在并行编程中的伟大,也是理解和学习闭包的活的教科书
让我们从Unix谈起。Go语言与Unix、C语言有着极深的渊源。Go语言的领袖们参与甚至主导了Unix和C语言的设计。Ken Thompson 甚至算得上Unix和C语言的鼻祖。Go语言亦深受Unix和C语言的设计哲学影响。
在Unix世界里,组件就是应用程序(app),每个app可大体抽象为:
输入:stdin(标准输入), params(命令行参数)
输出:stdout(标准输出)
协议:text (data stream)
不同的应用程序(app)如何连接?答案是:管道(pipeline)。在Unix世界中大家对这样的东西已经很熟悉了:
app1 params1 | app2 params2
通过管道(pipeline),可以将一个应用程序的输出(stdout)转换为另一个应用程序的输入(stdin)。更为神奇的一点,是这些应用程序是并行执行的。app1每产生一段输出,立即会被app2所处理。所以管道(pipeline)称得上是最古老,同时也是极其优秀的并行设施,简单而强大。
需要注意的是,Unix世界中不同应用程序直接是松散耦合的。上游app的输出是xml还是json,下游app需要知晓,但并无任何强制的约束。同一输出,不同的下游app,对协议的理解甚至都可能并不相同。例如,上游app输出一段xml文本,对于某个下游app来说,是一颗dom树,但对linecount程序来说只是一个多行的文本,对于英文单词词频统计程序来说,是一篇英文文章。
为了方便理解,我们先尝试在Go语言中模拟整个Unix的管道(pipeline)机制。首先是应用程序(app),我们抽象为:
func(in io.Reader, out io.Writer, args []string)
我们按下图来对应Unix与Go代码的关系:
也就是说,Unix 中的
app1 params1 | app2 params2
对应Go语言中是:
pipe( bind(app1, params1), bind(app2, params2) )
其中,bind 函数实现如下:
func bind(
app func(in io.Reader, out io.Writer, args []string),
args []string
) func(in io.Reader, out io.Writer) {
return func(in io.Reader, out io.Writer) {
app(in, out, args)
}
}
要理解bind函数,需要先理解“闭包”。Go语言中,应用程序以一个闭包的形式体现。如果你熟悉函数式编程,不难发现,这个bind函数其实就是所谓的柯里化(currying)。
pipe函数如下:
func pipe(
app1 func(in io.Reader, out io.Writer),
app2 func(in io.Reader, out io.Writer)
) func(in io.Reader, out io.Writer) {
return func(in io.Reader, out io.Writer) {
pr, pw := io.Pipe()
defer pw.Close()
go func() {
defer pr.Close()
app2(pr, out)
}()
app1(in, pw)
}
}
要理解pipe函数,除了“闭包”外,需要知晓defer关键字和goroutine(go关键字)。defer语句会在函数退出时执行(无论是否发生了异常),通常用于资源的清理操作(比如关闭文件句柄等)。有了defer语句,Go语言中的错误处理代码显得非常优雅。在一个正常的函数调用前加上go关键字,就会使得该函数在新的goroutine中并行执行。理解了这些背景,这个pipe函数不难理解,无非是:先创建一个管道,让app1读入数据(in),并向管道的写入端(pw)输出,启动一个新goroutine,让app2从管道的读入端读取数据,并将处理结果输出(out)。这样得到的app就是app1和app2的组合了。
你甚至可以对多个app进行组合:
func pipe(apps ...func(in io.Reader, out io.Writer)) func(in io.Reader, out io.Writer) {
if len(apps) == 0 { return nil }
app := apps[0]
for i := 1; i < len(apps); i++ {
app1, app2 := app, apps[i]
app = func(in io.Reader, out io.Writer) {
pr, pw := io.Pipe()
defer pw.Close()
go func() {
defer pr.Close()
app2(pr, out)
}()
app1(in, pw)
}
}
return app
}
我们举个比较实际的例子,假设我们有2个应用程序tar(打包)、gzip(压缩):
func tar(io.Reader, out io.Writer, files []string)
func gzip(in io.Reader, out io.Writer)
那么打包并压缩的代码是:
pipe( bind(tar, files), gzip )(nil, out)
通过对管道(pipeline)的模拟我们可以看出,Go语言对并行支持是非常强大的,这主要得益于Go的轻量级进程(goroutine)。
实例程序,帮助理解管道:
package main
import (
"io"
"os"
"bufio"
"bytes"
"fmt"
"strconv"
)
//bind函数主要是用来为pipe函数整合用的,通过将闭包将函数签名变成pipe所需的样子
//返回一个函数闭包,将一个函数字面量app和字符串slice 传入其中
func bind(app func(in io.Reader, out io.Writer, args []string), args []string) func(in io.Reader, out io.Writer) {
return func(in io.Reader, out io.Writer) {
app(in, out, args)
}
}
//将两个函数插入到管道的中间,调用者只需调用pipe返回的函数字面量,并传入管道的首尾两端,即可实现管道
//返回一个新的函数闭包
func pipe(app1 func(in io.Reader, out io.Writer), app2 func(in io.Reader, out io.Writer)) func(in io.Reader, out io.Writer) {
return func(in io.Reader, out io.Writer) {
pr, pw := io.Pipe()
defer pw.Close()
go func() {
defer pr.Close()
app2(pr, out)
}()
app1(in, pw)
}
}
//读取args slice的每个字符串,将其作为文件名,读取文件,并在文件的每一行首部加上行号,写入到out中
//此处in没有使用到,主要是为了保证管道定义的一致性
func app1(in io.Reader, out io.Writer, args []string) {
for _, v := range args {
//fmt.Println(v)
file, err := os.Open(v)
if err != nil {
continue
}
defer file.Close()
buf := bufio.NewReader(file)
for i:=1; ;i++{
line, err := buf.ReadBytes('\n')
if err != nil {
break
}
linenum := strconv.Itoa(i)
nline := []byte(linenum + " ")
nline = append(nline, line...)
out.Write(nline)
}
}
}
//app2 主要是将字节流转化为大写,中文可能会有点问题,不过主要是演示用,重在理解思想
//read from in, convert byte to Upper ,write the result to out
func app2(in io.Reader, out io.Writer) {
rd := bufio.NewReader(in)
p := make([]byte, 10)
for {
n, _ := rd.Read(p)
if n == 0 {
break
}
t := bytes.ToUpper(p[:n])
out.Write(t)
}
}
func main() {
args := os.Args[1:]
for _, v := range args {
fmt.Println(v)
}
p := pipe(bind(app1, args), app2)
p(os.Stdin, os.Stdout)
}