diff --git a/.github/actions/watcher/action.yaml b/.github/actions/watcher/action.yaml index 2025f65c2..df09bef20 100644 --- a/.github/actions/watcher/action.yaml +++ b/.github/actions/watcher/action.yaml @@ -19,7 +19,7 @@ runs: name: Compile e-dant/watcher run: | mkdir watcher - gh release download --repo e-dant/watcher -A tar.gz -O - | tar -xz -C watcher --strip-components 1 + gh release download 0.13.2 --repo e-dant/watcher -A tar.gz -O - | tar -xz -C watcher --strip-components 1 cd watcher cmake -S . -B build -DCMAKE_BUILD_TYPE=Release cmake --build build diff --git a/caddy/admin.go b/caddy/admin.go new file mode 100644 index 000000000..2be765b01 --- /dev/null +++ b/caddy/admin.go @@ -0,0 +1,65 @@ +package caddy + +import ( + "encoding/json" + "fmt" + "github.com/caddyserver/caddy/v2" + "github.com/dunglas/frankenphp" + "net/http" +) + +type FrankenPHPAdmin struct{} + +// if the id starts with "admin.api" the module will register AdminRoutes via module.Routes() +func (FrankenPHPAdmin) CaddyModule() caddy.ModuleInfo { + return caddy.ModuleInfo{ + ID: "admin.api.frankenphp", + New: func() caddy.Module { return new(FrankenPHPAdmin) }, + } +} + +// EXPERIMENTAL: These routes are not yet stable and may change in the future. +func (admin FrankenPHPAdmin) Routes() []caddy.AdminRoute { + return []caddy.AdminRoute{ + { + Pattern: "/frankenphp/workers/restart", + Handler: caddy.AdminHandlerFunc(admin.restartWorkers), + }, + { + Pattern: "/frankenphp/threads", + Handler: caddy.AdminHandlerFunc(admin.threads), + }, + } +} + +func (admin *FrankenPHPAdmin) restartWorkers(w http.ResponseWriter, r *http.Request) error { + if r.Method != http.MethodPost { + return admin.error(http.StatusMethodNotAllowed, fmt.Errorf("method not allowed")) + } + + frankenphp.RestartWorkers() + caddy.Log().Info("workers restarted from admin api") + admin.success(w, "workers restarted successfully\n") + + return nil +} + +func (admin *FrankenPHPAdmin) threads(w http.ResponseWriter, r *http.Request) error { + debugState := frankenphp.DebugState() + prettyJson, err := json.MarshalIndent(debugState, "", " ") + if err != nil { + return admin.error(http.StatusInternalServerError, err) + } + + return admin.success(w, string(prettyJson)) +} + +func (admin *FrankenPHPAdmin) success(w http.ResponseWriter, message string) error { + w.WriteHeader(http.StatusOK) + _, err := w.Write([]byte(message)) + return err +} + +func (admin *FrankenPHPAdmin) error(statusCode int, err error) error { + return caddy.APIError{HTTPStatus: statusCode, Err: err} +} diff --git a/caddy/admin_test.go b/caddy/admin_test.go new file mode 100644 index 000000000..4a970ffda --- /dev/null +++ b/caddy/admin_test.go @@ -0,0 +1,215 @@ +package caddy_test + +import ( + "encoding/json" + "io" + "net/http" + "sync" + "testing" + + "github.com/caddyserver/caddy/v2/caddytest" + "github.com/dunglas/frankenphp" + "github.com/stretchr/testify/assert" +) + +func TestRestartWorkerViaAdminApi(t *testing.T) { + tester := caddytest.NewTester(t) + tester.InitServer(` + { + skip_install_trust + admin localhost:2999 + http_port `+testPort+` + + frankenphp { + worker ../testdata/worker-with-counter.php 1 + } + } + + localhost:`+testPort+` { + route { + root ../testdata + rewrite worker-with-counter.php + php + } + } + `, "caddyfile") + + tester.AssertGetResponse("http://localhost:"+testPort+"/", http.StatusOK, "requests:1") + tester.AssertGetResponse("http://localhost:"+testPort+"/", http.StatusOK, "requests:2") + + assertAdminResponse(t, tester, "POST", "workers/restart", http.StatusOK, "workers restarted successfully\n") + + tester.AssertGetResponse("http://localhost:"+testPort+"/", http.StatusOK, "requests:1") +} + +func TestShowTheCorrectThreadDebugStatus(t *testing.T) { + tester := caddytest.NewTester(t) + tester.InitServer(` + { + skip_install_trust + admin localhost:2999 + http_port `+testPort+` + + frankenphp { + num_threads 3 + max_threads 6 + worker ../testdata/worker-with-counter.php 1 + worker ../testdata/index.php 1 + } + } + + localhost:`+testPort+` { + route { + root ../testdata + rewrite worker-with-counter.php + php + } + } + `, "caddyfile") + + debugState := getDebugState(t, tester) + + // assert that the correct threads are present in the thread info + assert.Equal(t, debugState.ThreadDebugStates[0].State, "ready") + assert.Contains(t, debugState.ThreadDebugStates[1].Name, "worker-with-counter.php") + assert.Contains(t, debugState.ThreadDebugStates[2].Name, "index.php") + assert.Equal(t, debugState.ReservedThreadCount, 3) + assert.Len(t, debugState.ThreadDebugStates, 3) +} + +func TestAutoScaleWorkerThreads(t *testing.T) { + wg := sync.WaitGroup{} + maxTries := 10 + requestsPerTry := 200 + tester := caddytest.NewTester(t) + tester.InitServer(` + { + skip_install_trust + admin localhost:2999 + http_port `+testPort+` + + frankenphp { + max_threads 10 + num_threads 2 + worker ../testdata/sleep.php 1 + } + } + + localhost:`+testPort+` { + route { + root ../testdata + rewrite sleep.php + php + } + } + `, "caddyfile") + + // spam an endpoint that simulates IO + endpoint := "http://localhost:" + testPort + "/?sleep=2&work=1000" + amountOfThreads := len(getDebugState(t, tester).ThreadDebugStates) + + // try to spawn the additional threads by spamming the server + for tries := 0; tries < maxTries; tries++ { + wg.Add(requestsPerTry) + for i := 0; i < requestsPerTry; i++ { + go func() { + tester.AssertGetResponse(endpoint, http.StatusOK, "slept for 2 ms and worked for 1000 iterations") + wg.Done() + }() + } + wg.Wait() + + amountOfThreads = len(getDebugState(t, tester).ThreadDebugStates) + if amountOfThreads > 2 { + break + } + } + + // assert that there are now more threads than before + assert.NotEqual(t, amountOfThreads, 2) +} + +// Note this test requires at least 2x40MB available memory for the process +func TestAutoScaleRegularThreadsOnAutomaticThreadLimit(t *testing.T) { + wg := sync.WaitGroup{} + maxTries := 10 + requestsPerTry := 200 + tester := caddytest.NewTester(t) + tester.InitServer(` + { + skip_install_trust + admin localhost:2999 + http_port `+testPort+` + + frankenphp { + max_threads auto + num_threads 1 + php_ini memory_limit 40M # a reasonable limit for the test + } + } + + localhost:`+testPort+` { + route { + root ../testdata + php + } + } + `, "caddyfile") + + // spam an endpoint that simulates IO + endpoint := "http://localhost:" + testPort + "/sleep.php?sleep=2&work=1000" + amountOfThreads := len(getDebugState(t, tester).ThreadDebugStates) + + // try to spawn the additional threads by spamming the server + for tries := 0; tries < maxTries; tries++ { + wg.Add(requestsPerTry) + for i := 0; i < requestsPerTry; i++ { + go func() { + tester.AssertGetResponse(endpoint, http.StatusOK, "slept for 2 ms and worked for 1000 iterations") + wg.Done() + }() + } + wg.Wait() + + amountOfThreads = len(getDebugState(t, tester).ThreadDebugStates) + if amountOfThreads > 1 { + break + } + } + + // assert that there are now more threads present + assert.NotEqual(t, amountOfThreads, 1) +} + +func assertAdminResponse(t *testing.T, tester *caddytest.Tester, method string, path string, expectedStatus int, expectedBody string) { + adminUrl := "http://localhost:2999/frankenphp/" + r, err := http.NewRequest(method, adminUrl+path, nil) + assert.NoError(t, err) + if expectedBody == "" { + _ = tester.AssertResponseCode(r, expectedStatus) + return + } + _, _ = tester.AssertResponse(r, expectedStatus, expectedBody) +} + +func getAdminResponseBody(t *testing.T, tester *caddytest.Tester, method string, path string) string { + adminUrl := "http://localhost:2999/frankenphp/" + r, err := http.NewRequest(method, adminUrl+path, nil) + assert.NoError(t, err) + resp := tester.AssertResponseCode(r, http.StatusOK) + defer resp.Body.Close() + bytes, err := io.ReadAll(resp.Body) + assert.NoError(t, err) + + return string(bytes) +} + +func getDebugState(t *testing.T, tester *caddytest.Tester) frankenphp.FrankenPHPDebugState { + threadStates := getAdminResponseBody(t, tester, "GET", "threads") + + var debugStates frankenphp.FrankenPHPDebugState + err := json.Unmarshal([]byte(threadStates), &debugStates) + assert.NoError(t, err) + + return debugStates +} diff --git a/caddy/caddy.go b/caddy/caddy.go index 19657578d..c72db244e 100644 --- a/caddy/caddy.go +++ b/caddy/caddy.go @@ -27,9 +27,12 @@ import ( const defaultDocumentRoot = "public" +var iniError = errors.New("'php_ini' must be in the format: php_ini \"\" \"\"") + func init() { caddy.RegisterModule(FrankenPHPApp{}) caddy.RegisterModule(FrankenPHPModule{}) + caddy.RegisterModule(FrankenPHPAdmin{}) httpcaddyfile.RegisterGlobalOption("frankenphp", parseGlobalOption) @@ -56,8 +59,12 @@ type workerConfig struct { type FrankenPHPApp struct { // NumThreads sets the number of PHP threads to start. Default: 2x the number of available CPUs. NumThreads int `json:"num_threads,omitempty"` + // MaxThreads limits how many threads can be started at runtime. Default 2x NumThreads + MaxThreads int `json:"max_threads,omitempty"` // Workers configures the worker scripts to start. Workers []workerConfig `json:"workers,omitempty"` + // Overwrites the default php ini configuration + PhpIni map[string]string `json:"php_ini,omitempty"` } // CaddyModule returns the Caddy module information. @@ -72,7 +79,13 @@ func (f *FrankenPHPApp) Start() error { repl := caddy.NewReplacer() logger := caddy.Log() - opts := []frankenphp.Option{frankenphp.WithNumThreads(f.NumThreads), frankenphp.WithLogger(logger), frankenphp.WithMetrics(metrics)} + opts := []frankenphp.Option{ + frankenphp.WithNumThreads(f.NumThreads), + frankenphp.WithMaxThreads(f.MaxThreads), + frankenphp.WithLogger(logger), + frankenphp.WithMetrics(metrics), + frankenphp.WithPhpIni(f.PhpIni), + } for _, w := range f.Workers { opts = append(opts, frankenphp.WithWorkers(repl.ReplaceKnown(w.FileName, ""), w.Num, w.Env, w.Watch)) } @@ -118,6 +131,58 @@ func (f *FrankenPHPApp) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { } f.NumThreads = v + case "max_threads": + if !d.NextArg() { + return d.ArgErr() + } + + if d.Val() == "auto" { + f.MaxThreads = -1 + continue + } + + v, err := strconv.ParseUint(d.Val(), 10, 32) + if err != nil { + return err + } + + f.MaxThreads = int(v) + case "php_ini": + parseIniLine := func(d *caddyfile.Dispenser) error { + key := d.Val() + if !d.NextArg() { + return iniError + } + if f.PhpIni == nil { + f.PhpIni = make(map[string]string) + } + f.PhpIni[key] = d.Val() + if d.NextArg() { + return iniError + } + + return nil + } + + isBlock := false + for d.NextBlock(1) { + isBlock = true + err := parseIniLine(d) + if err != nil { + return err + } + } + + if !isBlock { + if !d.NextArg() { + return iniError + } + err := parseIniLine(d) + if err != nil { + return err + } + } + case "worker": wc := workerConfig{} if d.NextArg() { @@ -184,6 +249,10 @@ func (f *FrankenPHPApp) UnmarshalCaddyfile(d *caddyfile.Dispenser) error { } } + if f.MaxThreads > 0 && f.NumThreads > 0 && f.MaxThreads < f.NumThreads { + return errors.New("'max_threads' must be greater than or equal to 'num_threads'") + } + return nil } diff --git a/caddy/caddy_test.go b/caddy/caddy_test.go index 71880fb5a..b9b65e9e6 100644 --- a/caddy/caddy_test.go +++ b/caddy/caddy_test.go @@ -616,3 +616,71 @@ func TestAllDefinedServerVars(t *testing.T) { expectedBody, ) } + +func TestPHPIniConfiguration(t *testing.T) { + tester := caddytest.NewTester(t) + tester.InitServer(` + { + skip_install_trust + admin localhost:2999 + http_port `+testPort+` + + frankenphp { + num_threads 2 + worker ../testdata/ini.php 1 + php_ini max_execution_time 100 + php_ini memory_limit 10000000 + } + } + + localhost:`+testPort+` { + route { + root ../testdata + php + } + } + `, "caddyfile") + + testSingleIniConfiguration(tester, "max_execution_time", "100") + testSingleIniConfiguration(tester, "memory_limit", "10000000") +} + +func TestPHPIniBlockConfiguration(t *testing.T) { + tester := caddytest.NewTester(t) + tester.InitServer(` + { + skip_install_trust + admin localhost:2999 + http_port `+testPort+` + + frankenphp { + num_threads 1 + php_ini { + max_execution_time 15 + memory_limit 20000000 + } + } + } + + localhost:`+testPort+` { + route { + root ../testdata + php + } + } + `, "caddyfile") + + testSingleIniConfiguration(tester, "max_execution_time", "15") + testSingleIniConfiguration(tester, "memory_limit", "20000000") +} + +func testSingleIniConfiguration(tester *caddytest.Tester, key string, value string) { + // test twice to ensure the ini setting is not lost + for i := 0; i < 2; i++ { + tester.AssertGetResponse( + "http://localhost:"+testPort+"/ini.php?key="+key, + http.StatusOK, + key+":"+value, + ) + } +} diff --git a/caddy/watcher_test.go b/caddy/watcher_test.go index aad782c54..63801b870 100644 --- a/caddy/watcher_test.go +++ b/caddy/watcher_test.go @@ -19,7 +19,7 @@ func TestWorkerWithInactiveWatcher(t *testing.T) { frankenphp { worker { - file ../testdata/worker-with-watcher.php + file ../testdata/worker-with-counter.php num 1 watch ./**/*.php } @@ -28,7 +28,7 @@ func TestWorkerWithInactiveWatcher(t *testing.T) { localhost:`+testPort+` { root ../testdata - rewrite worker-with-watcher.php + rewrite worker-with-counter.php php } `, "caddyfile") diff --git a/debugstate.go b/debugstate.go new file mode 100644 index 000000000..c6d412b93 --- /dev/null +++ b/debugstate.go @@ -0,0 +1,67 @@ +package frankenphp + +import ( + "net/http" + "time" +) + +type ThreadDebugState struct { + Index int + Name string + State string + IsHandlingRequest bool + Path string + InRequestSinceMilliseconds int64 + WaitingSinceMilliseconds int64 +} + +type FrankenPHPDebugState struct { + ThreadDebugStates []ThreadDebugState + ReservedThreadCount int +} + +// EXPERIMENTAL: DebugState prints the state of all PHP threads - debugging purposes only +func DebugState() FrankenPHPDebugState { + fullState := FrankenPHPDebugState{ + ThreadDebugStates: make([]ThreadDebugState, 0, len(phpThreads)), + ReservedThreadCount: 0, + } + for _, thread := range phpThreads { + if thread.state.is(stateReserved) { + fullState.ReservedThreadCount++ + continue + } + fullState.ThreadDebugStates = append(fullState.ThreadDebugStates, threadDebugState(thread)) + } + + return fullState +} + +// threadDebugState creates a small jsonable status message for debugging purposes +func threadDebugState(thread *phpThread) ThreadDebugState { + debugState := ThreadDebugState{ + Index: thread.threadIndex, + Name: thread.handler.name(), + State: thread.state.name(), + WaitingSinceMilliseconds: thread.state.waitTime(), + } + + var r *http.Request + if r = thread.getActiveRequestSafely(); r == nil { + return debugState + } + + fc := r.Context().Value(contextKey).(*FrankenPHPContext) + if fc.originalRequest != nil { + debugState.Path = fc.originalRequest.URL.Path + } else { + debugState.Path = r.URL.Path + } + + if fc.responseWriter != nil { + debugState.IsHandlingRequest = true + debugState.InRequestSinceMilliseconds = time.Since(fc.startedAt).Milliseconds() + } + + return debugState +} diff --git a/dev.Dockerfile b/dev.Dockerfile index 9493e92d2..974ea7b1f 100644 --- a/dev.Dockerfile +++ b/dev.Dockerfile @@ -71,7 +71,8 @@ WORKDIR /usr/local/src/watcher RUN git clone https://github.com/e-dant/watcher . && \ cmake -S . -B build -DCMAKE_BUILD_TYPE=Release && \ cmake --build build/ && \ - cmake --install build + cmake --install build && \ + ldconfig WORKDIR /go/src/app COPY . . diff --git a/docs/config.md b/docs/config.md index 07034aefe..720f102c6 100644 --- a/docs/config.md +++ b/docs/config.md @@ -51,6 +51,8 @@ Optionally, the number of threads to create and [worker scripts](worker.md) to s { frankenphp { num_threads # Sets the number of PHP threads to start. Default: 2x the number of available CPUs. + max_threads # Limits the number of additional PHP threads that can be started at runtime. Default: num_threads. Can be set to 'auto'. + php_ini # Set a php.ini directive. Can be used several times to set multiple directives. worker { file # Sets the path to the worker script. num # Sets the number of PHP threads to start, defaults to 2x the number of available CPUs. @@ -227,6 +229,23 @@ To load [additional PHP configuration files](https://www.php.net/manual/en/confi the `PHP_INI_SCAN_DIR` environment variable can be used. When set, PHP will load all the file with the `.ini` extension present in the given directories. +You can also change the PHP configuration using the `php_ini` directive in the `Caddyfile`: + +```caddyfile +{ + frankenphp { + php_ini memory_limit 256M + + # or + + php_ini { + memory_limit 256M + max_execution_time 15 + } + } +} +``` + ## Enable the Debug Mode When using the Docker image, set the `CADDY_GLOBAL_OPTIONS` environment variable to `debug` to enable the debug mode: diff --git a/docs/performance.md b/docs/performance.md index 39fc97638..6e6d55e6a 100644 --- a/docs/performance.md +++ b/docs/performance.md @@ -16,6 +16,16 @@ To find the right values, it's best to run load tests simulating real traffic. To configure the number of threads, use the `num_threads` option of the `php_server` and `php` directives. To change the number of workers, use the `num` option of the `worker` section of the `frankenphp` directive. +### `max_threads` + +While it's always better to know exactly what your traffic will look like, real-life applications tend to be more +unpredictable. The `max_threads` allows FrankenPHP to automatically spawn additional threads at runtime up to the specified limit. +`max_threads` can help you +figure out how many threads you need to handle your traffic and can make the server more resilient to latency spikes. +If set to `auto`, the limit will be estimated based on the `memory_limit` in your `php.ini`. If not able to do so, +`auto` will instead default to 2x `num_threads`. +`max_threads is similar to PHP FPM's [pm.max_children](https://www.php.net/manual/en/install.fpm.configuration.php#pm.max-children). + ## Worker Mode Enabling [the worker mode](worker.md) dramatically improves performance, diff --git a/docs/ru/CONTRIBUTING.md b/docs/ru/CONTRIBUTING.md index 045dc1b04..d3e4e6f59 100644 --- a/docs/ru/CONTRIBUTING.md +++ b/docs/ru/CONTRIBUTING.md @@ -211,4 +211,4 @@ strace -e 'trace=!futex,epoll_ctl,epoll_pwait,tgkill,rt_sigreturn' -p 1 5. Создайте Pull Request с переводом. 6. В [репозитории сайта](https://github.com/dunglas/frankenphp-website/tree/main) скопируйте и переведите файлы в папках `content/`, `data/` и `i18n/`. 7. Переведите значения в созданных YAML-файлах. -8. Откройте Pull Request в репозитории сайта. \ No newline at end of file +8. Откройте Pull Request в репозитории сайта. diff --git a/docs/ru/README.md b/docs/ru/README.md index f66040514..abcfb3eb8 100644 --- a/docs/ru/README.md +++ b/docs/ru/README.md @@ -83,4 +83,4 @@ frankenphp php-cli /path/to/your/script.php * [WordPress](https://github.com/StephenMiracle/frankenwp) * [Drupal](https://github.com/dunglas/frankenphp-drupal) * [Joomla](https://github.com/alexandreelise/frankenphp-joomla) -* [TYPO3](https://github.com/ochorocho/franken-typo3) \ No newline at end of file +* [TYPO3](https://github.com/ochorocho/franken-typo3) diff --git a/docs/ru/config.md b/docs/ru/config.md index 943e591e0..d1f948a30 100644 --- a/docs/ru/config.md +++ b/docs/ru/config.md @@ -229,4 +229,4 @@ docker run -v $PWD:/app/public \ -e CADDY_GLOBAL_OPTIONS=debug \ -p 80:80 -p 443:443 -p 443:443/udp \ dunglas/frankenphp -``` \ No newline at end of file +``` diff --git a/docs/ru/docker.md b/docs/ru/docker.md index c107d6787..dcf0fa46e 100644 --- a/docs/ru/docker.md +++ b/docs/ru/docker.md @@ -77,6 +77,7 @@ FROM dunglas/frankenphp AS runner # Заменяем официальный бинарный файл на пользовательский с добавленными модулями COPY --from=builder /usr/local/bin/frankenphp /usr/local/bin/frankenphp ``` + Образ `builder`, предоставляемый FrankenPHP, содержит скомпилированную версию `libphp`. [Образы builder](https://hub.docker.com/r/dunglas/frankenphp/tags?name=builder) доступны для всех версий FrankenPHP и PHP, как для Debian, так и для Alpine. @@ -197,4 +198,4 @@ Docker-образы обновляются: Сборка запускается автоматически при каждом коммите в основную ветку GitHub-репозитория Теги с префиксом `latest*` указывают на актуальное состояние ветки `main`. -Также доступны теги в формате `sha-`. \ No newline at end of file +Также доступны теги в формате `sha-`. diff --git a/docs/ru/early-hints.md b/docs/ru/early-hints.md index ae12def66..e27c3b194 100644 --- a/docs/ru/early-hints.md +++ b/docs/ru/early-hints.md @@ -18,4 +18,4 @@ echo <<<'HTML' HTML; ``` -Early Hints поддерживается как в обычном, так и в [worker режиме](worker.md). \ No newline at end of file +Early Hints поддерживается как в обычном, так и в [worker режиме](worker.md). diff --git a/docs/ru/github-actions.md b/docs/ru/github-actions.md index 9517381fd..aea7a80ef 100644 --- a/docs/ru/github-actions.md +++ b/docs/ru/github-actions.md @@ -27,4 +27,4 @@ 1. Создайте новый тег в репозитории. 2. GitHub Actions соберёт образ и выполнит тесты. 3. Если сборка пройдёт успешно, образ будет отправлен в реестр с именем тега (например, `v1.2.3` и `v1.2` будут созданы). -4. Также будет обновлён тег `latest`. \ No newline at end of file +4. Также будет обновлён тег `latest`. diff --git a/docs/ru/known-issues.md b/docs/ru/known-issues.md index 6c7834410..af92ba4b7 100644 --- a/docs/ru/known-issues.md +++ b/docs/ru/known-issues.md @@ -137,4 +137,4 @@ error:0A000086:SSL routines::certificate verify failed # Установите переменные окружения для TLS-сертификатов export SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt export SSL_CERT_DIR=/etc/ssl/certs -``` \ No newline at end of file +``` diff --git a/docs/ru/laravel.md b/docs/ru/laravel.md index 115d23ae0..b0274fc4e 100644 --- a/docs/ru/laravel.md +++ b/docs/ru/laravel.md @@ -164,7 +164,7 @@ php artisan octane:frankenphp Можно даже упаковать приложения Laravel Octane как автономный бинарный файл! -Для этого [установите Octane правильно](#laravel-octane) и следуйте шагам, описанным в [предыдущем разделе](#laravel-приложения-как-standalone-бинарники). +Для этого [установите Octane правильно](#laravel-octane) и следуйте шагам, описанным в предыдущем разделе. Затем, чтобы запустить FrankenPHP в worker-режиме через Octane, выполните: @@ -173,4 +173,4 @@ PATH="$PWD:$PATH" frankenphp php-cli artisan octane:frankenphp ``` > [!CAUTION] -> Для работы команды автономный бинарник **обязательно** должен быть назван `frankenphp`, так как Octane требует наличия программы с именем `frankenphp` в PATH. \ No newline at end of file +> Для работы команды автономный бинарник **обязательно** должен быть назван `frankenphp`, так как Octane требует наличия программы с именем `frankenphp` в PATH. diff --git a/docs/ru/metrics.md b/docs/ru/metrics.md index 684604ac9..19f1160ac 100644 --- a/docs/ru/metrics.md +++ b/docs/ru/metrics.md @@ -12,4 +12,4 @@ - `frankenphp_total_threads`: Общее количество потоков PHP. - `frankenphp_busy_threads`: Количество потоков PHP, которые в данный момент обрабатывают запрос (работающие worker-скрипты всегда используют поток). -Для метрик worker-скриптов плейсхолдер `[worker]` заменяется на путь к Worker-скрипту, указанному в Caddyfile. \ No newline at end of file +Для метрик worker-скриптов плейсхолдер `[worker]` заменяется на путь к Worker-скрипту, указанному в Caddyfile. diff --git a/docs/ru/production.md b/docs/ru/production.md index be189fc6c..529b4a142 100644 --- a/docs/ru/production.md +++ b/docs/ru/production.md @@ -122,4 +122,4 @@ docker compose up -d --wait ## Деплой на несколько узлов Если вам нужно развернуть приложение на кластер машин, используйте [Docker Swarm](https://docs.docker.com/engine/swarm/stack-deploy/), который совместим с предоставленными файлами Compose. -Для деплоя на Kubernetes ознакомьтесь с [Helm-чартом API Platform](https://api-platform.com/docs/deployment/kubernetes/), который использует FrankenPHP. \ No newline at end of file +Для деплоя на Kubernetes ознакомьтесь с [Helm-чартом API Platform](https://api-platform.com/docs/deployment/kubernetes/), который использует FrankenPHP. diff --git a/docs/ru/worker.md b/docs/ru/worker.md index 40e3df696..e94a540ca 100644 --- a/docs/ru/worker.md +++ b/docs/ru/worker.md @@ -156,4 +156,4 @@ $handler = static function () use ($workerServer) { }; // ... -``` \ No newline at end of file +``` diff --git a/docs/worker.md b/docs/worker.md index ca121ef97..5935bfac0 100644 --- a/docs/worker.md +++ b/docs/worker.md @@ -128,6 +128,16 @@ A workaround to using this type of code in worker mode is to restart the worker The previous worker snippet allows configuring a maximum number of request to handle by setting an environment variable named `MAX_REQUESTS`. +### Restart Workers Manually + +While it's possible to restart workers [on file changes](config.md#watching-for-file-changes), it's also possible to restart all workers +gracefully via the [Caddy admin API](https://caddyserver.com/docs/api). If the admin is enabled in your +[Caddyfile](config.md#caddyfile-config), you can ping the restart endpoint with a simple POST request like this: + +```console +curl -X POST http://localhost:2019/frankenphp/workers/restart +``` + ### Worker Failures If a worker script crashes with a non-zero exit code, FrankenPHP will restart it with an exponential backoff strategy. diff --git a/frankenphp.c b/frankenphp.c index 143c5ac83..a7a272338 100644 --- a/frankenphp.c +++ b/frankenphp.c @@ -78,7 +78,7 @@ typedef struct frankenphp_server_context { bool finished; } frankenphp_server_context; -__thread bool should_filter_var = 0; +bool should_filter_var = 0; __thread frankenphp_server_context *local_ctx = NULL; __thread uintptr_t thread_index; __thread zval *os_environment = NULL; @@ -772,7 +772,8 @@ frankenphp_register_variable_from_request_info(zend_string *zKey, char *value, frankenphp_register_trusted_var(zKey, value, strlen(value), Z_ARRVAL_P(track_vars_array)); } else if (must_be_present) { - frankenphp_register_trusted_var(zKey, "", 0, Z_ARRVAL_P(track_vars_array)); + frankenphp_register_trusted_var(zKey, NULL, 0, + Z_ARRVAL_P(track_vars_array)); } } @@ -918,12 +919,6 @@ static void *php_thread(void *arg) { local_ctx = malloc(sizeof(frankenphp_server_context)); - /* check if a default filter is set in php.ini and only filter if - * it is, this is deprecated and will be removed in PHP 9 */ - char *default_filter; - cfg_get_string("filter.default", &default_filter); - should_filter_var = default_filter != NULL; - // loop until Go signals to stop char *scriptName = NULL; while ((scriptName = go_frankenphp_before_script_execution(thread_index))) { @@ -985,10 +980,22 @@ static void *php_main(void *arg) { memcpy(frankenphp_sapi_module.ini_entries, HARDCODED_INI, sizeof(HARDCODED_INI)); #endif +#else + /* overwrite php.ini with custom user settings */ + char *php_ini_overrides = go_get_custom_php_ini(); + if (php_ini_overrides != NULL) { + frankenphp_sapi_module.ini_entries = php_ini_overrides; + } #endif frankenphp_sapi_module.startup(&frankenphp_sapi_module); + /* check if a default filter is set in php.ini and only filter if + * it is, this is deprecated and will be removed in PHP 9 */ + char *default_filter; + cfg_get_string("filter.default", &default_filter); + should_filter_var = default_filter != NULL; + go_frankenphp_main_thread_is_ready(); /* channel closed, shutdown gracefully */ @@ -1257,3 +1264,5 @@ int frankenphp_reset_opcache(void) { } return 0; } + +int frankenphp_get_current_memory_limit() { return PG(memory_limit); } diff --git a/frankenphp.go b/frankenphp.go index 3367351ee..29213c7ae 100644 --- a/frankenphp.go +++ b/frankenphp.go @@ -63,8 +63,7 @@ var ( ScriptExecutionError = errors.New("error during PHP script execution") NotRunningError = errors.New("FrankenPHP is not running. For proper configuration visit: https://frankenphp.dev/docs/config/#caddyfile-config") - requestChan chan *http.Request - isRunning bool + isRunning bool loggerMu sync.RWMutex logger *zap.Logger @@ -141,7 +140,8 @@ func clientHasClosed(r *http.Request) bool { // NewRequestWithContext creates a new FrankenPHP request context. func NewRequestWithContext(r *http.Request, opts ...RequestOption) (*http.Request, error) { fc := &FrankenPHPContext{ - done: make(chan interface{}), + done: make(chan interface{}), + startedAt: time.Now(), } for _, o := range opts { if err := o(fc); err != nil { @@ -244,7 +244,7 @@ func Config() PHPConfig { // MaxThreads is internally used during tests. It is written to, but never read and may go away in the future. var MaxThreads int -func calculateMaxThreads(opt *opt) (int, int, error) { +func calculateMaxThreads(opt *opt) (int, int, int, error) { maxProcs := runtime.GOMAXPROCS(0) * 2 var numWorkers int @@ -266,13 +266,17 @@ func calculateMaxThreads(opt *opt) (int, int, error) { opt.numThreads = maxProcs } } else if opt.numThreads <= numWorkers { - return opt.numThreads, numWorkers, NotEnoughThreads + return opt.numThreads, numWorkers, opt.maxThreads, NotEnoughThreads + } + + if opt.maxThreads < opt.numThreads && opt.maxThreads > 0 { + opt.maxThreads = opt.numThreads } metrics.TotalThreads(opt.numThreads) MaxThreads = opt.numThreads - return opt.numThreads, numWorkers, nil + return opt.numThreads, numWorkers, opt.maxThreads, nil } // Init starts the PHP runtime and the configured workers. @@ -312,7 +316,7 @@ func Init(options ...Option) error { metrics = opt.metrics } - totalThreadCount, workerThreadCount, err := calculateMaxThreads(opt) + totalThreadCount, workerThreadCount, maxThreadCount, err := calculateMaxThreads(opt) if err != nil { return err } @@ -332,11 +336,13 @@ func Init(options ...Option) error { logger.Warn(`ZTS is not enabled, only 1 thread will be available, recompile PHP using the "--enable-zts" configuration option or performance will be degraded`) } - requestChan = make(chan *http.Request, opt.numThreads) - if err := initPHPThreads(totalThreadCount); err != nil { + mainThread, err := initPHPThreads(totalThreadCount, maxThreadCount, opt.phpIni) + if err != nil { return err } + regularRequestChan = make(chan *http.Request, totalThreadCount-workerThreadCount) + regularThreads = make([]*phpThread, 0, totalThreadCount-workerThreadCount) for i := 0; i < totalThreadCount-workerThreadCount; i++ { thread := getInactivePHPThread() convertToRegularThread(thread) @@ -346,8 +352,10 @@ func Init(options ...Option) error { return err } + initAutoScaling(mainThread) + if c := logger.Check(zapcore.InfoLevel, "FrankenPHP started 🐘"); c != nil { - c.Write(zap.String("php_version", Version().Version), zap.Int("num_threads", totalThreadCount)) + c.Write(zap.String("php_version", Version().Version), zap.Int("num_threads", mainThread.numThreads), zap.Int("max_threads", mainThread.maxThreads)) } if EmbeddedAppPath != "" { if c := logger.Check(zapcore.InfoLevel, "embedded PHP app 📦"); c != nil { @@ -365,17 +373,18 @@ func Shutdown() { } drainWorkers() + drainAutoScaling() drainPHPThreads() + metrics.Shutdown() - requestChan = nil // Remove the installed app if EmbeddedAppPath != "" { _ = os.RemoveAll(EmbeddedAppPath) } - logger.Debug("FrankenPHP shut down") isRunning = false + logger.Debug("FrankenPHP shut down") } func getLogger() *zap.Logger { @@ -468,7 +477,6 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error } fc.responseWriter = responseWriter - fc.startedAt = time.Now() // Detect if a worker is available to handle this request if worker, ok := workers[fc.scriptFilename]; ok { @@ -476,15 +484,8 @@ func ServeHTTP(responseWriter http.ResponseWriter, request *http.Request) error return nil } - metrics.StartRequest() - - select { - case <-mainThread.done: - case requestChan <- request: - <-fc.done - } - - metrics.StopRequest() + // If no worker was availabe send the request to non-worker threads + handleRequestWithRegularPHPThreads(request, fc) return nil } diff --git a/frankenphp.h b/frankenphp.h index fb9a093f1..b8c255040 100644 --- a/frankenphp.h +++ b/frankenphp.h @@ -71,6 +71,7 @@ void frankenphp_register_variable_safe(char *key, char *var, size_t val_len, zend_string *frankenphp_init_persistent_string(const char *string, size_t len); void frankenphp_release_zend_string(zend_string *z_string); int frankenphp_reset_opcache(void); +int frankenphp_get_current_memory_limit(); void frankenphp_register_single(zend_string *z_key, char *value, size_t val_len, zval *track_vars_array); diff --git a/internal/cpu/cpu_unix.go b/internal/cpu/cpu_unix.go new file mode 100644 index 000000000..4d1821522 --- /dev/null +++ b/internal/cpu/cpu_unix.go @@ -0,0 +1,35 @@ +package cpu + +// #include +import "C" +import ( + "runtime" + "time" +) + +var cpuCount = runtime.GOMAXPROCS(0) + +// ProbeCPUs probes the CPU usage of the process +// if CPUs are not busy, most threads are likely waiting for I/O, so we should scale +// if CPUs are already busy we won't gain much by scaling and want to avoid the overhead of doing so +func ProbeCPUs(probeTime time.Duration, maxCPUUsage float64, abort chan struct{}) bool { + var cpuStart, cpuEnd C.struct_timespec + + // note: clock_gettime is a POSIX function + // on Windows we'd need to use QueryPerformanceCounter instead + start := time.Now() + C.clock_gettime(C.CLOCK_PROCESS_CPUTIME_ID, &cpuStart) + + select { + case <-abort: + return false + case <-time.After(probeTime): + } + + C.clock_gettime(C.CLOCK_PROCESS_CPUTIME_ID, &cpuEnd) + elapsedTime := float64(time.Since(start).Nanoseconds()) + elapsedCpuTime := float64(cpuEnd.tv_sec-cpuStart.tv_sec)*1e9 + float64(cpuEnd.tv_nsec-cpuStart.tv_nsec) + cpuUsage := elapsedCpuTime / elapsedTime / float64(cpuCount) + + return cpuUsage < maxCPUUsage +} diff --git a/internal/cpu/cpu_windows.go b/internal/cpu/cpu_windows.go new file mode 100644 index 000000000..d09d55241 --- /dev/null +++ b/internal/cpu/cpu_windows.go @@ -0,0 +1,15 @@ +package cpu + +import ( + "time" +) + +// ProbeCPUs fallback that always determines that the CPU limits are not reached +func ProbeCPUs(probeTime time.Duration, maxCPUUsage float64, abort chan struct{}) bool { + select { + case <-abort: + return false + case <-time.After(probeTime): + return true + } +} diff --git a/internal/memory/memory_linux.go b/internal/memory/memory_linux.go new file mode 100644 index 000000000..fc95d199d --- /dev/null +++ b/internal/memory/memory_linux.go @@ -0,0 +1,13 @@ +package memory + +import "syscall" + +func TotalSysMemory() uint64 { + sysInfo := &syscall.Sysinfo_t{} + err := syscall.Sysinfo(sysInfo) + if err != nil { + return 0 + } + + return uint64(sysInfo.Totalram) * uint64(sysInfo.Unit) +} diff --git a/internal/memory/memory_others.go b/internal/memory/memory_others.go new file mode 100644 index 000000000..a06af7b1a --- /dev/null +++ b/internal/memory/memory_others.go @@ -0,0 +1,8 @@ +//go:build !linux + +package memory + +// TotalSysMemory returns 0 if the total system memory cannot be determined +func TotalSysMemory() uint64 { + return 0 +} diff --git a/metrics.go b/metrics.go index 467e1a5dc..946383ae1 100644 --- a/metrics.go +++ b/metrics.go @@ -40,6 +40,10 @@ type Metrics interface { // StartWorkerRequest collects started worker requests StartWorkerRequest(name string) Shutdown() + QueuedWorkerRequest(name string) + DequeuedWorkerRequest(name string) + QueuedRequest() + DequeuedRequest() } type nullMetrics struct{} @@ -74,6 +78,13 @@ func (n nullMetrics) StartWorkerRequest(string) { func (n nullMetrics) Shutdown() { } +func (n nullMetrics) QueuedWorkerRequest(name string) {} + +func (n nullMetrics) DequeuedWorkerRequest(name string) {} + +func (n nullMetrics) QueuedRequest() {} +func (n nullMetrics) DequeuedRequest() {} + type PrometheusMetrics struct { registry prometheus.Registerer totalThreads prometheus.Counter @@ -85,6 +96,8 @@ type PrometheusMetrics struct { workerRestarts map[string]prometheus.Counter workerRequestTime map[string]prometheus.Counter workerRequestCount map[string]prometheus.Counter + workerQueueDepth map[string]prometheus.Gauge + queueDepth prometheus.Gauge mu sync.Mutex } @@ -213,6 +226,15 @@ func (m *PrometheusMetrics) TotalWorkers(name string, _ int) { }) m.registry.MustRegister(m.workerRequestCount[identity]) } + + if _, ok := m.workerQueueDepth[identity]; !ok { + m.workerQueueDepth[identity] = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "frankenphp", + Subsystem: subsystem, + Name: "worker_queue_depth", + }) + m.registry.MustRegister(m.workerQueueDepth[identity]) + } } func (m *PrometheusMetrics) TotalThreads(num int) { @@ -244,9 +266,32 @@ func (m *PrometheusMetrics) StartWorkerRequest(name string) { m.busyWorkers[name].Inc() } +func (m *PrometheusMetrics) QueuedWorkerRequest(name string) { + if _, ok := m.workerQueueDepth[name]; !ok { + return + } + m.workerQueueDepth[name].Inc() +} + +func (m *PrometheusMetrics) DequeuedWorkerRequest(name string) { + if _, ok := m.workerQueueDepth[name]; !ok { + return + } + m.workerQueueDepth[name].Dec() +} + +func (m *PrometheusMetrics) QueuedRequest() { + m.queueDepth.Inc() +} + +func (m *PrometheusMetrics) DequeuedRequest() { + m.queueDepth.Dec() +} + func (m *PrometheusMetrics) Shutdown() { m.registry.Unregister(m.totalThreads) m.registry.Unregister(m.busyThreads) + m.registry.Unregister(m.queueDepth) for _, g := range m.totalWorkers { m.registry.Unregister(g) @@ -276,6 +321,10 @@ func (m *PrometheusMetrics) Shutdown() { m.registry.Unregister(g) } + for _, g := range m.workerQueueDepth { + m.registry.Unregister(g) + } + m.totalThreads = prometheus.NewCounter(prometheus.CounterOpts{ Name: "frankenphp_total_threads", Help: "Total number of PHP threads", @@ -291,9 +340,15 @@ func (m *PrometheusMetrics) Shutdown() { m.workerRestarts = map[string]prometheus.Counter{} m.workerCrashes = map[string]prometheus.Counter{} m.readyWorkers = map[string]prometheus.Gauge{} + m.workerQueueDepth = map[string]prometheus.Gauge{} + m.queueDepth = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "frankenphp_queue_depth", + Help: "Number of regular queued requests", + }) m.registry.MustRegister(m.totalThreads) m.registry.MustRegister(m.busyThreads) + m.registry.MustRegister(m.queueDepth) } func getWorkerNameForMetrics(name string) string { @@ -325,10 +380,16 @@ func NewPrometheusMetrics(registry prometheus.Registerer) *PrometheusMetrics { workerRestarts: map[string]prometheus.Counter{}, workerCrashes: map[string]prometheus.Counter{}, readyWorkers: map[string]prometheus.Gauge{}, + workerQueueDepth: map[string]prometheus.Gauge{}, + queueDepth: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "frankenphp_queue_depth", + Help: "Number of regular queued requests", + }), } m.registry.MustRegister(m.totalThreads) m.registry.MustRegister(m.busyThreads) + m.registry.MustRegister(m.queueDepth) return m } diff --git a/metrics_test.go b/metrics_test.go index 052226993..a35e0576c 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -40,6 +40,7 @@ func createPrometheusMetrics() *PrometheusMetrics { workerRequestCount: make(map[string]prometheus.Counter), workerCrashes: make(map[string]prometheus.Counter), workerRestarts: make(map[string]prometheus.Counter), + workerQueueDepth: make(map[string]prometheus.Gauge), readyWorkers: make(map[string]prometheus.Gauge), mu: sync.Mutex{}, } diff --git a/options.go b/options.go index 724e75c8b..6d1a5ebd2 100644 --- a/options.go +++ b/options.go @@ -12,9 +12,11 @@ type Option func(h *opt) error // If you change this, also update the Caddy module and the documentation. type opt struct { numThreads int + maxThreads int workers []workerOpt logger *zap.Logger metrics Metrics + phpIni map[string]string } type workerOpt struct { @@ -33,6 +35,14 @@ func WithNumThreads(numThreads int) Option { } } +func WithMaxThreads(maxThreads int) Option { + return func(o *opt) error { + o.maxThreads = maxThreads + + return nil + } +} + func WithMetrics(m Metrics) Option { return func(o *opt) error { o.metrics = m @@ -58,3 +68,11 @@ func WithLogger(l *zap.Logger) Option { return nil } } + +// WithPhpIni configures user defined PHP ini settings. +func WithPhpIni(overrides map[string]string) Option { + return func(o *opt) error { + o.phpIni = overrides + return nil + } +} diff --git a/phpmainthread.go b/phpmainthread.go index ddc4778eb..dfb753be9 100644 --- a/phpmainthread.go +++ b/phpmainthread.go @@ -3,10 +3,13 @@ package frankenphp // #include "frankenphp.h" import "C" import ( + "fmt" "sync" + "github.com/dunglas/frankenphp/internal/memory" "github.com/dunglas/frankenphp/internal/phpheaders" "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) // represents the main PHP thread @@ -15,6 +18,8 @@ type phpMainThread struct { state *threadState done chan struct{} numThreads int + maxThreads int + phpIni map[string]string commonHeaders map[string]*C.zend_string knownServerKeys map[string]*C.zend_string } @@ -24,63 +29,72 @@ var ( mainThread *phpMainThread ) -// reserve a fixed number of PHP threads on the Go side -func initPHPThreads(numThreads int) error { +// initPHPThreads starts the main PHP thread, +// a fixed number of inactive PHP threads +// and reserves a fixed number of possible PHP threads +func initPHPThreads(numThreads int, numMaxThreads int, phpIni map[string]string) (*phpMainThread, error) { mainThread = &phpMainThread{ state: newThreadState(), done: make(chan struct{}), numThreads: numThreads, + maxThreads: numMaxThreads, + phpIni: phpIni, } - phpThreads = make([]*phpThread, numThreads) - // initialize all threads as inactive + // initialize the first thread // this needs to happen before starting the main thread // since some extensions access environment variables on startup - for i := 0; i < numThreads; i++ { - phpThreads[i] = newPHPThread(i) - convertToInactiveThread(phpThreads[i]) - } + // the threadIndex on the main thread defaults to 0 -> phpThreads[0].Pin(...) + initialThread := newPHPThread(0) + phpThreads = []*phpThread{initialThread} if err := mainThread.start(); err != nil { - return err + return nil, err + } + + // initialize all other threads + phpThreads = make([]*phpThread, mainThread.maxThreads) + phpThreads[0] = initialThread + for i := 1; i < mainThread.maxThreads; i++ { + phpThreads[i] = newPHPThread(i) } // start the underlying C threads ready := sync.WaitGroup{} ready.Add(numThreads) - for _, thread := range phpThreads { + for i := 0; i < numThreads; i++ { + thread := phpThreads[i] go func() { - if !C.frankenphp_new_php_thread(C.uintptr_t(thread.threadIndex)) { - logger.Panic("unable to create thread", zap.Int("threadIndex", thread.threadIndex)) - } - thread.state.waitFor(stateInactive) + thread.boot() ready.Done() }() } ready.Wait() - return nil + return mainThread, nil } func drainPHPThreads() { doneWG := sync.WaitGroup{} doneWG.Add(len(phpThreads)) - for _, thread := range phpThreads { - thread.handlerMu.Lock() - _ = thread.state.requestSafeStateChange(stateShuttingDown) - close(thread.drainChan) - } + mainThread.state.set(stateShuttingDown) close(mainThread.done) for _, thread := range phpThreads { + // shut down all reserved threads + if thread.state.compareAndSwap(stateReserved, stateDone) { + doneWG.Done() + continue + } + // shut down all active threads go func(thread *phpThread) { - thread.state.waitFor(stateDone) - thread.handlerMu.Unlock() + thread.shutdown() doneWG.Done() }(thread) } + doneWG.Wait() - mainThread.state.set(stateShuttingDown) - mainThread.state.waitFor(stateDone) + mainThread.state.set(stateDone) + mainThread.state.waitFor(stateReserved) phpThreads = nil } @@ -88,6 +102,7 @@ func (mainThread *phpMainThread) start() error { if C.frankenphp_new_main_thread(C.int(mainThread.numThreads)) != 0 { return MainThreadCreationError } + mainThread.state.waitFor(stateReady) // cache common request headers as zend_strings (HTTP_ACCEPT, HTTP_USER_AGENT, etc.) @@ -111,16 +126,74 @@ func getInactivePHPThread() *phpThread { return thread } } - panic("not enough threads reserved") + + for _, thread := range phpThreads { + if thread.state.compareAndSwap(stateReserved, stateBootRequested) { + thread.boot() + return thread + } + } + + return nil +} + +func getPHPThreadAtState(state stateID) *phpThread { + for _, thread := range phpThreads { + if thread.state.is(state) { + return thread + } + } + return nil } //export go_frankenphp_main_thread_is_ready func go_frankenphp_main_thread_is_ready() { + mainThread.setAutomaticMaxThreads() + if mainThread.maxThreads < mainThread.numThreads { + mainThread.maxThreads = mainThread.numThreads + } + mainThread.state.set(stateReady) - mainThread.state.waitFor(stateShuttingDown) + mainThread.state.waitFor(stateDone) +} + +// max_threads = auto +// setAutomaticMaxThreads estimates the amount of threads based on php.ini and system memory_limit +// If unable to get the system's memory limit, simply double num_threads +func (mainThread *phpMainThread) setAutomaticMaxThreads() { + if mainThread.maxThreads >= 0 { + return + } + perThreadMemoryLimit := int64(C.frankenphp_get_current_memory_limit()) + totalSysMemory := memory.TotalSysMemory() + if perThreadMemoryLimit <= 0 || totalSysMemory == 0 { + mainThread.maxThreads = mainThread.numThreads * 2 + return + } + maxAllowedThreads := totalSysMemory / uint64(perThreadMemoryLimit) + mainThread.maxThreads = int(maxAllowedThreads) + + if c := logger.Check(zapcore.DebugLevel, "Automatic thread limit"); c != nil { + c.Write(zap.Int("perThreadMemoryLimitMB", int(perThreadMemoryLimit/1024/1024)), zap.Int("maxThreads", mainThread.maxThreads)) + } } //export go_frankenphp_shutdown_main_thread func go_frankenphp_shutdown_main_thread() { - mainThread.state.set(stateDone) + mainThread.state.set(stateReserved) +} + +//export go_get_custom_php_ini +func go_get_custom_php_ini() *C.char { + if mainThread.phpIni == nil { + return nil + } + + // pass the php.ini overrides to PHP before startup + // TODO: if needed this would also be possible on a per-thread basis + overrides := "" + for k, v := range mainThread.phpIni { + overrides += fmt.Sprintf("%s=%s\n", k, v) + } + return C.CString(overrides) } diff --git a/phpmainthread_test.go b/phpmainthread_test.go index a5de26f93..2a1705c03 100644 --- a/phpmainthread_test.go +++ b/phpmainthread_test.go @@ -18,8 +18,9 @@ import ( var testDataPath, _ = filepath.Abs("./testdata") func TestStartAndStopTheMainThreadWithOneInactiveThread(t *testing.T) { - logger = zap.NewNop() // the logger needs to not be nil - assert.NoError(t, initPHPThreads(1)) // reserve 1 thread + logger = zap.NewNop() // the logger needs to not be nil + _, err := initPHPThreads(1, 1, nil) // boot 1 thread + assert.NoError(t, err) assert.Len(t, phpThreads, 1) assert.Equal(t, 0, phpThreads[0].threadIndex) @@ -31,7 +32,8 @@ func TestStartAndStopTheMainThreadWithOneInactiveThread(t *testing.T) { func TestTransitionRegularThreadToWorkerThread(t *testing.T) { logger = zap.NewNop() - assert.NoError(t, initPHPThreads(1)) + _, err := initPHPThreads(1, 1, nil) + assert.NoError(t, err) // transition to regular thread convertToRegularThread(phpThreads[0]) @@ -54,7 +56,8 @@ func TestTransitionRegularThreadToWorkerThread(t *testing.T) { func TestTransitionAThreadBetween2DifferentWorkers(t *testing.T) { logger = zap.NewNop() - assert.NoError(t, initPHPThreads(1)) + _, err := initPHPThreads(1, 1, nil) + assert.NoError(t, err) firstWorker := getDummyWorker("transition-worker-1.php") secondWorker := getDummyWorker("transition-worker-2.php") @@ -76,43 +79,39 @@ func TestTransitionAThreadBetween2DifferentWorkers(t *testing.T) { assert.Nil(t, phpThreads) } +// try all possible handler transitions +// takes around 200ms and is supposed to force race conditions func TestTransitionThreadsWhileDoingRequests(t *testing.T) { numThreads := 10 numRequestsPerThread := 100 - isRunning := atomic.Bool{} - isRunning.Store(true) + isDone := atomic.Bool{} wg := sync.WaitGroup{} worker1Path := testDataPath + "/transition-worker-1.php" worker2Path := testDataPath + "/transition-worker-2.php" assert.NoError(t, Init( WithNumThreads(numThreads), - WithWorkers(worker1Path, 1, map[string]string{"ENV1": "foo"}, []string{}), - WithWorkers(worker2Path, 1, map[string]string{"ENV1": "foo"}, []string{}), + WithWorkers(worker1Path, 1, map[string]string{}, []string{}), + WithWorkers(worker2Path, 1, map[string]string{}, []string{}), WithLogger(zap.NewNop()), )) - // randomly transition threads between regular, inactive and 2 worker threads - go func() { - for { - for i := 0; i < numThreads; i++ { - switch rand.IntN(4) { - case 0: - convertToRegularThread(phpThreads[i]) - case 1: - convertToWorkerThread(phpThreads[i], workers[worker1Path]) - case 2: - convertToWorkerThread(phpThreads[i], workers[worker2Path]) - case 3: - convertToInactiveThread(phpThreads[i]) - } - time.Sleep(time.Millisecond) - if !isRunning.Load() { - return + // try all possible permutations of transition, transition every ms + transitions := allPossibleTransitions(worker1Path, worker2Path) + for i := 0; i < numThreads; i++ { + go func(thread *phpThread, start int) { + for { + for j := start; j < len(transitions); j++ { + if isDone.Load() { + return + } + transitions[j](thread) + time.Sleep(time.Millisecond) } + start = 0 } - } - }() + }(phpThreads[i], i) + } // randomly do requests to the 3 endpoints wg.Add(numThreads) @@ -132,8 +131,9 @@ func TestTransitionThreadsWhileDoingRequests(t *testing.T) { }(i) } + // we are finished as soon as all 1000 requests are done wg.Wait() - isRunning.Store(false) + isDone.Store(true) Shutdown() } @@ -176,3 +176,20 @@ func assertRequestBody(t *testing.T, url string, expected string) { body, _ := io.ReadAll(resp.Body) assert.Equal(t, expected, string(body)) } + +// create a mix of possible transitions of workers and regular threads +func allPossibleTransitions(worker1Path string, worker2Path string) []func(*phpThread) { + return []func(*phpThread){ + convertToRegularThread, + func(thread *phpThread) { thread.shutdown() }, + func(thread *phpThread) { + if thread.state.is(stateReserved) { + thread.boot() + } + }, + func(thread *phpThread) { convertToWorkerThread(thread, workers[worker1Path]) }, + convertToInactiveThread, + func(thread *phpThread) { convertToWorkerThread(thread, workers[worker2Path]) }, + convertToInactiveThread, + } +} diff --git a/phpthread.go b/phpthread.go index ebe8d8419..d66b43053 100644 --- a/phpthread.go +++ b/phpthread.go @@ -7,23 +7,26 @@ import ( "runtime" "sync" "unsafe" + + "go.uber.org/zap" ) // representation of the actual underlying PHP thread // identified by the index in the phpThreads slice type phpThread struct { runtime.Pinner - threadIndex int requestChan chan *http.Request drainChan chan struct{} - handlerMu *sync.Mutex + handlerMu sync.Mutex + requestMu sync.Mutex handler threadHandler state *threadState } // interface that defines how the callbacks from the C thread should be handled type threadHandler interface { + name() string beforeScriptExecution() string afterScriptExecution(exitStatus int) getActiveRequest() *http.Request @@ -32,21 +35,55 @@ type threadHandler interface { func newPHPThread(threadIndex int) *phpThread { return &phpThread{ threadIndex: threadIndex, - drainChan: make(chan struct{}), requestChan: make(chan *http.Request), - handlerMu: &sync.Mutex{}, state: newThreadState(), } } +// boot starts the underlying PHP thread +func (thread *phpThread) boot() { + // thread must be in reserved state to boot + if !thread.state.compareAndSwap(stateReserved, stateBooting) && !thread.state.compareAndSwap(stateBootRequested, stateBooting) { + logger.Panic("thread is not in reserved state: " + thread.state.name()) + return + } + + // boot threads as inactive + thread.handlerMu.Lock() + thread.handler = &inactiveThread{thread: thread} + thread.drainChan = make(chan struct{}) + thread.handlerMu.Unlock() + + // start the actual posix thread - TODO: try this with go threads instead + if !C.frankenphp_new_php_thread(C.uintptr_t(thread.threadIndex)) { + logger.Panic("unable to create thread", zap.Int("threadIndex", thread.threadIndex)) + } + thread.state.waitFor(stateInactive) +} + +// shutdown the underlying PHP thread +func (thread *phpThread) shutdown() { + if !thread.state.requestSafeStateChange(stateShuttingDown) { + // already shutting down or done + return + } + close(thread.drainChan) + thread.state.waitFor(stateDone) + thread.drainChan = make(chan struct{}) + + // threads go back to the reserved state from which they can be booted again + if mainThread.state.is(stateReady) { + thread.state.set(stateReserved) + } +} + // change the thread handler safely // must be called from outside the PHP thread func (thread *phpThread) setHandler(handler threadHandler) { - logger.Debug("setHandler") thread.handlerMu.Lock() defer thread.handlerMu.Unlock() if !thread.state.requestSafeStateChange(stateTransitionRequested) { - // no state change allowed == shutdown + // no state change allowed == shutdown or done return } close(thread.drainChan) @@ -69,6 +106,16 @@ func (thread *phpThread) getActiveRequest() *http.Request { return thread.handler.getActiveRequest() } +// get the active request from outside the PHP thread +func (thread *phpThread) getActiveRequestSafely() *http.Request { + thread.handlerMu.Lock() + thread.requestMu.Lock() + r := thread.getActiveRequest() + thread.requestMu.Unlock() + thread.handlerMu.Unlock() + return r +} + // Pin a string that is not null-terminated // PHP's zend_string may contain null-bytes func (thread *phpThread) pinString(s string) *C.char { @@ -95,6 +142,7 @@ func go_frankenphp_before_script_execution(threadIndex C.uintptr_t) *C.char { if scriptName == "" { return nil } + // return the name of the PHP script that should be executed return thread.pinCString(scriptName) } diff --git a/scaling.go b/scaling.go new file mode 100644 index 000000000..84429ba55 --- /dev/null +++ b/scaling.go @@ -0,0 +1,260 @@ +package frankenphp + +//#include "frankenphp.h" +//#include +import "C" +import ( + "errors" + "sync" + "sync/atomic" + "time" + + "github.com/dunglas/frankenphp/internal/cpu" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + // requests have to be stalled for at least this amount of time before scaling + minStallTime = 5 * time.Millisecond + // time to check for CPU usage before scaling a single thread + cpuProbeTime = 120 * time.Millisecond + // do not scale over this amount of CPU usage + maxCpuUsageForScaling = 0.8 + // upscale stalled threads every x milliseconds + upscaleCheckTime = 100 * time.Millisecond + // downscale idle threads every x seconds + downScaleCheckTime = 5 * time.Second + // max amount of threads stopped in one iteration of downScaleCheckTime + maxTerminationCount = 10 + // autoscaled threads waiting for longer than this time are downscaled + maxThreadIdleTime = 5 * time.Second +) + +var ( + scaleChan chan *FrankenPHPContext + autoScaledThreads = []*phpThread{} + scalingMu = new(sync.RWMutex) + disallowScaling = atomic.Bool{} + + MaxThreadsReachedError = errors.New("max amount of overall threads reached") + CannotRemoveLastThreadError = errors.New("cannot remove last thread") + WorkerNotFoundError = errors.New("worker not found for given filename") +) + +func initAutoScaling(mainThread *phpMainThread) { + if mainThread.maxThreads <= mainThread.numThreads { + scaleChan = nil + return + } + + scalingMu.Lock() + scaleChan = make(chan *FrankenPHPContext) + maxScaledThreads := mainThread.maxThreads - mainThread.numThreads + autoScaledThreads = make([]*phpThread, 0, maxScaledThreads) + scalingMu.Unlock() + + go startUpscalingThreads(maxScaledThreads, scaleChan, mainThread.done) + go startDownScalingThreads(mainThread.done) +} + +func drainAutoScaling() { + scalingMu.Lock() + if c := logger.Check(zapcore.DebugLevel, "shutting down autoscaling"); c != nil { + c.Write(zap.Int("autoScaledThreads", len(autoScaledThreads))) + } + scalingMu.Unlock() +} + +func addRegularThread() (*phpThread, error) { + thread := getInactivePHPThread() + if thread == nil { + return nil, MaxThreadsReachedError + } + convertToRegularThread(thread) + thread.state.waitFor(stateReady, stateShuttingDown, stateReserved) + return thread, nil +} + +func removeRegularThread() error { + regularThreadMu.RLock() + if len(regularThreads) <= 1 { + regularThreadMu.RUnlock() + return CannotRemoveLastThreadError + } + thread := regularThreads[len(regularThreads)-1] + regularThreadMu.RUnlock() + thread.shutdown() + return nil +} + +func addWorkerThread(worker *worker) (*phpThread, error) { + thread := getInactivePHPThread() + if thread == nil { + return nil, MaxThreadsReachedError + } + convertToWorkerThread(thread, worker) + thread.state.waitFor(stateReady, stateShuttingDown, stateReserved) + return thread, nil +} + +func removeWorkerThread(worker *worker) error { + worker.threadMutex.RLock() + if len(worker.threads) <= 1 { + worker.threadMutex.RUnlock() + return CannotRemoveLastThreadError + } + thread := worker.threads[len(worker.threads)-1] + worker.threadMutex.RUnlock() + thread.shutdown() + + return nil +} + +// scaleWorkerThread adds a worker PHP thread automatically +func scaleWorkerThread(worker *worker) { + scalingMu.Lock() + defer scalingMu.Unlock() + + if !mainThread.state.is(stateReady) { + return + } + + // probe CPU usage before scaling + if !cpu.ProbeCPUs(cpuProbeTime, maxCpuUsageForScaling, mainThread.done) { + return + } + + thread, err := addWorkerThread(worker) + if err != nil { + if c := logger.Check(zapcore.WarnLevel, "could not increase max_threads, consider raising this limit"); c != nil { + c.Write(zap.String("worker", worker.fileName), zap.Error(err)) + } + return + } + + autoScaledThreads = append(autoScaledThreads, thread) +} + +// scaleRegularThread adds a regular PHP thread automatically +func scaleRegularThread() { + scalingMu.Lock() + defer scalingMu.Unlock() + + if !mainThread.state.is(stateReady) { + return + } + + // probe CPU usage before scaling + if !cpu.ProbeCPUs(cpuProbeTime, maxCpuUsageForScaling, mainThread.done) { + return + } + + thread, err := addRegularThread() + if err != nil { + if c := logger.Check(zapcore.WarnLevel, "could not increase max_threads, consider raising this limit"); c != nil { + c.Write(zap.Error(err)) + } + return + } + + autoScaledThreads = append(autoScaledThreads, thread) +} + +func startUpscalingThreads(maxScaledThreads int, scale chan *FrankenPHPContext, done chan struct{}) { + for { + scalingMu.Lock() + scaledThreadCount := len(autoScaledThreads) + scalingMu.Unlock() + if scaledThreadCount >= maxScaledThreads { + // we have reached max_threads, check again later + select { + case <-done: + return + case <-time.After(downScaleCheckTime): + continue + } + } + + select { + case fc := <-scale: + timeSinceStalled := time.Since(fc.startedAt) + + // if the request has not been stalled long enough, wait and repeat + if timeSinceStalled < minStallTime { + select { + case <-done: + return + case <-time.After(minStallTime - timeSinceStalled): + continue + } + } + + // if the request has been stalled long enough, scale + if worker, ok := workers[fc.scriptFilename]; ok { + scaleWorkerThread(worker) + } else { + scaleRegularThread() + } + case <-done: + return + } + } +} + +func startDownScalingThreads(done chan struct{}) { + for { + select { + case <-done: + return + case <-time.After(downScaleCheckTime): + deactivateThreads() + } + } +} + +// deactivateThreads checks all threads and removes those that have been inactive for too long +func deactivateThreads() { + stoppedThreadCount := 0 + scalingMu.Lock() + defer scalingMu.Unlock() + for i := len(autoScaledThreads) - 1; i >= 0; i-- { + thread := autoScaledThreads[i] + + // the thread might have been stopped otherwise, remove it + if thread.state.is(stateReserved) { + autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...) + continue + } + + waitTime := thread.state.waitTime() + if stoppedThreadCount > maxTerminationCount || waitTime == 0 { + continue + } + + // convert threads to inactive if they have been idle for too long + if thread.state.is(stateReady) && waitTime > maxThreadIdleTime.Milliseconds() { + if c := logger.Check(zapcore.DebugLevel, "auto-converting thread to inactive"); c != nil { + c.Write(zap.Int("threadIndex", thread.threadIndex)) + } + convertToInactiveThread(thread) + stoppedThreadCount++ + autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...) + + continue + } + + // TODO: Completely stopping threads is more memory efficient + // Some PECL extensions like #1296 will prevent threads from fully stopping (they leak memory) + // Reactivate this if there is a better solution or workaround + //if thread.state.is(stateInactive) && waitTime > maxThreadIdleTime.Milliseconds() { + // if c := logger.Check(zapcore.DebugLevel, "auto-stopping thread"); c != nil { + // c.Write(zap.Int("threadIndex", thread.threadIndex)) + // } + // thread.shutdown() + // stoppedThreadCount++ + // autoScaledThreads = append(autoScaledThreads[:i], autoScaledThreads[i+1:]...) + // continue + //} + } +} diff --git a/scaling_test.go b/scaling_test.go new file mode 100644 index 000000000..883757c41 --- /dev/null +++ b/scaling_test.go @@ -0,0 +1,60 @@ +package frankenphp + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" +) + +func TestScaleARegularThreadUpAndDown(t *testing.T) { + assert.NoError(t, Init( + WithNumThreads(1), + WithMaxThreads(2), + WithLogger(zap.NewNop()), + )) + + autoScaledThread := phpThreads[1] + + // scale up + scaleRegularThread() + assert.Equal(t, stateReady, autoScaledThread.state.get()) + assert.IsType(t, ®ularThread{}, autoScaledThread.handler) + + // on down-scale, the thread will be marked as inactive + setLongWaitTime(autoScaledThread) + deactivateThreads() + assert.IsType(t, &inactiveThread{}, autoScaledThread.handler) + + Shutdown() +} + +func TestScaleAWorkerThreadUpAndDown(t *testing.T) { + workerPath := testDataPath + "/transition-worker-1.php" + assert.NoError(t, Init( + WithNumThreads(2), + WithMaxThreads(3), + WithWorkers(workerPath, 1, map[string]string{}, []string{}), + WithLogger(zap.NewNop()), + )) + + autoScaledThread := phpThreads[2] + + // scale up + scaleWorkerThread(workers[workerPath]) + assert.Equal(t, stateReady, autoScaledThread.state.get()) + + // on down-scale, the thread will be marked as inactive + setLongWaitTime(autoScaledThread) + deactivateThreads() + assert.IsType(t, &inactiveThread{}, autoScaledThread.handler) + + Shutdown() +} + +func setLongWaitTime(thread *phpThread) { + thread.state.mu.Lock() + thread.state.waitingSince = time.Now().Add(-time.Hour) + thread.state.mu.Unlock() +} diff --git a/state.go b/state.go index 001213282..37061c241 100644 --- a/state.go +++ b/state.go @@ -2,19 +2,21 @@ package frankenphp import ( "slices" - "strconv" "sync" + "time" ) type stateID uint8 const ( - // lifecycle states of a thread - stateBooting stateID = iota + // livecycle states of a thread + stateReserved stateID = iota + stateBooting + stateBootRequested stateShuttingDown stateDone - // these states are safe to transition from at any time + // these states are 'stable' and safe to transition from at any time stateInactive stateReady @@ -28,10 +30,27 @@ const ( stateTransitionComplete ) +var stateNames = map[stateID]string{ + stateReserved: "reserved", + stateBooting: "booting", + stateInactive: "inactive", + stateReady: "ready", + stateShuttingDown: "shutting down", + stateDone: "done", + stateRestarting: "restarting", + stateYielding: "yielding", + stateTransitionRequested: "transition requested", + stateTransitionInProgress: "transition in progress", + stateTransitionComplete: "transition complete", +} + type threadState struct { currentState stateID mu sync.RWMutex subscribers []stateSubscriber + // how long threads have been waiting in stable states + waitingSince time.Time + isWaiting bool } type stateSubscriber struct { @@ -41,7 +60,7 @@ type stateSubscriber struct { func newThreadState() *threadState { return &threadState{ - currentState: stateBooting, + currentState: stateReserved, subscribers: []stateSubscriber{}, mu: sync.RWMutex{}, } @@ -68,8 +87,7 @@ func (ts *threadState) compareAndSwap(compareTo stateID, swapTo stateID) bool { } func (ts *threadState) name() string { - // TODO: return the actual name for logging/metrics - return "state:" + strconv.Itoa(int(ts.get())) + return stateNames[ts.get()] } func (ts *threadState) get() stateID { @@ -123,8 +141,8 @@ func (ts *threadState) waitFor(states ...stateID) { func (ts *threadState) requestSafeStateChange(nextState stateID) bool { ts.mu.Lock() switch ts.currentState { - // disallow state changes if shutting down - case stateShuttingDown, stateDone: + // disallow state changes if shutting down or done + case stateShuttingDown, stateDone, stateReserved: ts.mu.Unlock() return false // ready and inactive are safe states to transition from @@ -140,3 +158,26 @@ func (ts *threadState) requestSafeStateChange(nextState stateID) bool { ts.waitFor(stateReady, stateInactive, stateShuttingDown) return ts.requestSafeStateChange(nextState) } + +// markAsWaiting hints that the thread reached a stable state and is waiting for requests or shutdown +func (ts *threadState) markAsWaiting(isWaiting bool) { + ts.mu.Lock() + if isWaiting { + ts.isWaiting = true + ts.waitingSince = time.Now() + } else { + ts.isWaiting = false + } + ts.mu.Unlock() +} + +// waitTime returns the time since the thread is waiting in a stable state in ms +func (ts *threadState) waitTime() int64 { + ts.mu.RLock() + waitTime := int64(0) + if ts.isWaiting { + waitTime = time.Now().UnixMilli() - ts.waitingSince.UnixMilli() + } + ts.mu.RUnlock() + return waitTime +} diff --git a/testdata/ini.php b/testdata/ini.php new file mode 100644 index 000000000..dfbcd365b --- /dev/null +++ b/testdata/ini.php @@ -0,0 +1,7 @@ +/go/src/app/testdata/performance/flamegraph.svg diff --git a/testdata/performance/hanging-requests.js b/testdata/performance/hanging-requests.js new file mode 100644 index 000000000..db191fdef --- /dev/null +++ b/testdata/performance/hanging-requests.js @@ -0,0 +1,28 @@ +import http from 'k6/http' + +/** + * It is not uncommon for external services to hang for a long time. + * Make sure the server is resilient in such cases and doesn't hang as well. + */ +export const options = { + stages: [ + { duration: '20s', target: 100 }, + { duration: '20s', target: 500 }, + { duration: '20s', target: 0 } + ], + thresholds: { + http_req_failed: ['rate<0.01'] + } +} + +/* global __ENV */ +export default function () { + // 2% chance for a request that hangs for 15s + if (Math.random() < 0.02) { + http.get(`${__ENV.CADDY_HOSTNAME}/sleep.php?sleep=15000&work=10000&output=100`) + return + } + + // a regular request + http.get(`${__ENV.CADDY_HOSTNAME}/sleep.php?sleep=5&work=10000&output=100`) +} diff --git a/testdata/performance/hello-world.js b/testdata/performance/hello-world.js new file mode 100644 index 000000000..f0499fede --- /dev/null +++ b/testdata/performance/hello-world.js @@ -0,0 +1,20 @@ +import http from 'k6/http' + +/** + * 'Hello world' tests the raw server performance. + */ +export const options = { + stages: [ + { duration: '5s', target: 100 }, + { duration: '20s', target: 400 }, + { duration: '5s', target: 0 } + ], + thresholds: { + http_req_failed: ['rate<0.01'] + } +} + +/* global __ENV */ +export default function () { + http.get(`${__ENV.CADDY_HOSTNAME}/sleep.php`) +} diff --git a/testdata/performance/k6.Caddyfile b/testdata/performance/k6.Caddyfile new file mode 100644 index 000000000..44d5a54b9 --- /dev/null +++ b/testdata/performance/k6.Caddyfile @@ -0,0 +1,20 @@ +{ + frankenphp { + max_threads {$MAX_THREADS} + num_threads {$NUM_THREADS} + worker { + file /go/src/app/testdata/{$WORKER_FILE:sleep.php} + num {$WORKER_THREADS} + } + } +} + +:80 { + route { + root /go/src/app/testdata + php { + root /go/src/app/testdata + enable_root_symlink false + } + } +} diff --git a/testdata/performance/perf-test.sh b/testdata/performance/perf-test.sh new file mode 100755 index 000000000..50740f999 --- /dev/null +++ b/testdata/performance/perf-test.sh @@ -0,0 +1,39 @@ +#!/bin/bash + +# install the dev.Dockerfile, build the app and run k6 tests + +docker build -t frankenphp-dev -f dev.Dockerfile . + +export "CADDY_HOSTNAME=http://host.docker.internal" + +select filename in ./testdata/performance/*.js; do + read -r -p "How many worker threads? " workerThreads + read -r -p "How many max threads? " maxThreads + + numThreads=$((workerThreads + 1)) + + docker run --cap-add=SYS_PTRACE --security-opt seccomp=unconfined \ + -p 8125:80 \ + -v "$PWD:/go/src/app" \ + --name load-test-container \ + -e "MAX_THREADS=$maxThreads" \ + -e "WORKER_THREADS=$workerThreads" \ + -e "NUM_THREADS=$numThreads" \ + -itd \ + frankenphp-dev \ + sh /go/src/app/testdata/performance/start-server.sh + + docker exec -d load-test-container sh /go/src/app/testdata/performance/flamegraph.sh + + sleep 10 + + docker run --entrypoint "" -it -v .:/app -w /app \ + --add-host "host.docker.internal:host-gateway" \ + grafana/k6:latest \ + k6 run -e "CADDY_HOSTNAME=$CADDY_HOSTNAME:8125" "./$filename" + + docker exec load-test-container curl "http://localhost:2019/frankenphp/threads" + + docker stop load-test-container + docker rm load-test-container +done diff --git a/testdata/performance/performance-testing.md b/testdata/performance/performance-testing.md new file mode 100644 index 000000000..e13d4ee58 --- /dev/null +++ b/testdata/performance/performance-testing.md @@ -0,0 +1,19 @@ +# Running Load tests + +To run load tests with k6 you need to have Docker and Bash installed. +Go the root of this repository and run: + +```sh +bash testdata/performance/perf-test.sh +``` + +This will build the `frankenphp-dev` docker image and run it under the name 'load-test-container' +in the background. Additionally, it will run the `grafana/k6` container and you'll be able to choose +the load test you want to run. A `flamegraph.svg` will be created in the `testdata/performance` directory. + +If the load test has stopped prematurely, you might have to remove the container manually: + +```sh +docker stop load-test-container +docker rm load-test-container +``` diff --git a/testdata/performance/start-server.sh b/testdata/performance/start-server.sh new file mode 100755 index 000000000..998c148cc --- /dev/null +++ b/testdata/performance/start-server.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +# build and run FrankenPHP with the k6.Caddyfile +cd /go/src/app/caddy/frankenphp && + go build --buildvcs=false && + cd ../../testdata/performance && + /go/src/app/caddy/frankenphp/frankenphp run -c k6.Caddyfile diff --git a/testdata/performance/timeouts.js b/testdata/performance/timeouts.js new file mode 100644 index 000000000..775c36441 --- /dev/null +++ b/testdata/performance/timeouts.js @@ -0,0 +1,32 @@ +import http from 'k6/http' + +/** + * Databases or external resources can sometimes become unavailable for short periods of time. + * Make sure the server can recover quickly from periods of unavailability. + * This simulation swaps between a hanging and a working server every 10 seconds. + */ +export const options = { + stages: [ + { duration: '20s', target: 100 }, + { duration: '20s', target: 500 }, + { duration: '20s', target: 0 } + ], + thresholds: { + http_req_failed: ['rate<0.01'] + } +} + +/* global __ENV */ +export default function () { + const tenSecondInterval = Math.floor(new Date().getSeconds() / 10) + const shouldHang = tenSecondInterval % 2 === 0 + + // every 10 seconds requests lead to a max_execution-timeout + if (shouldHang) { + http.get(`${__ENV.CADDY_HOSTNAME}/sleep.php?sleep=50000`) + return + } + + // every other 10 seconds the resource is back + http.get(`${__ENV.CADDY_HOSTNAME}/sleep.php?sleep=5&work=30000&output=100`) +} diff --git a/testdata/sleep.php b/testdata/sleep.php new file mode 100644 index 000000000..174a28294 --- /dev/null +++ b/testdata/sleep.php @@ -0,0 +1,29 @@ + 0) { + usleep($sleep * 1000); + } + + // simulate output + for ($k = 0; $k < $output; $k++) { + echo "slept for $sleep ms and worked for $work iterations"; + } + } +}; diff --git a/testdata/worker-with-watcher.php b/testdata/worker-with-counter.php similarity index 100% rename from testdata/worker-with-watcher.php rename to testdata/worker-with-counter.php diff --git a/thread-regular.go b/thread-regular.go deleted file mode 100644 index 88d72106b..000000000 --- a/thread-regular.go +++ /dev/null @@ -1,78 +0,0 @@ -package frankenphp - -// #include "frankenphp.h" -import "C" -import ( - "net/http" -) - -// representation of a non-worker PHP thread -// executes PHP scripts in a web context -// implements the threadHandler interface -type regularThread struct { - state *threadState - thread *phpThread - activeRequest *http.Request -} - -func convertToRegularThread(thread *phpThread) { - thread.setHandler(®ularThread{ - thread: thread, - state: thread.state, - }) -} - -// beforeScriptExecution returns the name of the script or an empty string on shutdown -func (handler *regularThread) beforeScriptExecution() string { - switch handler.state.get() { - case stateTransitionRequested: - return handler.thread.transitionToNewHandler() - case stateTransitionComplete: - handler.state.set(stateReady) - return handler.waitForRequest() - case stateReady: - return handler.waitForRequest() - case stateShuttingDown: - // signal to stop - return "" - } - panic("unexpected state: " + handler.state.name()) -} - -// return true if the worker should continue to run -func (handler *regularThread) afterScriptExecution(exitStatus int) { - handler.afterRequest(exitStatus) -} - -func (handler *regularThread) getActiveRequest() *http.Request { - return handler.activeRequest -} - -func (handler *regularThread) waitForRequest() string { - select { - case <-handler.thread.drainChan: - // go back to beforeScriptExecution - return handler.beforeScriptExecution() - - case r := <-requestChan: - handler.activeRequest = r - fc := r.Context().Value(contextKey).(*FrankenPHPContext) - - if err := updateServerContext(handler.thread, r, true, false); err != nil { - rejectRequest(fc.responseWriter, err.Error()) - handler.afterRequest(0) - // go back to beforeScriptExecution - return handler.beforeScriptExecution() - } - - // set the scriptFilename that should be executed - return fc.scriptFilename - } -} - -func (handler *regularThread) afterRequest(exitStatus int) { - fc := handler.activeRequest.Context().Value(contextKey).(*FrankenPHPContext) - fc.exitStatus = exitStatus - maybeCloseContext(fc) - handler.activeRequest = nil -} diff --git a/thread-inactive.go b/threadinactive.go similarity index 67% rename from thread-inactive.go rename to threadinactive.go index 7c4810c71..c0404a8d4 100644 --- a/thread-inactive.go +++ b/threadinactive.go @@ -6,15 +6,13 @@ import ( // representation of a thread with no work assigned to it // implements the threadHandler interface +// each inactive thread weighs around ~350KB +// keeping threads at 'inactive' will consume more memory, but allow a faster transition type inactiveThread struct { thread *phpThread } func convertToInactiveThread(thread *phpThread) { - if thread.handler == nil { - thread.handler = &inactiveThread{thread: thread} - return - } thread.setHandler(&inactiveThread{thread: thread}) } @@ -26,8 +24,11 @@ func (handler *inactiveThread) beforeScriptExecution() string { return thread.transitionToNewHandler() case stateBooting, stateTransitionComplete: thread.state.set(stateInactive) + // wait for external signal to start or shut down + thread.state.markAsWaiting(true) thread.state.waitFor(stateTransitionRequested, stateShuttingDown) + thread.state.markAsWaiting(false) return handler.beforeScriptExecution() case stateShuttingDown: // signal to stop @@ -36,10 +37,14 @@ func (handler *inactiveThread) beforeScriptExecution() string { panic("unexpected state: " + thread.state.name()) } -func (thread *inactiveThread) afterScriptExecution(exitStatus int) { +func (handler *inactiveThread) afterScriptExecution(exitStatus int) { panic("inactive threads should not execute scripts") } -func (thread *inactiveThread) getActiveRequest() *http.Request { - panic("inactive threads have no requests") +func (handler *inactiveThread) getActiveRequest() *http.Request { + return nil +} + +func (handler *inactiveThread) name() string { + return "Inactive PHP Thread" } diff --git a/threadregular.go b/threadregular.go new file mode 100644 index 000000000..ce0778c78 --- /dev/null +++ b/threadregular.go @@ -0,0 +1,145 @@ +package frankenphp + +import ( + "net/http" + "sync" +) + +// representation of a non-worker PHP thread +// executes PHP scripts in a web context +// implements the threadHandler interface +type regularThread struct { + state *threadState + thread *phpThread + activeRequest *http.Request +} + +var ( + regularThreads []*phpThread + regularThreadMu = &sync.RWMutex{} + regularRequestChan chan *http.Request +) + +func convertToRegularThread(thread *phpThread) { + thread.setHandler(®ularThread{ + thread: thread, + state: thread.state, + }) + attachRegularThread(thread) +} + +// beforeScriptExecution returns the name of the script or an empty string on shutdown +func (handler *regularThread) beforeScriptExecution() string { + switch handler.state.get() { + case stateTransitionRequested: + detachRegularThread(handler.thread) + return handler.thread.transitionToNewHandler() + case stateTransitionComplete: + handler.state.set(stateReady) + return handler.waitForRequest() + case stateReady: + return handler.waitForRequest() + case stateShuttingDown: + detachRegularThread(handler.thread) + // signal to stop + return "" + } + panic("unexpected state: " + handler.state.name()) +} + +// return true if the worker should continue to run +func (handler *regularThread) afterScriptExecution(exitStatus int) { + handler.afterRequest(exitStatus) +} + +func (handler *regularThread) getActiveRequest() *http.Request { + return handler.activeRequest +} + +func (handler *regularThread) setActiveRequest(r *http.Request) { + handler.thread.requestMu.Lock() + handler.activeRequest = r + handler.thread.requestMu.Unlock() +} + +func (handler *regularThread) name() string { + return "Regular PHP Thread" +} + +func (handler *regularThread) waitForRequest() string { + handler.state.markAsWaiting(true) + + var r *http.Request + select { + case <-handler.thread.drainChan: + // go back to beforeScriptExecution + return handler.beforeScriptExecution() + case r = <-regularRequestChan: + } + + handler.setActiveRequest(r) + handler.state.markAsWaiting(false) + fc := r.Context().Value(contextKey).(*FrankenPHPContext) + + if err := updateServerContext(handler.thread, r, true, false); err != nil { + rejectRequest(fc.responseWriter, err.Error()) + handler.afterRequest(0) + handler.thread.Unpin() + // go back to beforeScriptExecution + return handler.beforeScriptExecution() + } + + // set the scriptFilename that should be executed + return fc.scriptFilename +} + +func (handler *regularThread) afterRequest(exitStatus int) { + fc := handler.activeRequest.Context().Value(contextKey).(*FrankenPHPContext) + fc.exitStatus = exitStatus + maybeCloseContext(fc) + handler.setActiveRequest(nil) +} + +func handleRequestWithRegularPHPThreads(r *http.Request, fc *FrankenPHPContext) { + metrics.StartRequest() + select { + case regularRequestChan <- r: + // a thread was available to handle the request immediately + <-fc.done + metrics.StopRequest() + return + default: + // no thread was available + } + + // if no thread was available, mark the request as queued and fan it out to all threads + metrics.QueuedRequest() + for { + select { + case regularRequestChan <- r: + metrics.DequeuedRequest() + <-fc.done + metrics.StopRequest() + return + case scaleChan <- fc: + // the request has triggered scaling, continue to wait for a thread + } + } +} + +func attachRegularThread(thread *phpThread) { + regularThreadMu.Lock() + regularThreads = append(regularThreads, thread) + regularThreadMu.Unlock() +} + +func detachRegularThread(thread *phpThread) { + regularThreadMu.Lock() + for i, t := range regularThreads { + if t == thread { + regularThreads = append(regularThreads[:i], regularThreads[i+1:]...) + break + } + } + regularThreadMu.Unlock() +} diff --git a/thread-worker.go b/threadworker.go similarity index 89% rename from thread-worker.go rename to threadworker.go index e087de0a8..acd1481a7 100644 --- a/thread-worker.go +++ b/threadworker.go @@ -52,6 +52,7 @@ func (handler *workerThread) beforeScriptExecution() string { setupWorkerScript(handler, handler.worker) return handler.worker.fileName case stateShuttingDown: + handler.worker.detachThread(handler.thread) // signal to stop return "" } @@ -70,6 +71,10 @@ func (handler *workerThread) getActiveRequest() *http.Request { return handler.fakeRequest } +func (handler *workerThread) name() string { + return "Worker PHP Thread - " + handler.worker.fileName +} + func setupWorkerScript(handler *workerThread, worker *worker) { handler.backoff.wait() metrics.StartWorker(worker.fileName) @@ -93,7 +98,7 @@ func setupWorkerScript(handler *workerThread, worker *worker) { panic(err) } - handler.fakeRequest = r + handler.setFakeRequest(r) if c := logger.Check(zapcore.DebugLevel, "starting"); c != nil { c.Write(zap.String("worker", worker.fileName), zap.Int("thread", handler.thread.threadIndex)) } @@ -105,15 +110,12 @@ func tearDownWorkerScript(handler *workerThread, exitStatus int) { if handler.workerRequest != nil { fc := handler.workerRequest.Context().Value(contextKey).(*FrankenPHPContext) maybeCloseContext(fc) - handler.workerRequest = nil + handler.setWorkerRequest(nil) } fc := handler.fakeRequest.Context().Value(contextKey).(*FrankenPHPContext) fc.exitStatus = exitStatus - - defer func() { - handler.fakeRequest = nil - }() + handler.setFakeRequest(nil) // on exit status 0 we just run the worker script again worker := handler.worker @@ -152,6 +154,8 @@ func (handler *workerThread) waitForWorkerRequest() bool { metrics.ReadyWorker(handler.worker.fileName) } + handler.state.markAsWaiting(true) + var r *http.Request select { case <-handler.thread.drainChan: @@ -159,8 +163,9 @@ func (handler *workerThread) waitForWorkerRequest() bool { c.Write(zap.String("worker", handler.worker.fileName)) } - // execute opcache_reset if the restart was triggered by the watcher - if watcherIsEnabled && handler.state.is(stateRestarting) { + // flush the opcache when restarting due to watcher or admin api + // note: this is done right before frankenphp_handle_request() returns 'false' + if handler.state.is(stateRestarting) { C.frankenphp_reset_opcache() } @@ -169,7 +174,8 @@ func (handler *workerThread) waitForWorkerRequest() bool { case r = <-handler.worker.requestChan: } - handler.workerRequest = r + handler.setWorkerRequest(r) + handler.state.markAsWaiting(false) if c := logger.Check(zapcore.DebugLevel, "request handling started"); c != nil { c.Write(zap.String("worker", handler.worker.fileName), zap.String("url", r.RequestURI)) @@ -192,6 +198,18 @@ func (handler *workerThread) waitForWorkerRequest() bool { return true } +func (handler *workerThread) setWorkerRequest(r *http.Request) { + handler.thread.requestMu.Lock() + handler.workerRequest = r + handler.thread.requestMu.Unlock() +} + +func (handler *workerThread) setFakeRequest(r *http.Request) { + handler.thread.requestMu.Lock() + handler.fakeRequest = r + handler.thread.requestMu.Unlock() +} + // go_frankenphp_worker_handle_request_start is called at the start of every php request served. // //export go_frankenphp_worker_handle_request_start @@ -210,7 +228,6 @@ func go_frankenphp_finish_worker_request(threadIndex C.uintptr_t) { maybeCloseContext(fc) thread.handler.(*workerThread).workerRequest = nil - thread.handler.(*workerThread).inRequest = false if c := fc.logger.Check(zapcore.DebugLevel, "request handling finished"); c != nil { c.Write(zap.String("worker", fc.scriptFilename), zap.String("url", r.RequestURI)) diff --git a/watcher_test.go b/watcher_test.go index ac95929e8..afd4e8c88 100644 --- a/watcher_test.go +++ b/watcher_test.go @@ -28,7 +28,7 @@ func TestWorkersShouldReloadOnMatchingPattern(t *testing.T) { runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, i int) { requestBodyHasReset := pollForWorkerReset(t, handler, maxTimesToPollForChanges) assert.True(t, requestBodyHasReset) - }, &testOptions{nbParallelRequests: 1, nbWorkers: 1, workerScript: "worker-with-watcher.php", watch: watch}) + }, &testOptions{nbParallelRequests: 1, nbWorkers: 1, workerScript: "worker-with-counter.php", watch: watch}) } func TestWorkersShouldNotReloadOnExcludingPattern(t *testing.T) { @@ -37,19 +37,19 @@ func TestWorkersShouldNotReloadOnExcludingPattern(t *testing.T) { runTest(t, func(handler func(http.ResponseWriter, *http.Request), _ *httptest.Server, i int) { requestBodyHasReset := pollForWorkerReset(t, handler, minTimesToPollForChanges) assert.False(t, requestBodyHasReset) - }, &testOptions{nbParallelRequests: 1, nbWorkers: 1, workerScript: "worker-with-watcher.php", watch: watch}) + }, &testOptions{nbParallelRequests: 1, nbWorkers: 1, workerScript: "worker-with-counter.php", watch: watch}) } func pollForWorkerReset(t *testing.T, handler func(http.ResponseWriter, *http.Request), limit int) bool { // first we make an initial request to start the request counter - body := fetchBody("GET", "http://example.com/worker-with-watcher.php", handler) + body := fetchBody("GET", "http://example.com/worker-with-counter.php", handler) assert.Equal(t, "requests:1", body) // now we spam file updates and check if the request counter resets for i := 0; i < limit; i++ { updateTestFile("./testdata/files/test.txt", "updated", t) time.Sleep(pollingTime * time.Millisecond) - body := fetchBody("GET", "http://example.com/worker-with-watcher.php", handler) + body := fetchBody("GET", "http://example.com/worker-with-counter.php", handler) if body == "requests:1" { return true } diff --git a/worker.go b/worker.go index bbb44c195..b366e37ea 100644 --- a/worker.go +++ b/worker.go @@ -56,7 +56,8 @@ func initWorkers(opt []workerOpt) error { return nil } - if err := watcher.InitWatcher(directoriesToWatch, restartWorkers, getLogger()); err != nil { + watcherIsEnabled = true + if err := watcher.InitWatcher(directoriesToWatch, RestartWorkers, getLogger()); err != nil { return err } @@ -89,7 +90,12 @@ func drainWorkers() { watcher.DrainWatcher() } -func restartWorkers() { +// RestartWorkers attempts to restart all workers gracefully +func RestartWorkers() { + // disallow scaling threads while restarting workers + scalingMu.Lock() + defer scalingMu.Unlock() + ready := sync.WaitGroup{} threadsToRestart := make([]*phpThread, 0) for _, worker := range workers { @@ -97,7 +103,8 @@ func restartWorkers() { ready.Add(len(worker.threads)) for _, thread := range worker.threads { if !thread.state.requestSafeStateChange(stateRestarting) { - // no state change allowed = shutdown + // no state change allowed == thread is shutting down + // we'll proceed to restart all other threads anyways continue } close(thread.drainChan) @@ -143,6 +150,14 @@ func (worker *worker) detachThread(thread *phpThread) { worker.threadMutex.Unlock() } +func (worker *worker) countThreads() int { + worker.threadMutex.RLock() + l := len(worker.threads) + worker.threadMutex.RUnlock() + + return l +} + func (worker *worker) handleRequest(r *http.Request, fc *FrankenPHPContext) { metrics.StartWorkerRequest(fc.scriptFilename) @@ -156,13 +171,22 @@ func (worker *worker) handleRequest(r *http.Request, fc *FrankenPHPContext) { metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt)) return default: + // thread is busy, continue } } worker.threadMutex.RUnlock() - // if no thread was available, fan the request out to all threads - // TODO: theoretically there could be autoscaling of threads here - worker.requestChan <- r - <-fc.done - metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt)) + // if no thread was available, mark the request as queued and apply the scaling strategy + metrics.QueuedWorkerRequest(fc.scriptFilename) + for { + select { + case worker.requestChan <- r: + metrics.DequeuedWorkerRequest(fc.scriptFilename) + <-fc.done + metrics.StopWorkerRequest(worker.fileName, time.Since(fc.startedAt)) + return + case scaleChan <- fc: + // the request has triggered scaling, continue to wait for a thread + } + } }