1)在服务器创建HBase表
在IDEA下创建maven项目,在pom.xml中加入jar包依赖:
创建主类:
hbase shell
create 'B58005',{NAME => 'info', VERSIONS => 2}
2)代码实现
在IDEA下创建maven项目,在pom.xml中加入jar包依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cancer</groupId>
<artifactId>wise-farmland</artifactId>
<version>1.1</version>
<properties>
<scala.version>2.11.8</scala.version>
<spark.version>2.3.0</spark.version>
<hadoop.version>2.7.5</hadoop.version>
<hbase.version>1.3.1</hbase.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.40</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<finalName>wise-farmland</finalName>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<id>compile-scala</id>
<phase>compile</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
<execution>
<id>test-compile-scala</id>
<phase>test-compile</phase>
<goals>
<goal>add-source</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
创建主类:
package com.cancer.wf
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.HBaseConfiguration
import scala.util.Random
object B58005MysqlImport {
case class B58005(MDY58005: String, YY58005: String, MM58005: String, DD58005: String,OO58005: String, QQ58005: String, MD58005: String, JJ58005: String, W58005M: String)
def main(args: Array[String]) {
val spark = SparkSession
.builder().appName("B58005MysqlImport")
.master("local[4]")
.getOrCreate()
val url = "jdbc:mysql://localhost:3306/wise_farmland?useUnicode=true&characterEncoding=UTF-8"
val password = "123456"
val table = "b58005"
val reader = spark.sqlContext.read.format("jdbc")
reader.option("url", url)
reader.option("dbtable", table)
reader.option("driver", "com.mysql.jdbc.Driver")
reader.option("user", "root")
reader.option("password", password)
val df = reader.load()
val data = df.select("MDY58005_","YY58005_","MM58005_","DD58005_", "OO58005_","QQ58005_","MD58005_","JJ58005_","W58005M")
.filter("YY58005_ = " + args{0}).collect()
val b58005 = data.map(row => (
B58005.apply(row.get(0).toString, row.get(1).toString, row.get(2).toString,
row.get(3).toString, row.get(4).toString, row.get(5).toString,
row.get(6).toString, row.get(7).toString, row.get(8).toString)
))
b58005.foreach(f => persistHBase(f))
}
def persistHBase(b58005 : B58005): Unit = {
val tableName = "B58005"
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, tableName)
val cfFamilyBytes = Bytes.toBytes("info")
val colBytesMDY58005_ = Bytes.toBytes("MDY58005_")
val colBytesYY58005_ = Bytes.toBytes("YY58005_")
val colBytesMM58005_ = Bytes.toBytes("MM58005_")
val colBytesDD58005_ = Bytes.toBytes("DD58005_")
val colBytesOO58005_ = Bytes.toBytes("OO58005_")
val colBytesQQ58005_ = Bytes.toBytes("QQ58005_")
val colBytesMD58005_ = Bytes.toBytes("MD58005_")
val colBytesJJ58005_ = Bytes.toBytes("JJ58005_")
val colBytesW58005M = Bytes.toBytes("W58005M")
val table = new HTable(conf, tableName)
val rand = new Random().nextInt((999999 - 100000) + 1)
val row = Bytes.toBytes("cf" + rand)
val p = new Put(row)
p.addColumn(cfFamilyBytes, colBytesMDY58005_, Bytes.toBytes(b58005.MDY58005))
p.addColumn(cfFamilyBytes, colBytesYY58005_, Bytes.toBytes(b58005.YY58005))
p.addColumn(cfFamilyBytes, colBytesMM58005_, Bytes.toBytes(b58005.MM58005))
p.addColumn(cfFamilyBytes, colBytesDD58005_, Bytes.toBytes(b58005.DD58005))
p.addColumn(cfFamilyBytes, colBytesOO58005_, Bytes.toBytes(b58005.OO58005))
p.addColumn(cfFamilyBytes, colBytesQQ58005_, Bytes.toBytes(b58005.QQ58005))
p.addColumn(cfFamilyBytes, colBytesMD58005_, Bytes.toBytes(b58005.MD58005))
p.addColumn(cfFamilyBytes, colBytesJJ58005_, Bytes.toBytes(b58005.JJ58005))
p.addColumn(cfFamilyBytes, colBytesW58005M, Bytes.toBytes(b58005.W58005M))
table.put(p)
}
}
在客户端将程序打包
cd D:\workspace\wise-farmland
mvn clean package
将生成的jar包:target/wise-farmland-jar-with-dependencies.jar上传到服务器
3)在服务器运行Spark从MySQL导入数据到HBase的任务
$SPARK_HOME/bin/spark-submit --class com.cancer.wf.B58005MysqlImport --master local[4] /usr/local/app/wise-farmland-jar-with-dependencies.jar '2017'
4)打开HBase客户端查看表数据
hbase shell
查看前十条数据
scan 'B58005',{LIMIT=>10}
查看RowKey为:cf294873的数据
get 'B58005','cf294873'
查看Column为:MD58005_的数据
scan 'B58005', FILTER=>"ColumnPrefixFilter('MD58005_')"
也可以使用SQuirreL SQL Client查看数据使用phoenix创建表:
/opt/phoenix/bin/sqlline.py localhost:2181
create table IF NOT EXISTS b58005 (ID varchar not null primary key,"info"."MDY58005_" varchar,"info"."YY58005_" varchar,"info"."MM58005_" varchar,"info"."DD58005_" varchar,"info"."OO58005_" varchar,"info"."QQ58005_" varchar,"info"."MD58005_" varchar,"info"."JJ58005_" varchar,"info"."W58005M" varchar);
打开SQuirreL SQL Client创建连接,输入sql语句查看数据: