Skip to content

Commit

Permalink
feat: add basic support for multi-cluster replication
Browse files Browse the repository at this point in the history
  • Loading branch information
arlyon committed Sep 4, 2023
1 parent 9a00278 commit e2d9509
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 14 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ Please note this table only reports end-to-end tests suite coverage, others vers
- [x] Cluster monitoring.
- [x] Complete end2end test suite.
- [x] Archival.
- [x] Multi cluster replication.
- [ ] Auto scaling.
- [ ] Multi cluster replication.
## Contributing
Expand Down
7 changes: 7 additions & 0 deletions api/v1beta1/temporalcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,11 @@ func (s *ClusterArchivalSpec) IsEnabled() bool {
return s != nil && s.Enabled
}

type ClusterReplicationSpec struct {
EnableGlobalNamespace bool `json:"enableGlobalNamespace"`
InitialFailoverVersion int64 `json:"initialFailoverVersion"`
}

type ArchivalProviderKind string

const (
Expand Down Expand Up @@ -956,6 +961,8 @@ type TemporalClusterSpec struct {
// Archival allows Workflow Execution Event Histories and Visibility data backups for the temporal cluster.
// +optional
Archival *ClusterArchivalSpec `json:"archival,omitempty"`

Replication *ClusterReplicationSpec `json:"replication,omitempty"`
}

// ServiceStatus reports a service status.
Expand Down
11 changes: 11 additions & 0 deletions bundle/manifests/temporal.io_temporalclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2584,6 +2584,17 @@ spec:
- defaultStore
- visibilityStore
type: object
replication:
properties:
enableGlobalNamespace:
type: boolean
initialFailoverVersion:
format: int64
type: integer
required:
- enableGlobalNamespace
- initialFailoverVersion
type: object
services:
description: Services allows customizations for each temporal services
deployment.
Expand Down
11 changes: 11 additions & 0 deletions config/crd/bases/temporal.io_temporalclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2004,6 +2004,17 @@ spec:
- defaultStore
- visibilityStore
type: object
replication:
properties:
enableGlobalNamespace:
type: boolean
initialFailoverVersion:
format: int64
type: integer
required:
- enableGlobalNamespace
- initialFailoverVersion
type: object
services:
description: Services allows customizations for each temporal services deployment.
properties:
Expand Down
62 changes: 62 additions & 0 deletions docs/features/multi-cluster-replication.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Multi-cluster replication

Temporal supports multi-cluster replication. This feature allows you to replicate specific temporal namespaces to a different temporal cluster. This is useful for disaster recovery, or to have a temporal cluster in a different region for latency reasons, or if you want to upgrade the temporal history shard count.

## How it works

To set up multi-cluster replication using the temporal operator, you must first enable global namespaces on the clusters you wish to support, and then assign them a unique failover version.
This can be configured via the `spec.replicaton` of the `TemporalCluster` resource. Temporal operator automatically configures the remaining fields, and currently hard-codes the failover
increment to 100. If a cluster fails, the remaining clusters will elect a new primary cluster based with the lowest failover version. The original cluster, if it comes back online, will
be assigned a new failover version, which is always the lowest multiple of the failover increment (+ initialFailoverVersion) that is greater than the leader cluster's failover version.

```yaml
apiVersion: temporal.io/v1beta1
kind: TemporalCluster
metadata:
name: prod
spec:
replication:
enableGlobalNamespace: true
initialFailoverVersion: 1
```
## Starting replication
Once the two clusters are configured, simply set up connections between them using the temporal CLI. In the future, there may be a way to do this via the operator.
```bash
# port forward to the frontend of the primary cluster
kubectl port-forward primary-frontend 7233:7233

temporal operator cluster upsert --frontend-address secondary-cluster.namespace.svc.cluster.local:7233 --enable-connection true

# port forward to the frontend of the secondary cluster
kubectl port-forward secondary-frontend 7233:7233

tepmoral operator cluster upsert --frontend-address primary-cluster.namespace.svc.cluster.local:7233 --enable-connection true
```

## Replicating namespaces

Once the clusters are connected, you can replicate namespaces between them. This can be done via the temporal CLI, or via the operator. Simply create a new namespace resource for just the primary clusterRef, and add the secondary cluster to the list of clusters. The namespace will be added to the secondary cluster automatically, with all the same settings, and start receiving updates.

```yaml
apiVersion: temporal.io/v1beta1
kind: TemporalNamespace
metadata:
name: primary-namespace
spec:
clusterRef:
name: primary
clusters:
- primary
- secondary
activeClusterName: secondary
isGlobalNamespace: true
```
## A mechanism for increasing the history shard count
Since temporal 1.20, replicated clusters do not require the same number of history shards. This means it is a viable method for migrating a cluster that has outgrown its shard count. To do this, simply have your secondary cluster use a higher shard count than the primary cluster. The only requirement is that the shard count on the secondary cluster is an even multiple of the first. So if you have 512 shards on the primary cluster, you can have 1024 shards (or any other multiple) on the secondary cluster, but not 1023.
When replication is complete, simply take down the old cluster, and flip your clients over to the new cluster. This can all be achieved with very little downtime.
46 changes: 33 additions & 13 deletions internal/resource/config/configmap_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,33 @@ func (b *ConfigmapBuilder) buildArchivalConfig() (*config.Archival, *config.Arch
return cfg, namespaceDefaults, nil
}

func (b *ConfigmapBuilder) buildClusterMetadataConfig() (*cluster.Config, error) {
var failoverVersion int64 = 0
if b.instance.Spec.Replication != nil {
failoverVersion = b.instance.Spec.Replication.InitialFailoverVersion
}
enableGlobalNamespace := false
if b.instance.Spec.Replication != nil {
enableGlobalNamespace = b.instance.Spec.Replication.EnableGlobalNamespace
}

cfg := &cluster.Config{
EnableGlobalNamespace: enableGlobalNamespace,
FailoverVersionIncrement: 10,
MasterClusterName: b.instance.Name,
CurrentClusterName: b.instance.Name,
ClusterInformation: map[string]cluster.ClusterInformation{
b.instance.Name: {
Enabled: true,
InitialFailoverVersion: failoverVersion,
RPCAddress: "127.0.0.1:7233",
},
},
}

return cfg, nil
}

func (b *ConfigmapBuilder) Update(object client.Object) error {
configMap := object.(*corev1.ConfigMap)

Expand All @@ -210,6 +237,11 @@ func (b *ConfigmapBuilder) Update(object client.Object) error {
return fmt.Errorf("can't build archival config: %w", err)
}

clusterMetadata, err := b.buildClusterMetadataConfig()
if err != nil {
return fmt.Errorf("can't build cluster metadata config: %w", err)
}

temporalCfg := config.Config{
Global: config.Global{
Membership: config.Membership{
Expand All @@ -223,19 +255,7 @@ func (b *ConfigmapBuilder) Update(object client.Object) error {
NamespaceDefaults: config.NamespaceDefaults{
Archival: *archivalNamespaceDefaults,
},
ClusterMetadata: &cluster.Config{
EnableGlobalNamespace: false,
FailoverVersionIncrement: 10,
MasterClusterName: b.instance.Name,
CurrentClusterName: b.instance.Name,
ClusterInformation: map[string]cluster.ClusterInformation{
b.instance.Name: {
Enabled: true,
InitialFailoverVersion: 1,
RPCAddress: "127.0.0.1:7233",
},
},
},
ClusterMetadata: clusterMetadata,
Services: map[string]config.Service{
string(primitives.FrontendService): {
RPC: config.RPC{
Expand Down

0 comments on commit e2d9509

Please sign in to comment.