Golang Pipe


原文链接: Golang Pipe
  1. pipe是反向的copy,copy要求提供一对rw,而pipe提供一对rw,两者都是为流服务的。
  2. 简而言之就是,pipe的reader读不到就会阻塞,而普通buffer读不到就认为EOF或者报错。业务里这样的需求太少了哈哈
  3. 假如一个函数要求提供一个writer,他会不定期写一些什么进去,另一个函数要求提供一个reader,会不定期读些什么出来。假如是一般的buffer,读出eof就退出了,而不能等到writer写新的东西进来。


本文摘录了许式伟 《Go,基于连接与组合的语言》部分内容,为了便于理解,我在其后端写了个完整的示例程序帮助理解,这篇文章 一是展示go在并行编程中的伟大,也是理解和学习闭包的活的教科书


让我们从Unix谈起。Go语言与Unix、C语言有着极深的渊源。Go语言的领袖们参与甚至主导了Unix和C语言的设计。Ken Thompson 甚至算得上Unix和C语言的鼻祖。Go语言亦深受Unix和C语言的设计哲学影响。

在Unix世界里,组件就是应用程序(app),每个app可大体抽象为:

输入:stdin(标准输入), params(命令行参数)
输出:stdout(标准输出)
协议:text (data stream)

不同的应用程序(app)如何连接?答案是:管道(pipeline)。在Unix世界中大家对这样的东西已经很熟悉了:

app1 params1 | app2 params2

通过管道(pipeline),可以将一个应用程序的输出(stdout)转换为另一个应用程序的输入(stdin)。更为神奇的一点,是这些应用程序是并行执行的。app1每产生一段输出,立即会被app2所处理。所以管道(pipeline)称得上是最古老,同时也是极其优秀的并行设施,简单而强大。

需要注意的是,Unix世界中不同应用程序直接是松散耦合的。上游app的输出是xml还是json,下游app需要知晓,但并无任何强制的约束。同一输出,不同的下游app,对协议的理解甚至都可能并不相同。例如,上游app输出一段xml文本,对于某个下游app来说,是一颗dom树,但对linecount程序来说只是一个多行的文本,对于英文单词词频统计程序来说,是一篇英文文章。

为了方便理解,我们先尝试在Go语言中模拟整个Unix的管道(pipeline)机制。首先是应用程序(app),我们抽象为:

func(in io.Reader, out io.Writer, args []string)

我们按下图来对应Unix与Go代码的关系:

也就是说,Unix 中的

app1 params1 | app2 params2

对应Go语言中是:

pipe( bind(app1, params1), bind(app2, params2) )

其中,bind 函数实现如下:

func bind(

app func(in io.Reader, out io.Writer, args []string),
args []string

) func(in io.Reader, out io.Writer) {

return func(in io.Reader, out io.Writer) {
    app(in, out, args)
}

}

要理解bind函数,需要先理解“闭包”。Go语言中,应用程序以一个闭包的形式体现。如果你熟悉函数式编程,不难发现,这个bind函数其实就是所谓的柯里化(currying)。

pipe函数如下:

func pipe(

app1 func(in io.Reader, out io.Writer),
app2 func(in io.Reader, out io.Writer)

) func(in io.Reader, out io.Writer) {

return func(in io.Reader, out io.Writer) {
    pr, pw := io.Pipe()
    defer pw.Close()
    go func() {
        defer pr.Close()
        app2(pr, out)
    }()
    app1(in, pw)
}

}

要理解pipe函数,除了“闭包”外,需要知晓defer关键字和goroutine(go关键字)。defer语句会在函数退出时执行(无论是否发生了异常),通常用于资源的清理操作(比如关闭文件句柄等)。有了defer语句,Go语言中的错误处理代码显得非常优雅。在一个正常的函数调用前加上go关键字,就会使得该函数在新的goroutine中并行执行。理解了这些背景,这个pipe函数不难理解,无非是:先创建一个管道,让app1读入数据(in),并向管道的写入端(pw)输出,启动一个新goroutine,让app2从管道的读入端读取数据,并将处理结果输出(out)。这样得到的app就是app1和app2的组合了。

