Skip to content

Commit

Permalink
PROTON-1520: C proactor epoll performance. Defer writes until after e…
Browse files Browse the repository at this point in the history
…vent batch processed,

 defer rearm until after write
  • Loading branch information
Cliff Jansen committed Jul 20, 2017
1 parent 10df413 commit fa598ea
Showing 1 changed file with 28 additions and 25 deletions.
53 changes: 28 additions & 25 deletions proton-c/src/proactor/epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,7 @@ struct pn_listener_t {


static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events, bool timeout, bool topup);
static void write_flush(pconnection_t *pc);
static void listener_begin_close(pn_listener_t* l);
static void proactor_add(pcontext_t *ctx);
static bool proactor_remove(pcontext_t *ctx);
Expand Down Expand Up @@ -800,9 +801,13 @@ static void pconnection_forced_shutdown(pconnection_t *pc) {
static pn_event_t *pconnection_batch_next(pn_event_batch_t *batch) {
pconnection_t *pc = batch_pconnection(batch);
pn_event_t *e = pn_connection_driver_next_event(&pc->driver);
if (!e && pc->hog_count < HOG_MAX) {
if (pconnection_process(pc, 0, false, true)) {
e = pn_connection_driver_next_event(&pc->driver);
if (!e) {
write_flush(pc); // May generate transport event
e = pn_connection_driver_next_event(&pc->driver);
if (!e && pc->hog_count < HOG_MAX) {
if (pconnection_process(pc, 0, false, true)) {
e = pn_connection_driver_next_event(&pc->driver);
}
}
}
return e;
Expand Down Expand Up @@ -903,6 +908,23 @@ static bool pconnection_write(pconnection_t *pc, pn_bytes_t wbuf) {
return true;
}

static void write_flush(pconnection_t *pc) {
if (!pc->write_blocked && !pconnection_wclosed(pc)) {
pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
if (wbuf.size > 0) {
if (!pconnection_write(pc, wbuf)) {
psocket_error(&pc->psocket, errno, pc->disconnected ? "disconnected" : "on write to");
}
}
else {
if (pn_connection_driver_write_closed(&pc->driver)) {
shutdown(pc->psocket.sockfd, SHUT_WR);
pc->write_blocked = true;
}
}
}
}

static void pconnection_connected_lh(pconnection_t *pc);
static void pconnection_maybe_connect_lh(pconnection_t *pc);

Expand Down Expand Up @@ -1001,7 +1023,6 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
pc->current_arm = 0;
pc->new_events = 0;
}
bool unarmed = (pc->current_arm == 0);

if (pc->context.closing && pconnection_is_final(pc)) {
unlock(&pc->context.mutex);
Expand Down Expand Up @@ -1058,36 +1079,17 @@ static pn_event_batch_t *pconnection_process(pconnection_t *pc, uint32_t events,
tick_required = false;
}

while (!pc->write_blocked && !pconnection_wclosed(pc)) {
pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver);
if (wbuf.size > 0) {
if (!pconnection_write(pc, wbuf)) {
psocket_error(&pc->psocket, errno, pc->disconnected ? "disconnected" : "on write to");
}
}
else {
if (pn_connection_driver_write_closed(&pc->driver)) {
shutdown(pc->psocket.sockfd, SHUT_WR);
pc->write_blocked = true;
}
else
break; /* nothing to write until next read/wake/timeout */
}
}

if (topup) {
// If there was anything new to topup, we have it by now.
if (unarmed && pconnection_rearm_check(pc))
pconnection_rearm(pc);
return NULL; // caller already owns the batch
}

if (pconnection_has_event(pc)) {
if (unarmed && pconnection_rearm_check(pc))
pconnection_rearm(pc);
return &pc->batch;
}

write_flush(pc);

lock(&pc->context.mutex);
if (pc->context.closing && pconnection_is_final(pc)) {
unlock(&pc->context.mutex);
Expand Down Expand Up @@ -1608,6 +1610,7 @@ pn_proactor_t *pn_proactor() {

void pn_proactor_free(pn_proactor_t *p) {
// No competing threads, not even a pending timer
p->shutting_down = true;
close(p->epollfd);
p->epollfd = -1;
close(p->eventfd);
Expand Down

0 comments on commit fa598ea

Please sign in to comment.