一、前言
前面已经编写了Flink第一个代码程序,并且也运行了,但是你会发现只是在IDEA上运行的,这种只适合开发,真正工作中我们更多的是要将写好的程序部署到集群上去跑,所以接下来说说如何将作业提交到集群上。
二、前提工作
我们这里以上一篇的StreamWorldCount来进行说明,仔细点的同学会发现有些变量都是在代码中写死的,这在实际的生产中是不允许的,因为一旦参数变了,就要重新改代码,然后打包,非常的麻烦,接下来我们对代码进行优化
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
object StreamWorldCount {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val paramTool = ParameterTool.fromArgs(args)
val hostname = paramTool.get("hostname")
val port = paramTool.getInt("port")
val resultDataStream = env.socketTextStream(hostname, port)
.flatMap(_.split(" "))
.map((_, 1))
.keyBy(0)
.sum(1)
resultDataStream.print()
env.execute("StreamWorldCount")
}
}
三、打包
上面我们已经将代码进行了调整,接下来非常重要的一步就是打包了,因为我们要将程序运行到flink集群上要打成jar包的形式,打jar包的方式也有几种,我这里说一种就是通过配置maven插件的方式进行打包,在pom.xml添加如下配置
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>scala-test-compile</id>
<phase>process-test-resources</phase>
<goals>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
配置完成以后,就可以点击右边MavenProjects中的package进行打包,打好jar包后接下来就正式的开始提交到集群上运行了
四、提交作业
作业的提交有两种方式,一中是通过flink的页面进行配置(底层还是页面转换成命令来执行),还有一种是通过命令来执行程序
第一种通过flink页面来进行提交作业,步骤如图所示:
如果是在yarn集群上,使用-m提交到yarn集群
$ bin/flink run -m yarn-cluster -c com.cn.StreamWorldCount /opt/flink-study-1.0-SNAPSHOT-jar-with-dependencies.jar --hostname localhost --port 9999
五、总结
以上就是今天要讲的内容,本文仅仅简单介绍了如何提交作业到集群上,而flink命令中有大量的参数可以提供给我们使用