Skip to content

Commit 88e191b

Browse files
Feature/599 in process launcher (#649)
* #599: InProcessLauncher * Removed comments * Test fix * Test fix * Exclude hadoop * NoBackendInProcessLauncher * NoBackendInProcessLauncher Attempt 2 * Attempt 3 * Directly include hortonworks hadoop * Remove builder.getEffectiveConfig to avoid checking for SPARK_HOME env variable * Remove launcher confs * Add comments * Debug github build * Revert * Delete unnecessary file * Fix tests * Fix test * Update readme Co-authored-by: jozefbakus <[email protected]>
1 parent e06ed35 commit 88e191b

File tree

11 files changed

+125
-37
lines changed

11 files changed

+125
-37
lines changed

README.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,8 +160,6 @@ spark.submitApi=yarn
160160
161161
#Submit api = YARN
162162
sparkYarnSink.submitTimeout=160000
163-
sparkYarnSink.hadoopConfDir=/opt/hadoop
164-
sparkYarnSink.sparkHome=/opt/spark
165163
sparkYarnSink.master=yarn
166164
sparkYarnSink.filesToDeploy=
167165
sparkYarnSink.additionalConfs.spark.ui.port=
@@ -200,6 +198,13 @@ db.skip.liquibase=false
200198
spring.liquibase.change-log=classpath:/db_scripts/liquibase/db.changelog.yml
201199
```
202200

201+
## Tomcat configuration
202+
The Hadoop configuration directory needs to be added as the environment variable `HADOOP_CONF_DIR` and it has to be added to the web application's classpath.
203+
204+
- The environment variable can be added in `<tomcat-root>/bin/setenv.sh`, e.g. `HADOOP_CONF_DIR=/opt/hadoop`.
205+
- To add the Hadoop configuration directory to the application classpath,
206+
in the file `<tomcat-base>/conf/catalina.properties`, append to the key `shared.loader` the hadoop conf dir, e.g. `shared.loader="/opt/hadoop"`.
207+
203208
## Embedded Tomcat
204209

205210
For development purposes, hyperdrive-trigger can be executed as an application with an embedded tomcat. Please check out branch **feature/embedded-tomcat-2** to use it.

pom.xml

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,20 @@
6161
</developer>
6262
</developers>
6363

64+
<repositories>
65+
<repository>
66+
<id>hortonworks</id>
67+
<url>https://repo.hortonworks.com/content/repositories/releases/</url>
68+
</repository>
69+
</repositories>
70+
6471
<properties>
6572
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
6673
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
6774

6875
<scala.compat.version>2.12</scala.compat.version>
6976
<spark.version>2.4.4</spark.version>
77+
<hadoop.version>2.7.3.2.6.1.0-129</hadoop.version>
7078
<slick.version>3.3.3</slick.version>
7179
<tminglei.version>0.19.6</tminglei.version>
7280
<slick-hikaricp.version>3.3.1</slick-hikaricp.version>
@@ -182,11 +190,64 @@
182190
<artifactId>kafka_${scala.compat.version}</artifactId>
183191
<version>${kafka.version}</version>
184192
</dependency>
193+
194+
<!-- Dependencies for Spark InProcessLauncher -->
185195
<dependency>
186196
<groupId>org.apache.spark</groupId>
187-
<artifactId>spark-launcher_${scala.compat.version}</artifactId>
197+
<artifactId>spark-yarn_${scala.compat.version}</artifactId>
188198
<version>${spark.version}</version>
199+
<exclusions>
200+
<exclusion>
201+
<groupId>javax.validation</groupId>
202+
<artifactId>validation-api</artifactId>
203+
</exclusion>
204+
<exclusion>
205+
<groupId>org.slf4j</groupId>
206+
<artifactId>slf4j-log4j12</artifactId>
207+
</exclusion>
208+
<exclusion>
209+
<groupId>org.apache.hadoop</groupId>
210+
<artifactId>*</artifactId>
211+
</exclusion>
212+
</exclusions>
213+
</dependency>
214+
<dependency>
215+
<groupId>org.apache.hadoop</groupId>
216+
<artifactId>hadoop-yarn-api</artifactId>
217+
<version>${hadoop.version}</version>
218+
</dependency>
219+
<dependency>
220+
<groupId>org.apache.hadoop</groupId>
221+
<artifactId>hadoop-yarn-common</artifactId>
222+
<version>${hadoop.version}</version>
189223
</dependency>
224+
<dependency>
225+
<groupId>org.apache.hadoop</groupId>
226+
<artifactId>hadoop-yarn-server-common</artifactId>
227+
<version>${hadoop.version}</version>
228+
</dependency>
229+
<dependency>
230+
<groupId>org.apache.hadoop</groupId>
231+
<artifactId>hadoop-yarn-client</artifactId>
232+
<version>${hadoop.version}</version>
233+
</dependency>
234+
<dependency>
235+
<groupId>org.apache.hadoop</groupId>
236+
<artifactId>hadoop-client</artifactId>
237+
<version>${hadoop.version}</version>
238+
<exclusions>
239+
<exclusion>
240+
<groupId>com.google.code.gson</groupId>
241+
<artifactId>gson</artifactId>
242+
</exclusion>
243+
<exclusion>
244+
<groupId>org.slf4j</groupId>
245+
<artifactId>slf4j-log4j12</artifactId>
246+
</exclusion>
247+
</exclusions>
248+
</dependency>
249+
<!-- End Dependencies for Spark InProcessLauncher -->
250+
190251
<dependency>
191252
<groupId>com.typesafe.play</groupId>
192253
<artifactId>play-json_${scala.compat.version}</artifactId>

src/main/resources/application.properties

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,6 @@ kafkaSource.properties.sasl.jaas.config=
9696
spark.submitApi=yarn
9797
sparkYarnSink.hadoopResourceManagerUrlBase=http://localhost:8088
9898
sparkYarnSink.userUsedToKillJob=
99-
sparkYarnSink.hadoopConfDir=/opt/hadoop
100-
sparkYarnSink.sparkHome=/opt/spark
10199
sparkYarnSink.master=yarn
102100
sparkYarnSink.submitTimeout=160000
103101
sparkYarnSink.filesToDeploy=
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
2+
/*
3+
* Copyright 2018 ABSA Group Limited
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.apache.spark.launcher
18+
19+
import org.slf4j.LoggerFactory
20+
21+
class NoBackendConnectionInProcessLauncher extends InProcessLauncher {
22+
23+
private val logger = LoggerFactory.getLogger(this.getClass)
24+
override def startApplication(listeners: SparkAppHandle.Listener*): SparkAppHandle = {
25+
import scala.collection.JavaConverters._
26+
if (builder.isClientMode(Map[String, String]().asJava)) {
27+
logger.warn("It's not recommended to run client-mode applications using InProcessLauncher.")
28+
}
29+
val main = findSparkSubmit()
30+
val server = LauncherServer.getOrCreateServer()
31+
32+
val handle = new InProcessAppHandle(server)
33+
listeners.foreach(handle.addListener)
34+
35+
// Remove launcher config to make sure that no backend connection is created
36+
// See org.apache.spark.launcher.LauncherBackend:connect
37+
builder.conf.remove(LauncherProtocol.CONF_LAUNCHER_PORT)
38+
builder.conf.remove(LauncherProtocol.CONF_LAUNCHER_SECRET)
39+
40+
// Set waitAppCompletion to false to ensure fire and forget mode
41+
// See org.apache.spark.deploy.yarn.Client:run
42+
setConf("spark.yarn.submit.waitAppCompletion", "false")
43+
44+
val sparkArgs = builder.buildSparkSubmitArgs().asScala.toArray
45+
val appName = CommandBuilderUtils.firstNonEmpty(builder.appName, builder.mainClass, "<unknown>")
46+
handle.start(appName, main, sparkArgs)
47+
handle
48+
}
49+
}

src/main/scala/za/co/absa/hyperdrive/trigger/configuration/application/SparkConfig.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,7 @@ class SparkConfig (
5050

5151
class SparkYarnSinkConfig (
5252
val submitTimeout: Int,
53-
val hadoopConfDir: String,
5453
val master: String,
55-
val sparkHome: String,
5654
@Name("filesToDeploy")
5755
filesToDeployInternal: String,
5856
@Name("additionalConfs")

src/main/scala/za/co/absa/hyperdrive/trigger/configuration/application/SparkConfigNestedClassesValidator.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,8 @@ class SparkConfigNestedClassesValidator extends ConstraintValidator[SparkConfigN
5050
validateConstraints(Seq(
5151
Constraint(sparkConfig.yarn.submitTimeout > 0,
5252
"sparkYarnSink.submitTimeout", "must be > 0"),
53-
Constraint(notBlankValidator.isValid(sparkConfig.yarn.hadoopConfDir, context),
54-
"sparkYarnSink.hadoopConfDir", notBlankMessage),
5553
Constraint(notBlankValidator.isValid(sparkConfig.yarn.master, context),
56-
"sparkYarnSink.master", notBlankMessage),
57-
Constraint(notBlankValidator.isValid(sparkConfig.yarn.sparkHome, context),
58-
"sparkYarnSink.sparkHome", notBlankMessage)
54+
"sparkYarnSink.master", notBlankMessage)
5955
))
6056
}
6157
}

src/main/scala/za/co/absa/hyperdrive/trigger/scheduler/executors/spark/SparkYarnClusterServiceImpl.scala

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,7 @@
1616

1717
package za.co.absa.hyperdrive.trigger.scheduler.executors.spark
1818

19-
import org.apache.spark.launcher.{SparkAppHandle, SparkLauncher}
20-
import org.slf4j.LoggerFactory
19+
import org.apache.spark.launcher.{InProcessLauncher, NoBackendConnectionInProcessLauncher, SparkAppHandle, SparkLauncher}
2120
import org.springframework.stereotype.Service
2221
import za.co.absa.hyperdrive.trigger.configuration.application.SparkConfig
2322
import za.co.absa.hyperdrive.trigger.models.enums.JobStatuses.{Lost, SubmissionTimeout, Submitting}
@@ -66,23 +65,17 @@ class SparkYarnClusterServiceImpl @Inject()(
6665
}
6766

6867
private def getSparkLauncher(id: String, jobName: String, jobParameters: SparkInstanceParameters)
69-
(implicit sparkConfig: SparkConfig): SparkLauncher = {
70-
import scala.collection.JavaConverters._
68+
(implicit sparkConfig: SparkConfig): InProcessLauncher = {
7169
val config = sparkConfig.yarn
72-
val sparkLauncher = new SparkLauncher(Map(
73-
"HADOOP_CONF_DIR" -> config.hadoopConfDir,
74-
"SPARK_PRINT_LAUNCH_COMMAND" -> "1"
75-
).asJava)
70+
val sparkLauncher = new NoBackendConnectionInProcessLauncher()
7671
.setMaster(config.master)
7772
.setDeployMode("cluster")
7873
.setMainClass(jobParameters.mainClass)
7974
.setAppResource(jobParameters.jobJar)
80-
.setSparkHome(config.sparkHome)
8175
.setAppName(jobName)
8276
.setConf("spark.yarn.tags", id)
8377
.addAppArgs(jobParameters.appArguments.toSeq:_*)
8478
.addSparkArg("--verbose")
85-
.redirectToLog(LoggerFactory.getLogger(s"SparkExecutor.executorJobId=$id").getName)
8679
config.filesToDeploy.foreach(file => sparkLauncher.addFile(file))
8780
config.additionalConfs.foreach(conf => sparkLauncher.setConf(conf._1, conf._2))
8881
jobParameters.additionalJars.foreach(sparkLauncher.addJar)

src/test/resources/application.properties

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@ kafka-source.properties.security.protocol=PLAINTEXT
4141

4242
spark.submitApi=yarn
4343
sparkYarnSink.hadoopResourceManagerUrlBase=http://localhost:8088
44-
sparkYarnSink.hadoopConfDir=/opt/hadoop
45-
sparkYarnSink.sparkHome=/opt/spark
4644
sparkYarnSink.master=yarn
4745
sparkYarnSink.submitTimeout=160000
4846

src/test/scala/za/co/absa/hyperdrive/trigger/ApplicationStartPostgresTest.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,6 @@ class ApplicationStartPostgresTest extends FlatSpec with Matchers with SpringInt
7878
kafkaConfig.properties.getProperty("security.protocol") shouldBe "PLAINTEXT"
7979
sparkConfig.submitApi shouldBe "yarn"
8080
sparkConfig.hadoopResourceManagerUrlBase shouldBe "http://localhost:8088"
81-
sparkConfig.yarn.hadoopConfDir shouldBe "/opt/hadoop"
82-
sparkConfig.yarn.sparkHome shouldBe "/opt/spark"
8381
sparkConfig.yarn.master shouldBe "yarn"
8482
sparkConfig.yarn.submitTimeout shouldBe 160000
8583
sparkConfig.yarn.filesToDeploy shouldBe Seq()

src/test/scala/za/co/absa/hyperdrive/trigger/configuration/application/DefaultTestSparkConfig.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@ import java.util.Properties
2121
case class DefaultTestSparkConfig (
2222
submitApi: String = "yarn",
2323
submitTimeout: Int = 1000,
24-
hadoopConfDir: String = "",
2524
master: String = "yarn",
26-
sparkHome: String = "",
2725
hadoopResourceManagerUrlBase: String = "",
2826
filesToDeploy: Seq[String] = Seq(),
2927
additionalConfs: Map[String, String] = Map(),
@@ -32,7 +30,7 @@ case class DefaultTestSparkConfig (
3230
clusterId: String = "j-2AXXXXXXGAPLF",
3331
) {
3432
def yarn: SparkConfig =
35-
new SparkConfig(submitApi, new SparkYarnSinkConfig(submitTimeout, hadoopConfDir, master, sparkHome,
33+
new SparkConfig(submitApi, new SparkYarnSinkConfig(submitTimeout, master,
3634
filesToDeploy.mkString(","), toProperties(additionalConfs)), null, hadoopResourceManagerUrlBase,
3735
userUsedToKillJob, sparkSubmitThreadPoolSize
3836
)

0 commit comments

Comments
 (0)