diff --git a/bench_test.go b/bench_test.go index 18c903c..5fc1c55 100644 --- a/bench_test.go +++ b/bench_test.go @@ -1,7 +1,6 @@ package yamux import ( - "fmt" "testing" ) @@ -85,7 +84,6 @@ func BenchmarkSendRecvLarge(b *testing.B) { client, server := testClientServer() defer client.Close() defer server.Close() - const sendSize = 512 * 1024 * 1024 const recvSize = 4 * 1024 @@ -107,9 +105,6 @@ func BenchmarkSendRecvLarge(b *testing.B) { b.Fatalf("err: %v", err) } } - - fmt.Printf("Capacity of rcv buffer = %v, length of rcv window = %v\n", stream.recvBuf.Cap(), stream.recvWindow) - } close(recvDone) }() diff --git a/session_test.go b/session_test.go index 0b4200e..53ae5ee 100644 --- a/session_test.go +++ b/session_test.go @@ -376,7 +376,12 @@ func TestSendData_Large(t *testing.T) { defer client.Close() defer server.Close() - data := make([]byte, 512*1024) + const ( + sendSize = 250 * 1024 * 1024 + recvSize = 4 * 1024 + ) + + data := make([]byte, sendSize) for idx := range data { data[idx] = byte(idx % 256) } @@ -390,16 +395,17 @@ func TestSendData_Large(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } - - buf := make([]byte, 4*1024) - for i := 0; i < 128; i++ { + var sz int + buf := make([]byte, recvSize) + for i := 0; i < sendSize/recvSize; i++ { n, err := stream.Read(buf) if err != nil { t.Fatalf("err: %v", err) } - if n != 4*1024 { + if n != recvSize { t.Fatalf("short read: %d", n) } + sz += n for idx := range buf { if buf[idx] != byte(idx%256) { t.Fatalf("bad: %v %v %v", i, idx, buf[idx]) @@ -410,6 +416,8 @@ func TestSendData_Large(t *testing.T) { if err := stream.Close(); err != nil { t.Fatalf("err: %v", err) } + + t.Logf("cap=%d, n=%d\n", stream.recvBuf.Cap(), sz) }() go func() { @@ -439,7 +447,7 @@ func TestSendData_Large(t *testing.T) { }() select { case <-doneCh: - case <-time.After(time.Second): + case <-time.After(5 * time.Second): panic("timeout") } } @@ -1026,6 +1034,60 @@ func TestSession_WindowUpdateWriteDuringRead(t *testing.T) { wg.Wait() } +func TestSession_PartialReadWindowUpdate(t *testing.T) { + client, server := testClientServerConfig(testConfNoKeepAlive()) + defer client.Close() + defer server.Close() + + var wg sync.WaitGroup + wg.Add(1) + + // Choose a huge flood size that we know will result in a window update. + flood := int64(client.config.MaxStreamWindowSize) + var wr *Stream + + // The server will accept a new stream and then flood data to it. + go func() { + defer wg.Done() + + var err error + wr, err = server.AcceptStream() + if err != nil { + t.Fatalf("err: %v", err) + } + defer wr.Close() + + if wr.sendWindow != client.config.MaxStreamWindowSize { + t.Fatalf("sendWindow: exp=%d, got=%d", client.config.MaxStreamWindowSize, wr.sendWindow) + } + + n, err := wr.Write(make([]byte, flood)) + if err != nil { + t.Fatalf("err: %v", err) + } + if int64(n) != flood { + t.Fatalf("short write: %d", n) + } + if wr.sendWindow != 0 { + t.Fatalf("sendWindow: exp=%d, got=%d", 0, wr.sendWindow) + } + }() + + stream, err := client.OpenStream() + if err != nil { + t.Fatalf("err: %v", err) + } + defer stream.Close() + + wg.Wait() + + _, err = stream.Read(make([]byte, flood/2+1)) + + if exp := uint32(flood/2 + 1); wr.sendWindow != exp { + t.Errorf("sendWindow: exp=%d, got=%d", exp, wr.sendWindow) + } +} + func TestSession_sendNoWait_Timeout(t *testing.T) { client, server := testClientServerConfig(testConfNoKeepAlive()) defer client.Close()