Skip to content

Commit

Permalink
Locks the thread handler on debug status.
Browse files Browse the repository at this point in the history
  • Loading branch information
Alliballibaba2 committed Jan 16, 2025
1 parent 32517fe commit 1d899db
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 16 deletions.
1 change: 0 additions & 1 deletion internal/cpu/cpu_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,5 @@ func ProbeCPUs(probeTime time.Duration, maxCPUUsage float64, abort chan struct{}
elapsedCpuTime := float64(cpuEnd.tv_sec-cpuStart.tv_sec)*1e9 + float64(cpuEnd.tv_nsec-cpuStart.tv_nsec)
cpuUsage := elapsedCpuTime / elapsedTime / float64(cpuCount)

println("CPU usage:", int(cpuUsage*100))
return cpuUsage < maxCPUUsage
}
18 changes: 13 additions & 5 deletions phpthread.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ type phpThread struct {
knownVariableKeys map[string]*C.zend_string
requestChan chan *http.Request
drainChan chan struct{}
handlerMu *sync.Mutex
handlerMu sync.Mutex
requestMu sync.Mutex
handler threadHandler
state *threadState
}
Expand All @@ -39,7 +40,6 @@ func newPHPThread(threadIndex int) *phpThread {
return &phpThread{
threadIndex: threadIndex,
requestChan: make(chan *http.Request),
handlerMu: &sync.Mutex{},
state: newThreadState(),
}
}
Expand Down Expand Up @@ -110,13 +110,22 @@ func (thread *phpThread) getActiveRequest() *http.Request {
return thread.handler.getActiveRequest()
}

// get the active request from outside the PHP thread
func (thread *phpThread) getActiveRequestSafely() *http.Request {
thread.handlerMu.Lock()
thread.requestMu.Lock()
r := thread.getActiveRequest()
thread.requestMu.Unlock()
thread.handlerMu.Unlock()
return r
}

// small status message for debugging
func (thread *phpThread) debugStatus() string {
reqState := ""
thread.handlerMu.Lock()
if waitTime := thread.state.waitTime(); waitTime > 0 {
reqState = fmt.Sprintf(", waiting for %dms", waitTime)
} else if r := thread.getActiveRequest(); r != nil {
} else if r := thread.getActiveRequestSafely(); r != nil {
fc := r.Context().Value(contextKey).(*FrankenPHPContext)
path := r.URL.Path
if fc.originalRequest != nil {
Expand All @@ -129,7 +138,6 @@ func (thread *phpThread) debugStatus() string {
reqState = fmt.Sprintf(", handling %s for %dms ", path, sinceMs)
}
}
thread.handlerMu.Unlock()

return fmt.Sprintf("Thread %d (%s%s) %s", thread.threadIndex, thread.state.name(), reqState, thread.handler.name())
}
Expand Down
10 changes: 8 additions & 2 deletions thread-regular.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ func (handler *regularThread) getActiveRequest() *http.Request {
return handler.activeRequest
}

func (handler *regularThread) setActiveRequest(r *http.Request) {
handler.thread.requestMu.Lock()
handler.activeRequest = r
handler.thread.requestMu.Unlock()
}

func (handler *regularThread) name() string {
return "Regular PHP Thread"
}
Expand All @@ -71,7 +77,7 @@ func (handler *regularThread) waitForRequest() string {
case r = <-regularRequestChan:
}

handler.activeRequest = r
handler.setActiveRequest(r)
handler.state.markAsWaiting(false)
fc := r.Context().Value(contextKey).(*FrankenPHPContext)

Expand All @@ -91,7 +97,7 @@ func (handler *regularThread) afterRequest(exitStatus int) {
fc := handler.activeRequest.Context().Value(contextKey).(*FrankenPHPContext)
fc.exitStatus = exitStatus
maybeCloseContext(fc)
handler.activeRequest = nil
handler.setActiveRequest(nil)
}

func handleRequestWithRegularPHPThreads(r *http.Request, fc *FrankenPHPContext) {
Expand Down
25 changes: 17 additions & 8 deletions thread-worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func setupWorkerScript(handler *workerThread, worker *worker) {
panic(err)
}

handler.fakeRequest = r
handler.setFakeRequest(r)
if c := logger.Check(zapcore.DebugLevel, "starting"); c != nil {
c.Write(zap.String("worker", worker.fileName), zap.Int("thread", handler.thread.threadIndex))
}
Expand All @@ -109,15 +109,12 @@ func tearDownWorkerScript(handler *workerThread, exitStatus int) {
if handler.workerRequest != nil {
fc := handler.workerRequest.Context().Value(contextKey).(*FrankenPHPContext)
maybeCloseContext(fc)
handler.workerRequest = nil
handler.setWorkerRequest(nil)
}

fc := handler.fakeRequest.Context().Value(contextKey).(*FrankenPHPContext)
fc.exitStatus = exitStatus

defer func() {
handler.fakeRequest = nil
}()
handler.setFakeRequest(nil)

// on exit status 0 we just run the worker script again
worker := handler.worker
Expand Down Expand Up @@ -174,7 +171,7 @@ func (handler *workerThread) waitForWorkerRequest() bool {
case r = <-handler.worker.requestChan:
}

handler.workerRequest = r
handler.setWorkerRequest(r)
handler.state.markAsWaiting(false)

if c := logger.Check(zapcore.DebugLevel, "request handling started"); c != nil {
Expand All @@ -196,6 +193,18 @@ func (handler *workerThread) waitForWorkerRequest() bool {
return true
}

func (handler *workerThread) setWorkerRequest(r *http.Request) {
handler.thread.requestMu.Lock()
handler.workerRequest = r
handler.thread.requestMu.Unlock()
}

func (handler *workerThread) setFakeRequest(r *http.Request) {
handler.thread.requestMu.Lock()
handler.fakeRequest = r
handler.thread.requestMu.Unlock()
}

//export go_frankenphp_worker_handle_request_start
func go_frankenphp_worker_handle_request_start(threadIndex C.uintptr_t) C.bool {
handler := phpThreads[threadIndex].handler.(*workerThread)
Expand All @@ -209,7 +218,7 @@ func go_frankenphp_finish_worker_request(threadIndex C.uintptr_t) {
fc := r.Context().Value(contextKey).(*FrankenPHPContext)

maybeCloseContext(fc)
thread.handler.(*workerThread).workerRequest = nil
thread.handler.(*workerThread).setWorkerRequest(nil)

if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil {
c.Write(zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI))
Expand Down

0 comments on commit 1d899db

Please sign in to comment.