Skip to content

Commit

Permalink
PROTON-????: Remove pn_data_t operations from frame codec
Browse files Browse the repository at this point in the history
We only convert to pn_data_t if the application requests the data
as a pn_data_t.

- renamed message rest of message pn_data members deprecated
- Added some more useful emmiter/consumer code
- parse message directly to raw bytes
- parse error conditions without pn_data_t
- Avoid using pn_amqp_decode_*C when not really needed
- Tidied up message codec
  • Loading branch information
astitcher committed Jul 12, 2024
1 parent 2296d3c commit d5fb231
Show file tree
Hide file tree
Showing 10 changed files with 655 additions and 415 deletions.
15 changes: 2 additions & 13 deletions c/src/core/consumers.h
Original file line number Diff line number Diff line change
Expand Up @@ -680,17 +680,6 @@ static inline bool consume_described_maybe_type_anything(pni_consumer_t* consume
return *qtype;
}

static inline bool consume_copy(pni_consumer_t *consumer, pn_data_t *data) {
size_t iposition = consumer->position;
uint8_t type;
bool tq = consume_single_value(consumer, &type);
if (!tq || type==PNE_NULL) return false;

pn_bytes_t value = {.size = consumer->position-iposition, .start = (const char*)consumer->output_start+iposition};
ssize_t err = pn_data_decode(data, value.start, value.size);
return err>=0 && err==(ssize_t)value.size;
}

