使用Spark从MySQL导入数据到HBase

1)在服务器创建HBase表
hbase shell
create 'B58005',{NAME => 'info', VERSIONS => 2}
2)代码实现
在IDEA下创建maven项目,在pom.xml中加入jar包依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.cancer</groupId>
    <artifactId>wise-farmland</artifactId>
    <version>1.1</version>

    <properties>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.3.0</spark.version>
        <hadoop.version>2.7.5</hadoop.version>
        <hbase.version>1.3.1</hbase.version>

        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-common</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>${hbase.version}</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.40</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <finalName>wise-farmland</finalName>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <id>compile-scala</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>test-compile-scala</id>
                        <phase>test-compile</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

创建主类:
package com.cancer.wf

import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.HBaseConfiguration
import scala.util.Random

object B58005MysqlImport {
  case class B58005(MDY58005: String, YY58005: String, MM58005: String, DD58005: String,OO58005: String, QQ58005: String, MD58005: String, JJ58005: String, W58005M: String)

  def main(args: Array[String]) {
    val spark = SparkSession
      .builder().appName("B58005MysqlImport")
      .master("local[4]")
      .getOrCreate()

    val url = "jdbc:mysql://localhost:3306/wise_farmland?useUnicode=true&characterEncoding=UTF-8"
    val password = "123456"
    val table = "b58005"

    val reader = spark.sqlContext.read.format("jdbc")
    reader.option("url", url)
    reader.option("dbtable", table)
    reader.option("driver", "com.mysql.jdbc.Driver")
    reader.option("user", "root")
    reader.option("password", password)

    val df = reader.load()

    val data = df.select("MDY58005_","YY58005_","MM58005_","DD58005_", "OO58005_","QQ58005_","MD58005_","JJ58005_","W58005M")
      .filter("YY58005_ = " + args{0}).collect()
    val b58005 = data.map(row => (
      B58005.apply(row.get(0).toString, row.get(1).toString, row.get(2).toString,
        row.get(3).toString, row.get(4).toString, row.get(5).toString,
        row.get(6).toString, row.get(7).toString, row.get(8).toString)
    ))
    b58005.foreach(f => persistHBase(f))
  }

  def persistHBase(b58005 : B58005): Unit = {
    val tableName = "B58005"

    val conf = HBaseConfiguration.create()
    conf.set(TableInputFormat.INPUT_TABLE, tableName)

    val cfFamilyBytes = Bytes.toBytes("info")
    val colBytesMDY58005_ = Bytes.toBytes("MDY58005_")
    val colBytesYY58005_ = Bytes.toBytes("YY58005_")
    val colBytesMM58005_ = Bytes.toBytes("MM58005_")
    val colBytesDD58005_ = Bytes.toBytes("DD58005_")
    val colBytesOO58005_ = Bytes.toBytes("OO58005_")
    val colBytesQQ58005_ = Bytes.toBytes("QQ58005_")
    val colBytesMD58005_ = Bytes.toBytes("MD58005_")
    val colBytesJJ58005_ = Bytes.toBytes("JJ58005_")
    val colBytesW58005M = Bytes.toBytes("W58005M")

    val table = new HTable(conf, tableName)

    val rand = new Random().nextInt((999999 - 100000) + 1)
    val row = Bytes.toBytes("cf" + rand)
    val p = new Put(row)
    p.addColumn(cfFamilyBytes, colBytesMDY58005_, Bytes.toBytes(b58005.MDY58005))
    p.addColumn(cfFamilyBytes, colBytesYY58005_, Bytes.toBytes(b58005.YY58005))
    p.addColumn(cfFamilyBytes, colBytesMM58005_, Bytes.toBytes(b58005.MM58005))
    p.addColumn(cfFamilyBytes, colBytesDD58005_, Bytes.toBytes(b58005.DD58005))
    p.addColumn(cfFamilyBytes, colBytesOO58005_, Bytes.toBytes(b58005.OO58005))
    p.addColumn(cfFamilyBytes, colBytesQQ58005_, Bytes.toBytes(b58005.QQ58005))
    p.addColumn(cfFamilyBytes, colBytesMD58005_, Bytes.toBytes(b58005.MD58005))
    p.addColumn(cfFamilyBytes, colBytesJJ58005_, Bytes.toBytes(b58005.JJ58005))
    p.addColumn(cfFamilyBytes, colBytesW58005M, Bytes.toBytes(b58005.W58005M))

    table.put(p)
  }

}

    
 

在客户端将程序打包

cd D:\workspace\wise-farmland
mvn clean package

将生成的jar包:target/wise-farmland-jar-with-dependencies.jar上传到服务器

3)在服务器运行Spark从MySQL导入数据到HBase的任务

$SPARK_HOME/bin/spark-submit --class com.cancer.wf.B58005MysqlImport --master local[4] /usr/local/app/wise-farmland-jar-with-dependencies.jar '2017'

4)打开HBase客户端查看表数据

hbase shell

查看前十条数据

scan 'B58005',{LIMIT=>10}

查看RowKey为:cf294873的数据

get 'B58005','cf294873'

查看Column为:MD58005_的数据

scan 'B58005', FILTER=>"ColumnPrefixFilter('MD58005_')"

也可以使用SQuirreL SQL Client查看数据使用phoenix创建表:

/opt/phoenix/bin/sqlline.py localhost:2181
create table IF NOT EXISTS b58005 (ID varchar not null primary key,"info"."MDY58005_" varchar,"info"."YY58005_" varchar,"info"."MM58005_" varchar,"info"."DD58005_" varchar,"info"."OO58005_" varchar,"info"."QQ58005_" varchar,"info"."MD58005_" varchar,"info"."JJ58005_" varchar,"info"."W58005M" varchar);

打开SQuirreL SQL Client创建连接,输入sql语句查看数据:


评论 1
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值