From 1e42f1225fd7e5dac0a1beb6186fdaadad86ada8 Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Tue, 29 Oct 2024 16:44:18 -0400 Subject: [PATCH] Add transaction disposition --- c/include/proton/disposition.h | 19 +++++++ c/src/core/emitters.h | 17 +++++++ c/src/core/engine-internal.h | 7 +++ c/src/core/engine.c | 56 ++++++++++++++++++++ c/src/core/transport.c | 14 +++++ c/tools/codec-generator/specs.json | 2 + python/cproton.h | 7 +++ python/cproton.py | 14 ++++- python/examples/dst_db | Bin 0 -> 12288 bytes python/examples/src_db | Bin 0 -> 12288 bytes python/proton/__init__.py | 3 +- python/proton/_delivery.py | 76 ++++++++++++++++++++++++++-- python/tests/proton_tests/engine.py | 22 +++++++- sasl_conf/proton-server.conf | 4 ++ sasl_conf/proton.sasldb | Bin 0 -> 16384 bytes 15 files changed, 234 insertions(+), 7 deletions(-) create mode 100644 python/examples/dst_db create mode 100644 python/examples/src_db create mode 100644 sasl_conf/proton-server.conf create mode 100644 sasl_conf/proton.sasldb diff --git a/c/include/proton/disposition.h b/c/include/proton/disposition.h index db0d1cea6..4965b2fd6 100644 --- a/c/include/proton/disposition.h +++ b/c/include/proton/disposition.h @@ -90,6 +90,12 @@ typedef struct pn_disposition_t pn_disposition_t; */ #define PN_MODIFIED (0x0000000000000027) +/** + * The PN_TRANSACTIONAL_STATE delivery state is a non terminal state + * indicating the transactional state of a delivery. + */ +#define PN_TRANSACTIONAL_STATE (0x0000000000000034) + /** * Get the type of a disposition. * @@ -234,6 +240,7 @@ PN_EXTERN pn_data_t *pn_disposition_annotations(pn_disposition_t *disposition); typedef struct pn_received_disposition_t pn_received_disposition_t; typedef struct pn_rejected_disposition_t pn_rejected_disposition_t; typedef struct pn_modified_disposition_t pn_modified_disposition_t; +typedef struct pn_transactional_disposition_t pn_transactional_disposition_t; typedef struct pn_custom_disposition_t pn_custom_disposition_t; /** @@ -254,6 +261,7 @@ PN_EXTERN pn_custom_disposition_t *pn_custom_disposition(pn_disposition_t *dispo PN_EXTERN pn_received_disposition_t *pn_received_disposition(pn_disposition_t *disposition); PN_EXTERN pn_rejected_disposition_t *pn_rejected_disposition(pn_disposition_t *disposition); PN_EXTERN pn_modified_disposition_t *pn_modified_disposition(pn_disposition_t *disposition); +PN_EXTERN pn_transactional_disposition_t *pn_transactional_disposition(pn_disposition_t *disposition); /** * Access the disposition as a raw pn_data_t. @@ -383,6 +391,17 @@ PN_EXTERN void pn_modified_disposition_set_undeliverable(pn_modified_disposition */ PN_EXTERN pn_data_t *pn_modified_disposition_annotations(pn_modified_disposition_t *disposition); + +PN_EXTERN pn_bytes_t pn_transactional_disposition_get_id(pn_transactional_disposition_t *disposition); +PN_EXTERN void pn_transactional_disposition_set_id(pn_transactional_disposition_t *disposition, pn_bytes_t id); +PN_EXTERN uint64_t pn_transactional_disposition_get_outcome_type(pn_transactional_disposition_t *disposition); + +/** + * Set the provisional outcome of the message if the transaction is committed successfully. + * Only terminal disposition states are allowed (PN_ACCEPTED, PN_REJECTED, PN_RELEASED, PN_MODIFIED) + */ +PN_EXTERN void pn_transactional_disposition_set_outcome_type(pn_transactional_disposition_t *disposition, uint64_t outcome); + /** * @} */ diff --git a/c/src/core/emitters.h b/c/src/core/emitters.h index 2cd59a64c..f20bdff10 100644 --- a/c/src/core/emitters.h +++ b/c/src/core/emitters.h @@ -647,6 +647,19 @@ static inline void emit_modified_disposition(pni_emitter_t* emitter, pni_compoun } } +static inline void emit_disposition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_disposition_t *disposition); + +static inline void emit_transactional_disposition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_transactional_disposition_t *disposition){ + for (bool small_encoding = true; ; small_encoding = false) { + pni_compound_context c = emit_list(emitter, compound0, small_encoding, true); + pni_compound_context compound = c; + emit_binary_bytes(emitter, &compound, disposition->id); + emit_raw(emitter, &compound, disposition->outcome_raw); + emit_end_list(emitter, &compound, small_encoding); + if (encode_succeeded(emitter, &compound)) break; + } +} + static inline void emit_custom_disposition(pni_emitter_t* emitter, pni_compound_context* compound0, pn_custom_disposition_t *disposition){ emit_descriptor(emitter, compound0, disposition->type); if ((disposition->data && pn_data_size(disposition->data) == 0) || @@ -687,6 +700,10 @@ static inline void emit_disposition(pni_emitter_t* emitter, pni_compound_context emit_descriptor(emitter, compound0, AMQP_DESC_MODIFIED); emit_modified_disposition(emitter, compound0, &disposition->u.s_modified); return; + case PN_DISP_TRANSACTIONAL: + emit_descriptor(emitter, compound0, AMQP_DESC_TRANSACTIONAL_STATE); + emit_transactional_disposition(emitter, compound0, &disposition->u.s_transactional); + return; case PN_DISP_CUSTOM: emit_custom_disposition(emitter, compound0, &disposition->u.s_custom); return; diff --git a/c/src/core/engine-internal.h b/c/src/core/engine-internal.h index da563813a..f10e6a8f0 100644 --- a/c/src/core/engine-internal.h +++ b/c/src/core/engine-internal.h @@ -341,6 +341,7 @@ typedef enum pn_disposition_type_t { PN_DISP_REJECTED = PN_REJECTED, PN_DISP_RELEASED = PN_RELEASED, PN_DISP_MODIFIED = PN_MODIFIED, + PN_DISP_TRANSACTIONAL = PN_TRANSACTIONAL_STATE, } pn_disposition_type_t; struct pn_received_disposition_t { @@ -359,6 +360,11 @@ struct pn_modified_disposition_t { bool undeliverable; }; +struct pn_transactional_disposition_t { + pn_bytes_t id; + pn_bytes_t outcome_raw; +}; + struct pn_custom_disposition_t { pn_data_t *data; pn_bytes_t data_raw; @@ -370,6 +376,7 @@ struct pn_disposition_t { struct pn_received_disposition_t s_received; struct pn_rejected_disposition_t s_rejected; struct pn_modified_disposition_t s_modified; + struct pn_transactional_disposition_t s_transactional; struct pn_custom_disposition_t s_custom; } u; uint16_t type; diff --git a/c/src/core/engine.c b/c/src/core/engine.c index 7ccb308b9..d7fb091d4 100644 --- a/c/src/core/engine.c +++ b/c/src/core/engine.c @@ -27,6 +27,7 @@ #include "consumers.h" #include "core/frame_consumers.h" #include "emitters.h" +#include "core/frame_generators.h" #include "fixed_string.h" #include "framing.h" #include "memory.h" @@ -1597,6 +1598,10 @@ static void pn_disposition_finalize(pn_disposition_t *ds) pn_data_free(ds->u.s_custom.data); pn_bytes_free(ds->u.s_custom.data_raw); break; + case PN_DISP_TRANSACTIONAL: + pn_bytes_free(ds->u.s_transactional.id); + pn_bytes_free(ds->u.s_transactional.outcome_raw); + break; } } @@ -1868,6 +1873,9 @@ void pni_disposition_to_raw(pn_disposition_t *disposition) { case PN_DISP_MODIFIED: emit_modified_disposition(&emitter, &compound, &disposition->u.s_modified); break; + case PN_DISP_TRANSACTIONAL: + emit_transactional_disposition(&emitter, &compound, &disposition->u.s_transactional); + break; } if (type != PN_DISP_EMPTY) { @@ -2005,6 +2013,13 @@ pn_modified_disposition_t *pn_modified_disposition(pn_disposition_t *disposition return &disposition->u.s_modified; } +pn_transactional_disposition_t *pn_transactional_disposition(pn_disposition_t *disposition) +{ + if (disposition->type==PN_DISP_EMPTY) disposition->type = PN_DISP_TRANSACTIONAL; + else if (disposition->type!=PN_DISP_TRANSACTIONAL) return NULL; + return &disposition->u.s_transactional; +} + pn_data_t *pn_custom_disposition_data(pn_custom_disposition_t *disposition) { assert(disposition); @@ -2084,6 +2099,44 @@ pn_data_t *pn_modified_disposition_annotations(pn_modified_disposition_t *dispos return disposition->annotations; } +pn_bytes_t pn_transactional_disposition_get_id(pn_transactional_disposition_t *disposition) +{ + assert(disposition); + return disposition->id; +} + +void pn_transactional_disposition_set_id(pn_transactional_disposition_t *disposition, pn_bytes_t id) +{ + assert(disposition); + pn_bytes_free(disposition->id); + disposition->id = pn_bytes_dup(id); +} + +uint64_t pn_transactional_disposition_get_outcome_type(pn_transactional_disposition_t *disposition) +{ + assert(disposition); + if (disposition->outcome_raw.size) { + bool qtype = false; + uint64_t type; + pn_amqp_decode_DQLq(disposition->outcome_raw, &qtype, &type); + if (qtype) { + return type; + } + } + return PN_DISP_EMPTY; +} + +void pn_transactional_disposition_set_outcome_type(pn_transactional_disposition_t *disposition, uint64_t type) +{ + assert(disposition); + // Generate a described LIST0 directly - this needs a max of 11 bytes + char outcome_scratch[11]; + pn_rwbytes_t scratch = {.size=sizeof(outcome_scratch), .start=outcome_scratch}; + pn_bytes_t outcome_raw = pn_amqp_encode_DLEe(&scratch, type); + pn_bytes_free(disposition->outcome_raw); + disposition->outcome_raw = pn_bytes_dup(outcome_raw); +} + pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery) { if (delivery) { @@ -2420,6 +2473,7 @@ void pn_delivery_update(pn_delivery_t *delivery, uint64_t state) case PN_RECEIVED: case PN_MODIFIED: case PN_RELEASED: + case PN_TRANSACTIONAL_STATE: break; default: delivery->local.u.s_custom.type = state; @@ -2434,6 +2488,7 @@ void pn_delivery_update(pn_delivery_t *delivery, uint64_t state) case PN_RECEIVED: case PN_MODIFIED: case PN_RELEASED: + case PN_TRANSACTIONAL_STATE: delivery->local.type = state; break; default: @@ -2804,6 +2859,7 @@ const char *pn_disposition_type_name(uint64_t d) { case PN_REJECTED: return "rejected"; case PN_RELEASED: return "released"; case PN_MODIFIED: return "modified"; + case PN_TRANSACTIONAL_STATE: return "transactional_state"; default: return "unknown"; } } diff --git a/c/src/core/transport.c b/c/src/core/transport.c index 892e430c1..98d015e63 100644 --- a/c/src/core/transport.c +++ b/c/src/core/transport.c @@ -1602,6 +1602,20 @@ static void pni_amqp_decode_disposition (uint64_t type, pn_bytes_t disp_data, pn } break; } + case AMQP_DESC_TRANSACTIONAL_STATE: { + pn_bytes_t id; + bool qoutcome; + pn_bytes_t outcome_raw; + pn_amqp_decode_DqEzQRe(disp_data, &id, &qoutcome, &outcome_raw); + disp->type = PN_DISP_TRANSACTIONAL; + pn_bytes_free(disp->u.s_transactional.id); + disp->u.s_transactional.id = pn_bytes_dup(id); + disp->u.s_transactional.outcome_raw = (pn_bytes_t){0, NULL}; + if (qoutcome) { + disp->u.s_transactional.outcome_raw = pn_bytes_dup(outcome_raw); + } + break; + } default: { pn_bytes_t data_raw = (pn_bytes_t){0, NULL}; pn_amqp_decode_DqR(disp_data, &data_raw); diff --git a/c/tools/codec-generator/specs.json b/c/tools/codec-generator/specs.json index dddf6d6f8..82b4a013f 100644 --- a/c/tools/codec-generator/specs.json +++ b/c/tools/codec-generator/specs.json @@ -2,6 +2,7 @@ "fill_specs": [ "R", "DLR", + "DL[]", "DL[c]", "DL[?HIIII]", "DL[?IIII?I?I?In?o]", @@ -44,6 +45,7 @@ "D.[s]", "D.[z]", "D.[Bz]", + "D.[z?R]", "D.[R]", "D?L.", "D?L?." diff --git a/python/cproton.h b/python/cproton.h index 59fc89944..76b9d87ab 100644 --- a/python/cproton.h +++ b/python/cproton.h @@ -410,11 +410,13 @@ typedef struct pn_custom_disposition_t pn_custom_disposition_t; typedef struct pn_received_disposition_t pn_received_disposition_t; typedef struct pn_rejected_disposition_t pn_rejected_disposition_t; typedef struct pn_modified_disposition_t pn_modified_disposition_t; +typedef struct pn_transactional_disposition_t pn_transactional_disposition_t; pn_custom_disposition_t *pn_custom_disposition(pn_disposition_t *disposition); pn_received_disposition_t *pn_received_disposition(pn_disposition_t *disposition); pn_rejected_disposition_t *pn_rejected_disposition(pn_disposition_t *disposition); pn_modified_disposition_t *pn_modified_disposition(pn_disposition_t *disposition); +pn_transactional_disposition_t *pn_transactional_disposition(pn_disposition_t *disposition); void pn_custom_disposition_set_type(pn_custom_disposition_t *disposition, uint64_t type); uint64_t pn_custom_disposition_get_type(pn_custom_disposition_t *disposition); @@ -429,6 +431,10 @@ void pn_modified_disposition_set_failed(pn_modified_disposition_t *disposition, _Bool pn_modified_disposition_is_undeliverable(pn_modified_disposition_t *disposition); void pn_modified_disposition_set_undeliverable(pn_modified_disposition_t *disposition, _Bool undeliverable); pn_data_t *pn_modified_disposition_annotations(pn_modified_disposition_t *disposition); +pn_bytes_t pn_transactional_disposition_get_id(pn_transactional_disposition_t *disposition); +void pn_transactional_disposition_set_id(pn_transactional_disposition_t *disposition, pn_bytes_t id); +uint64_t pn_transactional_disposition_get_outcome_type(pn_transactional_disposition_t *disposition); +void pn_transactional_disposition_set_outcome_type(pn_transactional_disposition_t *disposition, uint64_t outcome); int pn_error_code(pn_error_t *error); const char *pn_error_text(pn_error_t *error); @@ -656,6 +662,7 @@ int pn_transport_unbind(pn_transport_t *transport); #define PN_REJECTED ... #define PN_RELEASED ... #define PN_MODIFIED ... +#define PN_TRANSACTIONAL_STATE ... // Default message priority #define PN_DEFAULT_PRIORITY ... diff --git a/python/cproton.py b/python/cproton.py index 0e772f4ea..fe1eca037 100644 --- a/python/cproton.py +++ b/python/cproton.py @@ -54,7 +54,7 @@ PN_SSL_RESUME_UNKNOWN, PN_SSL_SHA1, PN_SSL_SHA256, PN_SSL_SHA512, PN_SSL_VERIFY_PEER, PN_SSL_VERIFY_PEER_NAME, PN_STRING, PN_SYMBOL, PN_TARGET, PN_TIMEOUT, PN_TIMER_TASK, PN_TIMESTAMP, PN_TRACE_DRV, - PN_TRACE_FRM, PN_TRACE_OFF, PN_TRACE_RAW, PN_TRANSPORT, + PN_TRACE_FRM, PN_TRACE_OFF, PN_TRACE_RAW, PN_TRANSACTIONAL_STATE, PN_TRANSPORT, PN_TRANSPORT_CLOSED, PN_TRANSPORT_ERROR, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT_TAIL_CLOSED, PN_UBYTE, PN_UINT, PN_ULONG, PN_UNSPECIFIED, PN_USHORT, PN_UUID, PN_VERSION_MAJOR, PN_VERSION_MINOR, @@ -145,7 +145,9 @@ pn_terminus_properties, pn_terminus_set_distribution_mode, pn_terminus_set_durability, pn_terminus_set_dynamic, pn_terminus_set_expiry_policy, pn_terminus_set_timeout, - pn_terminus_set_type, pn_transport, pn_transport_attachments, + pn_terminus_set_type, pn_transactional_disposition, + pn_transactional_disposition_get_outcome_type, + pn_transactional_disposition_set_outcome_type, pn_transport, pn_transport_attachments, pn_transport_bind, pn_transport_capacity, pn_transport_close_head, pn_transport_close_tail, pn_transport_closed, pn_transport_condition, pn_transport_connection, pn_transport_error, @@ -773,3 +775,11 @@ def pn_ssl_get_peer_hostname(ssl, size): def pn_ssl_set_peer_hostname(ssl, hostname): return lib.pn_ssl_set_peer_hostname(ssl, string2utf8(hostname)) + + +def pn_transactional_disposition_get_id(disp): + return bytes2pybytes(lib.pn_transactional_disposition_get_id(disp)) + + +def pn_transactional_disposition_set_id(disp, id): + return lib.pn_transactional_disposition_set_id(disp, py2bytes(id)) diff --git a/python/examples/dst_db b/python/examples/dst_db new file mode 100644 index 0000000000000000000000000000000000000000..6b9f701b279eb1ab58f4eb0a27f42e4f74000a71 GIT binary patch literal 12288 zcmeI&NsJU#0LJm^X#}nJzV~n)p+O1}onacReF0P&EQV$lhNcImalvM~89RX)nI6!q zd%PJJxS6=!Jh+{_xLrKC9lW_E9^Ilx*B&!tK1sNFFy_5=pZvN|*^X=)m`Bryn*Y}TG zEFB!^w4CaAJs-|%&P1&`R;e|dk-XtlChG^P2LIB`UCxzenmg0pVSZOu5Rf`y;moCK(EoZ`bB-KK35;Ax7DladG(Zf zSS_fwI;@6Oss`0sCGac0!x#7n@8C7OfT!^Y&fo-&U>`DUK?zd+Ccl?o%8%u{@^$&5 zd`3Ph&&nA&C986~9FprK#P8w<@s;>QyeHleFNtTxW8!{suV{$U%wlIARGCd41UED+rGTg{av6-3R24;*+ z%m~*rLu_OQaF{+eFg;wybg`c3XIRHfQDP=o%Z#yx8R1%Hh<;{(YnVQ+W_q}a>EcSF zm*EO#ip!Y^Rx@L0W(36y0cHTn^dXoYY^EzOD_R@=ev8UWNvT{#O5{pXEH5ENas?@r zMN%O9NWLtPyv6l@LH}&^erA{grhqA63YY?>fGJ=Km;$DNDPRhi0;a(Kr+_FFONFJ+ H23mgtK~efZ literal 0 HcmV?d00001 diff --git a/python/examples/src_db b/python/examples/src_db new file mode 100644 index 0000000000000000000000000000000000000000..630939f314d7dd1aa21776c5d90886a049f397e9 GIT binary patch literal 12288 zcmeI&K}*9h6bJB^Run2yZ#(qzF2cr&7r%g2f(*AhYXp0e+D0jCt!t}ykNX6EP``kw zbfANs9pj%g>D>0V2S_$ literal 0 HcmV?d00001 diff --git a/python/proton/__init__.py b/python/proton/__init__.py index 7c87b9bfd..0286a4e6e 100644 --- a/python/proton/__init__.py +++ b/python/proton/__init__.py @@ -37,7 +37,7 @@ from ._data import UNDESCRIBED, Array, Data, Described, char, symbol, timestamp, ubyte, ushort, uint, ulong, \ byte, short, int32, float32, decimal32, decimal64, decimal128, AnnotationDict, PropertyDict, SymbolList from ._delivery import Delivery, Disposition, DispositionType, GeneralDisposition, RejectedDisposition, \ - ModifiedDisposition, ReceivedDisposition + ModifiedDisposition, ReceivedDisposition, TransactionalDisposition from ._endpoints import Endpoint, Connection, Session, Link, Receiver, Sender, Terminus from ._events import Collector, Event, EventType from ._exceptions import ProtonException, MessageException, DataException, TransportException, \ @@ -89,6 +89,7 @@ "Terminus", "Timeout", "Interrupt", + "TransactionalDisposition", "Transport", "TransportException", "Url", diff --git a/python/proton/_delivery.py b/python/proton/_delivery.py index d5b929a39..b59726f0a 100644 --- a/python/proton/_delivery.py +++ b/python/proton/_delivery.py @@ -17,9 +17,9 @@ # under the License. # -from cproton import (PN_ACCEPTED, PN_MODIFIED, PN_RECEIVED, PN_REJECTED, PN_RELEASED, pn_delivery_abort, - pn_delivery_aborted, pn_delivery_attachments, pn_delivery_link, pn_delivery_local, - pn_delivery_local_state, +from cproton import (PN_ACCEPTED, PN_MODIFIED, PN_RECEIVED, PN_REJECTED, PN_RELEASED, PN_TRANSACTIONAL_STATE, + pn_delivery_abort, pn_delivery_aborted, pn_delivery_attachments, pn_delivery_link, + pn_delivery_local, pn_delivery_local_state, pn_delivery_partial, pn_delivery_pending, pn_delivery_readable, pn_delivery_remote, pn_delivery_remote_state, pn_delivery_settle, pn_delivery_settled, pn_delivery_tag, pn_delivery_update, pn_delivery_updated, @@ -44,6 +44,11 @@ pn_modified_disposition_is_undeliverable, pn_modified_disposition_set_undeliverable, pn_modified_disposition_annotations, + pn_transactional_disposition, + pn_transactional_disposition_get_id, + pn_transactional_disposition_set_id, + pn_transactional_disposition_get_outcome_type, + pn_transactional_disposition_set_outcome_type, isnull) from ._condition import cond2obj, obj2cond, Condition @@ -98,6 +103,12 @@ class DispositionType(IntEnum): delivery being settled. """ + TRANSACTIONAL_STATE = PN_TRANSACTIONAL_STATE + """ + A non-terminal delivery state indicating the transactional + state of a delivery + """ + @classmethod def or_int(cls, i: int) -> Union[int, Self]: return cls(i) if i in cls._value2member_map_ else i @@ -119,6 +130,7 @@ class Disposition: REJECTED = DispositionType.REJECTED RELEASED = DispositionType.RELEASED MODIFIED = DispositionType.MODIFIED + TRANSACTIONAL_STATE = DispositionType.TRANSACTIONAL_STATE class RemoteDisposition(Disposition): @@ -133,6 +145,8 @@ def __new__(cls, delivery_impl): return super().__new__(RemoteRejectedDisposition) elif state == cls.MODIFIED: return super().__new__(RemoteModifiedDisposition) + elif state == cls.TRANSACTIONAL_STATE: + return super().__new__(RemoteTransactionalDisposition) else: return super().__new__(RemoteGeneralDisposition) @@ -237,6 +251,29 @@ def apply_to(self, local_disposition: 'LocalDisposition'): ModifiedDisposition(self._failed, self._undeliverable, self._annotations).apply_to(local_disposition) +class RemoteTransactionalDisposition(RemoteDisposition): + + def __init__(self, delivery_impl): + impl = pn_transactional_disposition(pn_delivery_remote(delivery_impl)) + self._id = pn_transactional_disposition_get_id(impl) + self._outcome_type = pn_transactional_disposition_get_outcome_type(impl) + + @property + def type(self) -> Union[int, DispositionType]: + return Disposition.TRANSACTIONAL_STATE + + @property + def id(self): + return self._id + + @property + def outcome_type(self): + return self._outcome_type + + def apply_to(self, local_disposition: 'LocalDisposition'): + TransactionalDisposition(self._id, self._outcome_type).apply_to(local_disposition) + + class LocalDisposition(Disposition): def __init__(self, delivery_impl): @@ -430,6 +467,39 @@ def apply_to(self, local_disposition: LocalDisposition): obj2dat(self._annotations, pn_modified_disposition_annotations(disp)) +class TransactionalDisposition(LocalDisposition): + + def __init__(self, id, outcome_type=None): + self._id = id + self._outcome_type = outcome_type + + @property + def type(self) -> Union[int, DispositionType]: + return Disposition.TRANSACTIONAL_STATE + + @property + def id(self): + return self._id + + @id.setter + def id(self, id): + self._id = id + + @property + def outcome_type(self): + return self._outcome_type + + @outcome_type.setter + def outcome_type(self, type): + self._outcome_type = type + + def apply_to(self, local_disposition: LocalDisposition): + disp = pn_transactional_disposition(local_disposition._impl) + pn_transactional_disposition_set_id(disp, self._id) + if self._outcome_type: + pn_transactional_disposition_set_outcome_type(disp, self._outcome_type) + + class Delivery(Wrapper): """ Tracks and/or records the delivery of a message over a link. diff --git a/python/tests/proton_tests/engine.py b/python/tests/proton_tests/engine.py index f628ba761..e4d3b40e4 100644 --- a/python/tests/proton_tests/engine.py +++ b/python/tests/proton_tests/engine.py @@ -24,7 +24,7 @@ from proton import Array, Condition, Collector, Connection, Data, Delivery, Disposition, DispositionType, Endpoint, \ Event, GeneralDisposition, Link, ModifiedDisposition, PropertyDict, ReceivedDisposition, RejectedDisposition, \ - SASL, SessionException, SymbolList, Terminus, Transport, UNDESCRIBED, symbol + SASL, SessionException, SymbolList, Terminus, TransactionalDisposition, Transport, UNDESCRIBED, symbol from proton.reactor import Container from . import common @@ -2378,6 +2378,23 @@ def check(self, dlv: Delivery): assert dlv.remote.data == self._data, (dlv.data, self._data) +class TransactionalTester(DispositionTester): + def __init__(self, id, outcome_type): + self._id = id + self._outcome_type = outcome_type + super().__init__(Disposition.TRANSACTIONAL_STATE) + + def apply(self, dlv: Delivery): + dlv.local = TransactionalDisposition(self._id, self._outcome_type) + dlv.update() + + def check(self, dlv: Delivery): + assert dlv.remote_state == self._type + assert dlv.remote.type == self._type + assert dlv.remote.id == self._id + assert dlv.remote.outcome_type == self._outcome_type + + class DeliveryTest(Test): def tearDown(self): @@ -2456,6 +2473,9 @@ def testNewModified(self): self._testDisposition(NewModifiedTester(failed=True, undeliverable=True, annotations={"key": "value"})) + def testTransactional(self): + self._testDisposition(TransactionalTester(id=b'1324xxx', outcome_type=Disposition.ACCEPTED)) + def testCustom(self): self._testDisposition(CustomTester(0x12345, [1, 2, 3])) diff --git a/sasl_conf/proton-server.conf b/sasl_conf/proton-server.conf new file mode 100644 index 000000000..15f683a44 --- /dev/null +++ b/sasl_conf/proton-server.conf @@ -0,0 +1,4 @@ + +sasldb_path: /home/andrew/Work/proton/src-jj/sasl_conf/proton.sasldb +mech_list: EXTERNAL DIGEST-MD5 SCRAM-SHA-1 CRAM-MD5 PLAIN ANONYMOUS + \ No newline at end of file diff --git a/sasl_conf/proton.sasldb b/sasl_conf/proton.sasldb new file mode 100644 index 0000000000000000000000000000000000000000..d6a270e357bc9ac858bcc2c5324387c8d304d73f GIT binary patch literal 16384 zcmeI&u?fOJ7>40rE3Fg+vCIf|wpP}5mJS??KOT*3()z#(jO19xx^LGN-WoS>Bv zR9+$ZLn23b`0m5P({|RAu1Iz0VP@OSota~E61mvH836w$*d)w1u{i^f zY-#RW6&!Hz_Xl|(59GlYAP>F<{XibbgD+5T9**ZjP!@46?sxG(FW=G!`>tMRJn^&w zIK1PaHfm)0DsX+(*Nf@D^uO$@s@t+WRq0C1^)xJsb(%++o7MAWn8XiHqj(XPiJITP c<-al42q1s}0tg_000IagfB*srAn