Spark核心概念之间的关系:
Spark Driver用于提交用户应用程序,实际可以看作Spark的客户端;
Spark Driver的初始化始终围绕着SparkContext的初始化。SparkContext可以算得上是所有Spark应用程序的发动机引擎;
SparkContext初始化完毕,才能向Spark集群提交任务执行。
Spark context 处理原理如下图:
从代码角度理解Driver APP program部分功能:
用户使用SparkContext提供的API(常用的有textFile、sequenceFile、runJob、stop等)编写Driver app程序,也可以理解为客户端应用程序,用于将任务程序转换为RDD和DAG,并与Cluster Manager进行通信与调度。
SparkContext的配置参数则由SparkConf负责,主要是通过ConcurrentHashMap来维护各种Spark的配置属性
public final class JavaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Usage: JavaWordCount <file>");
System.exit(1);
}
//创建spark context属性,这里只设置了部分属性,其他属性参考http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkConf
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount").setMaster("local[2]");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);//创建sparkcontext
JavaRDD<String> lines = ctx.textFile(args[0], 1); //读取文件生成RDD
//下列一些列操作被构建成DAG,红色部分是各个RDD
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String s) {//transaction操作, 使用space 切分字符串
return Arrays.asList(SPACE.split(s));
}
});
JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);//transaction操作,生成string ,count的MAP
}
});
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {//transaction操作,加法
return i1 + i2;
}
});
List<Tuple2<String, Integer>> output = counts.collect()
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1() + ": " + tuple._2());;//action操作
}
//DAG被提交给DAGScheduler解析成Stage,然后把一个个TaskSet提交给底层调度器TaskScheduler处理。
//Executor向SparkContext申请Task,TaskScheduler将Task发放给Executor运行并提供应用程序代码。
ctx.stop();
}
没有评论