From 0a76afffeff6f786579c18247c0f086ccf0cc8cc Mon Sep 17 00:00:00 2001 From: Joseph McWilliams Date: Wed, 13 Nov 2024 14:59:01 -0800 Subject: [PATCH] Add Kafka Ingestion Spec Submission - Implemented functionality to submit Kafka ingestion specs - Fixed integration tests - Updated RBAC rules to include Secret verbs for authenticated API usage --- chart/templates/rbac_manager.yaml | 7 +++ controllers/ingestion/reconciler.go | 47 ++++++++++++++-- e2e/configs/kafka-ingestion.yaml | 74 ++++++++++++++++++++++++++ e2e/configs/minio-tenant-override.yaml | 3 +- e2e/e2e.sh | 17 ++++-- examples/kafka-ingestion.yaml | 74 ++++++++++++++++++++++++++ 6 files changed, 213 insertions(+), 9 deletions(-) create mode 100644 e2e/configs/kafka-ingestion.yaml create mode 100644 examples/kafka-ingestion.yaml diff --git a/chart/templates/rbac_manager.yaml b/chart/templates/rbac_manager.yaml index 3ead46f5..ae52e5ac 100644 --- a/chart/templates/rbac_manager.yaml +++ b/chart/templates/rbac_manager.yaml @@ -178,6 +178,13 @@ rules: - get - list - watch +- apiGroups: + - "" + resources: + - secrets + verbs: + - get + - list {{- end }} {{- end }} diff --git a/controllers/ingestion/reconciler.go b/controllers/ingestion/reconciler.go index 02276fa3..1fa46543 100644 --- a/controllers/ingestion/reconciler.go +++ b/controllers/ingestion/reconciler.go @@ -125,13 +125,16 @@ func (r *DruidIngestionReconciler) CreateOrUpdate( // check if task id does not exist in status if di.Status.TaskId == "" && di.Status.CurrentIngestionSpec == "" { // if does not exist create task - postHttp := internalhttp.NewHTTPClient( &http.Client{}, &auth, ) - respCreateTask, err := postHttp.Do(http.MethodPost, getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, "", false), []byte(di.Spec.Ingestion.Spec)) + respCreateTask, err := postHttp.Do( + http.MethodPost, + getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, "", false), + []byte(di.Spec.Ingestion.Spec), + ) if err != nil { return controllerutil.OperationResultNone, err @@ -203,7 +206,11 @@ func (r *DruidIngestionReconciler) CreateOrUpdate( &auth, ) - respUpdateSpec, err := postHttp.Do(http.MethodPost, getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, "", false), []byte(di.Spec.Ingestion.Spec)) + respUpdateSpec, err := postHttp.Do( + http.MethodPost, + getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, "", false), + []byte(di.Spec.Ingestion.Spec), + ) if err != nil { return controllerutil.OperationResultNone, err } @@ -290,6 +297,13 @@ func getPath( } case v1alpha1.HadoopIndexHadoop: case v1alpha1.Kafka: + if httpMethod == http.MethodGet { + return makeSupervisorGetTaskPath(svcName, taskId) + } else if httpMethod == http.MethodPost && !shutDownTask { + return makeSupervisorCreateUpdateTaskPath(svcName) + } else if shutDownTask { + return makeSupervisorShutDownTaskPath(svcName, taskId) + } case v1alpha1.Kinesis: case v1alpha1.QueryControllerSQL: default: @@ -312,8 +326,21 @@ func makeRouterGetTaskPath(svcName, taskId string) string { return svcName + "/druid/indexer/v1/task/" + taskId } +func makeSupervisorCreateUpdateTaskPath(svcName string) string { + return svcName + "/druid/indexer/v1/supervisor" +} + +func makeSupervisorShutDownTaskPath(svcName, taskId string) string { + return svcName + "/druid/indexer/v1/supervisor/" + taskId + "/shutdown" +} + +func makeSupervisorGetTaskPath(svcName, taskId string) string { + return svcName + "/druid/indexer/v1/supervisor/" + taskId +} + type taskHolder struct { - Task string `json:"task"` + Task string `json:"task"` // tasks + ID string `json:"id"` // supervisor } func getTaskIdFromResponse(resp string) (string, error) { @@ -321,7 +348,17 @@ func getTaskIdFromResponse(resp string) (string, error) { if err := json.Unmarshal([]byte(resp), &task); err != nil { return "", err } - return task.Task, nil + + // check both fields and return the appropriate value + // tasks use different field names than supervisors + if task.Task != "" { + return task.Task, nil + } + if task.ID != "" { + return task.ID, nil + } + + return "", errors.New("task id not found") } func (r *DruidIngestionReconciler) getRouterSvcUrl(namespace, druidClusterName string) (string, error) { diff --git a/e2e/configs/kafka-ingestion.yaml b/e2e/configs/kafka-ingestion.yaml new file mode 100644 index 00000000..468ae90c --- /dev/null +++ b/e2e/configs/kafka-ingestion.yaml @@ -0,0 +1,74 @@ +apiVersion: druid.apache.org/v1alpha1 +kind: DruidIngestion +metadata: + labels: + app.kubernetes.io/name: druidingestion + app.kubernetes.io/instance: druidingestion-sample + name: kafka-1 +spec: + suspend: false + druidCluster: tiny-cluster + ingestion: + type: kafka + spec: |- + { + "type": "kafka", + "spec": { + "dataSchema": { + "dataSource": "metrics-kafka", + "timestampSpec": { + "column": "timestamp", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": [], + "dimensionExclusions": [ + "timestamp", + "value" + ] + }, + "metricsSpec": [ + { + "name": "count", + "type": "count" + }, + { + "name": "value_sum", + "fieldName": "value", + "type": "doubleSum" + }, + { + "name": "value_min", + "fieldName": "value", + "type": "doubleMin" + }, + { + "name": "value_max", + "fieldName": "value", + "type": "doubleMax" + } + ], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "HOUR", + "queryGranularity": "NONE" + } + }, + "ioConfig": { + "topic": "metrics", + "inputFormat": { + "type": "json" + }, + "consumerProperties": { + "bootstrap.servers": "localhost:9092" + }, + "taskCount": 1, + "replicas": 1, + "taskDuration": "PT1H" + }, + "tuningConfig": { + "type": "kafka", + "maxRowsPerSegment": 5000000 + } + } + } diff --git a/e2e/configs/minio-tenant-override.yaml b/e2e/configs/minio-tenant-override.yaml index a51b419c..611ecd05 100644 --- a/e2e/configs/minio-tenant-override.yaml +++ b/e2e/configs/minio-tenant-override.yaml @@ -1,6 +1,7 @@ tenant: pools: - - servers: 1 + - name: "minio" + servers: 1 volumesPerServer: 1 certificate: requestAutoCert: false diff --git a/e2e/e2e.sh b/e2e/e2e.sh index 74d12b90..de90104b 100755 --- a/e2e/e2e.sh +++ b/e2e/e2e.sh @@ -4,7 +4,7 @@ set -x # Get Kind go install sigs.k8s.io/kind@v0.21.0 # minio statefulset name -MINIO_STS_NAME=myminio-ss-0 +MINIO_STS_NAME=myminio-minio # druid namespace NAMESPACE=druid # fmt code @@ -23,8 +23,6 @@ make docker-build-local-test make docker-push-local-test # try to install the CRD with make make install -# delete the crd -make uninstall # install druid-operator make helm-install-druid-operator # install minio-operator and tenant @@ -64,6 +62,19 @@ sleep 30 # wait for the manager to submit the ingestion task taskId=`kubectl get druidingestion -n druid wikipedia-ingestion --template={{.status.taskId}}` make deploy-testingestionjob TASK_ID=$taskId +# Running a test Kafka DruidIngestion resource and wait for the task to be submitted +kubectl apply -f e2e/configs/kafka-ingestion.yaml -n ${NAMESPACE} +sleep 30 # wait for the manager to submit the ingestion task + +# Verify the supervisor task has been created +supervisorTaskId=`kubectl get druidingestion -n druid kafka-1 --template={{.status.taskId}}` +if [ -z "$supervisorTaskId" ]; then + echo "Failed to get supervisor task ID" + exit 1 +else + echo "Supervisor task ID: $supervisorTaskId" +fi + # Delete old druid kubectl delete -f e2e/configs/druid-cr.yaml -n ${NAMESPACE} for d in $(kubectl get pods -n ${NAMESPACE} -l app=druid -l druid_cr=tiny-cluster -o name) diff --git a/examples/kafka-ingestion.yaml b/examples/kafka-ingestion.yaml new file mode 100644 index 00000000..468ae90c --- /dev/null +++ b/examples/kafka-ingestion.yaml @@ -0,0 +1,74 @@ +apiVersion: druid.apache.org/v1alpha1 +kind: DruidIngestion +metadata: + labels: + app.kubernetes.io/name: druidingestion + app.kubernetes.io/instance: druidingestion-sample + name: kafka-1 +spec: + suspend: false + druidCluster: tiny-cluster + ingestion: + type: kafka + spec: |- + { + "type": "kafka", + "spec": { + "dataSchema": { + "dataSource": "metrics-kafka", + "timestampSpec": { + "column": "timestamp", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": [], + "dimensionExclusions": [ + "timestamp", + "value" + ] + }, + "metricsSpec": [ + { + "name": "count", + "type": "count" + }, + { + "name": "value_sum", + "fieldName": "value", + "type": "doubleSum" + }, + { + "name": "value_min", + "fieldName": "value", + "type": "doubleMin" + }, + { + "name": "value_max", + "fieldName": "value", + "type": "doubleMax" + } + ], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "HOUR", + "queryGranularity": "NONE" + } + }, + "ioConfig": { + "topic": "metrics", + "inputFormat": { + "type": "json" + }, + "consumerProperties": { + "bootstrap.servers": "localhost:9092" + }, + "taskCount": 1, + "replicas": 1, + "taskDuration": "PT1H" + }, + "tuningConfig": { + "type": "kafka", + "maxRowsPerSegment": 5000000 + } + } + }