From c262d86dde371d8118285ee4783beeed83bf8c27 Mon Sep 17 00:00:00 2001 From: Lutz Bichler Date: Mon, 22 Jul 2019 12:20:50 +0200 Subject: [PATCH] capicxx-someip-runtime 3.1.12.17 --- CHANGES | 20 +++++++ include/CommonAPI/SomeIP/Connection.hpp | 1 - include/CommonAPI/SomeIP/InputStream.hpp | 8 +-- src/CommonAPI/SomeIP/Connection.cpp | 28 +++++----- src/CommonAPI/SomeIP/InputStream.cpp | 68 ++++++++++-------------- src/CommonAPI/SomeIP/Proxy.cpp | 50 ++++++++--------- 6 files changed, 93 insertions(+), 82 deletions(-) diff --git a/CHANGES b/CHANGES index 73b458b..a39ad02 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,26 @@ Changes ======= +v3.1.12.17 +- Fix race condition which could occur when doing calls on an + unavailable proxy. The race could lead to waiting the complete + specified timeout before sending out the request instead of sending + it out directly if the proxy got available in the meantime. + +v3.1.12.16 +- Fix handling of broken UTF16 surrogate pairs + +v3.1.12.14 +- Ensure vsomeip message delivery via mainloop thread + on synchronous calls. + +v3.1.12.13 +- Deregister dispatch sources later on connection destruction. +- Improve handling of spurious wakeups caused by system time + adjustments. +- Improve robustness of Inputstream regarding malformed messages + containing Variants. + v3.1.12.12 - Fix deregistration of events upon service deregistration - Improve error handling on send errors diff --git a/include/CommonAPI/SomeIP/Connection.hpp b/include/CommonAPI/SomeIP/Connection.hpp index 7d5c058..a26d294 100644 --- a/include/CommonAPI/SomeIP/Connection.hpp +++ b/include/CommonAPI/SomeIP/Connection.hpp @@ -313,7 +313,6 @@ class Connection: std::map, std::shared_ptr> subscriptionStates_; - std::mutex availabilityCalledMutex_; std::map> availabilityCalled_; std::mutex requestedServicesMutex_; diff --git a/include/CommonAPI/SomeIP/InputStream.hpp b/include/CommonAPI/SomeIP/InputStream.hpp index 2d34980..315cdc3 100644 --- a/include/CommonAPI/SomeIP/InputStream.hpp +++ b/include/CommonAPI/SomeIP/InputStream.hpp @@ -177,7 +177,7 @@ class InputStream: public CommonAPI::InputStream { bitAlign(); uint32_t serial; - _readBitValue(serial, 32, false); + errorOccurred_ = _readBitValue(serial, 32, false); if (!hasError()) { _value = Polymorphic_Struct::create(serial); _value->template readValue(*this, _depl); @@ -200,8 +200,8 @@ class InputStream: public CommonAPI::InputStream { DeleteVisitor visitor(_value.valueStorage_); ApplyVoidVisitor, Variant, Types_... >::visit(visitor, _value); - _value.valueType_ = 0; } + _value.valueType_ = 0; uint32_t itsSize; uint32_t itsType; @@ -218,7 +218,7 @@ class InputStream: public CommonAPI::InputStream { readValue(itsSize, unionLengthWidth, true); } - if(!itsType) + if (!itsType) errorOccurred_ = true; if (!hasError()) { @@ -427,7 +427,7 @@ class InputStream: public CommonAPI::InputStream { _value.clear(); - _readBitValue(itsSize, 32, false); + errorOccurred_ = _readBitValue(itsSize, 32, false); while (itsSize > 0) { size_t remainingBeforeRead = remaining_; diff --git a/src/CommonAPI/SomeIP/Connection.cpp b/src/CommonAPI/SomeIP/Connection.cpp index 3404cbd..d9feb9c 100644 --- a/src/CommonAPI/SomeIP/Connection.cpp +++ b/src/CommonAPI/SomeIP/Connection.cpp @@ -40,9 +40,9 @@ void Connection::receive(const std::shared_ptr &_message) { (_message->get_message_type() < vsomeip::message_type_e::MT_NOTIFICATION ? commDirectionType::STUBRECEIVE : commDirectionType::PROXYRECEIVE); - // avoid blocking the mainloop + // avoid blocking the mainloop when a synchronous call in a callback was done bool isSendAndBlockAnswer = false; - { + if (itsDirection == commDirectionType::PROXYRECEIVE) { std::lock_guard itsLock(sendReceiveMutex_); if(_message->get_message_type() != vsomeip::message_type_e::MT_NOTIFICATION && sendAndBlockAnswers_.find(_message->get_session()) != sendAndBlockAnswers_.end()) { @@ -175,7 +175,7 @@ void Connection::onConnectionEvent(state_type_e state) { void Connection::onAvailabilityChange(service_id_t _service, instance_id_t _instance, bool _is_available) { { - std::lock_guard itsLock(availabilityCalledMutex_); + std::lock_guard itsLock(availabilityMutex_); auto its_service = availabilityCalled_.find(_service); if (its_service != availabilityCalled_.end()) { auto its_instance = its_service->second.find(_instance); @@ -184,7 +184,6 @@ void Connection::onAvailabilityChange(service_id_t _service, instance_id_t _inst queueSubscriptionStatusHandler(_service, _instance); } } - availabilityCalled_[_service][_instance] = true; } if (auto lockedContext = mainLoopContext_.lock()) { std::shared_ptr avbl_queue_entry = std::make_shared( @@ -256,7 +255,7 @@ void Connection::handleAvailabilityChange(const service_id_t _service, std::weak_ptr, void*>> itsHandlers; { - std::unique_lock itsLock(availabilityMutex_); + std::lock_guard itsLock(availabilityMutex_); auto foundService = availabilityHandlers_.find(_service); if (foundService != availabilityHandlers_.end()) { auto foundInstance = foundService->second.find(_instance); @@ -271,6 +270,7 @@ void Connection::handleAvailabilityChange(const service_id_t _service, itsHandlers.push_back(h.second); } } + availabilityCalled_[_service][_instance] = true; } for (auto h : itsHandlers) { @@ -278,7 +278,6 @@ void Connection::handleAvailabilityChange(const service_id_t _service, std::get<0>(h)(itsProxy, _service, _instance, _is_available, std::get<2>(h)); } - } void Connection::dispatch() { @@ -374,6 +373,10 @@ Connection::Connection(const std::string &_name) } Connection::~Connection() { + if (auto lockedContext = mainLoopContext_.lock()) { + lockedContext->deregisterDispatchSource(dispatchSource_); + lockedContext->deregisterWatch(watch_); + } bool shouldDisconnect(false); { std::lock_guard itsLock(connectionMutex_); @@ -422,10 +425,6 @@ bool Connection::connect(bool) { } void Connection::doDisconnect() { - if (auto lockedContext = mainLoopContext_.lock()) { - lockedContext->deregisterDispatchSource(dispatchSource_); - lockedContext->deregisterWatch(watch_); - } if (asyncAnswersCleanupThread_) { { std::lock_guard lg(cleanupMutex_); @@ -575,17 +574,18 @@ Message Connection::sendMessageWithReplyAndBlock( break; } waitStatus = sendAndBlockCondition_.wait_until(lock, elapsed); - if (waitStatus == std::cv_status::timeout || itsAnswer.first->second) + if (itsAnswer.first->second || (waitStatus == std::cv_status::timeout && (elapsed < std::chrono::steady_clock::now()))) break; } while (!itsAnswer.first->second); // If there was an answer (thus, we did not run into the timeout), // move it to itsResult - if (waitStatus != std::cv_status::timeout) { + if (itsAnswer.first->second) { itsResult = std::move(itsAnswer.first->second); - sendAndBlockAnswers_.erase(itsAnswer.first); } + sendAndBlockAnswers_.erase(itsAnswer.first); + return itsResult; } @@ -736,7 +736,7 @@ Connection::isAvailable(const Address &_address) { } { bool availabilityCalled(false); - std::lock_guard itsLock(availabilityCalledMutex_); + std::lock_guard itsLock(availabilityMutex_); auto its_service = availabilityCalled_.find(_address.getService()); if (its_service != availabilityCalled_.end()) { auto its_instance = its_service->second.find(_address.getInstance()); diff --git a/src/CommonAPI/SomeIP/InputStream.cpp b/src/CommonAPI/SomeIP/InputStream.cpp index 66cbd36..c507f09 100644 --- a/src/CommonAPI/SomeIP/InputStream.cpp +++ b/src/CommonAPI/SomeIP/InputStream.cpp @@ -184,19 +184,19 @@ InputStream& InputStream::readValue(uint32_t &_value, const uint8_t &_width, con case 1: { uint8_t temp; - _readBitValue(temp, 8, false); + errorOccurred_ = _readBitValue(temp, 8, false); _value = temp; } break; case 2: { uint16_t temp; - _readBitValue(temp, 16, false); + errorOccurred_ = _readBitValue(temp, 16, false); _value = temp; } break; case 4: - _readBitValue(_value, 32, false); + errorOccurred_ = _readBitValue(_value, 32, false); break; default: errorOccurred_ = true; @@ -244,46 +244,36 @@ InputStream& InputStream::readValue(std::string &_value, const StringDeployment size_t length = 0; if (encoder->checkBom(data, itsSize, _depl->stringEncoding_)) { - switch (_depl->stringEncoding_) - { - case StringEncoding::UTF16BE: - while (itsSize > 1 && (data[itsSize - 1] != 0x00 || data[itsSize - 2] != 0x00)) - itsSize--; - - if (itsSize % 2 != 0) { - errorOccurred_ = true; - } - - if(!hasError()) { - encoder->utf16To8((byte_t *) data, BIG_ENDIAN, itsSize - 2, status, &bytes, length); + const StringEncoding encoding = _depl->stringEncoding_; + if (encoding == StringEncoding::UTF8) { + if (data[itsSize - 1] != 0x00) { + errorOccurred_ = true; + } else { + status = EncodingStatus::SUCCESS; + } + + bytes = (byte_t *) data; + } else { // UTF16BE + UTF16LE + while (itsSize > 1 && (data[itsSize - 1] != 0x00 || data[itsSize - 2] != 0x00)) + itsSize--; + + if (itsSize % 2 != 0) { + errorOccurred_ = true; + } + + if(!hasError()) { + const int endianess = (encoding == StringEncoding::UTF16BE) ? BIG_ENDIAN : LITTLE_ENDIAN; + encoder->utf16To8((byte_t *) data, endianess, itsSize - 2, status, &bytes, length); + if (length) { itsSize = static_cast(length); - } - break; - - case StringEncoding::UTF16LE: - while (itsSize > 1 && (data[itsSize - 1] != 0x00 || data[itsSize - 2] != 0x00)) - itsSize--; - - if (itsSize % 2 != 0) { + status = EncodingStatus::SUCCESS; + } else { errorOccurred_ = true; + delete[] bytes; + bytes = NULL; } - - if(!hasError()) { - encoder->utf16To8((byte_t *) data, LITTLE_ENDIAN, itsSize - 2, status, &bytes, length); - itsSize = static_cast(length); - } - break; - - default: - if (data[itsSize - 1] != 0x00) { - errorOccurred_ = true; - } - - bytes = (byte_t *) data; - break; + } } - - status = EncodingStatus::SUCCESS; } else { status = EncodingStatus::INVALID_BOM; } diff --git a/src/CommonAPI/SomeIP/Proxy.cpp b/src/CommonAPI/SomeIP/Proxy.cpp index e6a165d..1abffa2 100644 --- a/src/CommonAPI/SomeIP/Proxy.cpp +++ b/src/CommonAPI/SomeIP/Proxy.cpp @@ -58,9 +58,9 @@ void Proxy::availabilityTimeoutThreadHandler() const { // their required data in a list of tuples. typedef std::tuple< isAvailableAsyncCallback, - std::promise, AvailabilityStatus, - std::chrono::steady_clock::time_point + std::chrono::steady_clock::time_point, + std::list::iterator > CallbackData_t; std::list callbacks; @@ -100,25 +100,25 @@ void Proxy::availabilityTimeoutThreadHandler() const { std::chrono::steady_clock::time_point timepoint_; if(isAvailable()) { availabilityMutex_.lock(); - callbacks.push_back(std::make_tuple(callback, std::move(std::get<2>(*it)), + callbacks.push_back(std::make_tuple(callback, AvailabilityStatus::AVAILABLE, - timepoint_)); + timepoint_, it)); } else { availabilityMutex_.lock(); - callbacks.push_back(std::make_tuple(callback, std::move(std::get<2>(*it)), + callbacks.push_back(std::make_tuple(callback, AvailabilityStatus::NOT_AVAILABLE, - timepoint_)); + timepoint_, it)); } - it = timeouts_.erase(it); + ++it; availabilityMutex_.unlock(); } else { //timeout not expired if(isAvailable()) { availabilityMutex_.lock(); - callbacks.push_back(std::make_tuple(callback, std::move(std::get<2>(*it)), + callbacks.push_back(std::make_tuple(callback, AvailabilityStatus::AVAILABLE, - minTimeout)); - it = timeouts_.erase(it); + minTimeout, it)); + ++it; availabilityMutex_.unlock(); } else { ++it; @@ -131,7 +131,9 @@ void Proxy::availabilityTimeoutThreadHandler() const { if(firstIteration) { firstIteration = false; - continue; + if (!isAvailable()) { + continue; + } } //timeout not expired @@ -142,10 +144,10 @@ void Proxy::availabilityTimeoutThreadHandler() const { if(isAvailable()) { availabilityMutex_.lock(); - callbacks.push_back(std::make_tuple(callback, std::move(std::get<2>(*it)), + callbacks.push_back(std::make_tuple(callback, AvailabilityStatus::AVAILABLE, - minTimeout)); - it = timeouts_.erase(it); + minTimeout, it)); + ++it; availabilityMutex_.unlock(); } else { ++it; @@ -164,26 +166,29 @@ void Proxy::availabilityTimeoutThreadHandler() const { auto it = callbacks.begin(); while(it != callbacks.end()) { callback = std::get<0>(*it); - avStatus = std::get<2>(*it); + avStatus = std::get<1>(*it); // compute remaining timeout now = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now(); - remainingTimeout = (int)std::chrono::duration_cast(std::get<3>(*it) - now).count(); + remainingTimeout = (int)std::chrono::duration_cast(std::get<2>(*it) - now).count(); if(remainingTimeout < 0) remainingTimeout = 0; threadLock.unlock(); - std::get<1>(*it).set_value(avStatus); callback(avStatus, remainingTimeout); threadLock.lock(); - it = callbacks.erase(it); + ++it; } //cancel thread timeoutsMutex_.lock(); + for (const auto& cb : callbacks) { + timeouts_.erase(std::get<3>(cb)); + } + callbacks.clear(); if(timeouts_.size() == 0 && callbacks.size() == 0) finish = true; timeoutsMutex_.unlock(); @@ -340,9 +345,6 @@ std::future Proxy::isAvailableAsync( isAvailableAsyncCallback _callback, const CommonAPI::CallInfo *_info) const { - std::promise promise; - std::future future = promise.get_future(); - //set timeout point auto timeoutPoint = (std::chrono::steady_clock::time_point) std::chrono::steady_clock::now() + std::chrono::milliseconds(_info->timeout_); @@ -366,7 +368,7 @@ std::future Proxy::isAvailableAsync( } } //add new timeout - timeouts_.push_back(std::make_tuple(timeoutPoint, _callback, std::move(promise))); + timeouts_.push_back(std::make_tuple(timeoutPoint, _callback, std::promise())); //start availability thread if(!isAvailabilityTimeoutThread) @@ -375,7 +377,7 @@ std::future Proxy::isAvailableAsync( shared_from_this())); } else { //add timeout - timeouts_.push_back(std::make_tuple(timeoutPoint, _callback, std::move(promise))); + timeouts_.push_back(std::make_tuple(timeoutPoint, _callback, std::promise())); } timeoutsMutex_.unlock(); @@ -384,7 +386,7 @@ std::future Proxy::isAvailableAsync( availabilityTimeoutCondition_.notify_all(); availabilityTimeoutThreadMutex_.unlock(); - return future; + return std::future(); } AvailabilityStatus Proxy::getAvailabilityStatus() const {