Skip to content

Commit

Permalink
Add Kafka Ingestion Spec Submission (datainfrahq#189)
Browse files Browse the repository at this point in the history
- Implemented functionality to submit Kafka ingestion specs
- Fixed integration tests
- Updated RBAC rules to include Secret verbs for authenticated API usage
  • Loading branch information
jmcwilliams-te authored and ahmed.g committed Dec 10, 2024
1 parent d6ddc80 commit ff83530
Show file tree
Hide file tree
Showing 6 changed files with 213 additions and 9 deletions.
7 changes: 7 additions & 0 deletions chart/templates/rbac_manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,13 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- secrets
verbs:
- get
- list
{{- end }}
{{- end }}

Expand Down
47 changes: 42 additions & 5 deletions controllers/ingestion/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,16 @@ func (r *DruidIngestionReconciler) CreateOrUpdate(
// check if task id does not exist in status
if di.Status.TaskId == "" && di.Status.CurrentIngestionSpec == "" {
// if does not exist create task

postHttp := internalhttp.NewHTTPClient(
&http.Client{},
&auth,
)

respCreateTask, err := postHttp.Do(http.MethodPost, getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, "", false), []byte(di.Spec.Ingestion.Spec))
respCreateTask, err := postHttp.Do(
http.MethodPost,
getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, "", false),
[]byte(di.Spec.Ingestion.Spec),
)

if err != nil {
return controllerutil.OperationResultNone, err
Expand Down Expand Up @@ -203,7 +206,11 @@ func (r *DruidIngestionReconciler) CreateOrUpdate(
&auth,
)

respUpdateSpec, err := postHttp.Do(http.MethodPost, getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, "", false), []byte(di.Spec.Ingestion.Spec))
respUpdateSpec, err := postHttp.Do(
http.MethodPost,
getPath(di.Spec.Ingestion.Type, svcName, http.MethodPost, "", false),
[]byte(di.Spec.Ingestion.Spec),
)
if err != nil {
return controllerutil.OperationResultNone, err
}
Expand Down Expand Up @@ -290,6 +297,13 @@ func getPath(
}
case v1alpha1.HadoopIndexHadoop:
case v1alpha1.Kafka:
if httpMethod == http.MethodGet {
return makeSupervisorGetTaskPath(svcName, taskId)
} else if httpMethod == http.MethodPost && !shutDownTask {
return makeSupervisorCreateUpdateTaskPath(svcName)
} else if shutDownTask {
return makeSupervisorShutDownTaskPath(svcName, taskId)
}
case v1alpha1.Kinesis:
case v1alpha1.QueryControllerSQL:
default:
Expand All @@ -312,16 +326,39 @@ func makeRouterGetTaskPath(svcName, taskId string) string {
return svcName + "/druid/indexer/v1/task/" + taskId
}

func makeSupervisorCreateUpdateTaskPath(svcName string) string {
return svcName + "/druid/indexer/v1/supervisor"
}

func makeSupervisorShutDownTaskPath(svcName, taskId string) string {
return svcName + "/druid/indexer/v1/supervisor/" + taskId + "/shutdown"
}

func makeSupervisorGetTaskPath(svcName, taskId string) string {
return svcName + "/druid/indexer/v1/supervisor/" + taskId
}

type taskHolder struct {
Task string `json:"task"`
Task string `json:"task"` // tasks
ID string `json:"id"` // supervisor
}

func getTaskIdFromResponse(resp string) (string, error) {
var task taskHolder
if err := json.Unmarshal([]byte(resp), &task); err != nil {
return "", err
}
return task.Task, nil

// check both fields and return the appropriate value
// tasks use different field names than supervisors
if task.Task != "" {
return task.Task, nil
}
if task.ID != "" {
return task.ID, nil
}

return "", errors.New("task id not found")
}

func (r *DruidIngestionReconciler) getRouterSvcUrl(namespace, druidClusterName string) (string, error) {
Expand Down
74 changes: 74 additions & 0 deletions e2e/configs/kafka-ingestion.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
apiVersion: druid.apache.org/v1alpha1
kind: DruidIngestion
metadata:
labels:
app.kubernetes.io/name: druidingestion
app.kubernetes.io/instance: druidingestion-sample
name: kafka-1
spec:
suspend: false
druidCluster: tiny-cluster
ingestion:
type: kafka
spec: |-
{
"type": "kafka",
"spec": {
"dataSchema": {
"dataSource": "metrics-kafka",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [],
"dimensionExclusions": [
"timestamp",
"value"
]
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "value_sum",
"fieldName": "value",
"type": "doubleSum"
},
{
"name": "value_min",
"fieldName": "value",
"type": "doubleMin"
},
{
"name": "value_max",
"fieldName": "value",
"type": "doubleMax"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE"
}
},
"ioConfig": {
"topic": "metrics",
"inputFormat": {
"type": "json"
},
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H"
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 5000000
}
}
}
3 changes: 2 additions & 1 deletion e2e/configs/minio-tenant-override.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
tenant:
pools:
- servers: 1
- name: "minio"
servers: 1
volumesPerServer: 1
certificate:
requestAutoCert: false
Expand Down
17 changes: 14 additions & 3 deletions e2e/e2e.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ set -x
# Get Kind
go install sigs.k8s.io/[email protected]
# minio statefulset name
MINIO_STS_NAME=myminio-ss-0
MINIO_STS_NAME=myminio-minio
# druid namespace
NAMESPACE=druid
# fmt code
Expand All @@ -23,8 +23,6 @@ make docker-build-local-test
make docker-push-local-test
# try to install the CRD with make
make install
# delete the crd
make uninstall
# install druid-operator
make helm-install-druid-operator
# install minio-operator and tenant
Expand Down Expand Up @@ -64,6 +62,19 @@ sleep 30 # wait for the manager to submit the ingestion task
taskId=`kubectl get druidingestion -n druid wikipedia-ingestion --template={{.status.taskId}}`
make deploy-testingestionjob TASK_ID=$taskId

# Running a test Kafka DruidIngestion resource and wait for the task to be submitted
kubectl apply -f e2e/configs/kafka-ingestion.yaml -n ${NAMESPACE}
sleep 30 # wait for the manager to submit the ingestion task

# Verify the supervisor task has been created
supervisorTaskId=`kubectl get druidingestion -n druid kafka-1 --template={{.status.taskId}}`
if [ -z "$supervisorTaskId" ]; then
echo "Failed to get supervisor task ID"
exit 1
else
echo "Supervisor task ID: $supervisorTaskId"
fi

# Delete old druid
kubectl delete -f e2e/configs/druid-cr.yaml -n ${NAMESPACE}
for d in $(kubectl get pods -n ${NAMESPACE} -l app=druid -l druid_cr=tiny-cluster -o name)
Expand Down
74 changes: 74 additions & 0 deletions examples/kafka-ingestion.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
apiVersion: druid.apache.org/v1alpha1
kind: DruidIngestion
metadata:
labels:
app.kubernetes.io/name: druidingestion
app.kubernetes.io/instance: druidingestion-sample
name: kafka-1
spec:
suspend: false
druidCluster: tiny-cluster
ingestion:
type: kafka
spec: |-
{
"type": "kafka",
"spec": {
"dataSchema": {
"dataSource": "metrics-kafka",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [],
"dimensionExclusions": [
"timestamp",
"value"
]
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "value_sum",
"fieldName": "value",
"type": "doubleSum"
},
{
"name": "value_min",
"fieldName": "value",
"type": "doubleMin"
},
{
"name": "value_max",
"fieldName": "value",
"type": "doubleMax"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE"
}
},
"ioConfig": {
"topic": "metrics",
"inputFormat": {
"type": "json"
},
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H"
},
"tuningConfig": {
"type": "kafka",
"maxRowsPerSegment": 5000000
}
}
}

0 comments on commit ff83530

Please sign in to comment.