Skip to content

Commit

Permalink
Add option for controlling reponse from CapHeader. #1541
Browse files Browse the repository at this point in the history
  • Loading branch information
yang-xiaodong committed Jun 10, 2024
1 parent 797a8dd commit bce8dc7
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 10 deletions.
29 changes: 29 additions & 0 deletions docs/content/user-guide/en/cap/messaging.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,35 @@ public object DeductProductQty(JsonElement param)
}
```

### Controlling callback response

You can inject the `CapHeader` parameter in the subscription method using the `[FromCap]` attribute and utilize its methods to add extra headers to the callback context or terminate the callback.

Example:

```cs
[CapSubscribe("place.order.qty.deducted")]
public object DeductProductQty(JsonElement param, [FromCap] CapHeader header)
{
var orderId = param.GetProperty("OrderId").GetInt32();
var productId = param.GetProperty("ProductId").GetInt32();
var qty = param.GetProperty("Qty").GetInt32();

// Add additional headers to the response message
header.AddResponseHeader("some-message-info", "this is the test");
// Or add a callback to the response
header.AddResponseHeader(DotNetCore.CAP.Messages.Headers.CallbackName, "place.order.qty.deducted-callback");

// If you no longer want to follow the sender's specified callback and want to modify it, use the RewriteCallback method.
header.RewriteCallback("new-callback-name");

// If you want to terminate/stop, or no longer respond to the sender, call RemoveCallback to remove the callback.
header.RemoveCallback();

return new { OrderId = orderId, IsSuccess = true };
}
```

## Heterogeneous system integration

In version 3.0+, we reconstructed the message structure. We used the Header in the message protocol in the message queue to transmit some additional information, so that we can do it in the Body without modifying or packaging the user’s original The message data format and content are sent.
Expand Down
29 changes: 29 additions & 0 deletions docs/content/user-guide/zh/cap/messaging.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,35 @@ public object DeductProductQty(JsonElement param)
}
```

### 控制回调响应

你可以通过 `[FromCap]` 标记在订阅方法中注入 `CapHeader` 参数,并利用其提供的方法来向回调上下文中添加额外的头信息或者终止回调。

示例如下:

```cs
[CapSubscribe("place.order.qty.deducted")]
public object DeductProductQty(JsonElement param, [FromCap] CapHeader header)
{
var orderId = param.GetProperty("OrderId").GetInt32();
var productId = param.GetProperty("ProductId").GetInt32();
var qty = param.GetProperty("Qty").GetInt32();

// 添加额外的头信息到响应消息中
header.AddResponseHeader("some-message-info", "this is the test");
// 或再次添加回调的回调
header.AddResponseHeader(DotNetCore.CAP.Messages.Headers.CallbackName, "place.order.qty.deducted-callback");

// 如果你不再遵从发送着指定的回调,想修改回调,可通过 RewriteCallback 方法修改。
header.RewriteCallback("new-callback-name");

// 如果你想终止/停止,或不再给发送方响应,调用 RemoveCallback 来移除回调。
header.RemoveCallback();

return new { OrderId = orderId, IsSuccess = true };
}
```

## 异构系统集成

在 3.0+ 版本中,我们对消息结构进行了重构,我们利用了消息队列中消息协议中的 Header 来传输一些额外信息,以便于在 Body 中我们可以做到不需要修改或包装使用者的原始消息数据格式和内容进行发送。
Expand Down
31 changes: 31 additions & 0 deletions src/DotNetCore.CAP/CAP.Attribute.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Collections.Generic;
using System.Collections.ObjectModel;
using DotNetCore.CAP.Internal;
using DotNetCore.CAP.Messages;

// ReSharper disable once CheckNamespace
namespace DotNetCore.CAP;
Expand All @@ -29,7 +30,37 @@ public class FromCapAttribute : Attribute

