Skip to content

Commit

Permalink
PROTON-28xx: Reorganize dispositions representation
Browse files Browse the repository at this point in the history
The natural representation for dispositions is a union because different
types of disposition can only have quite different pieces of data.
  • Loading branch information
astitcher committed Aug 13, 2024
1 parent 38213cb commit e812ed8
Show file tree
Hide file tree
Showing 6 changed files with 316 additions and 176 deletions.
103 changes: 63 additions & 40 deletions c/src/core/emitters.h
Original file line number Diff line number Diff line change
Expand Up @@ -614,58 +614,81 @@ static inline void emit_condition(pni_emitter_t* emitter, pni_compound_context*
}
}

static inline void emit_received_disposition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_received_disposition_t *disposition) {
for (bool small_encoding = true; ; small_encoding = false) {
pni_compound_context c = emit_list(emitter, compound0, small_encoding, true);
pni_compound_context compound = c;
emit_uint(emitter, &compound, disposition->section_number);
emit_ulong(emitter, &compound, disposition->section_offset);
emit_end_list(emitter, &compound, small_encoding);
if (encode_succeeded(emitter, &compound)) break;
}
}

static inline void emit_rejected_disposition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_rejected_disposition_t *disposition) {
for (bool small_encoding = true; ; small_encoding = false) {
pni_compound_context c = emit_list(emitter, compound0, small_encoding, true);
pni_compound_context compound = c;
emit_condition(emitter, &compound, &disposition->condition);
emit_end_list(emitter, &compound, small_encoding);
if (encode_succeeded(emitter, &compound)) break;
}
}

static inline void emit_modified_disposition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_modified_disposition_t *disposition){
for (bool small_encoding = true; ; small_encoding = false) {
pni_compound_context c = emit_list(emitter, compound0, small_encoding, true);
pni_compound_context compound = c;
emit_bool(emitter, &compound, disposition->failed);
emit_bool(emitter, &compound, disposition->undeliverable);
emit_copy_or_raw(emitter, &compound, disposition->annotations, disposition->annotations_raw);
emit_end_list(emitter, &compound, small_encoding);
if (encode_succeeded(emitter, &compound)) break;
}
}

static inline void emit_custom_disposition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_custom_disposition_t *disposition){
emit_descriptor(emitter, compound0, disposition->type);
if ((disposition->data && pn_data_size(disposition->data) == 0) ||
(!disposition->data && disposition->data_raw.size == 0)) {
emit_list0(emitter, compound0);
return;
}
pni_compound_context c = make_compound();
emit_copy_or_raw(emitter, &c, disposition->data, disposition->data_raw);
compound0->count++;
}

static inline void emit_disposition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_disposition_t *disposition)
{
if (!disposition || !disposition->type) {
if (!disposition || pn_disposition_type(disposition)==PN_DISP_EMPTY) {
emit_null(emitter, compound0);
return;
}

emit_descriptor(emitter, compound0, disposition->type);
switch (disposition->type) {
case PN_RECEIVED:
for (bool small_encoding = true; ; small_encoding = false) {
pni_compound_context c = emit_list(emitter, compound0, small_encoding, true);
pni_compound_context compound = c;
emit_uint(emitter, &compound, disposition->section_number);
emit_ulong(emitter, &compound, disposition->section_offset);
emit_end_list(emitter, &compound, small_encoding);
if (encode_succeeded(emitter, &compound)) break;
}
case PN_DISP_RECEIVED:
emit_descriptor(emitter, compound0, AMQP_DESC_RECEIVED);
emit_received_disposition(emitter, compound0, &disposition->u.s_received);
return;
case PN_DISP_ACCEPTED:
emit_descriptor(emitter, compound0, AMQP_DESC_ACCEPTED);
emit_list0(emitter, compound0);
return;
case PN_ACCEPTED:
case PN_RELEASED:
case PN_DISP_RELEASED:
emit_descriptor(emitter, compound0, AMQP_DESC_RELEASED);
emit_list0(emitter, compound0);
return;
case PN_REJECTED:
for (bool small_encoding = true; ; small_encoding = false) {
pni_compound_context c = emit_list(emitter, compound0, small_encoding, true);
pni_compound_context compound = c;
emit_condition(emitter, &compound, &disposition->condition);
emit_end_list(emitter, &compound, small_encoding);
if (encode_succeeded(emitter, &compound)) break;
}
case PN_DISP_REJECTED:
emit_descriptor(emitter, compound0, AMQP_DESC_REJECTED);
emit_rejected_disposition(emitter, compound0, &disposition->u.s_rejected);
return;
case PN_MODIFIED:
for (bool small_encoding = true; ; small_encoding = false) {
pni_compound_context c = emit_list(emitter, compound0, small_encoding, true);
pni_compound_context compound = c;
emit_bool(emitter, &compound, disposition->failed);
emit_bool(emitter, &compound, disposition->undeliverable);
emit_copy_or_raw(emitter, &compound, disposition->annotations, disposition->annotations_raw);
emit_end_list(emitter, &compound, small_encoding);
if (encode_succeeded(emitter, &compound)) break;
}
case PN_DISP_MODIFIED:
emit_descriptor(emitter, compound0, AMQP_DESC_MODIFIED);
emit_modified_disposition(emitter, compound0, &disposition->u.s_modified);
return;
default:
if ((disposition->data && pn_data_size(disposition->data) == 0) ||
(!disposition->data && disposition->data_raw.size == 0)) {
emit_list0(emitter, compound0);
return;
}
pni_compound_context c = make_compound();
emit_copy_or_raw(emitter, &c, disposition->data, disposition->data_raw);
compound0->count++;
case PN_DISP_CUSTOM:
emit_custom_disposition(emitter, compound0, &disposition->u.s_custom);
return;
}
}
Expand Down
41 changes: 35 additions & 6 deletions c/src/core/engine-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,17 +328,46 @@ struct pn_link_t {
bool more_pending;
};

struct pn_disposition_t {
typedef enum pn_disposition_type_t {
PN_DISP_EMPTY = 0,
PN_DISP_CUSTOM = 1,
PN_DISP_RECEIVED = PN_RECEIVED,
PN_DISP_ACCEPTED = PN_ACCEPTED,
PN_DISP_REJECTED = PN_REJECTED,
PN_DISP_RELEASED = PN_RELEASED,
PN_DISP_MODIFIED = PN_MODIFIED,
} pn_disposition_type_t;

typedef struct pn_received_disposition_t {
uint64_t section_offset;
uint32_t section_number;
} pn_received_disposition_t;

typedef struct pn_rejected_disposition_t {
pn_condition_t condition;
uint64_t type;
pn_data_t *data;
pn_bytes_t data_raw;
} pn_rejected_disposition_t;

typedef struct pn_modified_disposition_t {
pn_data_t *annotations;
pn_bytes_t annotations_raw;
uint64_t section_offset;
uint32_t section_number;
bool failed;
bool undeliverable;
} pn_modified_disposition_t;

typedef struct pn_custom_disposition_t {
pn_data_t *data;
uint64_t type;
pn_bytes_t data_raw;
} pn_custom_disposition_t;

struct pn_disposition_t {
union {
struct pn_received_disposition_t s_received;
struct pn_rejected_disposition_t s_rejected;
struct pn_modified_disposition_t s_modified;
struct pn_custom_disposition_t s_custom;
} u;
uint16_t type;
bool settled;
};

Expand Down
Loading

0 comments on commit e812ed8

Please sign in to comment.