Skip to content

Commit

Permalink
Add transaction disposition
Browse files Browse the repository at this point in the history
  • Loading branch information
astitcher committed Nov 21, 2024
1 parent 75ad0d9 commit 1e42f12
Show file tree
Hide file tree
Showing 15 changed files with 234 additions and 7 deletions.
19 changes: 19 additions & 0 deletions c/include/proton/disposition.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ typedef struct pn_disposition_t pn_disposition_t;
*/
#define PN_MODIFIED (0x0000000000000027)

/**
* The PN_TRANSACTIONAL_STATE delivery state is a non terminal state
* indicating the transactional state of a delivery.
*/
#define PN_TRANSACTIONAL_STATE (0x0000000000000034)

/**
* Get the type of a disposition.
*
Expand Down Expand Up @@ -234,6 +240,7 @@ PN_EXTERN pn_data_t *pn_disposition_annotations(pn_disposition_t *disposition);
typedef struct pn_received_disposition_t pn_received_disposition_t;
typedef struct pn_rejected_disposition_t pn_rejected_disposition_t;
typedef struct pn_modified_disposition_t pn_modified_disposition_t;
typedef struct pn_transactional_disposition_t pn_transactional_disposition_t;
typedef struct pn_custom_disposition_t pn_custom_disposition_t;

/**
Expand All @@ -254,6 +261,7 @@ PN_EXTERN pn_custom_disposition_t *pn_custom_disposition(pn_disposition_t *dispo
PN_EXTERN pn_received_disposition_t *pn_received_disposition(pn_disposition_t *disposition);
PN_EXTERN pn_rejected_disposition_t *pn_rejected_disposition(pn_disposition_t *disposition);
PN_EXTERN pn_modified_disposition_t *pn_modified_disposition(pn_disposition_t *disposition);
PN_EXTERN pn_transactional_disposition_t *pn_transactional_disposition(pn_disposition_t *disposition);

/**
* Access the disposition as a raw pn_data_t.
Expand Down Expand Up @@ -383,6 +391,17 @@ PN_EXTERN void pn_modified_disposition_set_undeliverable(pn_modified_disposition
*/
PN_EXTERN pn_data_t *pn_modified_disposition_annotations(pn_modified_disposition_t *disposition);


PN_EXTERN pn_bytes_t pn_transactional_disposition_get_id(pn_transactional_disposition_t *disposition);
PN_EXTERN void pn_transactional_disposition_set_id(pn_transactional_disposition_t *disposition, pn_bytes_t id);
PN_EXTERN uint64_t pn_transactional_disposition_get_outcome_type(pn_transactional_disposition_t *disposition);

/**
* Set the provisional outcome of the message if the transaction is committed successfully.
* Only terminal disposition states are allowed (PN_ACCEPTED, PN_REJECTED, PN_RELEASED, PN_MODIFIED)
*/
PN_EXTERN void pn_transactional_disposition_set_outcome_type(pn_transactional_disposition_t *disposition, uint64_t outcome);

/**
* @}
*/
Expand Down
17 changes: 17 additions & 0 deletions c/src/core/emitters.h
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,19 @@ static inline void emit_modified_disposition(pni_emitter_t* emitter, pni_compoun
}
}

static inline void emit_disposition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_disposition_t *disposition);

static inline void emit_transactional_disposition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_transactional_disposition_t *disposition){
for (bool small_encoding = true; ; small_encoding = false) {
pni_compound_context c = emit_list(emitter, compound0, small_encoding, true);
pni_compound_context compound = c;
emit_binary_bytes(emitter, &compound, disposition->id);
emit_raw(emitter, &compound, disposition->outcome_raw);
emit_end_list(emitter, &compound, small_encoding);
if (encode_succeeded(emitter, &compound)) break;
}
}