你甚至可以对多个app进行组合:

func pipe(apps ...func(in io.Reader, out io.Writer)) func(in io.Reader, out io.Writer) {

    if len(apps) == 0 { return nil }
    app := apps[0]
    for i := 1; i < len(apps); i++ {
        app1, app2 := app, apps[i]
        app = func(in io.Reader, out io.Writer) {
            pr, pw := io.Pipe()
            defer pw.Close()
            go func() {
                defer pr.Close()
                app2(pr, out)
            }()
            app1(in, pw)
        }
    }
    return app
}

我们举个比较实际的例子,假设我们有2个应用程序tar(打包)、gzip(压缩):

func tar(io.Reader, out io.Writer, files []string)

func gzip(in io.Reader, out io.Writer)

那么打包并压缩的代码是:

pipe( bind(tar, files), gzip )(nil, out)

通过对管道(pipeline)的模拟我们可以看出,Go语言对并行支持是非常强大的,这主要得益于Go的轻量级进程(goroutine)。

实例程序,帮助理解管道:


    package main  
      
    import (  
        "io"  
        "os"  
        "bufio"  
        "bytes"  
        "fmt"  
        "strconv"  
    )  
      
    //bind函数主要是用来为pipe函数整合用的,通过将闭包将函数签名变成pipe所需的样子  
    //返回一个函数闭包,将一个函数字面量app和字符串slice 传入其中  
    func bind(app func(in io.Reader, out io.Writer, args []string), args []string) func(in io.Reader, out io.Writer) {  
        return func(in io.Reader, out io.Writer) {  
            app(in, out, args)  
        }  
    }  
      
    //将两个函数插入到管道的中间,调用者只需调用pipe返回的函数字面量,并传入管道的首尾两端,即可实现管道  
    //返回一个新的函数闭包  
    func pipe(app1 func(in io.Reader, out io.Writer), app2 func(in io.Reader, out io.Writer)) func(in io.Reader, out io.Writer) {  
        return func(in io.Reader, out io.Writer) {  
            pr, pw := io.Pipe()  
            defer pw.Close()  
            go func() {  
                defer pr.Close()  
                app2(pr, out)  
            }()  
            app1(in, pw)  
        }  
    }  
      
    //读取args slice的每个字符串,将其作为文件名,读取文件,并在文件的每一行首部加上行号,写入到out中  
    //此处in没有使用到,主要是为了保证管道定义的一致性  
    func app1(in io.Reader, out io.Writer, args []string) {  
        for _, v := range args {  
            //fmt.Println(v)  
            file, err := os.Open(v)  
            if err != nil {  
                continue  
            }  
            defer file.Close()  
            buf := bufio.NewReader(file)  
            for i:=1; ;i++{  
                line, err := buf.ReadBytes('\n')  
                if err != nil {  
                    break  
                }  
                linenum := strconv.Itoa(i)  
                nline := []byte(linenum + " ")  
                nline = append(nline, line...)  
                out.Write(nline)  
            }  
        }  
    }  


    //app2 主要是将字节流转化为大写,中文可能会有点问题,不过主要是演示用,重在理解思想  
    //read from in, convert byte to Upper ,write the result to out  
    func app2(in io.Reader, out io.Writer) {  
        rd := bufio.NewReader(in)  
        p := make([]byte, 10)  
        for {  
            n, _ := rd.Read(p)  
            if n == 0 {  
                break  
            }  
            t := bytes.ToUpper(p[:n])  
            out.Write(t)  
        }  
    }  
      
    func main() {  
        args := os.Args[1:]  
        for _, v := range args {  
            fmt.Println(v)  
        }  
        p := pipe(bind(app1, args), app2)  
        p(os.Stdin, os.Stdout)  
    }  
`