【Golang】io.Pipeのr/wブロック
io.Pipe
io
パッケージの関数 io.Pipe()
は io.Writer
を実装した PipeReader
、 io.Reader
を実装した PipeWriter
ポインタを返します。
PipeWriter.Write()
で書き込みを行うと、その内容を PipeReader.Read()
で読みこむことができます。
pr, pw := io.Pipe() go func(w io.Writer) { s := []byte("string") if _, err := w.Write(s); err != nil { t.Error(err) } }(pw) b := make([]byte, 1024) if _, err := pr.Read(b); err != nil { t.Error(err) } fmt.Println(b) // -> [115 116 114 105 110 103 0 ... ]
PipeReader
PipeWriter
共々内部的には同じ pipe
型ポインタを持っていて、これを介してコンテンツを読み書きしています。
pipe.go
// A PipeReader is the read half of a pipe. type PipeReader struct { p *pipe } ... // A PipeWriter is the write half of a pipe. type PipeWriter struct { p *pipe }
io.Pipe
のr/wブロック
PipeReader.Read()
PipeWriter.Write()
は互いの操作をブロックします。すなわち PipeReader.Read()
は PipeWriter.Write()
が呼ばれるまでブロックされ、 PipeWriter.Write()
もまた PipeReader.Read()
が呼ばれるまでブロックされます。そのため上記の例ではgoroutineで非同期に PipeWriter.Write()
を呼び、 PipeReader.Read()
がブロックされることを防いでいます。
ではgoroutineを使用せず同期的に PipeReader.Read()
あるいは PipeWriter.Write()
を呼び出すとそのままプログラムの実行がstopするのか?と思いきやfatal errorで終了します。
pr, pw := io.Pipe() s := []byte("string") if _, err := pw.Write(s); err != nil { panic(err) } b := make([]byte, 1024) if _, err := pr.Read(b); err != nil { panic(err) } fmt.Println(b)
fatal error: all goroutines are asleep - deadlock! goroutine 1 [select]: io.(*pipe).Write(0xc0000201e0, 0xc00001c09a, 0x6, 0x6, 0x0, 0x0, 0x0) /usr/local/go/src/io/pipe.go:94 +0x1e5 io.(*PipeWriter).Write(...) /usr/local/go/src/io/pipe.go:163
これは pipe
が内部で保持するチャネルが入力待ちの状態に陥り、main goroutine含む全てのgoroutineがstopしてしまうために発生します。
pipe.go
func (p *pipe) Write(b []byte) (n int, err error) { select { case <-p.done: return 0, p.writeCloseError() default: p.wrMu.Lock() defer p.wrMu.Unlock() } for once := true; once || len(b) > 0; once = false { select { case p.wrCh <- b: // ここでp.wrChが出力可能になるまでブロック nw := <-p.rdCh b = b[nw:] n += nw case <-p.done: return n, p.writeCloseError() } } return n, nil } ... func (p *pipe) Read(b []byte) (n int, err error) { select { case <-p.done: return 0, p.readCloseError() default: } select { case bw := <-p.wrCh: // ここで入力可能になるまでブロック nr := copy(b, bw) p.rdCh <- nr return nr, nil case <-p.done: return 0, p.readCloseError() } }
PipeReader.Read()
PipeWriter.Write()
がブロックされるのは、内部のチャネルのブロックによるものでした。
こうして見ると、 io.Pipe
関数はチャネルによるgoroutine間の []byte
I/Oを抽象化するWrapperのようにも感じます。
nits
中間にバッファを介せば、一応同期的にr/w処理を分けることができます。バッファ分のメモリ容量を必要としますが...
buf := bytes.Buffer{} s := []byte("string") if _, err := buf.Write(s); err != nil { t.Error(err) } b := make([]byte, 1024) if _, err := buf.Read(b); err != nil { t.Error(err) } fmt.Println(b)