static inline void emit_custom_disposition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_custom_disposition_t *disposition){
emit_descriptor(emitter, compound0, disposition->type);
if ((disposition->data && pn_data_size(disposition->data) == 0) ||
Expand Down Expand Up @@ -687,6 +700,10 @@ static inline void emit_disposition(pni_emitter_t* emitter, pni_compound_context
emit_descriptor(emitter, compound0, AMQP_DESC_MODIFIED);
emit_modified_disposition(emitter, compound0, &disposition->u.s_modified);
return;
case PN_DISP_TRANSACTIONAL:
emit_descriptor(emitter, compound0, AMQP_DESC_TRANSACTIONAL_STATE);
emit_transactional_disposition(emitter, compound0, &disposition->u.s_transactional);
return;
case PN_DISP_CUSTOM:
emit_custom_disposition(emitter, compound0, &disposition->u.s_custom);
return;
Expand Down
7 changes: 7 additions & 0 deletions c/src/core/engine-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ typedef enum pn_disposition_type_t {
PN_DISP_REJECTED = PN_REJECTED,
PN_DISP_RELEASED = PN_RELEASED,
PN_DISP_MODIFIED = PN_MODIFIED,
PN_DISP_TRANSACTIONAL = PN_TRANSACTIONAL_STATE,
} pn_disposition_type_t;

struct pn_received_disposition_t {
Expand All @@ -359,6 +360,11 @@ struct pn_modified_disposition_t {
bool undeliverable;
};

struct pn_transactional_disposition_t {
pn_bytes_t id;
pn_bytes_t outcome_raw;
};

struct pn_custom_disposition_t {
pn_data_t *data;
pn_bytes_t data_raw;
Expand All @@ -370,6 +376,7 @@ struct pn_disposition_t {
struct pn_received_disposition_t s_received;
struct pn_rejected_disposition_t s_rejected;
struct pn_modified_disposition_t s_modified;
struct pn_transactional_disposition_t s_transactional;
struct pn_custom_disposition_t s_custom;
} u;
uint16_t type;
Expand Down
56 changes: 56 additions & 0 deletions c/src/core/engine.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "consumers.h"
#include "core/frame_consumers.h"
#include "emitters.h"
#include "core/frame_generators.h"
#include "fixed_string.h"
#include "framing.h"
#include "memory.h"
Expand Down Expand Up @@ -1597,6 +1598,10 @@ static void pn_disposition_finalize(pn_disposition_t *ds)
pn_data_free(ds->u.s_custom.data);
pn_bytes_free(ds->u.s_custom.data_raw);
break;
case PN_DISP_TRANSACTIONAL:
pn_bytes_free(ds->u.s_transactional.id);
pn_bytes_free(ds->u.s_transactional.outcome_raw);
break;
}
}

Expand Down Expand Up @@ -1868,6 +1873,9 @@ void pni_disposition_to_raw(pn_disposition_t *disposition) {
case PN_DISP_MODIFIED:
emit_modified_disposition(&emitter, &compound, &disposition->u.s_modified);
break;
case PN_DISP_TRANSACTIONAL:
emit_transactional_disposition(&emitter, &compound, &disposition->u.s_transactional);
break;
}

if (type != PN_DISP_EMPTY) {
Expand Down Expand Up @@ -2005,6 +2013,13 @@ pn_modified_disposition_t *pn_modified_disposition(pn_disposition_t *disposition
return &disposition->u.s_modified;
}

pn_transactional_disposition_t *pn_transactional_disposition(pn_disposition_t *disposition)
{
if (disposition->type==PN_DISP_EMPTY) disposition->type = PN_DISP_TRANSACTIONAL;
else if (disposition->type!=PN_DISP_TRANSACTIONAL) return NULL;
return &disposition->u.s_transactional;
}

pn_data_t *pn_custom_disposition_data(pn_custom_disposition_t *disposition)
{
assert(disposition);
Expand Down Expand Up @@ -2084,6 +2099,44 @@ pn_data_t *pn_modified_disposition_annotations(pn_modified_disposition_t *dispos
return disposition->annotations;
}

pn_bytes_t pn_transactional_disposition_get_id(pn_transactional_disposition_t *disposition)
{
assert(disposition);
return disposition->id;
}

void pn_transactional_disposition_set_id(pn_transactional_disposition_t *disposition, pn_bytes_t id)
{
assert(disposition);
pn_bytes_free(disposition->id);
disposition->id = pn_bytes_dup(id);
}

uint64_t pn_transactional_disposition_get_outcome_type(pn_transactional_disposition_t *disposition)
{
assert(disposition);
if (disposition->outcome_raw.size) {
bool qtype = false;
uint64_t type;
pn_amqp_decode_DQLq(disposition->outcome_raw, &qtype, &type);
if (qtype) {
return type;
}
}
return PN_DISP_EMPTY;
}

void pn_transactional_disposition_set_outcome_type(pn_transactional_disposition_t *disposition, uint64_t type)
{
assert(disposition);
// Generate a described LIST0 directly - this needs a max of 11 bytes
char outcome_scratch[11];
pn_rwbytes_t scratch = {.size=sizeof(outcome_scratch), .start=outcome_scratch};
pn_bytes_t outcome_raw = pn_amqp_encode_DLEe(&scratch, type);
pn_bytes_free(disposition->outcome_raw);
disposition->outcome_raw = pn_bytes_dup(outcome_raw);
}

pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery)
{
if (delivery) {
Expand Down Expand Up @@ -2420,6 +2473,7 @@ void pn_delivery_update(pn_delivery_t *delivery, uint64_t state)
case PN_RECEIVED:
case PN_MODIFIED:
case PN_RELEASED:
case PN_TRANSACTIONAL_STATE:
break;
default:
delivery->local.u.s_custom.type = state;
Expand All @@ -2434,6 +2488,7 @@ void pn_delivery_update(pn_delivery_t *delivery, uint64_t state)
case PN_RECEIVED:
case PN_MODIFIED:
case PN_RELEASED:
case PN_TRANSACTIONAL_STATE:
delivery->local.type = state;
break;
default:
Expand Down Expand Up @@ -2804,6 +2859,7 @@ const char *pn_disposition_type_name(uint64_t d) {
case PN_REJECTED: return "rejected";
case PN_RELEASED: return "released";
case PN_MODIFIED: return "modified";
case PN_TRANSACTIONAL_STATE: return "transactional_state";
default: return "unknown";
}
}
Expand Down
14 changes: 14 additions & 0 deletions c/src/core/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -1602,6 +1602,20 @@ static void pni_amqp_decode_disposition (uint64_t type, pn_bytes_t disp_data, pn
}
break;
}
case AMQP_DESC_TRANSACTIONAL_STATE: {
pn_bytes_t id;
bool qoutcome;
pn_bytes_t outcome_raw;
pn_amqp_decode_DqEzQRe(disp_data, &id, &qoutcome, &outcome_raw);
disp->type = PN_DISP_TRANSACTIONAL;
pn_bytes_free(disp->u.s_transactional.id);
disp->u.s_transactional.id = pn_bytes_dup(id);
disp->u.s_transactional.outcome_raw = (pn_bytes_t){0, NULL};
if (qoutcome) {
disp->u.s_transactional.outcome_raw = pn_bytes_dup(outcome_raw);
}
break;
}
default: {
pn_bytes_t data_raw = (pn_bytes_t){0, NULL};
pn_amqp_decode_DqR(disp_data, &data_raw);
Expand Down
2 changes: 2 additions & 0 deletions c/tools/codec-generator/specs.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"fill_specs": [
"R",
"DLR",
"DL[]",
"DL[c]",
"DL[?HIIII]",
"DL[?IIII?I?I?In?o]",
Expand Down Expand Up @@ -44,6 +45,7 @@
"D.[s]",
"D.[z]",
"D.[Bz]",
"D.[z?R]",
"D.[R]",
"D?L.",
"D?L?."
Expand Down
7 changes: 7 additions & 0 deletions python/cproton.h
Original file line number Diff line number Diff line change
Expand Up @@ -410,11 +410,13 @@ typedef struct pn_custom_disposition_t pn_custom_disposition_t;
typedef struct pn_received_disposition_t pn_received_disposition_t;
typedef struct pn_rejected_disposition_t pn_rejected_disposition_t;
typedef struct pn_modified_disposition_t pn_modified_disposition_t;
typedef struct pn_transactional_disposition_t pn_transactional_disposition_t;

pn_custom_disposition_t *pn_custom_disposition(pn_disposition_t *disposition);
pn_received_disposition_t *pn_received_disposition(pn_disposition_t *disposition);
pn_rejected_disposition_t *pn_rejected_disposition(pn_disposition_t *disposition);
pn_modified_disposition_t *pn_modified_disposition(pn_disposition_t *disposition);
pn_transactional_disposition_t *pn_transactional_disposition(pn_disposition_t *disposition);

void pn_custom_disposition_set_type(pn_custom_disposition_t *disposition, uint64_t type);
uint64_t pn_custom_disposition_get_type(pn_custom_disposition_t *disposition);
Expand All @@ -429,6 +431,10 @@ void pn_modified_disposition_set_failed(pn_modified_disposition_t *disposition,
_Bool pn_modified_disposition_is_undeliverable(pn_modified_disposition_t *disposition);
void pn_modified_disposition_set_undeliverable(pn_modified_disposition_t *disposition, _Bool undeliverable);
pn_data_t *pn_modified_disposition_annotations(pn_modified_disposition_t *disposition);
pn_bytes_t pn_transactional_disposition_get_id(pn_transactional_disposition_t *disposition);
void pn_transactional_disposition_set_id(pn_transactional_disposition_t *disposition, pn_bytes_t id);
uint64_t pn_transactional_disposition_get_outcome_type(pn_transactional_disposition_t *disposition);
void pn_transactional_disposition_set_outcome_type(pn_transactional_disposition_t *disposition, uint64_t outcome);

int pn_error_code(pn_error_t *error);
const char *pn_error_text(pn_error_t *error);
Expand Down Expand Up @@ -656,6 +662,7 @@ int pn_transport_unbind(pn_transport_t *transport);
#define PN_REJECTED ...
#define PN_RELEASED ...
#define PN_MODIFIED ...
#define PN_TRANSACTIONAL_STATE ...

// Default message priority
#define PN_DEFAULT_PRIORITY ...
Expand Down
14 changes: 12 additions & 2 deletions python/cproton.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@
PN_SSL_RESUME_UNKNOWN, PN_SSL_SHA1, PN_SSL_SHA256, PN_SSL_SHA512,
PN_SSL_VERIFY_PEER, PN_SSL_VERIFY_PEER_NAME, PN_STRING, PN_SYMBOL,
PN_TARGET, PN_TIMEOUT, PN_TIMER_TASK, PN_TIMESTAMP, PN_TRACE_DRV,
PN_TRACE_FRM, PN_TRACE_OFF, PN_TRACE_RAW, PN_TRANSPORT,
PN_TRACE_FRM, PN_TRACE_OFF, PN_TRACE_RAW, PN_TRANSACTIONAL_STATE, PN_TRANSPORT,
PN_TRANSPORT_CLOSED, PN_TRANSPORT_ERROR, PN_TRANSPORT_HEAD_CLOSED,
PN_TRANSPORT_TAIL_CLOSED, PN_UBYTE, PN_UINT, PN_ULONG, PN_UNSPECIFIED,
PN_USHORT, PN_UUID, PN_VERSION_MAJOR, PN_VERSION_MINOR,
Expand Down Expand Up @@ -145,7 +145,9 @@
pn_terminus_properties, pn_terminus_set_distribution_mode,
pn_terminus_set_durability, pn_terminus_set_dynamic,
pn_terminus_set_expiry_policy, pn_terminus_set_timeout,
pn_terminus_set_type, pn_transport, pn_transport_attachments,
pn_terminus_set_type, pn_transactional_disposition,
pn_transactional_disposition_get_outcome_type,
pn_transactional_disposition_set_outcome_type, pn_transport, pn_transport_attachments,
pn_transport_bind, pn_transport_capacity, pn_transport_close_head,
pn_transport_close_tail, pn_transport_closed, pn_transport_condition,
pn_transport_connection, pn_transport_error,
Expand Down Expand Up @@ -773,3 +775,11 @@ def pn_ssl_get_peer_hostname(ssl, size):

def pn_ssl_set_peer_hostname(ssl, hostname):
return lib.pn_ssl_set_peer_hostname(ssl, string2utf8(hostname))


def pn_transactional_disposition_get_id(disp):
return bytes2pybytes(lib.pn_transactional_disposition_get_id(disp))


def pn_transactional_disposition_set_id(disp, id):
return lib.pn_transactional_disposition_set_id(disp, py2bytes(id))
Binary file added python/examples/dst_db
Binary file not shown.
Binary file added python/examples/src_db
Binary file not shown.
3 changes: 2 additions & 1 deletion python/proton/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from ._data import UNDESCRIBED, Array, Data, Described, char, symbol, timestamp, ubyte, ushort, uint, ulong, \
byte, short, int32, float32, decimal32, decimal64, decimal128, AnnotationDict, PropertyDict, SymbolList
from ._delivery import Delivery, Disposition, DispositionType, GeneralDisposition, RejectedDisposition, \
ModifiedDisposition, ReceivedDisposition
ModifiedDisposition, ReceivedDisposition, TransactionalDisposition
from ._endpoints import Endpoint, Connection, Session, Link, Receiver, Sender, Terminus
from ._events import Collector, Event, EventType
from ._exceptions import ProtonException, MessageException, DataException, TransportException, \
Expand Down Expand Up @@ -89,6 +89,7 @@
"Terminus",
"Timeout",
"Interrupt",
"TransactionalDisposition",
"Transport",
"TransportException",
"Url",
Expand Down
Loading

0 comments on commit 1e42f12

Please sign in to comment.