Skip to content

Latest commit

 

History

History
309 lines (245 loc) · 10.2 KB

developing-plugin.md

File metadata and controls

309 lines (245 loc) · 10.2 KB

插件开发

插件体系介绍

seatunnel插件分为三部分,InputFilterOutput

Input

Input负责将外部数据源的数据转化为DStream[(String, String)]

Filter

Filtertransform操作,负责对Dataset[Row]的数据结构进行操作

Output

Outputaction操作,负责将Dataset[Row]输出到外部数据源或者打印到终端

准备工作

seatunnel支持Java/Scala作为插件开发语言,其中Input插件推荐使用Scala作为开发语言,其余类型插件Java和Scala皆可。

新建一个Java/Scala项目,或者可以直接拉取seatunnel-filter-example,然后在此项目上进行修改

一、 新建pom.xml

参考文件pom.xml

将seatunnel提供的接口加入项目的依赖中

<dependency>
    <groupId>io.github.interestinglab.seatunnel</groupId>
    <artifactId>seatunnel-apis_2.11</artifactId>
    <version>1.1.0</version>
</dependency>

二、 实现自己的方法

Input(实时流)

  • 新建一个类,并继承seatunnel-apis提供的父类BaseInput
    class ScalaHdfs extends BaseStreamingInput {
    
      var config: Config = ConfigFactory.empty()
    
      /**
        * Set Config.
        **/
      override def setConfig(config: Config): Unit = {
        this.config = config
      }
    
      /**
        * Get Config.
        **/
      override def getConfig(): Config = {
        this.config
      }
  • 重写父类定义的checkConfigpreparegetDstream方法
    override def checkConfig(): (Boolean, String) = {}
    override def prepare(spark: SparkSession): Unit = {}
    override def getDStream(ssc: StreamingContext): DStream[(String, String)] = {}
    
  • Input插件在调用时会先执行checkConfig方法核对调用插件时传入的参数是否正确,然后调用prepare方法配置参数的缺省值以及初始化类的成员变量,最后调用getStream方法将外部数据源转换为DStream[(String, String)]
  • Scala版本Input插件实现参照ScalaHdfs

Filter

  • 新建一个类,并继承seatunnel-apis提供的父类BaseFilter
    class ScalaSubstring extends BaseFilter {
    
      var config: Config = ConfigFactory.empty()
    
      /**
        * Set Config.
        **/
      override def setConfig(config: Config): Unit = {
        this.config = config
      }
    
      /**
        * Get Config.
        **/
      override def getConfig(): Config = {
        this.config
      }
    }
    public class JavaSubstring extends BaseFilter {
    
        private Config config;
    
        @Override
        public Config getConfig() {
            return config;
        }
    
        @Override
        public void setConfig(Config config) {
            this.config = config;
        }
    }
  • 重写父类定义的checkConfigprepareprocess方法
    override def checkConfig(): (Boolean, String) = {}
    override def prepare(spark: SparkSession): Unit = {}
    override def process(spark: SparkSession, ds: Dataset[Row]): Dataset[Row] = {}
    @Override
    public Tuple2<Object, String> checkConfig() {}
    @Override
    public void prepare(SparkSession spark, StreamingContext ssc) {}
    @Override
    public Dataset<Row> process(SparkSession spark, Dataset<Row> df) {}
    • Filter插件在调用时会先执行checkConfig方法核对调用插件时传入的参数是否正确,然后调用prepare方法配置参数的缺省值以及初始化类的成员变量,最后调用process方法对 Dataset[Row] 格式数据进行处理。
    • Java版本Filter插件的实现参照JavaSubstring,Scala版本Filter插件的实现参照ScalaSubstring

Output

  • 新建一个类,并继承seatunnel-apis提供的父类BaseOutput
    class ScalaStdout extends BaseOutput {
    
    
      var config: Config = ConfigFactory.empty()
    
      /**
        * Set Config.
        **/
      override def setConfig(config: Config): Unit = {
        this.config = config
      }
    
      /**
        * Get Config.
        **/
      override def getConfig(): Config = {
        this.config
      }
    }
    public class JavaStdout extends BaseOutput {
    
        private Config config;
    
        @Override
        public Config getConfig() {
            return config;
        }
    
        @Override
        public void setConfig(Config config) {
            this.config = config;
        }
    }
  • 重写父类定义的checkConfigprepareprocess方法
    override def checkConfig(): (Boolean, String) = {}
    override def prepare(spark: SparkSession): Unit = {}
    override def process(spark: SparkSession, ds: Dataset[Row]): Dataset[Row] = {}
    @Override
    public Tuple2<Object, String> checkConfig() {}
    @Override
    public void prepare(SparkSession spark) {}
    @Override
    public Dataset<Row> process(SparkSession spark, Dataset<Row> ds) {}
    • Output插件调用结构与Filter插件相似。在调用时会先执行checkConfig方法核对调用插件时传入的参数是否正确,然后调用prepare方法配置参数的缺省值以及初始化类的成员变量,最后调用process方法将 Dataset[Row] 格式数据输出到外部数据源。
    • Java版本Output插件的实现参照JavaStdout,Scala版本Output插件的实现参照ScalaStdout

UDF

  • 新建一个类,并继承seatunnel-apis提供的父类BaseFilter

    class ScalaSubstring extends BaseFilter {
    
      var config: Config = ConfigFactory.empty()
    
      /**
        * Set Config.
        **/
      override def setConfig(config: Config): Unit = {
        this.config = config
      }
    
      /**
        * Get Config.
        **/
      override def getConfig(): Config = {
        this.config
      }
    }
  • 重写父类定义的checkConfigpreparegetUdfListprocess方法,这里只介绍getUdfList以及process两个方法

    override def getUdfList(): List[(String, UserDefinedFunction)] = {
      val func = udf((s: String, pos: Int, len: Int) => s.substring(pos, pos+len))
      List(("my_sub", func))
    }
    override def process(spark: SparkSession, ds: Dataset[Row]): Dataset[Row] = {
      val srcField = config.getString("source_field")
      val targetField = config.getString("target_field")
      val pos = config.getInt("pos")
      val len = config.getInt("len")
      val func = getUdfList().get(0)._2
      df.withColumn(targetField, func(col(srcField), lit(pos), lit(len)))
    }

    具体UDF插件开发完整案例参照ScalaSubstring

  • 新建META-INF/services

    seatunnel会利用Service loader机制将实现io.github.interestinglab.seatunnel.apis.BaseFilter的方法根据getUdfList返回的方法注册为UDF,如果接口实现类不在services中注明,将不会注册为UDF。

    案例中的META-INF

三、 打包使用

  1. 打包

    mvn package

  2. 将打包好的Jar包放到seatunnel plugins目录下

    cd seatunnel-1.1.0
    mkdir -p plugins/my_plugins/lib
    cd plugins/my_plugins/lib

    seatunnel需要将第三方Jar包放到,必须新建lib文件夹

    plugins/your_plugin_name/lib/your_jar_name

    其他文件放到

    plugins/your_plugin_name/files/your_file_name

  3. 在配置文件中使用插件

    以下是一个使用第三方插件的完整示例,并将其放至config/application.conf

    Fake插件生成测试数据,进行Split进行分割后,使用第三方插件ScalaSubstring进行字符串截取,最后使用第三方插件JavaStdout打印到终端。

    spark {
        spark.streaming.batchDuration = 5
        spark.app.name = "seatunnel-sample"
        spark.ui.port = 13000
        spark.executor.instances = 2
        spark.executor.cores = 1
        spark.executor.memory = "1g"
    }
    
    input {
        fakeStream {
            content = ["INFO : gary is 28 years old", "WARN : suwey is 16 years old"]
            rate = 5
        }
    }
    
    filter {
        split {
            fields = ["log_level", "message"]
            delimiter = ":"
        }
        sql = {
            table_name = "tmp"
            # 使用UDF
            sql = "select log_level, my_sub(message, 1, 3) from tmp"
        }
    }
    
    output {
        org.interestinglab.seatunnel.output.JavaStdout {
            limit = 2
        }
    }
    
  4. 启动seatunnel

    ./bin/start-seatunnel.sh --config config/application.conf --deploy-mode client --master local[2]
    
  5. 查看结果

    +---------+------------------+
    |log_level|UDF(message, 1, 3)|
    +---------+------------------+
    |INFO     |ary               |
    |INFO     |ary               |
    +---------+------------------+
    only showing top 2 rows