Skip to content

Commit

Permalink
http1.1: pipelining
Browse files Browse the repository at this point in the history
  • Loading branch information
vividsnow committed Jan 17, 2025
1 parent a221b6d commit e7fded6
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 5 deletions.
3 changes: 3 additions & 0 deletions Changes
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
Revision history for Perl extension Feersum

1.504 Sat Jan 18 19:14:15 2024 -0200
- http/1.1 pipelining support

1.503 Tue Sep 17 12:14:15 2024 -0200
- start listening on re-forked child
- try fix test for some older perls on freebsd
Expand Down
29 changes: 25 additions & 4 deletions Feersum.xs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,8 @@ struct feer_conn {
unsigned int is_http11:1;
unsigned int poll_write_cb_is_io_handle:1;
unsigned int auto_cl:1;

ssize_t pipelined;
};

enum feer_header_norm_style {
Expand Down Expand Up @@ -675,6 +677,7 @@ new_feer_conn (EV_P_ int conn_fd, struct sockaddr *sa)
c->receiving = RECEIVE_HEADERS;
c->is_keepalive = 0;
c->reqs = 0;
c->pipelined = 0;

ev_io_init(&c->read_ev_io, try_conn_read, conn_fd, EV_READ);
c->read_ev_io.data = (void *)c;
Expand Down Expand Up @@ -1036,8 +1039,16 @@ try_write_shutdown:
Safefree(c->req);
}
c->req = NULL;
start_read_watcher(c);
restart_read_timer(c);
ssize_t pipelined = SvCUR(c->rbuf);
if (unlikely(pipelined > 0 && c->is_http11)) {
trace3("connections has pipelined data on %d\n", c->fd);
c->pipelined = pipelined;
try_conn_read(EV_A, &c->read_ev_io, 0);
} else {
c->pipelined = 0;
start_read_watcher(c);
restart_read_timer(c);
}
trace3("connections active on %d\n", c->fd);
} else {
trace3("write SHUTDOWN %d, refcnt=%d, state=%d\n", c->fd, SvREFCNT(c->self), c->responding);
Expand Down Expand Up @@ -1076,6 +1087,9 @@ try_conn_read(EV_P_ ev_io *w, int revents)
{
dCONN;
SvREFCNT_inc_void_NN(c->self);
ssize_t got_n = 0;

if (unlikely(c->pipelined)) goto pipelined;

// if it's marked readable EV suggests we simply try read it. Otherwise it
// is stopped and we should ditch this connection.
Expand All @@ -1089,7 +1103,7 @@ try_conn_read(EV_P_ ev_io *w, int revents)

trace("try read %d\n",w->fd);

if (likely(!c->rbuf)) { // likely = optimize for small requests
if (unlikely(!c->rbuf)) { // likely = optimize for keepalive requests
trace("init rbuf for %d\n",w->fd);
c->rbuf = newSV(READ_INIT_FACTOR*READ_BUFSZ + 1);
SvPOK_on(c->rbuf);
Expand All @@ -1105,7 +1119,7 @@ try_conn_read(EV_P_ ev_io *w, int revents)
}

char *cur = SvPVX(c->rbuf) + SvCUR(c->rbuf);
ssize_t got_n = read(w->fd, cur, space_free);
got_n = read(w->fd, cur, space_free);

if (unlikely(got_n <= 0)) {
if (unlikely(got_n == 0)) {
Expand All @@ -1120,6 +1134,13 @@ try_conn_read(EV_P_ ev_io *w, int revents)

trace("read %d %"Ssz_df"\n", w->fd, (Ssz)got_n);
SvCUR(c->rbuf) += got_n;
goto try_parse;

pipelined:
got_n = c->pipelined;
c->pipelined = 0;

try_parse:
// likely = optimize for small requests
if (likely(c->receiving <= RECEIVE_HEADERS)) {
int ret = try_parse_http(c, (size_t)got_n);
Expand Down
2 changes: 1 addition & 1 deletion lib/Feersum.pm
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use warnings;
use EV ();
use Carp ();

our $VERSION = '1.503';
our $VERSION = '1.504';

require Feersum::Connection;
require Feersum::Connection::Handle;
Expand Down
105 changes: 105 additions & 0 deletions t/66-pipelining.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#!perl
use warnings;
use strict;
use Test::More tests => 21;
use Test::Fatal;
use utf8;
use lib 't'; use Utils;

BEGIN { use_ok('Feersum') };

my ($listen_socket, $port) = get_listen_socket();
ok $listen_socket, "made listen socket";
ok $listen_socket->fileno, "has a fileno";

my $evh = Feersum->new();

# Enable keep-alive which is needed for pipelining
$evh->set_keepalive(1);

my $request_count = 0;
$evh->request_handler(sub {
my $r = shift;
isa_ok $r, 'Feersum::Connection', 'got an object!';

$request_count++;
my $path = $r->path;

# Add small delays to verify ordering
my @res = (
200,
['Content-Type' => 'text/plain'],
["Response $request_count: $path"]
);
if ($path eq '/test2') {
my $w; $w = AE::timer 0.1, 0.1, sub {
undef $w;
$r->send_response(@res);
};
} else { $r->send_response(@res) }
});

is exception {
$evh->use_socket($listen_socket);
}, undef, 'assigned socket';

my $cv = AE::cv;
$cv->begin;

# Create connection using AnyEvent::Handle
my @responses;
my $h; $h = AnyEvent::Handle->new(
connect => ['localhost', $port],
timeout => 5,
on_error => sub {
my ($h, $fatal, $msg) = @_;
fail "client error: $msg";
$cv->send;
},
on_eof => sub {
# Done handling all responses
my $responses = join "\n\n", @responses;
like($responses, qr/Response 1: \/test1.*Response 2: \/test2.*Response 3: \/test3/s,
'Got all responses in correct order');

# Verify each response had proper headers
is(scalar(@responses), 3, 'Got expected number of response parts'); # 3 headers + 3 bodies + trailing empty

my @parts = map { split /\r\n\r\n/ } @responses;

# First response
like($parts[0], qr/^HTTP\/1\.1 200 OK/, 'First response has correct status');
like($parts[0], qr/Content-Type: text\/plain/, 'First response has content type');
is($parts[1], "Response 1: /test1", 'First response has correct body');

# Second response
like($parts[2], qr/^HTTP\/1\.1 200 OK/, 'Second response has correct status');
like($parts[2], qr/Content-Type: text\/plain/, 'Second response has content type');
is($parts[3], "Response 2: /test2", 'Second response has correct body');

# Third response
like($parts[4], qr/^HTTP\/1\.1 200 OK/, 'Third response has correct status');
like($parts[4], qr/Content-Type: text\/plain/, 'Third response has content type');
like($parts[4], qr/Connection: close/, 'Third response has Connection: close');
is($parts[5], "Response 3: /test3", 'Third response has correct body');

is($request_count, 3, 'Handled all three requests');
$cv->end;
$h->destroy;
},
on_read => sub {
last unless my $len = length(my $buf = $_[0]->rbuf);
push @responses, $buf;
substr $_[0]->rbuf, 0, $len, '';
}
);

# Send three pipelined requests at once
$h->push_write(
"GET /test1 HTTP/1.1\r\nHost: localhost\r\n\r\n" .
"GET /test2 HTTP/1.1\r\nHost: localhost\r\n\r\n" .
"GET /test3 HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n"
);

$cv->recv;
pass "all done";

0 comments on commit e7fded6

Please sign in to comment.