在创建项目之前确保自己本地安装好了scala环境和java环境,因为spark是scala编写的,scala和java一样都是需要编译成字节码,然后在JVM里面运行。我本地的scala版本是2.11.0版本,hadoop是2.7.6版本
第一步:打开idea,然后创建一个maven项目
在pom里面加入如下依赖:
<properties>
<spark.version>2.3.1</spark.version>
<scala.version>2.11</scala.version>
<geotools.version>20-SNAPSHOT</geotools.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.6</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_${scala.version}</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>com.esri.geometry</groupId>
<artifactId>esri-geometry-api</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
第二步:在编辑器点击File-->Project Structure 点击下图的Scala SDK,浏览进入本地的scala安装位置,导入scala库
第三步:书写java版的类:
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
/**
* Created by zhanglu on 2018/9/7.
*/
public class WordCount {
public static void main(String[] args) {
// 编写spark程序
// 第一步:创建sparkConf对象,设置spark的配置信息
SparkConf sparkConf=new SparkConf();
sparkConf.setAppName("WordCount");
sparkConf.setMaster("local");
// 第二步:创建javaSparkContext对象,初始化spark的各种组件
JavaSparkContext javaSparkContext=new JavaSparkContext(sparkConf);
// 第三步:针对输入的数据源(hdfs,本地文件)创建一个RDD,输入数据会分配到RDD的各个分区上面
// 形成一个初始的分布式数据集。textFile()通过输入数据的类型创建RDD,文件里面的每一行就相当于RDD里面的每一个元素
JavaRDD<String> javaRDD=javaSparkContext.textFile("E://个人/word_count.txt");
// 第四步:对初始的RDD进行transformation,就是计算操作
// 将每一行元素拆成单个单词,通常操作会通过创建一个function,并配合RDD的map.flatMap等算子来操作
JavaRDD<String> counts=javaRDD.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID=1L;
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
// 将每个单词映射成(单词,1)的tuple形式
JavaPairRDD<String,Integer> pairs=counts.mapToPair(new PairFunction<String, String, Integer>() {
private static final long serialVersionUID=1L;
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String,Integer>(s,1);
}
});
// 将上面tuple进行reduce操作,迭代计算
JavaPairRDD<String,Integer> wordCounts=pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID=1L;
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer+integer2;
}
});
// 以上flatMap和mapToPair和reduceBykey都是transformation操作,spark程序还需要action操作,程序才可以执行
wordCounts.foreach(new VoidFunction<Tuple2<String, Integer>>() {
private static final long serialVersionUID=1L;
@Override
public void call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
System.out.println(stringIntegerTuple2._1+" appeared "+stringIntegerTuple2._2+" times ");
}
});
javaSparkContext.close();
}
}