Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add basic support for multi-cluster replication #494

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ Please note this table only reports end-to-end tests suite coverage, others vers
- [x] Cluster monitoring.
- [x] Complete end2end test suite.
- [x] Archival.
- [x] Multi cluster replication.
- [ ] Auto scaling.
- [ ] Multi cluster replication.

## Contributing

Expand Down
17 changes: 17 additions & 0 deletions api/v1beta1/temporalcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,20 @@ func (s *ClusterArchivalSpec) IsEnabled() bool {
return s != nil && s.Enabled
}

arlyon marked this conversation as resolved.
Show resolved Hide resolved
// ClusterReplicationSpec defines the specifications for replication in the temporal cluster. Using these
// the operator will configure the cluster to replicate data to other clusters. If this object is defined,
// then both properties below must be set.
type ClusterReplicationSpec struct {
// EnableGlobalNamespace signifies to this node that global namespaces should be enabled.
// Namespaces that are marked global are replicated across nodes in the cluster.
// +kubebuilder:validation:Required
EnableGlobalNamespace bool `json:"enableGlobalNamespace"`
// InitialFailoverVersion is used to determine the leadership order between nodes in the cluster. The node with the
// lowest initial failover version will be the primary. This must be unique across the cluster.
// +kubebuilder:validation:Required
InitialFailoverVersion int64 `json:"initialFailoverVersion"`
arlyon marked this conversation as resolved.
Show resolved Hide resolved
}

type ArchivalProviderKind string

