您好,欢迎来到步遥情感网。
搜索
您的当前位置:首页Flink基础学习(Scala): 提交作业到集群上

Flink基础学习(Scala): 提交作业到集群上

来源:步遥情感网

一、前言

前面已经编写了Flink第一个代码程序,并且也运行了,但是你会发现只是在IDEA上运行的,这种只适合开发,真正工作中我们更多的是要将写好的程序部署到集群上去跑,所以接下来说说如何将作业提交到集群上。

二、前提工作

我们这里以上一篇的StreamWorldCount来进行说明,仔细点的同学会发现有些变量都是在代码中写死的,这在实际的生产中是不允许的,因为一旦参数变了,就要重新改代码,然后打包,非常的麻烦,接下来我们对代码进行优化

import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._


object StreamWorldCount {
  def main(args: Array[String]): Unit = {
    // 创建一个流处理的执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    
    // 【新增】利用参数工具,参数从外部传变量中获取
    val paramTool = ParameterTool.fromArgs(args)
    val hostname = paramTool.get("hostname")
    val port = paramTool.getInt("port")

    // 获取数据并进行转换
    val resultDataStream = env.socketTextStream(hostname, port)
      .flatMap(_.split(" "))
      .map((_, 1))
      .keyBy(0)
      .sum(1)

    // 打印结果
    resultDataStream.print()

    // 启动执行
    env.execute("StreamWorldCount")
  }
}

三、打包

上面我们已经将代码进行了调整,接下来非常重要的一步就是打包了,因为我们要将程序运行到flink集群上要打成jar包的形式,打jar包的方式也有几种,我这里说一种就是通过配置maven插件的方式进行打包,在pom.xml添加如下配置

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>2.2</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>net.alchim31.maven</groupId>
            <artifactId>scala-maven-plugin</artifactId>
            <version>3.2.2</version>
            <executions>
                <execution>
                    <id>scala-compile-first</id>
                    <phase>process-resources</phase>
                    <goals>
                        <goal>add-source</goal>
                        <goal>compile</goal>
                    </goals>
                </execution>
                <execution>
                    <id>scala-test-compile</id>
                    <phase>process-test-resources</phase>
                    <goals>
                        <goal>testCompile</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <configuration>
                <source>1.8</source>
                <target>1.8</target>
            </configuration>
        </plugin>
    </plugins>
</build>

配置完成以后,就可以点击右边MavenProjects中的package进行打包,打好jar包后接下来就正式的开始提交到集群上运行了

四、提交作业

作业的提交有两种方式,一中是通过flink的页面进行配置(底层还是页面转换成命令来执行),还有一种是通过命令来执行程序
第一种通过flink页面来进行提交作业,步骤如图所示:

如果是在yarn集群上,使用-m提交到yarn集群

$ bin/flink run -m yarn-cluster -c com.cn.StreamWorldCount /opt/flink-study-1.0-SNAPSHOT-jar-with-dependencies.jar --hostname localhost --port 9999

五、总结

以上就是今天要讲的内容,本文仅仅简单介绍了如何提交作业到集群上,而flink命令中有大量的参数可以提供给我们使用

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- obuygou.com 版权所有 赣ICP备2024042798号-5

违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务