From 580ca9f94ca0fe7b4fb5b9f6072ab2c60f62bea6 Mon Sep 17 00:00:00 2001 From: vividsnow Date: Sat, 31 Aug 2024 14:10:10 +0300 Subject: [PATCH] * http/1.1 keepalive support for chunked(streaming) responses * max_connection_reqs to control requests per keepalive connection * tweak some tests for better cpantesters matrix --- Changes | 6 ++++ Feersum.xs | 68 ++++++++++++++++++++++++++++++++----------- lib/Feersum.pm | 2 +- lib/Feersum/Runner.pm | 10 +++++-- t/05-streaming.t | 20 +++++++++++-- t/11-runner.t | 5 +++- t/13-pre-fork.t | 4 ++- t/51-psgi-streaming.t | 24 +++++++++++++-- t/65-keepalive.t | 49 ++++++++++++++++++++++++++++++- t/Utils.pm | 2 +- 10 files changed, 161 insertions(+), 29 deletions(-) diff --git a/Changes b/Changes index 9661501..4e0234e 100644 --- a/Changes +++ b/Changes @@ -1,11 +1,17 @@ Revision history for Perl extension Feersum +1.501 Sat Aug 31 09:10:55 2024 -0200 + - http/1.1 keepalive support for chunked(streaming) responses + - max_connection_reqs to control requests per keepalive connection + - tweak some tests for better cpantesters matrix + 1.500 Tue Aug 20 18:10:55 2024 -0200 Features: - native interface: access specific parts of request - http/1.1 keepalive support - http/1.1 date header - defer accept, accept4 + Backward incompatibly: - remove adobe flash policy support diff --git a/Feersum.xs b/Feersum.xs index e8febf6..820e0d3 100644 --- a/Feersum.xs +++ b/Feersum.xs @@ -212,6 +212,7 @@ struct feer_conn { enum feer_respond_state responding; enum feer_receive_state receiving; bool is_keepalive; + int reqs; unsigned int in_callback; unsigned int is_http11:1; @@ -302,6 +303,7 @@ static SV *shutdown_cb_cv = NULL; static bool shutting_down = 0; static int active_conns = 0; static double read_timeout = READ_TIMEOUT; +static unsigned int max_connection_reqs = 0; static SV *feer_server_name = NULL; static SV *feer_server_port = NULL; @@ -672,6 +674,7 @@ new_feer_conn (EV_P_ int conn_fd, struct sockaddr *sa) c->responding = RESPOND_NOT_STARTED; c->receiving = RECEIVE_HEADERS; c->is_keepalive = 0; + c->reqs = 0; ev_io_init(&c->read_ev_io, try_conn_read, conn_fd, EV_READ); c->read_ev_io.data = (void *)c; @@ -1129,6 +1132,7 @@ try_conn_read(EV_P_ ev_io *w, int revents) else goto try_read_again_reset_timer; } #endif + if (process_request_headers(c, ret)) goto try_read_again_reset_timer; else @@ -1307,6 +1311,7 @@ process_request_headers (struct feer_conn *c, int body_offset) c->is_http11 = (req->minor_version == 1); c->is_keepalive = is_keepalive && c->is_http11; + c->reqs++; change_receiving_state(c, RECEIVE_BODY); @@ -1388,16 +1393,16 @@ process_request_headers (struct feer_conn *c, int body_offset) unlikely(str_case_eq("connection", 10, hdr->name, hdr->name_len))) { if (likely(c->is_http11) - && likely(str_case_eq("close", 5, hdr->value, hdr->value_len)) - && c->is_keepalive) + && likely(c->is_keepalive) + && likely(str_case_eq("close", 5, hdr->value, hdr->value_len))) { c->is_keepalive = 0; trace("setting conn %d to close after response\n", c->fd); } else if ( likely(!c->is_http11) - && likely(str_case_eq("keep-alive", 10, hdr->value, hdr->value_len)) - && !c->is_keepalive) + && likely(is_keepalive) + && str_case_eq("keep-alive", 10, hdr->value, hdr->value_len)) { c->is_keepalive = 1; trace("setting conn %d to keep after response\n", c->fd); @@ -1406,6 +1411,11 @@ process_request_headers (struct feer_conn *c, int body_offset) // TODO: support "Transfer-Encoding: chunked" bodies } + if (max_connection_reqs > 0 && c->reqs >= max_connection_reqs) { + c->is_keepalive = 0; + trace("reached max requests per connection (%d), will close after response\n", max_connection_reqs); + } + if (likely(next_req_follows)) goto got_it_all; // optimize for GET else if (likely(got_content_length)) goto got_cl; @@ -1494,6 +1504,7 @@ respond_with_server_error (struct feer_conn *c, const char *msg, STRLEN msg_len, stop_read_timer(c); change_responding_state(c, RESPOND_SHUTDOWN); change_receiving_state(c, RECEIVE_SHUTDOWN); + if (c->is_keepalive) c->is_keepalive = 0; conn_write_ready(c); } @@ -1980,11 +1991,24 @@ feersum_start_response (pTHX_ struct feer_conn *c, SV *message, AV *headers, add_crlf_to_wbuf(c); } + if (likely(c->is_http11)) { + #ifdef DATE_HEADER + generate_date_header(); + add_const_to_wbuf(c, DATE_BUF, DATE_HEADER_LENGTH); + #endif + if (unlikely(!c->is_keepalive)) + add_const_to_wbuf(c, "Connection: close" CRLF, 19); + } else if (unlikely(c->is_keepalive) && !streaming) + add_const_to_wbuf(c, "Connection: keep-alive" CRLF, 24); + if (streaming) { if (c->is_http11) add_const_to_wbuf(c, "Transfer-Encoding: chunked" CRLFx2, 30); - else - add_const_to_wbuf(c, "Connection: close" CRLFx2, 21); + else { + add_crlf_to_wbuf(c); + // cant do keep-alive for streaming http/1.0 since client completes read on close + if (unlikely(c->is_keepalive)) c->is_keepalive = 0; + } } conn_write_ready(c); @@ -2019,17 +2043,6 @@ feersum_write_whole_body (pTHX_ struct feer_conn *c, SV *body) body_is_string = 1; } - if (likely(c->is_http11)) { - #ifdef DATE_HEADER - generate_date_header(); - #endif - add_const_to_wbuf(c, DATE_BUF, DATE_HEADER_LENGTH); - if (unlikely(!c->is_keepalive)) - add_const_to_wbuf(c, "Connection: close" CRLF, 19); - } - else if (unlikely(c->is_keepalive)) - add_const_to_wbuf(c, "Connection: keep-alive" CRLF, 24); - SV *cl_sv; // content-length future struct iovec *cl_iov; if (likely(c->auto_cl)) @@ -2586,6 +2599,27 @@ set_keepalive (SV *self, SV *set) is_keepalive = SvTRUE(set); } +unsigned int +max_connection_reqs (SV *self, ...) + PROTOTYPE: $;$ + CODE: +{ + if (items <= 1) { + RETVAL = max_connection_reqs; + } + else if (items == 2) { + SV *num = ST(1); + NV new_max_connection_reqs = SvIV(num); + if (!(new_max_connection_reqs >= 0)) { + croak("must set a positive value"); + } + trace("set max requests per connection %d\n", (unsigned int)new_max_connection_reqs); + max_connection_reqs = (unsigned int) new_max_connection_reqs; + } +} + OUTPUT: + RETVAL + void DESTROY (SV *self) PPCODE: diff --git a/lib/Feersum.pm b/lib/Feersum.pm index 88445df..36b7c8b 100644 --- a/lib/Feersum.pm +++ b/lib/Feersum.pm @@ -5,7 +5,7 @@ use warnings; use EV (); use Carp (); -our $VERSION = '1.500'; +our $VERSION = '1.501'; require Feersum::Connection; require Feersum::Connection::Handle; diff --git a/lib/Feersum/Runner.pm b/lib/Feersum/Runner.pm index 6041382..4ca57ea 100644 --- a/lib/Feersum/Runner.pm +++ b/lib/Feersum/Runner.pm @@ -72,10 +72,12 @@ sub _prepare { $f->use_socket($sock); if (my $opts = $self->{options}) { - $self->{$_} = delete $opts->{$_} for grep $opts->{$_}, qw/pre_fork keepalive read_timeout/; + $self->{$_} = delete $opts->{$_} for grep defined($opts->{$_}), + qw/pre_fork keepalive read_timeout max_connection_reqs/; } - $f->set_keepalive($_) for grep $_, delete $self->{keepalive}; + $f->set_keepalive($_) for grep defined, delete $self->{keepalive}; $f->read_timeout($_) for grep $_, delete $self->{read_timeout}; + $f->max_connection_reqs($_) for grep $_, delete $self->{max_connection_reqs}; $self->{endjinn} = $f; return; @@ -235,6 +237,10 @@ Enable/disable http keepalive requests. Set read/keepalive timeout in seconds. +=item max_connection_reqs + +Set max requests per connection in case of keepalive - 0(default) for unlimited. + =item quiet Don't be so noisy. (default: on) diff --git a/t/05-streaming.t b/t/05-streaming.t index 4f0c925..07e2057 100644 --- a/t/05-streaming.t +++ b/t/05-streaming.t @@ -4,8 +4,8 @@ use strict; use constant HARDER => $ENV{RELEASE_TESTING} ? 10 : 1; use constant CLIENTS_11 => HARDER * 2; use constant CLIENTS_10 => HARDER * 2; -use constant CLIENTS => CLIENTS_11 + CLIENTS_10; -use Test::More tests => 7 + 21 * CLIENTS_11 + 22 * CLIENTS_10; +use constant CLIENTS => CLIENTS_11 * 2 + CLIENTS_10; +use Test::More tests => 7 + 44 * CLIENTS_11 + 22 * CLIENTS_10; use Test::Fatal; use lib 't'; use Utils; @@ -101,6 +101,7 @@ is exception { sub client { my $cnum = sprintf("%04d",shift); my $is_chunked = shift || 0; + my $is_keepalive = shift || 0; $cv->begin; my $h; $h = simple_client GET => '/foo', name => $cnum, @@ -110,6 +111,7 @@ sub client { "Accept" => "*/*", 'X-Client' => $cnum, }, + $is_chunked && $is_keepalive ? (keepalive => 1) : (), sub { my ($body, $headers) = @_; is $headers->{Status}, 200, "$cnum got 200" @@ -117,11 +119,20 @@ sub client { if ($is_chunked) { is $headers->{HTTPVersion}, '1.1'; is $headers->{'transfer-encoding'}, "chunked", "$cnum got chunked!"; + if (!$is_keepalive) { + is $headers->{'connection'}, 'close', "$cnum conn close"; + } else { + ok !exists($headers->{'connection'}), 'conn keep'; + } } else { is $headers->{HTTPVersion}, '1.0'; ok !exists $headers->{'transfer-encoding'}, "$cnum not chunked!"; - is $headers->{'connection'}, 'close', "$cnum conn closed"; + if ($is_keepalive) { + is $headers->{'connection'}, 'keep-alive', "$cnum conn keep"; + } else { + ok !exists($headers->{'connection'}), 'conn close'; + } } is_deeply [split /\n/,$body], [ "$cnum Hello streaming world! chunk one", @@ -141,6 +152,9 @@ sub client { client(1000+$_,1) for (1..CLIENTS_11); client(2000+$_,0) for (1..CLIENTS_10); # HTTP/1.0 style +$evh->set_keepalive(1); +client(1000+$_,1,1) for (1..CLIENTS_11); + $cv->recv; is $started, CLIENTS, 'handlers started'; is $finished, CLIENTS, 'handlers finished'; diff --git a/t/11-runner.t b/t/11-runner.t index 386e69d..770dead 100644 --- a/t/11-runner.t +++ b/t/11-runner.t @@ -4,6 +4,7 @@ use strict; use Test::More; use utf8; use lib 't'; use Utils; +use File::Spec::Functions 'rel2abs'; BEGIN { plan skip_all => "Need Test::TCP 1.06 to run this test" @@ -28,6 +29,8 @@ plan tests => 15; ok -f 'eg/app.feersum' && -r _, "found eg/app.feersum"; ok -f 'eg/chat.feersum' && -r _, "found eg/chat.feersum"; +note(my $app_path = rel2abs('eg/app.feersum')); + test_tcp( client => sub { my $port = shift; @@ -50,7 +53,7 @@ test_tcp( my $runner; eval { - my $app = do 'eg/app.feersum'; + my $app = do $app_path; ok $app, "did the app"; $runner = Feersum::Runner->new( listen => ["localhost:$port"], diff --git a/t/13-pre-fork.t b/t/13-pre-fork.t index 637f1bf..5d77ba1 100644 --- a/t/13-pre-fork.t +++ b/t/13-pre-fork.t @@ -7,6 +7,7 @@ use constant CLIENTS => HARDER ? 30 : 4; use Test::More tests => 4 + CLIENTS*3; use utf8; use lib 't'; use Utils; +use File::Spec::Functions 'rel2abs'; use_ok 'Feersum::Runner'; @@ -29,6 +30,7 @@ sub simple_get { }; } +note(my $app_path = rel2abs('eg/app.feersum')); my $pid = fork; die "can't fork: $!" unless defined $pid; if (!$pid) { @@ -37,7 +39,7 @@ if (!$pid) { my $runner = Feersum::Runner->new( listen => ["localhost:$port"], server_starter => 1, - app_file => 'eg/app.feersum', + app_file => $app_path, pre_fork => NUM_FORK, quiet => 1, ); diff --git a/t/51-psgi-streaming.t b/t/51-psgi-streaming.t index 7d8d0ff..0cce25d 100644 --- a/t/51-psgi-streaming.t +++ b/t/51-psgi-streaming.t @@ -1,7 +1,7 @@ #!perl use warnings; use strict; -use Test::More tests => 36; +use Test::More tests => 48; use lib 't'; use Utils; BEGIN { use_ok('Feersum') }; @@ -116,6 +116,7 @@ using_writer: { is $headers->{'Status'}, 200, "Response OK"; is $headers->{'content-type'}, 'application/json', "... is JSON"; is $headers->{'transfer-encoding'}, 'chunked', '... was chunked'; + is $headers->{'connection'}, 'close', '... close'; is $body, q({"message":"O hai 2"}), "... correct de-chunked body"; $cv->end; undef $h; @@ -131,7 +132,7 @@ using_writer_and_1_0: { is $headers->{'Status'}, 200, "Response OK"; is $headers->{'content-type'}, 'application/json', "... is JSON"; ok !$headers->{'transfer-encoding'}, '... was not chunked'; - is $headers->{'connection'}, 'close', '... got close'; + isnt $headers->{'connection'}, 'keep-alive', '... got close'; is $body, q({"message":"O hai 3"}), "... correct body"; $cv->end; undef $h2; @@ -139,4 +140,23 @@ using_writer_and_1_0: { $cv->recv; } +$evh->set_keepalive(1); + +using_writer_and_1_1: { + my $cv = AE::cv; + $cv->begin; + my $h2; $h2 = simple_client GET => '/', proto => '1.1', keepalive => 1, sub { + my ($body, $headers) = @_; + is $headers->{'Status'}, 200, "Response OK"; + is $headers->{'content-type'}, 'application/json', "... is JSON"; + ok $headers->{'transfer-encoding'}, '... not chunked'; + isnt $headers->{'connection'}, 'close', '... keep'; + is $body, q({"message":"O hai 4"}), "... correct de-chunked body"; + $cv->end; + undef $h2; + }; + $cv->recv; +} + + pass "all done app 2"; diff --git a/t/65-keepalive.t b/t/65-keepalive.t index 7e51a70..446897a 100644 --- a/t/65-keepalive.t +++ b/t/65-keepalive.t @@ -24,7 +24,7 @@ plan skip_all => "can't create tmp socket path" unlink $sock_path; -plan tests => 23; +plan tests => 34; pass 'using sock path '.$sock_path; @@ -35,6 +35,7 @@ if ($pid == 0) { # child listen => [$sock_path], keepalive => 1, read_timeout => 1, + max_connection_reqs => 4, app => sub { my $r = shift; pass 'got request http/1.'.($r->is_http11 ? 1 : 0); @@ -157,6 +158,50 @@ if ($pid == 0) { # child $cv->recv; undef $hdl; + # max_connection_reqs + $socket = IO::Socket::UNIX->new( + Peer => $sock_path, + Type => SOCK_STREAM, + ) or warn $!; + ok $socket, 'client ok'; + ok $socket->blocking(0), 'unblock socket'; + + $cv = AE::cv; + $cv->begin; + + my ($request_count, $send_request) = (0); + $hdl = AnyEvent::Handle->new( + fh => $socket, + on_error => sub { + fail 'error in connection for max_connection_reqs test'; + $hdl->destroy; + $cv->send; + }, + on_eof => sub { + pass 'server closed connection after max requests'; + $hdl->destroy; + $cv->send; + }, + timeout => 2 + ); + + $send_request = sub { + $request_count++; + $hdl->push_write("GET / HTTP/1.1\015\012\015\012"); + $hdl->push_read(line => "\015\012\015\012" => sub { + if ($request_count < 4) { + unlike $_[1], qr(Connection: close), "request $request_count: no close header"; + $send_request->(); + } elsif ($request_count == 4) { + like $_[1], qr(Connection: close), 'request 4: connection close header'; + $hdl->on_read(sub {}); + } + }); + }; + $send_request->(); + $cv->recv; + undef $hdl; + pass 'server killing'; kill 3, $pid; # QUIT waitpid $pid, 0; @@ -164,3 +209,5 @@ if ($pid == 0) { # child } else { die $!; }; + +# max_connection_reqs diff --git a/t/Utils.pm b/t/Utils.pm index 64a04d8..f9c9e59 100644 --- a/t/Utils.pm +++ b/t/Utils.pm @@ -200,7 +200,7 @@ sub simple_client ($$;@) { } # HTTP/1.1 default is 'keep-alive' - $headers->{'Connection'} ||= 'close'; + $headers->{'Connection'} ||= 'close' if $proto eq '1.1' && !$opts{keepalive}; my $head = join($CRLF, map {$_.': '.$headers->{$_}} sort keys %$headers);