const (
Expand Down Expand Up @@ -1055,6 +1069,9 @@ type TemporalClusterSpec struct {
// Authorization allows authorization configuration for the temporal cluster.
// +optional
Authorization *AuthorizationSpec `json:"authorization,omitempty"`
// Replication allows configuration of multi-cluster replication.
// +optional
Replication *ClusterReplicationSpec `json:"replication,omitempty"`
}

// ServiceStatus reports a service status.
Expand Down
30 changes: 26 additions & 4 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions bundle/manifests/temporal.io_temporalclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2941,6 +2941,17 @@ spec:
- defaultStore
- visibilityStore
type: object
replication:
properties:
enableGlobalNamespace:
type: boolean
initialFailoverVersion:
format: int64
type: integer
required:
- enableGlobalNamespace
- initialFailoverVersion
type: object
services:
description: Services allows customizations for each temporal services
deployment.
Expand Down
18 changes: 18 additions & 0 deletions config/crd/bases/temporal.io_temporalclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2692,6 +2692,24 @@ spec:
- defaultStore
- visibilityStore
type: object
replication:
description: Replication allows configuration of multi-cluster replication.
properties:
enableGlobalNamespace:
description: |-
EnableGlobalNamespace signifies to this node that global namespaces should be enabled.
Namespaces that are marked global are replicated across nodes in the cluster.
type: boolean
initialFailoverVersion:
description: |-
InitialFailoverVersion is used to determine the leadership order between nodes in the cluster. The node with the
lowest initial failover version will be the primary. This must be unique across the cluster.
format: int64
type: integer
required:
- enableGlobalNamespace
- initialFailoverVersion
type: object
services:
description: Services allows customizations for each temporal services deployment.
properties:
Expand Down
75 changes: 75 additions & 0 deletions docs/api/v1beta1.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,20 @@ AuthorizationSpec
<p>Authorization allows authorization configuration for the temporal cluster.</p>
</td>
</tr>
<tr>
<td>
<code>replication</code><br>
<em>
<a href="#temporal.io/v1beta1.ClusterReplicationSpec">
ClusterReplicationSpec
</a>
</em>
</td>
<td>
<em>(Optional)</em>
<p>Replication allows configuration of multi-cluster replication.</p>
</td>
</tr>
</table>
</td>
</tr>
Expand Down Expand Up @@ -923,6 +937,53 @@ ArchivalSpec
</table>
</div>
</div>
<h3 id="temporal.io/v1beta1.ClusterReplicationSpec">ClusterReplicationSpec
</h3>
<p>
(<em>Appears on:</em>
<a href="#temporal.io/v1beta1.TemporalClusterSpec">TemporalClusterSpec</a>)
</p>
<p>ClusterReplicationSpec defines the specifications for replication in the temporal cluster. Using these
the operator will configure the cluster to replicate data to other clusters. If this object is defined,
then both properties below must be set.</p>
<div class="md-typeset__scrollwrap">
<div class="md-typeset__table">
<table>
<thead>
<tr>
<th>Field</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>
<code>enableGlobalNamespace</code><br>
<em>
bool
</em>
</td>
<td>
<p>EnableGlobalNamespace signifies to this node that global namespaces should be enabled.
Namespaces that are marked global are replicated across nodes in the cluster.</p>
</td>
</tr>
<tr>
<td>
<code>initialFailoverVersion</code><br>
<em>
int64
</em>
</td>
<td>
<p>InitialFailoverVersion is used to determine the leadership order between nodes in the cluster. The node with the
lowest initial failover version will be the primary. This must be unique across the cluster.</p>
</td>
</tr>
</tbody>
</table>
</div>
</div>
<h3 id="temporal.io/v1beta1.ConstrainedValue">ConstrainedValue
</h3>
<p>
Expand Down Expand Up @@ -5005,6 +5066,20 @@ AuthorizationSpec
<p>Authorization allows authorization configuration for the temporal cluster.</p>
</td>
</tr>
<tr>
<td>
<code>replication</code><br>
<em>
<a href="#temporal.io/v1beta1.ClusterReplicationSpec">
ClusterReplicationSpec
</a>
</em>
</td>
<td>
<em>(Optional)</em>
<p>Replication allows configuration of multi-cluster replication.</p>
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
66 changes: 66 additions & 0 deletions docs/features/multi-cluster-replication.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Multi-cluster replication

Temporal supports multi-cluster replication. This feature allows you to replicate specific temporal namespaces to a different temporal cluster. This is useful for disaster recovery, or to have a temporal cluster in a different region for latency reasons, or if you want to upgrade the temporal history shard count.

## How it works

To set up multi-cluster replication using the temporal operator, you must first enable global namespaces on the clusters you wish to support, and then assign them a unique failover version.
This can be configured via the `spec.replicaton` of the `TemporalCluster` resource. Temporal operator automatically configures the remaining fields, and currently hard-codes the failover
increment to 10, meaning you can have at most one leader and 9 followers. If a cluster fails, the remaining clusters will elect a new primary cluster based with the lowest failover version. The original cluster, if it comes back online, will
be assigned a new failover version, which is always the lowest multiple of the failover increment (+ initialFailoverVersion) that is greater than the leader cluster's failover version.

```yaml
apiVersion: temporal.io/v1beta1
kind: TemporalCluster
metadata:
name: prod
spec:
replication:
enableGlobalNamespace: true
initialFailoverVersion: 1
```

For example, in a setup with a leader with `initialFailoverVersion` 1, and a follower with `initialFailoverVersion` 2, since the increment is set to 10 a failure in the leader will flip control to the follower, and increment the leader's failover version to 11.

## Starting replication

Once the two clusters are configured, simply set up connections between them using the temporal CLI. In the future, there may be a way to do this via the operator.

```bash
# port forward to the frontend of the primary cluster
kubectl port-forward primary-frontend 7233:7233

temporal operator cluster upsert --frontend-address secondary-cluster.namespace.svc.cluster.local:7233 --enable-connection true

# port forward to the frontend of the secondary cluster
kubectl port-forward secondary-frontend 7233:7233

temporal operator cluster upsert --frontend-address primary-cluster.namespace.svc.cluster.local:7233 --enable-connection true
```

## Replicating namespaces

Once the clusters are connected, you can replicate namespaces between them. This can be done via the temporal CLI, or via the operator. Simply create a new namespace resource for just the primary clusterRef, and add the secondary cluster to the list of clusters. The namespace will be added to the secondary cluster automatically, with all the same settings, and start receiving updates.

```yaml
apiVersion: temporal.io/v1beta1
kind: TemporalNamespace
metadata:
name: primary-namespace
spec:
clusterRef:
name: primary
clusters:
- primary
- secondary
activeClusterName: secondary
isGlobalNamespace: true
```

| **🚨 Note**: Enabling replication will not automatically replicate old workflows. It only replicates workflows as they are interacted with. For cases like trying to increase the shard count, this is important as you need to make sure each workflow has been evaluated at least once after replication has been set up.

## A mechanism for increasing the history shard count

Since temporal 1.20, replicated clusters do not require the same number of history shards. This means it is a viable method for migrating a cluster that has outgrown its shard count. To do this, simply have your secondary cluster use a higher shard count than the primary cluster. The only requirement is that the shard count on the secondary cluster is an even multiple of the first. So if you have 512 shards on the primary cluster, you can have 1024 shards (or any other multiple) on the secondary cluster, but not 1023.

When replication is complete, simply take down the old cluster, and flip your clients over to the new cluster. This can all be achieved with very little downtime.
40 changes: 27 additions & 13 deletions internal/resource/config/configmap_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,30 @@ func (b *ConfigmapBuilder) buildArchivalConfig() (*config.Archival, *config.Arch
return cfg, namespaceDefaults
}

func (b *ConfigmapBuilder) buildClusterMetadataConfig() *cluster.Config {
failoverVersion := int64(1)
enableGlobalNamespace := false

if b.instance.Spec.Replication != nil {
failoverVersion = b.instance.Spec.Replication.InitialFailoverVersion
enableGlobalNamespace = b.instance.Spec.Replication.EnableGlobalNamespace
}

return &cluster.Config{
EnableGlobalNamespace: enableGlobalNamespace,
FailoverVersionIncrement: 10,
MasterClusterName: b.instance.Name,
CurrentClusterName: b.instance.Name,
ClusterInformation: map[string]cluster.ClusterInformation{
b.instance.Name: {
Enabled: true,
InitialFailoverVersion: failoverVersion,
RPCAddress: "127.0.0.1:7233",
},
},
}
}

func (b *ConfigmapBuilder) Update(object client.Object) error {
configMap := object.(*corev1.ConfigMap)

Expand All @@ -209,6 +233,8 @@ func (b *ConfigmapBuilder) Update(object client.Object) error {

archivalConfig, archivalNamespaceDefaults := b.buildArchivalConfig()

clusterMetadata := b.buildClusterMetadataConfig()

temporalCfg := config.Config{
Global: config.Global{
Membership: config.Membership{
Expand All @@ -223,19 +249,7 @@ func (b *ConfigmapBuilder) Update(object client.Object) error {
NamespaceDefaults: config.NamespaceDefaults{
Archival: *archivalNamespaceDefaults,
},
ClusterMetadata: &cluster.Config{
EnableGlobalNamespace: false,
FailoverVersionIncrement: 10,
MasterClusterName: b.instance.Name,
CurrentClusterName: b.instance.Name,
ClusterInformation: map[string]cluster.ClusterInformation{
b.instance.Name: {
Enabled: true,
InitialFailoverVersion: 1,
RPCAddress: "127.0.0.1:7233",
},
},
},
ClusterMetadata: clusterMetadata,
Services: map[string]config.Service{
string(primitives.FrontendService): {
RPC: config.RPC{
Expand Down
1 change: 0 additions & 1 deletion pkg/version/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading