Skip to content

Commit

Permalink
Feat: Airflow uses kubernetes_executor as default executor, git sync …
Browse files Browse the repository at this point in the history
…dags and add tutorials (#109)

Signed-off-by: xingcan-ltc <[email protected]>
  • Loading branch information
xingcan-hu authored May 31, 2024
1 parent 83a893e commit 48351d1
Show file tree
Hide file tree
Showing 12 changed files with 176 additions and 102 deletions.
44 changes: 11 additions & 33 deletions catalog/airflow/apps/airflow.app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,56 +12,34 @@ Airflow 通过KDP web部署。

##### 2.2.1 创建DAG文件

以下为一个简单的 DAG 文件,将以下内容复制到文件 demo.py 中。

```python
from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator

with DAG(dag_id="demo", start_date=datetime(2024, 5, 20), schedule="0 0 * * *") as dag:

hello = BashOperator(task_id="hello", bash_command="echo hello")

@task()
def airflow():
print("airflow")

hello >> airflow()
```

##### 2.2.2 将 demo.py 文件通过kubectl 拷贝到airflow scheduler和worker pod 中

```shell
kubectl cp demo.py airflow-scheduler-7cf5ddb9c5-fql6x:/opt/airflow/dags -n kdp-data
kubectl cp demo.py airflow-worker-584b97f6cb-8gxq8:/opt/airflow/dags -n kdp-data
```

注意:使用时请修改 `kdp-data` 为你的 namespace,修改 `airflow-scheduler-7cf5ddb9c5-fql6x` 为你的 scheduler pod 名称,修改 `airflow-worker-584b97f6cb-8gxq8` 为你的 worker pod 名称。

##### 2.2.3 浏览器访问airflow web

可通过配置的ingress地址访问airflow web,或者通过kubectl port-forward 访问,以下为port-forward命令。
可通过配置的ingress(http://airflow-web-kdp-data.kdp-e2e.io/home)地址访问airflow web,或者通过kubectl port-forward 访问,以下为port-forward命令。

```shell
kubectl port-forward svc/airflow-webserver -n kdp-data 8080:8080
```

默认登陆用户/密码为 `admin/admin`

#### 2.2.4 查看DAG以及任务执行
#### 2.2.4 配置DAG文件

DAG文件存放在git仓库中,默认安装配置的dag文件存放在git仓库中,你可以通过修改该文件来修改DAG文件。
你也可以自己fork该仓库,然后修改dags文件,然后提交到git仓库中, 在 KDP 页面安装配置修改 dag repo, branch 等,然后更新即可。

#### 2.2.4 运行DAG

DAG 默认是暂停状态,需要手动启动。手动激活(点击名称旁边的开关即可)名称`hello_airflow`的DAG即可,该DAG是每天运行一次,激活后会自动补跑昨天的任务。
也可以手动触发:点击`hello_airflow`DAG右边`Trigger DAG`按钮即可。

当前配置的scheduler扫描频率为1分钟,在web页面可见demo DAG,点击右侧 `Trigger DAG` 按钮,即可触发DAG执行。

### 3. 常见问题自检

#### 3.1. DAG执行失败

原因与排查:

1. 检查scheduler和worker pod 中 `/opt/airflow/dags` 目录下是否均存在 demo.py 文件;
1. 检查 dag 代码同步是否成功,检查日志: `kubectl logs -l component=scheduler,release=airflow -c git-sync -n kdp-data`
2. 查看scheduler和worker pod 日志输出信息。

### 4. 附录
Expand Down
6 changes: 2 additions & 4 deletions catalog/airflow/apps/airflow.app/app.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ metadata:
spec:
type: airflow
properties:
chartVersion: v1.1.0-1.14.0
images:
airflow:
tag: 2.9.1
scheduler:
replicas: 1

63 changes: 19 additions & 44 deletions catalog/airflow/apps/airflow.app/i18n/en/README.md
Original file line number Diff line number Diff line change
@@ -1,74 +1,49 @@
### 1. Application Description

Airflow™ is a platform created by the community to programmatically author, schedule and monitor workflows.
Airflow™ is a platform for programmatically authoring, scheduling, and monitoring workflows.

### 2. Quick Start

#### 2.1 Deploy
#### 2.1 Deployment

Airflow is deployed by using KDP web.
Airflow is deployed via the KDP web interface.

#### 2.2. Practice
#### 2.2 Practical Usage

##### 2.2.1 Create DAG file
##### 2.2.1 Creating DAG Files

Here is a simple DAG file, copy the following content to the demo.py file.
##### 2.2.3 Accessing Airflow Web via Browser

```python
from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator

with DAG(dag_id="demo", start_date=datetime(2024, 5, 20), schedule="0 0 * * *") as dag:

hello = BashOperator(task_id="hello", bash_command="echo hello")

@task()
def airflow():
print("airflow")

hello >> airflow()
```

##### 2.2.2 Copy DAG file to airflow scheduler and worker pod
You can access the Airflow web interface through the configured ingress (http://airflow-web-kdp-data.kdp-e2e.io/home) or by using kubectl port-forward, as shown in the following command:

```shell
kubectl cp demo.py airflow-scheduler-7cf5ddb9c5-fql6x:/opt/airflow/dags -n kdp-data
kubectl cp demo.py airflow-worker-584b97f6cb-8gxq8:/opt/airflow/dags -n kdp-data
kubectl port-forward svc/airflow-webserver -n kdp-data 8080:8080
```

Note: When using this example, please modify `kdp-data` to your namespace, modify `airflow-scheduler-7cf5ddb9c5-fql6x` to your scheduler pod name, modify `airflow-worker-584b97f6cb-8gxq8` to your worker pod name.

##### 2.2.3 Visit airflow web
The default login username/password is admin/admin.

Visit airflow web by ingress address, or using kubectl port-forward, as follows:

```shell
kubectl port-forward svc/airflow-webserver -n kdp-data 8080:8080
```
### 2.2.4 Configuring DAG Files

Default login user/password is `admin/admin`
DAG files are stored in a Git repository. The default installation configuration places the DAG files in a Git repository, which you can modify to change the DAG files. Alternatively, you can fork the repository, modify the DAG files, and then commit them to the Git repository. You can also install and configure the DAG repository, branch, etc., on the KDP page and then update it.

#### 2.2.4 View DAG and task execution
### 2.2.4 Running DAGs

The current configured scheduler scanning frequency is 1 minute, and the demo DAG can be seen on the web page. Click the 'Trigger DAG' button on the right to trigger DAG execution.
DAGs are set to a paused state by default and need to be manually started. Manually activate the DAG named `hello_airflow` by clicking the switch next to its name. This DAG runs once a day and will automatically catch up on yesterday's tasks after activation. You can also manually trigger it by clicking the `Trigger DAG` button on the right side of the `hello_airflow` DAG.

### 3. FAQ

#### 3.1. DAG execution failed
#### 3.1. DAG Execution Failure

Reasons and results:
Causes and Troubleshooting:

1. Check whether the demo.py file exists in the `/opt/airflow/dags` directory of the scheduler and worker pod;
2. View the output information of scheduler and worker pod logs.
- Check if the DAG code synchronization is successful by checking the logs: `kubectl logs -l component=scheduler,release=airflow -c git-sync -n kdp-data`
- Review the log output information for the scheduler and worker pods.

### 4. Appendix

#### 4.1. Concept
#### 4.1. Concept Introduction

**DAG**

A DAG (Directed Acyclic Graph) is the core concept of Airflow, collecting Tasks together, organized with dependencies and relationships to say how they should run.

Directed Acyclic Graph, which is the basic unit for describing workflows in Airflow.
59 changes: 40 additions & 19 deletions catalog/airflow/x-definitions/app-airflow.cue
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
import (
"strconv"
"strings"
)

import ("encoding/json")
import "strings"

airflow: {
annotations: {}
Expand Down Expand Up @@ -65,6 +60,16 @@ template: {
values: {
defaultAirflowRepository: _imageRegistry + "apache/airflow"
defaultAirflowTag: parameter.images.airflow.tag
executor: "KubernetesExecutor"
dags: {
gitSync: {
enabled: true
repo: parameter.dags.gitSync.repo
branch: parameter.dags.gitSync.branch
rev: parameter.dags.gitSync.rev
subPath: parameter.dags.gitSync.subPath
}
}
config: {
core: {
"default_timezone": "Asia/Shanghai"
Expand Down Expand Up @@ -198,10 +203,6 @@ template: {
enabled: false
}
}
redis: {
enabled: true
terminationGracePeriodSeconds: 30
}
ingress: {
web: {
enabled: true
Expand All @@ -210,18 +211,17 @@ template: {
name: "airflow-web-" + context.namespace + "." + context["ingress.root_domain"]
},
]
ingressClassName: "kong"
}
}
images: {
statsd: {
repository: _imageRegistry + "prometheus/statsd-exporter"
tag: "v0.26.1"
}
redis: {
repository: _imageRegistry + "redis"
tag: "7-bookworm"
gitSync: {
repository: _imageRegistry + "git-sync/git-sync"
tag: "v4.2.3"
}

}
}
}
Expand All @@ -246,8 +246,29 @@ template: {
mysqlSecret: string
}
}
// +ui:title=Webserver

// ui:title=DAG 配置
// +ui:order=2
dags: {
// +ui:description=git 同步配置 (必须配置)
gitSync: {
// +ui:description=git 仓库地址。 备用镜像仓库: https://gitee.com/linktime-cloud/example-datasets.git
// +ui:order=1
repo: *"https://github.com/linktimecloud/example-datasets.git" | string
// +ui:description=git 仓库分支
// +ui:order=2
branch: *"airflow" | string
// +ui:description=git 仓库提交 ID
// +ui:order=3
rev: *"HEAD" | string
// +ui:description= DAG 代码目录 (根目录请使用空字符串)
// +ui:order=4
subPath: *"dags" | string
}
}

// +ui:title=Webserver
// +ui:order=3
webserver: {
// +ui:description=资源规格
// +ui:order=1
Expand Down Expand Up @@ -288,7 +309,7 @@ template: {
replicas: *1 | int
}
// +ui:title=Scheduler
// +ui:order=3
// +ui:order=4
scheduler: {
// +ui:description=资源规格
// +ui:order=1
Expand Down Expand Up @@ -329,7 +350,7 @@ template: {
replicas: *1 | int
}
// +ui:title=Workers
// +ui:order=4
// +ui:order=5
workers: {
// +ui:description=资源规格
// +ui:order=1
Expand Down Expand Up @@ -380,7 +401,7 @@ template: {
images: {
airflow: {
// +ui:options={"disabled":true}
tag: *"2.9.1" | string
tag: *"v1.0.0-2.9.1" | string
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@

# Batch Job Scheduling for Hive SQL with Apache Airflow

# 1. Introduction
Apache Airflow is an open-source platform for orchestrating and automating batch jobs, allowing for the easy creation, scheduling, monitoring, and management of workflows. Airflow supports Hive SQL, enabling the effortless execution of Hive SQL tasks.

Apache Airflow utilizes Directed Acyclic Graphs (DAGs) to represent workflows, which consist of task nodes and dependencies. Task nodes can be Python operations, Shell operations, SQL operations, and more. Airflow supports various schedulers, including the Local scheduler, Celery scheduler, Kubernetes scheduler, and others.

This article introduces how to write Hive SQL tasks using `pyhive` and execute them with the Apache Airflow Kubernetes scheduler.

# 2. Writing a Hive SQL DAG

The specific code implementation can be accessed on [Github](https://github.com/linktimecloud/example-datasets/blob/airflow/dags/hive-sql-example.py) or [Gitee](https://gitee.com/linktime-cloud/example-datasets/blob/airflow/dags/hive-sql-example.py).

This code is a DAG (Directed Acyclic Graph) written using the Apache Airflow framework, designed for automating data processing tasks. It primarily performs two tasks: creating a Hive table and inserting data, followed by identifying the top-scoring students in each subject.

# 3. Running the DAG
## 3.1 Component Dependencies
The following components need to be installed in KDP:
- mysql
- airflow
- zookeeper
- hdfs
- hive (hive metastore, hive server)
- hue, httpfs-gateway (optional)

## 3.2 Scheduling Jobs
After installing Airflow with default parameters in KDP, log in to the Airflow Web interface using the username `admin` and password `admin`.

Start the DAG named `hive-sql-example`.

![Airflow Web Interface](./images/airflow01.png)

Upon successful execution, the results can be viewed through the Hue Web interface. Alternatively, you can refer to the `hive-server2` Quick Start guide to connect to Hive Server2 using beeline and view the results.

![Hue Web Interface](./images/airflow02.png)


运行成功后,可以通过Hue Web界面查看结果。也可以参考 `hive-server2` Qucick Start 使用beeline 连接 Hive Server2 查看结果。

![](./images/airflow02.png)










Binary file added docs/en/user-tutorials/images/airflow01.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/en/user-tutorials/images/airflow02.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
4 changes: 3 additions & 1 deletion docs/en/user-tutorials/tutorials.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ Users can refer to the following scenario tutorials to practice how to do data i
* [How to integrate with Kafka on KDP](./integration-kafka-with-int-ext-comps.md)
* [Data Import from External HDFS to KDP HDFS](./import-from-hdfs-to-hdfs.md)
* [Exploring data using Airbyte/ClickHouse/Superset](./exploring-data-using-airbyte-clickhouse-superset.md)
* More...
* [Batch Job Scheduling for Hive SQL with Apache Airflow](./batch-job-scheduling-for-hive-sql-with-apache-airflow.md)
* More...

Loading

0 comments on commit 48351d1

Please sign in to comment.