Skip to content

Commit

Permalink
feat: add basic support for multi-cluster replication
Browse files Browse the repository at this point in the history
  • Loading branch information
arlyon committed Mar 12, 2024
1 parent 9ab3d1e commit 26f5f1a
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 14 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ Please note this table only reports end-to-end tests suite coverage, others vers
- [x] Cluster monitoring.
- [x] Complete end2end test suite.
- [x] Archival.
- [x] Multi cluster replication.
- [ ] Auto scaling.
- [ ] Multi cluster replication.
## Contributing
Expand Down
17 changes: 17 additions & 0 deletions api/v1beta1/temporalcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,20 @@ func (s *ClusterArchivalSpec) IsEnabled() bool {
return s != nil && s.Enabled
}

// ClusterReplicationSpec defines the specifications for replication in the temporal cluster. Using these
// the operator will comfigure the cluster to replicate data to other clusters. If this object is defined,
// then both properties below must be set.
type ClusterReplicationSpec struct {
// EnableGlobalNamespace signifies to this node that global namespaces should be enabled.
// Namespaces that are marked global are replicated across nodes in the cluster.
// +kubebuilder:validation:Required
EnableGlobalNamespace bool `json:"enableGlobalNamespace"`
// InitialFailoverVersion is used to determine the leadership order between nodes in the cluster. The node with the
// lowest initial failover version will be the primary. This must be unique across the cluster.
// +kubebuilder:validation:Required
InitialFailoverVersion int64 `json:"initialFailoverVersion"`
}

type ArchivalProviderKind string

