1、启动spark
sbin/start-all.sh
可以在http://hiup:8080/看到spark运行情况。
2、启动shell
bin/spark-shell
3、测试
$ ./run-example SparkPi 10 Pi is roughly 3.140408 $ ./run-example SparkPi 100 Pi is roughly 3.1412528 $ ./run-example SparkPi 1000 Pi is roughly 3.14159016
4、基本操作
#HDFS加载数据 scala> var textFile=sc.textFile("/usr/hadoop/inpatient.txt") #第一行 scala> textFile.first() res1: String = "第一行内容" #第一行,用逗号分割后,第一列(住院号) textFile.first().split(",")(0) res2: String = "0000718165" #第一行,用逗号分割后,第5列(费用) textFile.first().split(",")(5) res3: String = "100.01" #行数 scala> textFile.count() res4: Long = 115411 #包含ICU的行数 textFile.filter(line=>line.contains("ICU")).count() res5: Long = 912 #获取每一行的长度 var lineLengths = textFile.map(s=>s.length) #获取总长度 var totalLenght = lineLengths.reduce((a,b)=>a+b) totalLenght: Int = 32859905 #获取最大费用 textFile.map(line=>if(line.split(",").size==30) line.split(",")(23).replace("\"","") else "0").reduce((a,b)=>if(a.toDouble>b.toDouble) a else b) res6: String = 300 #创建一个类 @SerialVersionUID(100L) class PATIENT(var PATIENT_NO : String,var NAME : String,var SEX_CODE : String,var BIRTHDATE : String,var BALANCE_COST : String) extends Serializable #新建一个对象 var p=new PATIENT("PATIENT_NO","NAME","SEX_CODE","BIRTHDATE","BALANCE_COST") #新建一个map函数 def mapFunc(line:String) : PATIENT = { var cols=line.split(",") return new PATIENT(cols(0),cols(1),cols(2),cols(3),cols(4)) } #最大费用 textFile.filter(line=>line.split(",").size==30).map(mapFunc).reduce((a,b)=>if(a.BALANCE_COST.replace("\"","").toDouble>b.BALANCE_COST.replace("\"","").toDouble) a else b).BALANCE_COST #男性最大费用 textFile.filter(line=>line.split(",").size==30).map(mapFunc).filter(p=>p.SEX_CODE=="\"M\"").reduce((a,b)=>if(a.BALANCE_COST.replace("\"","").toDouble>b.BALANCE_COST.replace("\"","").toDouble) a else b).BALANCE_COST #女性最大费用 textFile.filter(line=>line.split(",").size==30).map(mapFunc).filter(p=>p.SEX_CODE=="\"F\"").reduce((a,b)=>if(a.BALANCE_COST.replace("\"","").toDouble>b.BALANCE_COST.replace("\"","").toDouble) a else b).BALANCE_COST #退出 scala> exit