-
Notifications
You must be signed in to change notification settings - Fork 239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix window update, reduce memory utilization and improve performance #50
Conversation
* flow control was assuming a `Read` consumed the entire buffer * flow control fix reduces memory utilization when receiving large streams of data * use timer pool to reduce allocations * use static handler function pointer to avoid closure allocations for every frame
d483bcb
to
6d85d98
Compare
This also addresses issue #43 The |
This gist shows real-world numbers streaming a lot of data between two nodes tl;dr
|
return ErrInvalidMsgType | ||
} | ||
|
||
// Invoke the handler | ||
if err := handler(hdr); err != nil { | ||
if err := handlers[mt](s, hdr); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really like this change. Yes it's less lines of code but it's also much more susceptible to subtle bugs where you forget to change the if condition above. With the switch statement adding or removing a case only affects 1 place while after this change you need to update both the if statement and the lookup table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made this change to avoid allocations for every message that is handled.
I tried this approach first, however it still resulted in allocations (which was surprising):
switch mt {
case typeData:
handler = (*Session).handleStreamMessage
case typeWindowUpdate:
handler = (*Session).handleStreamMessage
...
}
if err := handler(s, hdr)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See commit e7f9152
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure it allocates? I can't seem to produce any similar code that actually allocates: https://gist.github.com/erikdubbelboer/53f4bc902563293ffa9e3a351ff4a149
If it really allocates I would have turned handlers into a map so you can easily add and remove things while keeping the if the same. But of course this would be a bit slower than what you write now. As long as there are enough tests I guess your typeMax solution is also good.
I will add a upper message type constant, so that the |
* typeData is always the minimum given the spec
Here is the original code, where you will notice the call to 431 . . // Switch on the type
432 . . switch hdr.MsgType() {
433 . . case typeData:
434 3MB 3MB handler = s.handleStreamMessage
. . 1109973: MOVQ AX, 0(SP) session.go:434
3MB 3MB 1109977: CALL runtime.newobject(SB) session.go:434
. . 110997c: MOVQ 0x8(SP), AX session.go:434
. . 1109981: LEAQ 0x11f98(IP), CX session.go:434
. . 1109988: MOVQ CX, 0(AX) session.go:434
. . 110998b: TESTB AL, 0(AX) session.go:434
. . 110998d: MOVL 0x14a4dd(IP), DX session.go:434
. . 1109993: LEAQ 0x8(AX), BX session.go:434
. . 1109997: TESTL DX, DX session.go:434
. . 1109999: JNE 0x1109a05 session.go:434
. . 110999b: MOVQ 0xb8(SP), BX session.go:434
. . 11099a3: MOVQ BX, 0x8(AX) session.go:434
. . 11099a7: MOVQ 0x50(SP), CX session.go:434
. . 1109a0d: MOVQ BX, 0(SP) session.go:434
. . 1109a11: MOVQ 0xb8(SP), AX session.go:434
. . 1109a19: MOVQ AX, 0x8(SP) session.go:434
. . 1109a1e: CALL runtime.writebarrierptr(SB) session.go:434
. . 1109a23: MOVQ 0x80(SP), AX session.go:434
. . 1109a2b: LEAQ 0x11eee(IP), CX session.go:434
. . 1109a32: MOVQ 0xb8(SP), BX session.go:434
. . 1109a3a: JMP 0x11099a7 session.go:434
. . 1109a3f: LEAQ 0x3d0da(IP), AX session.go:434 |
) | ||
|
||
var ( | ||
timerPool = &sync.Pool{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't need to be a pointer but that doesn't really matter.
I think I found the reason it allocates and your change will still make it allocate: import (
"testing"
)
type X struct {
}
func (x *X) switchFunc(a *int) {
}
func BenchmarkSwitch(b *testing.B) {
x := X{}
handlers := []func(*X, *int){
0: (*X).switchFunc,
}
for i := 0; i < b.N; i++ {
a := 1
x.switchFunc(&a) // Doesn't allocate because it can do proper escape analysis because it knows what switchFunc does.
h := x.switchFunc
h(nil) // still doesn't allocate because nil.
h(&a) // Allocates because it can't do proper escape analysis for 'a' because it doesn't know what code h executes.
j := handlers[0]
j(&x, nil) // Doesn't allocate because nil.
j(&x, &a) // Still allocates for the same reason above.
}
} So the only way to write this to not make it allocate is to write it out fully: switch hdr.MsgType() {
case typeData:
if err := s.handleStreamMessage(hdr); err != nil {
return err
}
case typeWindowUpdate:
if err := s.handleStreamMessage(hdr); err != nil {
return err
}
case typeGoAway:
if err := s.handleGoAway(hdr); err != nil {
return err
}
case typePing:
if err := s.handlePing(hdr); err != nil {
return err
}
default:
return ErrInvalidMsgType
} |
@erikdubbelboer it is not allocating because of a value passed to the function, it is allocating because the original logic was creating a closure, capturing if err := handlers[mt](s, hdr); err != nil {
0x1109a80 488b35995a1200 MOVQ 0x125a99(IP), SI
0x1109a87 488b3d9a5a1200 MOVQ 0x125a9a(IP), DI
// bounds check of handlers slice
0x1109a8e 0fb6db MOVZX BL, BX
0x1109a91 4839fb CMPQ DI, BX
// and jump to `panicindex` if CMPQ is BX is outside range
0x1109a94 0f83e5020000 JAE 0x1109d7f
// find address of function pointer in handlers array for mt; SI contains address of handlers slice
0x1109a9a 488b14de MOVQ 0(SI)(BX*8), DX
0x1109a9e 488b9c2498000000 MOVQ 0x98(SP), BX
// load hdr slice onto stack
0x1109aa6 48891c24 MOVQ BX, 0(SP)
0x1109aaa 48894c2408 MOVQ CX, 0x8(SP)
0x1109aaf 4889442410 MOVQ AX, 0x10(SP)
0x1109ab4 488b742448 MOVQ 0x48(SP), SI
0x1109ab9 4889742418 MOVQ SI, 0x18(SP)
// load function pointer of Stream handler into DI
0x1109abe 488b3a MOVQ 0(DX), DI
// call handler
0x1109ac1 ffd7 CALL DI |
Why would it create a closure? That's not how closures work as far as I know. A closure would be when the code of the function is define there as well so it can use the scope around it. func a() {
b := 1
c := func() { // c is now a closure that captures b.
println(b)
}
} I'm 99% sure that your version still allocates because it can't know what the code will do with hdr so it needs to allocate it on the stack. The allocation probably just happens before the dump you posted. The reason it happens inside the Try my code with package main
import (
"testing"
)
var g []byte
type X struct {
}
func (x *X) switchFunc(a []byte) {
//g = a // Uncommenting this will always make it allocate as it always escapes.
}
func BenchmarkSwitch(b *testing.B) {
x := &X{}
handlers := []func(*X, []byte){
0: (*X).switchFunc,
}
_ = handlers
for i := 0; i < b.N; i++ {
a := make([]byte, 12)
x.switchFunc(a) // Doesn't allocate because it can do proper escape analysis because it knows what switchFunc does.
h := x.switchFunc
h(nil) // still doesn't allocate because nil.
//h(a) // Allocates because it can't do proper escape analysis for 'a' because it doesn't know what code h executes.
j := handlers[0]
j(x, nil) // Doesn't allocate because nil.
//j(x, a) // Still allocates for the same reason above.
//handlers[0](x, a) // Still allocates for the same reason above.
// Doesn't allocate because nil, even though its a closure. The Go compiler is smart.
k := func() {
x.switchFunc(nil)
}
k()
// This allocates because it allocates a closure? Or the closure prevents escape analysis?
//l := func() {
// x.switchFunc(a)
//}
//l()
}
} |
It appears to be a problem with escape analysis. The following will allocate memory in the package methods
import (
"fmt"
"testing"
"math/rand"
)
type foo struct {
s string
}
func (f *foo) handle1(v string) bool {
fmt.Println(v)
return true
}
func (f *foo) handle2(v string) bool {
fmt.Println(f.s + v)
return true
}
func (f *foo) methodValue(i int) {
var handler func(string) bool
for {
switch i {
case 0:
handler = f.handle1
case 1:
handler = f.handle2
default:
panic("bad")
}
handler("hello")
return
}
}
func TestFooMethodValue(t *testing.T) {
f := &foo{}
f.methodValue(rand.Int() & 1)
} Move the func (f *foo) methodValue(i int) {
for {
var handler func(string) bool
switch i {
case 0:
handler = f.handle1
case 1:
handler = f.handle2
default:
panic("bad")
}
handler("hello")
return
}
} |
running
Escape analysis failed to determine that this copy did not escape, thus the Here is an output of the escape analysis when the
after I move the
|
I see you were right, I was looking at the wrong allocation. Your way indeed saves one allocation, but I still think writing out the full switch is better. Your new solution still suffers from the issue where it can't do proper escape analysis on (The original code also prevented proper escape analysis on |
In this case we want |
If it's on the stack it can still be reused since the frame for |
@stuartcarnie Thanks for the patch. I tried it out, with a small change to the large benchmark to send 512MB rather than 100MB. Pasting results below. Overall, this is looking really good. I'll note that I didn't quite see a 15% speedup, it was more around 8%. Memory usage efficiency gains look really good, it went from allocating 7MB to only needing to allocate 0.3MB, to send 512MB. Master branch:
Your changes:
I've got a couple of questions in the code review, other than that this is looking good. |
defer func() { | ||
timer.Stop() | ||
timerPool.Put(t) | ||
}() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like sendNoWait could use the same timer pool too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we could use the timer pool for sendNoWait
too
@@ -238,18 +238,25 @@ func (s *Stream) sendWindowUpdate() error { | |||
|
|||
// Determine the delta update | |||
max := s.session.config.MaxStreamWindowSize | |||
delta := max - atomic.LoadUint32(&s.recvWindow) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did the atomic load go away? I don't have too much context around this code, but seems like the receive lock recvLock
has a lot more contention since many other places use it, vs previously here it was using atomic load to get a potentially faster lower level primitive to load recvWindow
. Can this new logic for bufLen be rewritten to use atomic.loadUint32 for both recvWindow
and recvBuf.len()
so we don't need the recvLock mutex?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @preetapan, thanks for the feedback.
We must synchronize access to the recvWindow
and recvBuf
values, because they are related. The previous (incorrect) logic only looked at the recvWindow
for determining the value of the window update after a Read
operation. The bug surfaced when a Read
does not completely drain the recvBuf
. We must then take remaining bytes into consideration when calculating the final window update value and therefore take the lock.
If you sample the |
@stuartcarnie thanks for the explanation. I pushed just the benchmark to master with some debug output. I saw the recvBuf grow up to 16MB when sending 512MB, and yes indeed with your changes it caps out at 2X the window size or 0.5mb. |
OpenStream did not properly cleanup if the `sendWindowUpdate` call failed. Calling `closeStream` correctly drains the inflight SYN semaphore and removes the stream from the `streams` map.
Fix race condition when SYN is successfully sent
not yamux consistency issues, log as WARN
@@ -374,7 +374,7 @@ func (s *Session) send() { | |||
for sent < len(ready.Hdr) { | |||
n, err := s.conn.Write(ready.Hdr[sent:]) | |||
if err != nil { | |||
s.logger.Printf("[ERR] yamux: Failed to write header: %v", err) | |||
s.logger.Printf("[WARN] yamux: Failed to write header: %v", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why exactly should it be a warning and not an error? If I understand correctly this will completely stop the yamux connection. I would think with just a warning things would be able to recover and continue to work, not stop completely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@erikdubbelboer none of those additional commits should have appeared in this PR. We are maintaining our own fork.
An error
is still returned to the client, this only changes a log message. I consider yamux consistency issues bugs and should be logged as errors. An example of a consistency issue would yamux: SYN tracking out of sync
and highlights a bug in yamux. The network connection being closed, causing a write to fail is not a yamux bug and is still reported as an error
to the client code, however it generates unnecessary log noise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case should it log anything at all? Maybe it should just return the error and let the application log something if it wants. I always thing libraries logging things if only for when it can't tell its user that something happened (by returning an error for example).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I definitely agree. The logging within the yamux package should be restricted to yamux internal consistency issues. Logging a warning because the network connection has been closed is redundant, since the error will be passed to the caller
I am closing this PR as a number of additional commits have unintentionally been pulled in |
Are you going to open a new PR? These changes seem very useful. |
@stuartcarnie There is definite interest in these changes -- it should be easy enough to force push this branch and put the other changes into other PRs. Are you planning on following up? |
@jefferai @erikdubbelboer @stuartcarnie I made a new PR that includes the relevant changes (with commit credits to @stuartcarnie ) #53 |
thanks @preetapan |
improve memory utilization in receive buffer, fix flow control
Read
consumed the entire bufferutilization when receiving large streams of data
for every frame
Obligatory benchmarks