From 17017e907efcfb40ef55c8e52ca6149d676375c7 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 6 Sep 2024 11:40:57 -0700 Subject: [PATCH] Allow half-closed reads and test against TCP/TLS connections (#131) * test against tls.Conns, not pipes Specifically to debug hashicorp/nomad#23305 but tests should probably run against multiple net.Conn implementations as yamux is sensitive to net.Conn behaviors such as concurrent Read|Write|Close and what errors are returned. * locally closed streams can still read Effectively reverts 912e29616f9b41d9cbe0d35bc76bd43ef6f11943 Reasons to revert to locally closed streams being readable: 1. Matches libp2p's yamux fork: https://github.com/libp2p/go-yamux/blob/master/stream.go#L95-L96 2. Both yamux and SPDY make it clear that a locally closed stream cannot send more data. 3. SPDY explicitly supports unidirectional streams where one peer is closed (readonly) from the beginning: https://www.chromium.org/spdy/spdy-protocol/spdy-protocol-draft3-1/#46-unidirectional-streams * test: fix timing when using tcp TestSession_PartialReadWindowUpdate asserts that large sends can cause window updates. When using io.Pipes the server recieves the window update nearly synchronously during the client's Read call. Using tcp sockets adds just enough latency to reliably lose the race for observing the window update on the server. Added a sleep to ensure the server has time to read and apply the window update from the client. I also added a number of comments and non-functional cleanups. The time.Sleep() is the only functional change. * test: expand connection types tested Expanded tests to use TCP and TLS. Sorry for the huge diff, but I think this makes yamux's test suite much more realistic. There are quite a few new tools: - testConnPipe is the original io.Pipe based testing tool. Some tests still require it due to the ability to easily pause the data flow. - testConnTCP and testConnTLS create TCP and TLS connections for yamux to use. This introduces more realistic timing issues, buffering, and subtle differences in error messages. - testConnTypes is a helper to run subtests against *all* the above connection types as well as ensuring reversing the client/server sockets doesn't impact yamux (it didn't!). I didn't convert every test to it since it adds some time and noise to test runs. I also tried to formalize (client, server) as a pattern. There was a mix of orderings. Those roles are rarely meaningful to yamux, but meaningful names makes debugging easier than numbering variables. --- session_test.go | 552 ++++++++++++++++++++++++++++++--------------- stream.go | 4 +- testdata/README.md | 7 + testdata/cert.pem | 18 ++ testdata/key.pem | 28 +++ 5 files changed, 429 insertions(+), 180 deletions(-) create mode 100644 testdata/README.md create mode 100644 testdata/cert.pem create mode 100644 testdata/key.pem diff --git a/session_test.go b/session_test.go index 46e9233..d95780a 100644 --- a/session_test.go +++ b/session_test.go @@ -3,6 +3,7 @@ package yamux import ( "bytes" "context" + "crypto/tls" "errors" "fmt" "io" @@ -67,7 +68,7 @@ func (p *pipeConn) Close() error { return p.writer.Close() } -func testConn() (io.ReadWriteCloser, io.ReadWriteCloser) { +func testConnPipe(testing.TB) (io.ReadWriteCloser, io.ReadWriteCloser) { read1, write1 := io.Pipe() read2, write2 := io.Pipe() conn1 := &pipeConn{reader: read1, writer: write2} @@ -75,6 +76,144 @@ func testConn() (io.ReadWriteCloser, io.ReadWriteCloser) { return conn1, conn2 } +func testConnTCP(t testing.TB) (io.ReadWriteCloser, io.ReadWriteCloser) { + l, err := net.ListenTCP("tcp", nil) + if err != nil { + t.Fatalf("error creating listener: %v", err) + } + t.Cleanup(func() { _ = l.Close() }) + + network := l.Addr().Network() + addr := l.Addr().String() + + var server net.Conn + errCh := make(chan error, 1) + go func() { + defer close(errCh) + var err error + server, err = l.Accept() + if err != nil { + errCh <- err + return + } + }() + + t.Logf("Connecting to %s: %s", network, addr) + client, err := net.DialTimeout(network, addr, 10*time.Second) + if err != nil { + t.Fatalf("error dialing tls listener: %v", err) + } + t.Cleanup(func() { _ = client.Close() }) + + if err := <-errCh; err != nil { + t.Fatalf("error creating tls server: %v", err) + } + t.Cleanup(func() { _ = server.Close() }) + + return client, server +} + +func testConnTLS(t testing.TB) (io.ReadWriteCloser, io.ReadWriteCloser) { + cert, err := tls.LoadX509KeyPair("testdata/cert.pem", "testdata/key.pem") + if err != nil { + t.Fatalf("error loading certificate: %v", err) + } + + l, err := net.ListenTCP("tcp", nil) + if err != nil { + t.Fatalf("error creating listener: %v", err) + } + t.Cleanup(func() { _ = l.Close() }) + + var server net.Conn + errCh := make(chan error, 1) + go func() { + defer close(errCh) + conn, err := l.Accept() + if err != nil { + errCh <- err + return + } + + server = tls.Server(conn, &tls.Config{ + Certificates: []tls.Certificate{cert}, + }) + }() + + t.Logf("Connecting to %s: %s", l.Addr().Network(), l.Addr()) + client, err := net.DialTimeout(l.Addr().Network(), l.Addr().String(), 10*time.Second) + if err != nil { + t.Fatalf("error dialing tls listener: %v", err) + } + t.Cleanup(func() { _ = client.Close() }) + + tlsClient := tls.Client(client, &tls.Config{ + // InsecureSkipVerify is safe to use here since this is only for tests. + InsecureSkipVerify: true, + }) + + if err := <-errCh; err != nil { + t.Fatalf("error creating tls server: %v", err) + } + t.Cleanup(func() { _ = server.Close() }) + + return tlsClient, server +} + +// connTypeFunc is func that returns a client and server connection for testing +// like testConnTLS. +// +// See connTypeTest +type connTypeFunc func(t testing.TB) (io.ReadWriteCloser, io.ReadWriteCloser) + +// connTypeTest is a test case for a specific conn type. +// +// See testConnType +type connTypeTest struct { + Name string + Conns connTypeFunc +} + +// testConnType runs subtests of the given testFunc against multiple connection +// types. +func testConnTypes(t *testing.T, testFunc func(t testing.TB, client, server io.ReadWriteCloser)) { + reverse := func(f connTypeFunc) connTypeFunc { + return func(t testing.TB) (io.ReadWriteCloser, io.ReadWriteCloser) { + c, s := f(t) + return s, c + } + } + cases := []connTypeTest{ + { + Name: "Pipes", + Conns: testConnPipe, + }, + { + Name: "TCP", + Conns: testConnTCP, + }, + { + Name: "TCP_Reverse", + Conns: reverse(testConnTCP), + }, + { + Name: "TLS", + Conns: testConnTLS, + }, + { + Name: "TLS_Reverse", + Conns: reverse(testConnTLS), + }, + } + for i := range cases { + tc := cases[i] + t.Run(tc.Name, func(t *testing.T) { + client, server := tc.Conns(t) + testFunc(t, client, server) + }) + } +} + func testConf() *Config { conf := DefaultConfig() conf.AcceptBacklog = 64 @@ -97,24 +236,30 @@ func testConfNoKeepAlive() *Config { } func testClientServer(t testing.TB) (*Session, *Session) { - return testClientServerConfig(t, testConf(), testConf()) + client, server := testConnTLS(t) + return testClientServerConfig(t, client, server, testConf(), testConf()) } -func testClientServerConfig(t testing.TB, serverConf, clientConf *Config) (*Session, *Session) { - conn1, conn2 := testConn() +func testClientServerConfig( + t testing.TB, + clientConn, serverConn io.ReadWriteCloser, + clientConf, serverConf *Config, +) (clientSession *Session, serverSession *Session) { - client, err := Client(conn1, clientConf) + var err error + + clientSession, err = Client(clientConn, clientConf) if err != nil { t.Fatalf("err: %v", err) } - t.Cleanup(func() { _ = client.Close() }) + t.Cleanup(func() { _ = clientSession.Close() }) - server, err := Server(conn2, serverConf) + serverSession, err = Server(serverConn, serverConf) if err != nil { t.Fatalf("err: %v", err) } - t.Cleanup(func() { _ = server.Close() }) - return client, server + t.Cleanup(func() { _ = serverSession.Close() }) + return clientSession, serverSession } func TestPing(t *testing.T) { @@ -139,7 +284,8 @@ func TestPing(t *testing.T) { func TestPing_Timeout(t *testing.T) { conf := testConfNoKeepAlive() - client, server := testClientServerConfig(t, conf.Clone(), conf.Clone()) + clientPipe, serverPipe := testConnPipe(t) + client, server := testClientServerConfig(t, clientPipe, serverPipe, conf.Clone(), conf.Clone()) // Prevent the client from responding clientConn := client.conn.(*pipeConn) @@ -179,38 +325,40 @@ func TestPing_Timeout(t *testing.T) { } func TestCloseBeforeAck(t *testing.T) { - cfg := testConf() - cfg.AcceptBacklog = 8 - client, server := testClientServerConfig(t, cfg, cfg.Clone()) + testConnTypes(t, func(t testing.TB, clientConn, serverConn io.ReadWriteCloser) { + cfg := testConf() + cfg.AcceptBacklog = 8 + client, server := testClientServerConfig(t, clientConn, serverConn, cfg, cfg.Clone()) - for i := 0; i < 8; i++ { - s, err := client.OpenStream() - if err != nil { - t.Fatal(err) + for i := 0; i < 8; i++ { + s, err := client.OpenStream() + if err != nil { + t.Fatal(err) + } + s.Close() } - s.Close() - } - for i := 0; i < 8; i++ { - s, err := server.AcceptStream() - if err != nil { - t.Fatal(err) + for i := 0; i < 8; i++ { + s, err := server.AcceptStream() + if err != nil { + t.Fatal(err) + } + s.Close() } - s.Close() - } - errCh := make(chan error, 1) - go func() { - s, err := client.OpenStream() - if err != nil { - errCh <- err - return - } - s.Close() - errCh <- nil - }() + errCh := make(chan error, 1) + go func() { + s, err := client.OpenStream() + if err != nil { + errCh <- err + return + } + s.Close() + errCh <- nil + }() - drainErrorsUntil(t, errCh, 1, time.Second*5, "timed out trying to open stream") + drainErrorsUntil(t, errCh, 1, time.Second*5, "timed out trying to open stream") + }) } func TestAccept(t *testing.T) { @@ -252,44 +400,59 @@ func TestAccept(t *testing.T) { func TestOpenStreamTimeout(t *testing.T) { const timeout = 25 * time.Millisecond - serverConf := testConf() - serverConf.StreamOpenTimeout = timeout + testConnTypes(t, func(t testing.TB, clientConn, serverConn io.ReadWriteCloser) { + serverConf := testConf() + serverConf.StreamOpenTimeout = timeout - clientConf := serverConf.Clone() - clientLogs := captureLogs(clientConf) + clientConf := serverConf.Clone() + clientLogs := captureLogs(clientConf) - client, _ := testClientServerConfig(t, serverConf, clientConf) + client, _ := testClientServerConfig(t, clientConn, serverConn, clientConf, serverConf) - // Open a single stream without a server to acknowledge it. - s, err := client.OpenStream() - if err != nil { - t.Fatal(err) - } + // Open a single stream without a server to acknowledge it. + s, err := client.OpenStream() + if err != nil { + t.Fatal(err) + } - // Sleep for longer than the stream open timeout. - // Since no ACKs are received, the stream and session should be closed. - time.Sleep(timeout * 5) + // Sleep for longer than the stream open timeout. + // Since no ACKs are received, the stream and session should be closed. + time.Sleep(timeout * 5) - if !clientLogs.match([]string{"[ERR] yamux: aborted stream open (destination=yamux:remote): i/o deadline reached"}) { - t.Fatalf("server log incorect: %v", clientLogs.logs()) - } + // Support multiple underlying connection types + var dest string + switch conn := clientConn.(type) { + case net.Conn: + dest = conn.RemoteAddr().String() + case *pipeConn: + dest = "yamux:remote" + default: + t.Fatalf("unsupported connection type %T - please update test", conn) + } + exp := fmt.Sprintf("[ERR] yamux: aborted stream open (destination=%s): i/o deadline reached", dest) - s.stateLock.Lock() - state := s.state - s.stateLock.Unlock() + if !clientLogs.match([]string{exp}) { + t.Fatalf("server log incorect: %v\nexpected: %v", clientLogs.logs(), exp) + } - if state != streamClosed { - t.Fatalf("stream should have been closed") - } - if !client.IsClosed() { - t.Fatalf("session should have been closed") - } + s.stateLock.Lock() + state := s.state + s.stateLock.Unlock() + + if state != streamClosed { + t.Fatalf("stream should have been closed") + } + if !client.IsClosed() { + t.Fatalf("session should have been closed") + } + }) } func TestClose_closeTimeout(t *testing.T) { conf := testConf() conf.StreamCloseTimeout = 10 * time.Millisecond - client, server := testClientServerConfig(t, conf, conf.Clone()) + clientConn, serverConn := testConnTLS(t) + client, server := testClientServerConfig(t, clientConn, serverConn, conf, conf.Clone()) if client.NumStreams() != 0 { t.Fatalf("bad") @@ -365,6 +528,7 @@ func TestSendData_Small(t *testing.T) { errCh := make(chan error, 2) + // Accept an incoming client and perform some reads before closing go func() { stream, err := server.AcceptStream() if err != nil { @@ -401,6 +565,7 @@ func TestSendData_Small(t *testing.T) { errCh <- nil }() + // Open a client and perform some writes before closing go func() { stream, err := client.Open() if err != nil { @@ -432,13 +597,16 @@ func TestSendData_Small(t *testing.T) { errCh <- nil }() - drainErrorsUntil(t, errCh, 2, time.Second, "timeout") + drainErrorsUntil(t, errCh, 2, 5*time.Second, "timeout") - if client.NumStreams() != 0 { - t.Fatalf("bad") + // Give client and server a second to receive FINs and close streams + time.Sleep(time.Second) + + if n := client.NumStreams(); n != 0 { + t.Errorf("expected 0 client streams but found %d", n) } - if server.NumStreams() != 0 { - t.Fatalf("bad") + if n := server.NumStreams(); n != 0 { + t.Errorf("expected 0 server streams but found %d", n) } } @@ -706,55 +874,60 @@ func TestManyStreams_PingPong(t *testing.T) { drainErrorsUntil(t, errCh, 2*streams, 0, "") } +// TestHalfClose asserts that half closed streams can still read. func TestHalfClose(t *testing.T) { - client, server := testClientServer(t) + testConnTypes(t, func(t testing.TB, clientConn, serverConn io.ReadWriteCloser) { + client, server := testClientServerConfig(t, clientConn, serverConn, testConf(), testConf()) - stream, err := client.Open() - if err != nil { - t.Fatalf("err: %v", err) - } - if _, err = stream.Write([]byte("a")); err != nil { - t.Fatalf("err: %v", err) - } + clientStream, err := client.Open() + if err != nil { + t.Fatalf("err: %v", err) + } + if _, err = clientStream.Write([]byte("a")); err != nil { + t.Fatalf("err: %v", err) + } - stream2, err := server.Accept() - if err != nil { - t.Fatalf("err: %v", err) - } - stream2.Close() // Half close + serverStream, err := server.Accept() + if err != nil { + t.Fatalf("err: %v", err) + } + serverStream.Close() // Half close - buf := make([]byte, 4) - n, err := stream2.Read(buf) - if err != nil { - t.Fatalf("err: %v", err) - } - if n != 1 { - t.Fatalf("bad: %v", n) - } + // Server reads 1 byte written by Client + buf := make([]byte, 4) + n, err := serverStream.Read(buf) + if err != nil { + t.Fatalf("err: %v", err) + } + if n != 1 { + t.Fatalf("bad: %v", n) + } - // Send more - if _, err = stream.Write([]byte("bcd")); err != nil { - t.Fatalf("err: %v", err) - } - stream.Close() + // Send more + if _, err = clientStream.Write([]byte("bcd")); err != nil { + t.Fatalf("err: %v", err) + } + clientStream.Close() - // Read after close - n, err = stream2.Read(buf) - if err != nil { - t.Fatalf("err: %v", err) - } - if n != 3 { - t.Fatalf("bad: %v", n) - } + // Read after close always returns the bytes written but may or may not + // receive the EOF. + n, err = serverStream.Read(buf) + if err != nil { + t.Fatalf("err: %v", err) + } + if n != 3 { + t.Fatalf("bad: %v", n) + } - // EOF after close - n, err = stream2.Read(buf) - if err != io.EOF { - t.Fatalf("err: %v", err) - } - if n != 0 { - t.Fatalf("bad: %v", n) - } + // EOF after close + n, err = serverStream.Read(buf) + if err != io.EOF { + t.Fatalf("err: %v", err) + } + if n != 0 { + t.Fatalf("bad: %v", n) + } + }) } func TestHalfCloseSessionShutdown(t *testing.T) { @@ -1019,26 +1192,29 @@ func TestBacklogExceeded(t *testing.T) { } func TestKeepAlive(t *testing.T) { - client, server := testClientServer(t) + testConnTypes(t, func(t testing.TB, clientConn, serverConn io.ReadWriteCloser) { + client, server := testClientServerConfig(t, clientConn, serverConn, testConf(), testConf()) - time.Sleep(200 * time.Millisecond) + // Give keepalives time to happen + time.Sleep(200 * time.Millisecond) - // Ping value should increase - client.pingLock.Lock() - defer client.pingLock.Unlock() - if client.pingID == 0 { - t.Fatalf("should ping") - } + // Ping value should increase + client.pingLock.Lock() + defer client.pingLock.Unlock() + if client.pingID == 0 { + t.Fatalf("should ping") + } - server.pingLock.Lock() - defer server.pingLock.Unlock() - if server.pingID == 0 { - t.Fatalf("should ping") - } + server.pingLock.Lock() + defer server.pingLock.Unlock() + if server.pingID == 0 { + t.Fatalf("should ping") + } + }) } func TestKeepAlive_Timeout(t *testing.T) { - conn1, conn2 := testConn() + conn1, conn2 := testConnPipe(t) clientConf := testConf() clientConf.ConnectionWriteTimeout = time.Hour // We're testing keep alives, not connection writes @@ -1092,7 +1268,8 @@ func TestLargeWindow(t *testing.T) { conf := DefaultConfig() conf.MaxStreamWindowSize *= 2 - client, server := testClientServerConfig(t, conf, conf.Clone()) + clientConn, serverConn := testConnTLS(t) + client, server := testClientServerConfig(t, clientConn, serverConn, conf, conf.Clone()) stream, err := client.Open() if err != nil { @@ -1241,7 +1418,8 @@ func TestBacklogExceeded_Accept(t *testing.T) { func TestSession_WindowUpdateWriteDuringRead(t *testing.T) { conf := testConfNoKeepAlive() - client, server := testClientServerConfig(t, conf, conf.Clone()) + clientConn, serverConn := testConnPipe(t) + client, server := testClientServerConfig(t, clientConn, serverConn, conf, conf.Clone()) // Choose a huge flood size that we know will result in a window update. flood := int64(client.config.MaxStreamWindowSize) - 1 @@ -1280,7 +1458,7 @@ func TestSession_WindowUpdateWriteDuringRead(t *testing.T) { } defer stream.Close() - conn := client.conn.(*pipeConn) + conn := clientConn.(*pipeConn) conn.writeBlocker.Lock() defer conn.writeBlocker.Unlock() @@ -1296,73 +1474,85 @@ func TestSession_WindowUpdateWriteDuringRead(t *testing.T) { drainErrorsUntil(t, errCh, 2, 0, "") } +// TestSession_PartialReadWindowUpdate asserts that when a client performs a +// partial read it updates the server's send window. func TestSession_PartialReadWindowUpdate(t *testing.T) { - conf := testConfNoKeepAlive() + testConnTypes(t, func(t testing.TB, clientConn, serverConn io.ReadWriteCloser) { + conf := testConfNoKeepAlive() - client, server := testClientServerConfig(t, conf, conf.Clone()) + client, server := testClientServerConfig(t, clientConn, serverConn, conf, conf.Clone()) - errCh := make(chan error, 1) + errCh := make(chan error, 1) - // Choose a huge flood size that we know will result in a window update. - flood := int64(client.config.MaxStreamWindowSize) - var wr *Stream + // Choose a huge flood size that we know will result in a window update. + flood := int64(client.config.MaxStreamWindowSize) + var wr *Stream - // The server will accept a new stream and then flood data to it. - go func() { - var err error - wr, err = server.AcceptStream() - if err != nil { - errCh <- err - return - } - defer wr.Close() + // The server will accept a new stream and then flood data to it. + go func() { + var err error + wr, err = server.AcceptStream() + if err != nil { + errCh <- err + return + } + defer wr.Close() - window := atomic.LoadUint32(&wr.sendWindow) - if window != client.config.MaxStreamWindowSize { - errCh <- fmt.Errorf("sendWindow: exp=%d, got=%d", client.config.MaxStreamWindowSize, window) - return - } + window := atomic.LoadUint32(&wr.sendWindow) + if window != client.config.MaxStreamWindowSize { + errCh <- fmt.Errorf("sendWindow: exp=%d, got=%d", client.config.MaxStreamWindowSize, window) + return + } - n, err := wr.Write(make([]byte, flood)) - if err != nil { + n, err := wr.Write(make([]byte, flood)) + if err != nil { + errCh <- err + return + } + if int64(n) != flood { + errCh <- fmt.Errorf("short write: %d", n) + return + } + window = atomic.LoadUint32(&wr.sendWindow) + if window != 0 { + errCh <- fmt.Errorf("sendWindow: exp=%d, got=%d", 0, window) + return + } errCh <- err - return - } - if int64(n) != flood { - errCh <- fmt.Errorf("short write: %d", n) - return - } - window = atomic.LoadUint32(&wr.sendWindow) - if window != 0 { - errCh <- fmt.Errorf("sendWindow: exp=%d, got=%d", 0, window) - return + }() + + stream, err := client.OpenStream() + if err != nil { + t.Fatalf("err: %v", err) } - errCh <- err - }() + defer stream.Close() - stream, err := client.OpenStream() - if err != nil { - t.Fatalf("err: %v", err) - } - defer stream.Close() + drainErrorsUntil(t, errCh, 1, 0, "") - drainErrorsUntil(t, errCh, 1, 0, "") + // Only read part of the flood + partialReadSize := flood/2 + 1 + _, err = stream.Read(make([]byte, partialReadSize)) + if err != nil { + t.Fatalf("err: %v", err) + } - _, err = stream.Read(make([]byte, flood/2+1)) - if err != nil { - t.Fatalf("err: %v", err) - } + // Wait for window update to be applied by server. Should be "instant" but CI + // can be slow. + time.Sleep(2 * time.Second) - window := atomic.LoadUint32(&wr.sendWindow) - if exp := uint32(flood/2 + 1); window != exp { - t.Fatalf("sendWindow: exp=%d, got=%d", exp, window) - } + // Assert server received window update + window := atomic.LoadUint32(&wr.sendWindow) + if exp := uint32(partialReadSize); window != exp { + t.Fatalf("sendWindow: exp=%d, got=%d", exp, window) + } + }) } func TestSession_sendNoWait_Timeout(t *testing.T) { conf := testConfNoKeepAlive() - client, server := testClientServerConfig(t, conf, conf.Clone()) + clientConn, serverConn := testConnPipe(t) + client, server := testClientServerConfig(t, clientConn, serverConn, conf, conf.Clone()) errCh := make(chan error, 2) @@ -1386,7 +1576,7 @@ func TestSession_sendNoWait_Timeout(t *testing.T) { } defer stream.Close() - conn := client.conn.(*pipeConn) + conn := clientConn.(*pipeConn) conn.writeBlocker.Lock() defer conn.writeBlocker.Unlock() @@ -1412,7 +1602,8 @@ func TestSession_sendNoWait_Timeout(t *testing.T) { func TestSession_PingOfDeath(t *testing.T) { conf := testConfNoKeepAlive() - client, server := testClientServerConfig(t, conf, conf.Clone()) + clientConn, serverConn := testConnPipe(t) + client, server := testClientServerConfig(t, clientConn, serverConn, conf, conf.Clone()) errCh := make(chan error, 2) @@ -1485,7 +1676,8 @@ func TestSession_PingOfDeath(t *testing.T) { func TestSession_ConnectionWriteTimeout(t *testing.T) { conf := testConfNoKeepAlive() - client, server := testClientServerConfig(t, conf, conf.Clone()) + clientConn, serverConn := testConnPipe(t) + client, server := testClientServerConfig(t, clientConn, serverConn, conf, conf.Clone()) errCh := make(chan error, 2) @@ -1509,7 +1701,7 @@ func TestSession_ConnectionWriteTimeout(t *testing.T) { } defer stream.Close() - conn := client.conn.(*pipeConn) + conn := clientConn.(*pipeConn) conn.writeBlocker.Lock() defer conn.writeBlocker.Unlock() @@ -1556,6 +1748,8 @@ func TestCancelAccept(t *testing.T) { drainErrorsUntil(t, errCh, 1, 0, "") } +// drainErrorsUntil receives `expect` errors from errCh within `timeout`. Fails +// on any non-nil errors. func drainErrorsUntil(t testing.TB, errCh chan error, expect int, timeout time.Duration, msg string) { t.Helper() start := time.Now() diff --git a/stream.go b/stream.go index 0db2af2..31168d9 100644 --- a/stream.go +++ b/stream.go @@ -95,10 +95,12 @@ func (s *Stream) StreamID() uint32 { func (s *Stream) Read(b []byte) (n int, err error) { defer asyncNotify(s.recvNotifyCh) START: + + // If the stream is closed and there's no data buffered, return EOF s.stateLock.Lock() switch s.state { case streamLocalClose: - fallthrough + // LocalClose only prohibits further local writes. Handle reads normally. case streamRemoteClose: fallthrough case streamClosed: diff --git a/testdata/README.md b/testdata/README.md new file mode 100644 index 0000000..91a8338 --- /dev/null +++ b/testdata/README.md @@ -0,0 +1,7 @@ +Test certificates generated with: + +``` +go run $(go env GOROOT)/src/crypto/tls/generate_cert.go --host example.com +``` + +Requires a bash-like shell and Go installed. diff --git a/testdata/cert.pem b/testdata/cert.pem new file mode 100644 index 0000000..77bbaab --- /dev/null +++ b/testdata/cert.pem @@ -0,0 +1,18 @@ +-----BEGIN CERTIFICATE----- +MIIC/DCCAeSgAwIBAgIRAI8YOah8fp9JcV8YbON8488wDQYJKoZIhvcNAQELBQAw +EjEQMA4GA1UEChMHQWNtZSBDbzAeFw0yNDA4MTQyMzE0MTJaFw0yNTA4MTQyMzE0 +MTJaMBIxEDAOBgNVBAoTB0FjbWUgQ28wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAw +ggEKAoIBAQC5cvLaZRsKScA/JBgIlXTs3uJj7KLOLwGuRxk3sUQ7aREoqlC1bGR8 +wN5mYu9Yso6dEOWJBSXybtSpH60AtGnepAKAra4IDNfLbNmWi+13lyqD/BgIpWoP +Lww6MDHLxNP1+U4hQ2mcOw/hueaSMUahXhjTTTVHq+cLQ9eBxk7b/mcxzHLS6he+ +0zS2QsQ7p/R5RN5QTALZNoHTgXh7Wou/ynmCHNVzaAOdGIvDTSi6fBdqNgvWrY0w +WcZePVcTiZ2y/4TKEugXqu6RdO1C3rtAFEWCn9q+RyNl0MCcbxo+n3xljy9y3HTW +0qdpYg2wZJKuRBRlsr0D72pEg+OTd4gNAgMBAAGjTTBLMA4GA1UdDwEB/wQEAwIF +oDATBgNVHSUEDDAKBggrBgEFBQcDATAMBgNVHRMBAf8EAjAAMBYGA1UdEQQPMA2C +C2V4YW1wbGUuY29tMA0GCSqGSIb3DQEBCwUAA4IBAQBvjm0Y6uS285qhh9Ae/4+f +/nc9KVECHCt0w4CAVmkoCULncOLPnfDRgfn0S2jveBeD/1916egnRljYdqHaE/1G +/DHo3b45uC77dCGZzCKl7GC50GOUdirHxNiS99xCPM2rWmoada+v5Oe3kcCBXlJ4 +KeDffE7EGo8ACzO5ziKMbR8oThaFrOXIPtUYUFInURbu9VKfRzkLzXNGBZ1WgVZ6 +i9McZImuKnKLZJ1e3SlX3PcZwoBYbumaIG1XFx0K4FCO+QsZNOtLPIzA+aVdtFii +f5nn4CxXJ/SGhwnjbJE4lS7vH0JlzVIX5rHEYB4jL8d7TApXVgJ0L/0wgFvQ0bCv +-----END CERTIFICATE----- diff --git a/testdata/key.pem b/testdata/key.pem new file mode 100644 index 0000000..579ae5f --- /dev/null +++ b/testdata/key.pem @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC5cvLaZRsKScA/ +JBgIlXTs3uJj7KLOLwGuRxk3sUQ7aREoqlC1bGR8wN5mYu9Yso6dEOWJBSXybtSp +H60AtGnepAKAra4IDNfLbNmWi+13lyqD/BgIpWoPLww6MDHLxNP1+U4hQ2mcOw/h +ueaSMUahXhjTTTVHq+cLQ9eBxk7b/mcxzHLS6he+0zS2QsQ7p/R5RN5QTALZNoHT +gXh7Wou/ynmCHNVzaAOdGIvDTSi6fBdqNgvWrY0wWcZePVcTiZ2y/4TKEugXqu6R +dO1C3rtAFEWCn9q+RyNl0MCcbxo+n3xljy9y3HTW0qdpYg2wZJKuRBRlsr0D72pE +g+OTd4gNAgMBAAECggEAdVXvppM2KqpDQzAZLMUzt/PGFidRU1eWnqhJol08qMJv +ouUwL7onUm/Nx8ZtXheL+IEKWkmxmtTZJTDvi3SbT81B8Bzz8g/+Ma3rdj+Ovo4c +zmmg40eV9Yl1GRQJTb55xjY5Yv5+QeV0xQOUiYc4Az3AQ2GkhnaTtyLzph7NIo+d +n4jiNJIlCP2wO3hyIaIVtQXyoGAwNWHTrJsShGaNi9C32SEqzSzuNF94lnGJ4U9M +7yLl1GD+uhz6V0q3eY8CHK6g8HOaj1ukMlIz+qZdsY252qpEgmY21kvoUJ9Awjjn +3dAtR7aX+YFtMdAD8rCPHS00lmSlGOEMcg8m07EHmQKBgQDEyHIKvls9RGRfbMbR ++GQ0HyQu2ACbHu6sc8baZZpr9h0pQJOObvPhVWkTcMpVZqlfJZ3X08WHhBBmTZkQ +F4K1nAs+Ps7U04cBDph3eGGfe+roQSVcVARmTq4is1SOQMtSpnKyRMS1HLFp9sR0 +03m9a43pmnkT45m/BxFp6kCMxwKBgQDxQV5yH0YBfT5i8T63vDs/f9nvxgRODC5d +0rdgJBd1R4VPTmI5Cbcfa3IY4H5nMgh90x9T7xu209ywp75TYS/Q119eElTcj5tX +xhDitw052F2ZA90nuCsyXQq+01zLeRuhMQQx3HbmgoVDNNJJBRfERakhM8z4FDVi +a5FDrDQoiwKBgBZg6T85iKy+A2Aqwa2NPvACfp3pKKB7cw8fl4Ssu1P9yDExy9YN +3iRJD0sLr6bopuhQIdQynCseJLNNrdN7qPy4QzsP73ualqbTHxmvEgMOF5fUGMiY +MWvlFL6TgFExIy5CCZcmZOxn1/FCA/N5PUYCXkArtgtB/fEQf7V402BvAoGBAN3g +oazRaD/cYLD8cBLo0ZCf0955veHNwCLXtYB9EPnycf8y9pDAh6Mk3QVWCcp8sGSP +82LtKA7oIDJzw03JtwEZ4oKQ120VwediqIrpkQdfHw2oCRALh+bEvSotF02mryt6 ++gGlYdCzvz3E6ZTwUyBWdKqtileptkMy7KFRUZLrAoGAVHMhb1CJWIHLOSNWBHU3 +U0Aq9kY3sK/iECKRa0e7qe8gK/hSm61q3RyyYsrrsp8nyPhuvvoJ61AyD9sf4bWn +4lQalf69PpfdM3Kr9wgu3B8UG15RYAgu9mEp4f5ys/lB0kdcoNhXHm/omuEI7xho +0TzPD2rJfUl/Jce1oLGoPL8= +-----END PRIVATE KEY-----