static inline bool consume_described_maybe_type_raw(pni_consumer_t *consumer, bool *qtype, uint64_t *type, pn_bytes_t *raw) {
pni_consumer_t subconsumer;
*qtype = consume_described_ulong_descriptor(consumer, &subconsumer, type);
Expand All @@ -704,9 +693,9 @@ static inline bool consume_described_maybe_type_maybe_anything(pni_consumer_t *c
return *qtype && *qanything;
}

static inline bool consume_described_copy(pni_consumer_t *consumer, pn_data_t *data) {
static inline bool consume_described_raw(pni_consumer_t *consumer, pn_bytes_t *raw) {
pni_consumer_t subconsumer;
return consume_described(consumer, &subconsumer) && consume_copy(&subconsumer, data);
return consume_described(consumer, &subconsumer) && consume_raw(&subconsumer, raw);
}

static inline bool consume_string(pni_consumer_t *consumer, pn_bytes_t *string) {
Expand Down
128 changes: 92 additions & 36 deletions c/src/core/emitters.h
Original file line number Diff line number Diff line change
Expand Up @@ -558,49 +558,105 @@ static inline void emit_raw(pni_emitter_t* emitter, pni_compound_context* compou
compound->count++;
}

static inline void emit_multiple(pni_emitter_t* emitter, pni_compound_context* compound, pn_data_t* data) {
if (!data || pn_data_size(data) == 0) {
emit_null(emitter, compound);
// Keep this here as a placeholder until we do something more intelligent
static inline void emit_multiple(pni_emitter_t* emitter, pni_compound_context* compound, const pn_bytes_t bytes) {
emit_raw(emitter, compound, bytes);
}

static inline void emit_described_type_raw(pni_emitter_t* emitter, pni_compound_context* compound, uint64_t descriptor, const pn_bytes_t bytes) {
emit_descriptor(emitter, compound, descriptor);
pni_compound_context c = make_compound();
emit_raw(emitter, &c, bytes);
// Can only be a single item (probably a list though)
compound->count++;
}

static inline void emit_condition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_condition_t* condition) {
if (!condition || !condition->name || !pn_string_get(condition->name)) {
emit_null(emitter, compound0);
return;
}

pn_handle_t point = pn_data_point(data);
pn_data_rewind(data);
// Rewind and position to first node so data type is defined.
pn_data_next(data);

if (pn_data_type(data) == PN_ARRAY) {
switch (pn_data_get_array(data)) {
case 0:
emit_null(emitter, compound);
pn_data_restore(data, point);
return;
case 1:
emit_accumulated_nulls(emitter, compound);
pn_data_enter(data);
pn_data_narrow(data);
pni_emitter_data(emitter, data);
pn_data_widen(data);
break;
default:
emit_accumulated_nulls(emitter, compound);
pni_emitter_data(emitter, data);
emit_descriptor(emitter, compound0, ERROR);
for (bool small_encoding = true; ; small_encoding = false) {
pni_compound_context c = emit_list(emitter, compound0, small_encoding, true);
pni_compound_context compound = c;
pn_bytes_t name_bytes = pn_string_bytes(condition->name);
if (name_bytes.size==0) {
emit_null(emitter, &compound);
} else {
emit_symbol_bytes(emitter, &compound, name_bytes);
}
} else {
emit_accumulated_nulls(emitter, compound);
pni_emitter_data(emitter, data);
pn_bytes_t description_bytes = pn_string_bytes(condition->description);
if (description_bytes.size==0) {
emit_null(emitter, &compound);
} else {
emit_string_bytes(emitter, &compound, description_bytes);
}
if (condition->info) {
emit_copy(emitter, &compound, condition->info);
} else {
emit_raw(emitter, &compound, condition->info_raw);
}
emit_end_list(emitter, &compound, small_encoding);
if (encode_succeeded(emitter, &compound)) break;
}

compound->count++;
pn_data_restore(data, point);
}

static inline void emit_described_type_copy(pni_emitter_t* emitter, pni_compound_context* compound, uint64_t descriptor, pn_data_t* data) {
emit_descriptor(emitter, compound, descriptor);
pni_compound_context c = make_compound();
emit_copy(emitter, &c, data);
// Can only be a single item (probably a list though)
compound->count++;
static inline void emit_disposition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_disposition_t *disposition)
{
if (!disposition || !disposition->type) {
emit_null(emitter, compound0);
return;
}

emit_descriptor(emitter, compound0, disposition->type);
switch (disposition->type) {
case PN_RECEIVED:
for (bool small_encoding = true; ; small_encoding = false) {
pni_compound_context c = emit_list(emitter, compound0, small_encoding, true);
pni_compound_context compound = c;
emit_uint(emitter, &compound, disposition->section_number);
emit_ulong(emitter, &compound, disposition->section_offset);
emit_end_list(emitter, &compound, small_encoding);
if (encode_succeeded(emitter, &compound)) break;
}
return;
case PN_ACCEPTED:
case PN_RELEASED:
return;
case PN_REJECTED:
for (bool small_encoding = true; ; small_encoding = false) {
pni_compound_context c = emit_list(emitter, compound0, small_encoding, true);
pni_compound_context compound = c;
emit_condition(emitter, &compound, &disposition->condition);
emit_end_list(emitter, &compound, small_encoding);
if (encode_succeeded(emitter, &compound)) break;
}
return;
case PN_MODIFIED:
for (bool small_encoding = true; ; small_encoding = false) {
pni_compound_context c = emit_list(emitter, compound0, small_encoding, true);
pni_compound_context compound = c;
emit_bool(emitter, &compound, disposition->failed);
emit_bool(emitter, &compound, disposition->undeliverable);
if (disposition->annotations) {
emit_copy(emitter, &compound, disposition->annotations);
} else {
emit_raw(emitter, &compound, disposition->annotations_raw);
}
emit_end_list(emitter, &compound, small_encoding);
if (encode_succeeded(emitter, &compound)) break;
}
return;
default:
if (disposition->data) {
emit_copy(emitter, compound0, disposition->data);
} else {
emit_raw(emitter, compound0, disposition->data_raw);
}
return;
}
}

#endif // PROTON_EMITTERS_H
23 changes: 19 additions & 4 deletions c/src/core/engine-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ typedef enum pn_endpoint_type_t {CONNECTION, SESSION, SENDER, RECEIVER} pn_endpo
typedef struct pn_endpoint_t pn_endpoint_t;

struct pn_condition_t {
pn_bytes_t info_raw;
pn_string_t *name;
pn_string_t *description;
pn_data_t *info;
Expand Down Expand Up @@ -136,10 +137,9 @@ struct pn_transport_t {
pn_connection_t *connection; // reference counted
char *remote_container;
char *remote_hostname;
pn_data_t *remote_offered_capabilities;
pn_data_t *remote_desired_capabilities;
pn_data_t *remote_properties;
pn_data_t *disp_data;
pn_bytes_t remote_offered_capabilities_raw;
pn_bytes_t remote_desired_capabilities_raw;
pn_bytes_t remote_properties_raw;
// DEFAULT_MAX_FRAME_SIZE see PROTON-2640
#define PN_DEFAULT_MAX_FRAME_SIZE (32*1024)
uint32_t local_max_frame;
Expand Down Expand Up @@ -243,9 +243,15 @@ struct pn_connection_t {
pn_string_t *auth_user;
pn_string_t *authzid;
pn_string_t *auth_password;
pn_bytes_t offered_capabilities_raw;
pn_bytes_t desired_capabilities_raw;
pn_bytes_t properties_raw;
pn_data_t *offered_capabilities;
pn_data_t *desired_capabilities;
pn_data_t *properties;
pn_data_t *remote_offered_capabilities;
pn_data_t *remote_desired_capabilities;
pn_data_t *remote_properties;
pn_collector_t *collector;
pn_record_t *context;
pn_list_t *delivery_pool;
Expand All @@ -270,6 +276,10 @@ struct pn_session_t {

struct pn_terminus_t {
pn_string_t *address;
pn_bytes_t properties_raw;
pn_bytes_t capabilities_raw;
pn_bytes_t outcomes_raw;
pn_bytes_t filter_raw;
pn_data_t *properties;
pn_data_t *capabilities;
pn_data_t *outcomes;
Expand Down Expand Up @@ -297,7 +307,9 @@ struct pn_link_t {
pn_delivery_t *current;
pn_record_t *context;
pn_data_t *properties;
pn_bytes_t properties_raw;
pn_data_t *remote_properties;
pn_bytes_t remote_properties_raw;
size_t unsettled_count;
uint64_t max_message_size;
uint64_t remote_max_message_size;
Expand All @@ -320,7 +332,9 @@ struct pn_disposition_t {
pn_condition_t condition;
uint64_t type;
pn_data_t *data;
pn_bytes_t data_raw;
pn_data_t *annotations;
pn_bytes_t annotations_raw;
uint64_t section_offset;
uint32_t section_number;
bool failed;
Expand Down Expand Up @@ -357,6 +371,7 @@ struct pn_delivery_t {
#define PN_SET_REMOTE(OLD, NEW) \
(OLD) = ((OLD) & PN_LOCAL_MASK) | (NEW)

pn_link_t *pn_link_new(int type, pn_session_t *session, pn_string_t *name);
void pn_link_dump(pn_link_t *link);

void pn_dump(pn_connection_t *conn);
Expand Down
Loading

0 comments on commit d5fb231

Please sign in to comment.