spark 简单实践1

2018年11月29日

Spark核心概念之间的关系:

Spark Driver用于提交用户应用程序,实际可以看作Spark的客户端;

Spark Driver的初始化始终围绕着SparkContext的初始化。SparkContext可以算得上是所有Spark应用程序的发动机引擎;

SparkContext初始化完毕,才能向Spark集群提交任务执行。

 

Spark context 处理原理如下图:

 

从代码角度理解Driver APP program部分功能:

用户使用SparkContext提供的API(常用的有textFile、sequenceFile、runJob、stop等)编写Driver app程序,也可以理解为客户端应用程序,用于将任务程序转换为RDDDAG,并与Cluster Manager进行通信与调度。

SparkContext的配置参数则由SparkConf负责,主要是通过ConcurrentHashMap来维护各种Spark的配置属性

public final class JavaWordCount {
  private static final Pattern SPACE = Pattern.compile(" ");
 
  public static void main(String[] args) throws Exception {
 
    if (args.length < 1) {
      System.err.println("Usage: JavaWordCount <file>");
      System.exit(1);
    }
    
    //创建spark context属性,这里只设置了部分属性,其他属性参考http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkConf
    SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount").setMaster("local[2]");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);//创建sparkcontext
    JavaRDD<String> lines = ctx.textFile(args[0], 1); //读取文件生成RDD
 
    //下列一些列操作被构建成DAG,红色部分是各个RDD
    JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      @Override
      public Iterable<String> call(String s) {//transaction操作, 使用space 切分字符串
        return Arrays.asList(SPACE.split(s));
      }
    });
 
    JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {
      @Override
      public Tuple2<String, Integer> call(String s) {
        return new Tuple2<String, Integer>(s, 1);//transaction操作,生成string ,count的MAP
      }
    });
 
    JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
      @Override
      public Integer call(Integer i1, Integer i2) {//transaction操作,加法
        return i1 + i2;
      }
    });
 
    List<Tuple2<String, Integer>> output = counts.collect()
    for (Tuple2<?,?> tuple : output) {
      System.out.println(tuple._1() + ": " + tuple._2());;//action操作
    }

    //DAG被提交给DAGScheduler解析成Stage,然后把一个个TaskSet提交给底层调度器TaskScheduler处理。
    //Executor向SparkContext申请Task,TaskScheduler将Task发放给Executor运行并提供应用程序代码。
    ctx.stop();
  }

 

 

没有评论

发表评论

邮箱地址不会被公开。 必填项已用*标注