Works by

Ren's blog

@rennnosuke_rk 技術ブログです

【Golang】io.Pipeのr/wブロック

io.Pipe

io パッケージの関数 io.Pipe()io.Writer を実装した PipeReaderio.Reader を実装した PipeWriter ポインタを返します。 PipeWriter.Write() で書き込みを行うと、その内容を PipeReader.Read() で読みこむことができます。

pr, pw := io.Pipe()

go func(w io.Writer) {
    s := []byte("string")
    if _, err := w.Write(s); err != nil {
        t.Error(err)
    }
}(pw)

b := make([]byte, 1024)
if _, err := pr.Read(b); err != nil {
    t.Error(err)
}
fmt.Println(b) // -> [115 116 114 105 110 103 0 ... ]

PipeReader PipeWriter 共々内部的には同じ pipe 型ポインタを持っていて、これを介してコンテンツを読み書きしています。

pipe.go
// A PipeReader is the read half of a pipe.
type PipeReader struct {
    p *pipe
}
...
// A PipeWriter is the write half of a pipe.
type PipeWriter struct {
    p *pipe
}

io.Pipe のr/wブロック

PipeReader.Read() PipeWriter.Write() は互いの操作をブロックします。すなわち PipeReader.Read()PipeWriter.Write() が呼ばれるまでブロックされ、 PipeWriter.Write() もまた PipeReader.Read() が呼ばれるまでブロックされます。そのため上記の例ではgoroutineで非同期に PipeWriter.Write() を呼び、 PipeReader.Read() がブロックされることを防いでいます。

ではgoroutineを使用せず同期的に PipeReader.Read() あるいは PipeWriter.Write() を呼び出すとそのままプログラムの実行がstopするのか?と思いきやfatal errorで終了します。

pr, pw := io.Pipe()

s := []byte("string")
if _, err := pw.Write(s); err != nil {
    panic(err)
}

b := make([]byte, 1024)
if _, err := pr.Read(b); err != nil {
    panic(err)
}
fmt.Println(b)
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [select]:
io.(*pipe).Write(0xc0000201e0, 0xc00001c09a, 0x6, 0x6, 0x0, 0x0, 0x0)
        /usr/local/go/src/io/pipe.go:94 +0x1e5
io.(*PipeWriter).Write(...)
        /usr/local/go/src/io/pipe.go:163

これは pipe が内部で保持するチャネルが入力待ちの状態に陥り、main goroutine含む全てのgoroutineがstopしてしまうために発生します。

pipe.go
func (p *pipe) Write(b []byte) (n int, err error) {
    select {
    case <-p.done:
        return 0, p.writeCloseError()
    default:
        p.wrMu.Lock()
        defer p.wrMu.Unlock()
    }

    for once := true; once || len(b) > 0; once = false {
        select {
        case p.wrCh <- b: // ここでp.wrChが出力可能になるまでブロック
            nw := <-p.rdCh
            b = b[nw:]
            n += nw
        case <-p.done:
            return n, p.writeCloseError()
        }
    }
    return n, nil
}

...

func (p *pipe) Read(b []byte) (n int, err error) {
    select {
    case <-p.done:
        return 0, p.readCloseError()
    default:
    }

    select {
    case bw := <-p.wrCh: // ここで入力可能になるまでブロック
        nr := copy(b, bw)
        p.rdCh <- nr
        return nr, nil
    case <-p.done:
        return 0, p.readCloseError()
    }
}

PipeReader.Read() PipeWriter.Write() がブロックされるのは、内部のチャネルのブロックによるものでした。 こうして見ると、 io.Pipe 関数はチャネルによるgoroutine間の []byte I/Oを抽象化するWrapperのようにも感じます。

nits

中間にバッファを介せば、一応同期的にr/w処理を分けることができます。バッファ分のメモリ容量を必要としますが...

buf := bytes.Buffer{}

s := []byte("string")
if _, err := buf.Write(s); err != nil {
    t.Error(err)
}

b := make([]byte, 1024)
if _, err := buf.Read(b); err != nil {
    t.Error(err)
}
fmt.Println(b)

参考文献

golang.org