const (
Expand Down Expand Up @@ -1045,6 +1059,9 @@ type TemporalClusterSpec struct {
// Authorization allows authorization configuration for the temporal cluster.
// +optional
Authorization *AuthorizationSpec `json:"authorization,omitempty"`
// Replication allows configuration of multi-cluster replication.
// +optional
Replication *ClusterReplicationSpec `json:"replication,omitempty"`
}

// ServiceStatus reports a service status.
Expand Down
11 changes: 11 additions & 0 deletions bundle/manifests/temporal.io_temporalclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2700,6 +2700,17 @@ spec:
- defaultStore
- visibilityStore
type: object
replication:
properties:
enableGlobalNamespace:
type: boolean
initialFailoverVersion:
format: int64
type: integer
required:
- enableGlobalNamespace
- initialFailoverVersion
type: object
services:
description: Services allows customizations for each temporal services
deployment.
Expand Down
11 changes: 11 additions & 0 deletions config/crd/bases/temporal.io_temporalclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2062,6 +2062,17 @@ spec:
- defaultStore
- visibilityStore
type: object
replication:
properties:
enableGlobalNamespace:
type: boolean
initialFailoverVersion:
format: int64
type: integer
required:
- enableGlobalNamespace
- initialFailoverVersion
type: object
services:
description: Services allows customizations for each temporal services deployment.
properties:
Expand Down
62 changes: 62 additions & 0 deletions docs/features/multi-cluster-replication.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Multi-cluster replication

Temporal supports multi-cluster replication. This feature allows you to replicate specific temporal namespaces to a different temporal cluster. This is useful for disaster recovery, or to have a temporal cluster in a different region for latency reasons, or if you want to upgrade the temporal history shard count.

## How it works

To set up multi-cluster replication using the temporal operator, you must first enable global namespaces on the clusters you wish to support, and then assign them a unique failover version.
This can be configured via the `spec.replicaton` of the `TemporalCluster` resource. Temporal operator automatically configures the remaining fields, and currently hard-codes the failover
increment to 100. If a cluster fails, the remaining clusters will elect a new primary cluster based with the lowest failover version. The original cluster, if it comes back online, will
be assigned a new failover version, which is always the lowest multiple of the failover increment (+ initialFailoverVersion) that is greater than the leader cluster's failover version.

```yaml
apiVersion: temporal.io/v1beta1
kind: TemporalCluster
metadata:
name: prod
spec:
replication:
enableGlobalNamespace: true
initialFailoverVersion: 1
```
## Starting replication
Once the two clusters are configured, simply set up connections between them using the temporal CLI. In the future, there may be a way to do this via the operator.
```bash
# port forward to the frontend of the primary cluster
kubectl port-forward primary-frontend 7233:7233

temporal operator cluster upsert --frontend-address secondary-cluster.namespace.svc.cluster.local:7233 --enable-connection true

# port forward to the frontend of the secondary cluster
kubectl port-forward secondary-frontend 7233:7233

tepmoral operator cluster upsert --frontend-address primary-cluster.namespace.svc.cluster.local:7233 --enable-connection true
```

## Replicating namespaces

Once the clusters are connected, you can replicate namespaces between them. This can be done via the temporal CLI, or via the operator. Simply create a new namespace resource for just the primary clusterRef, and add the secondary cluster to the list of clusters. The namespace will be added to the secondary cluster automatically, with all the same settings, and start receiving updates.

```yaml
apiVersion: temporal.io/v1beta1
kind: TemporalNamespace
metadata:
name: primary-namespace
spec:
clusterRef:
name: primary
clusters:
- primary
- secondary
activeClusterName: secondary
isGlobalNamespace: true
```
## A mechanism for increasing the history shard count
Since temporal 1.20, replicated clusters do not require the same number of history shards. This means it is a viable method for migrating a cluster that has outgrown its shard count. To do this, simply have your secondary cluster use a higher shard count than the primary cluster. The only requirement is that the shard count on the secondary cluster is an even multiple of the first. So if you have 512 shards on the primary cluster, you can have 1024 shards (or any other multiple) on the secondary cluster, but not 1023.
When replication is complete, simply take down the old cluster, and flip your clients over to the new cluster. This can all be achieved with very little downtime.
40 changes: 27 additions & 13 deletions internal/resource/config/configmap_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,30 @@ func (b *ConfigmapBuilder) buildArchivalConfig() (*config.Archival, *config.Arch
return cfg, namespaceDefaults
}

func (b *ConfigmapBuilder) buildClusterMetadataConfig() *cluster.Config {
failoverVersion := int64(1)
enableGlobalNamespace := false

if b.instance.Spec.Replication != nil {
failoverVersion = b.instance.Spec.Replication.InitialFailoverVersion
enableGlobalNamespace = b.instance.Spec.Replication.EnableGlobalNamespace
}

return &cluster.Config{
EnableGlobalNamespace: enableGlobalNamespace,
FailoverVersionIncrement: 10,
MasterClusterName: b.instance.Name,
CurrentClusterName: b.instance.Name,
ClusterInformation: map[string]cluster.ClusterInformation{
b.instance.Name: {
Enabled: true,
InitialFailoverVersion: failoverVersion,
RPCAddress: "127.0.0.1:7233",
},
},
}
}

func (b *ConfigmapBuilder) Update(object client.Object) error {
configMap := object.(*corev1.ConfigMap)

Expand All @@ -209,6 +233,8 @@ func (b *ConfigmapBuilder) Update(object client.Object) error {

archivalConfig, archivalNamespaceDefaults := b.buildArchivalConfig()

clusterMetadata := b.buildClusterMetadataConfig()

temporalCfg := config.Config{
Global: config.Global{
Membership: config.Membership{
Expand All @@ -223,19 +249,7 @@ func (b *ConfigmapBuilder) Update(object client.Object) error {
NamespaceDefaults: config.NamespaceDefaults{
Archival: *archivalNamespaceDefaults,
},
ClusterMetadata: &cluster.Config{
EnableGlobalNamespace: false,
FailoverVersionIncrement: 10,
MasterClusterName: b.instance.Name,
CurrentClusterName: b.instance.Name,
ClusterInformation: map[string]cluster.ClusterInformation{
b.instance.Name: {
Enabled: true,
InitialFailoverVersion: 1,
RPCAddress: "127.0.0.1:7233",
},
},
},
ClusterMetadata: clusterMetadata,
Services: map[string]config.Service{
string(primitives.FrontendService): {
RPC: config.RPC{
Expand Down

0 comments on commit 26f5f1a

Please sign in to comment.