Spark环境搭建05

这里主要说一下Spark的SQL操作,如何从mysql导入数据。

#拷贝了mysql的驱动到jar文件下面
#建立DF
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3307").option("dbtable", "hive.TBLS").option("user", "hive").option("password", "hive").load()

#schema
jdbcDF.schema

#count
jdbcDF.count()

#show
jdbcDF.show()

Spark环境搭建04

这里主要说一下Spark的SQL操作。

1、dataframe操作数据

#加载json数据
val df = spark.read.json("/usr/hadoop/person.json")
#加载CSV数据
#val df = spark.read.csv("/usr/hadoop/person.csv")

#查询前20行
df.show()

#查看结构
df.printSchema()

#选择一列
df.select("NAME").show()

#按条件过滤行
df.filter($"BALANCE_COST" < 10 && $"BALANCE_COST" > 1).show()

#分组统计
df.groupBy("SEX_CODE").count().show()

2、sql操作数据

#创建视图
df.createOrReplaceTempView("person")

#查看数据
spark.sql("SELECT * FROM person").show()

#统计数据
spark.sql("SELECT * FROM person").count()

#带条件选择
spark.sql("SELECT * FROM person WHERE BALANCE_COST<10 and BALANCE_COST>1 order by BALANCE_COST").show()

3、转为DS

#转为Dataset
case class PERSONC(PATIENT_NO : String,NAME : String,SEX_CODE : String,BIRTHDATE : String,BALANCE_CODE : String)
var personDS = spark.read.json("/usr/hadoop/person.json").as[PERSONC]

4、sql与map reduce混用

personDS.select("BALANCE_COST").map(row=>if(row(0)==null) 0.0 else (row(0)+"").toDouble).reduce((a,b)=>if(a>b) a else b)

spark.sql("select BALANCE_COST from person").map(row=>if(row(0)==null) 0.0 else (row(0)+"").toDouble).reduce((a,b)=>if(a>b) a else b)

5、数据映射为Class对象

val personRDD = spark.sparkContext.textFile("/usr/hadoop/person.txt")
val persons = personRDD.map(_.split(",")).map(attributes => Person(attributes(0), attributes(1), attributes(2), attributes(3), attributes(4).toDouble))

6、自定义schema

import org.apache.spark.sql.Row
import org.apache.spark.sql.types._

#加载数据
val personRDD = spark.sparkContext.textFile("/usr/hadoop/person.txt")
#转为org.apache.spark.sql.Row
val rowRDD = personRDD.map(_.split(",")).map(attributes => Row(attributes(0), attributes(1), attributes(2), attributes(3), attributes(4).replace("\"","").toDouble))
#定义新的Schema
val personSchema = StructType(List(StructField("PatientNum",StringType,nullable = true), StructField("Name",StringType,nullable = true), StructField("SexCode",StringType,nullable = true), StructField("BirthDate",StringType,nullable = true), StructField("BalanceCode",DoubleType,nullable = true)))
#建立新的DF
val personDF = spark.createDataFrame(rowRDD, personSchema)
#使用DF
personDF.select("PatientNum").show()

Spark环境搭建03

上面说到了Spark如何与Hadoop整合,下面就说一下Spark如何与HBase整合。

1、获取hbase的classpath

#要把netty和jetty的包去掉,否则会有jar包冲突
HBASE_PATH=`/home/hadoop/Deploy/hbase-1.1.2/bin/hbase classpath`

2、启动spark

bin/spark-shell --driver-class-path $HBASE_PATH

3、进行简单的操作

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "inpatient_hb")

val admin = new HBaseAdmin(conf)
admin.isTableAvailable("inpatient_hb")
res1: Boolean = true

val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])

hBaseRDD.count()
2017-01-03 20:46:29,854 INFO  [main] scheduler.DAGScheduler (Logging.scala:logInfo(58)) - Job 0 finished: count at <console>:36, took 23.170739 s
res2: Long = 115077

