Skip to content

Commit

Permalink
Adjusts scaling logic and comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
Alliballibaba2 committed Jan 10, 2025
1 parent 6a6d040 commit 83e0c08
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 26 deletions.
24 changes: 13 additions & 11 deletions scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,11 @@ func initAutoScaling(numThreads int, maxThreads int) {

func drainAutoScaling() {
scalingMu.Lock()
logger.Debug("shutting down autoscalin", zap.Int("num scaled threads", len(autoScaledThreads)))
scalingMu.Unlock()
}

// turn the first inactive/reserved thread into a regular thread
// AddRegularThread adds one regular PHP thread at runtime if max_threads are not yet reached
func AddRegularThread() (int, error) {
scalingMu.Lock()
defer scalingMu.Unlock()
Expand All @@ -77,7 +78,7 @@ func addRegularThread() (*phpThread, error) {
return thread, nil
}

// remove the last regular thread
// RemoveRegularThread removes one regular PHP thread at runtime, won't remove the last thread
func RemoveRegularThread() (int, error) {
scalingMu.Lock()
defer scalingMu.Unlock()
Expand All @@ -97,7 +98,7 @@ func removeRegularThread() error {
return nil
}

// turn the first inactive/reserved thread into a worker thread
// AddWorkerThread adds one PHP worker thread at runtime if max_threads are not yet reached
func AddWorkerThread(workerFileName string) (int, error) {
worker, ok := workers[workerFileName]
if !ok {
Expand All @@ -119,7 +120,7 @@ func addWorkerThread(worker *worker) (*phpThread, error) {
return thread, nil
}

// remove the last worker thread
// RemoveWorkerThread removes one PHP worker thread at runtime, won't remove the last thread
func RemoveWorkerThread(workerFileName string) (int, error) {
worker, ok := workers[workerFileName]
if !ok {
Expand Down Expand Up @@ -254,14 +255,15 @@ func deactivateThreads() {
continue
}

// TODO: reactivate thread-shutdown once there's a way around leaky PECL extensions like #1299
// if threads are already inactive, shut them down
if thread.state.is(stateInactive) && waitTime > maxThreadIdleTime.Milliseconds() {
logger.Debug("auto-stopping thread", zap.Int("threadIndex", thread.threadIndex))
thread.shutdown()
stoppedThreadCount++
autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...)
continue
}
//if thread.state.is(stateInactive) && waitTime > maxThreadIdleTime.Milliseconds() {
// logger.Debug("auto-stopping thread", zap.Int("threadIndex", thread.threadIndex))
// thread.shutdown()
// stoppedThreadCount++
// autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...)
// continue
//}
}
}

Expand Down
16 changes: 2 additions & 14 deletions scaling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,11 @@ func TestScaleARegularThreadUpAndDown(t *testing.T) {
assert.Equal(t, stateReady, autoScaledThread.state.get())
assert.IsType(t, &regularThread{}, autoScaledThread.handler)

// on the first down-scale, the thread will be marked as inactive
// on down-scale, the thread will be marked as inactive
setLongWaitTime(autoScaledThread)
deactivateThreads()
assert.IsType(t, &inactiveThread{}, autoScaledThread.handler)

// on the second down-scale, the thread will be removed
autoScaledThread.state.waitFor(stateInactive)
setLongWaitTime(autoScaledThread)
deactivateThreads()
assert.Equal(t, stateReserved, autoScaledThread.state.get())

Shutdown()
}

Expand All @@ -51,17 +45,11 @@ func TestScaleAWorkerThreadUpAndDown(t *testing.T) {
scaleWorkerThread(workers[workerPath])
assert.Equal(t, stateReady, autoScaledThread.state.get())

// on the first down-scale, the thread will be marked as inactive
// on down-scale, the thread will be marked as inactive
setLongWaitTime(autoScaledThread)
deactivateThreads()
assert.IsType(t, &inactiveThread{}, autoScaledThread.handler)

// on the second down-scale, the thread will be removed
autoScaledThread.state.waitFor(stateInactive)
setLongWaitTime(autoScaledThread)
deactivateThreads()
assert.Equal(t, stateReserved, autoScaledThread.state.get())

Shutdown()
}

Expand Down
4 changes: 3 additions & 1 deletion worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func drainWorkers() {
watcher.DrainWatcher()
}

// RestartWorkers attempts to restart all worker threads gracefully
func RestartWorkers() {
// disallow scaling threads while restarting workers
scalingMu.Lock()
Expand Down Expand Up @@ -124,9 +125,10 @@ func RestartWorkers() {
}
}

// WorkerFileNames returns the list of worker file names
func WorkerFileNames() []string {
workerNames := make([]string, 0, len(workers))
for fileName, _ := range workers {
for fileName := range workers {
workerNames = append(workerNames, fileName)
}
return workerNames
Expand Down

0 comments on commit 83e0c08

Please sign in to comment.