public class CapHeader : ReadOnlyDictionary<string, string?>
{
internal IDictionary<string, string?>? ResponseHeader { get; set; }

public CapHeader(IDictionary<string, string?> dictionary) : base(dictionary)
{
}

/// <summary>
/// When a callbackName is specified from publish message, use this method to add an additional header.
/// </summary>
/// <param name="key">The response header key.</param>
/// <param name="value">The response header value.</param>
public void AddResponseHeader(string key, string? value)
{
ResponseHeader ??= new Dictionary<string, string?>();
ResponseHeader[key] = value;
}

/// <summary>
/// When a callbackName is specified from publish message, use this method to abort the callback.
/// </summary>
public void RemoveCallback()
{
Dictionary.Remove(Headers.CallbackName);
}

/// <summary>
/// When a callbackName is specified from Publish message, use this method to rewrite the callback name.
/// </summary>
/// <param name="callbackName"></param>
public void RewriteCallback(string callbackName)
{
Dictionary[Headers.CallbackName] = callbackName;
}
}
7 changes: 6 additions & 1 deletion src/DotNetCore.CAP/Internal/ConsumerExecutedResult.cs
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
// Copyright (c) .NET Core Community. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System.Collections.Generic;

namespace DotNetCore.CAP.Internal;

public class ConsumerExecutedResult
{
public ConsumerExecutedResult(object? result, string msgId, string? callbackName)
public ConsumerExecutedResult(object? result, string msgId, string? callbackName, IDictionary<string, string?>? callbackHeader)
{
Result = result;
MessageId = msgId;
CallbackName = callbackName;
CallbackHeader = callbackHeader;
}

public object? Result { get; set; }

public string MessageId { get; set; }

public string? CallbackName { get; set; }

public IDictionary<string, string?>? CallbackHeader { get; set; }
}
14 changes: 6 additions & 8 deletions src/DotNetCore.CAP/Internal/ISubscribeExector.Default.cs
Original file line number Diff line number Diff line change
Expand Up @@ -185,17 +185,15 @@ private async Task InvokeConsumerMethodAsync(MediumMessage message, ConsumerExec

if (!string.IsNullOrEmpty(ret.CallbackName))
{
var header = new Dictionary<string, string?>
{
[Headers.CorrelationId] = message.Origin.GetId(),
[Headers.CorrelationSequence] = (message.Origin.GetCorrelationSequence() + 1).ToString()
};
ret.CallbackHeader ??= new Dictionary<string, string?>();
ret.CallbackHeader[Headers.CorrelationId] = message.Origin.GetId();
ret.CallbackHeader[Headers.CorrelationSequence] = (message.Origin.GetCorrelationSequence() + 1).ToString();

if(message.Origin.Headers.TryGetValue(Headers.TraceParent, out var traceparent))
header[Headers.TraceParent] = traceparent;
if (message.Origin.Headers.TryGetValue(Headers.TraceParent, out var traceparent))
ret.CallbackHeader[Headers.TraceParent] = traceparent;

await _provider.GetRequiredService<ICapPublisher>()
.PublishAsync(ret.CallbackName, ret.Result, header, cancellationToken).ConfigureAwait(false);
.PublishAsync(ret.CallbackName, ret.Result, ret.CallbackHeader, cancellationToken).ConfigureAwait(false);
}
}
catch (OperationCanceledException)
Expand Down
4 changes: 3 additions & 1 deletion src/DotNetCore.CAP/Internal/ISubscribeInvoker.Default.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,14 @@ public async Task<ConsumerExecutedResult> InvokeAsync(ConsumerContext context,
var message = context.DeliverMessage;
var parameterDescriptors = context.ConsumerDescriptor.Parameters;
var executeParameters = new object?[parameterDescriptors.Count];
var headerIndex = 0;
for (var i = 0; i < parameterDescriptors.Count; i++)
{
var parameterDescriptor = parameterDescriptors[i];
if (parameterDescriptor.IsFromCap)
{
executeParameters[i] = GetCapProvidedParameter(parameterDescriptor, message, cancellationToken);
headerIndex = i;
}
else
{
Expand Down Expand Up @@ -123,7 +125,7 @@ public async Task<ConsumerExecutedResult> InvokeAsync(ConsumerContext context,
}
}

return new ConsumerExecutedResult(resultObj, message.GetId(), message.GetCallbackName());
return new ConsumerExecutedResult(resultObj, message.GetId(), message.GetCallbackName(), (executeParameters[headerIndex] as CapHeader)?.ResponseHeader);
}

private static object GetCapProvidedParameter(ParameterDescriptor parameterDescriptor, Message message,
Expand Down

0 comments on commit bce8dc7

Please sign in to comment.