Spark环境搭建02

1、启动spark

sbin/start-all.sh

可以在http://hiup:8080/看到spark运行情况。

2、启动shell

bin/spark-shell

3、测试

$ ./run-example SparkPi 10
Pi is roughly 3.140408

$ ./run-example SparkPi 100
Pi is roughly 3.1412528

$ ./run-example SparkPi 1000
Pi is roughly 3.14159016

4、基本操作

#HDFS加载数据
scala> var textFile=sc.textFile("/usr/hadoop/inpatient.txt")

#第一行
scala> textFile.first()
res1: String = "第一行内容"

#第一行,用逗号分割后,第一列(住院号)
textFile.first().split(",")(0)
res2: String = "0000718165"

#第一行,用逗号分割后,第5列(费用)
textFile.first().split(",")(5)
res3: String = "100.01"

#行数
scala> textFile.count()
res4: Long = 115411

#包含ICU的行数
textFile.filter(line=>line.contains("ICU")).count()
res5: Long = 912

#获取每一行的长度
var lineLengths = textFile.map(s=>s.length)

#获取总长度
var totalLenght = lineLengths.reduce((a,b)=>a+b)
totalLenght: Int = 32859905

#获取最大费用
textFile.map(line=>if(line.split(",").size==30) line.split(",")(23).replace("\"","") else "0").reduce((a,b)=>if(a.toDouble>b.toDouble) a else b)
res6: String = 300

#创建一个类
@SerialVersionUID(100L)
class PATIENT(var PATIENT_NO : String,var NAME : String,var SEX_CODE : String,var BIRTHDATE : String,var BALANCE_COST : String) extends Serializable 

#新建一个对象
var p=new PATIENT("PATIENT_NO","NAME","SEX_CODE","BIRTHDATE","BALANCE_COST")

#新建一个map函数
def mapFunc(line:String) : PATIENT = {
var cols=line.split(",")
return new PATIENT(cols(0),cols(1),cols(2),cols(3),cols(4))
}

#最大费用
textFile.filter(line=>line.split(",").size==30).map(mapFunc).reduce((a,b)=>if(a.BALANCE_COST.replace("\"","").toDouble>b.BALANCE_COST.replace("\"","").toDouble) a else b).BALANCE_COST

#男性最大费用
textFile.filter(line=>line.split(",").size==30).map(mapFunc).filter(p=>p.SEX_CODE=="\"M\"").reduce((a,b)=>if(a.BALANCE_COST.replace("\"","").toDouble>b.BALANCE_COST.replace("\"","").toDouble) a else b).BALANCE_COST

#女性最大费用
textFile.filter(line=>line.split(",").size==30).map(mapFunc).filter(p=>p.SEX_CODE=="\"F\"").reduce((a,b)=>if(a.BALANCE_COST.replace("\"","").toDouble>b.BALANCE_COST.replace("\"","").toDouble) a else b).BALANCE_COST

#退出
scala> exit

Spark环境搭建01

1、下载scala-2.11.1,并解压到/usr/scala/scala-2.11.1

2、下载spark-2.0.0-bin-hadoop2.4,并解压到/home/hadoop/Deploy/spark-2.0.0
(*如果要看后续文章,建议使用hadoop-2.5.2 hbase-1.1.2 hive-1.2.1 spark-2.0.0)

3、复制spark-env.sh.template为spark-env.sh,并添加下面几行

export JAVA_HOME=/usr/java/jdk1.7.0_79
export SCALA_HOME=/usr/scala/scala-2.11.1/
export SPARK_MASTER_IP=hiup01
export SPARK_WORKER_MEMORY=1g
export HADOOP_CONF_DIR=/home/hadoop/Deploy/hadoop-2.5.2/etc/hadoop

4、复制slaves.template为slaves,并添加下面几行

hiup01
hiup02
hiup03

5、将scala-2.11.1及spark-2.0.0复制到hiup02及hiup03

6、环境搭建完毕。