1、什么是Spark RDD a. Spark RDD is immutable(不可变) b. Spark RDD is distributable(分布式) c. Spark RDD lives in memory(驻内存) d. Spark RDD is strongly typed(强类型)2、开启Spark监控(3种方式,优先级:c > b > a) a. 修改$SPARK_HOME/conf/spark-defaults.conf spark.eventLog.enabled true spark.eventLog.dir hdfs://hadoop.master:9000/user/centos/spark/event_log b. 使用./spark-shell(或./spark-submit)命令 ./spark-shell --master spark://spark.master:7077 --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=hdfs://hadoop.master:9000/user/centos/spark/event_log c. 使用编码方式 new SparkConf().set(key, value)3、Spark-Shell交互模式(或IDEA等)下设置控制台日志输出 import org.apache.log4j.{Level, Logger} Logger.getLogger("org.apache.spark").setLevel(Level.WARN) Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)4、Spark简单使用 val acTransList = Array("SB10001,1000", "SB10002,1200", "SB10003,8000", "SB10004,400", "SB10005,300", "SB10006,10000", "SB10007,500", "SB10008,56", "SB10009,30","SB10010,7000", "CR10001,7000", "SB10002,-10") val acTransRDD = sc.parallelize(acTransList).map(_.split(",")).map(x => (x(0), x(1).toDouble)) acTransRDD.collect val goodTransRecords = acTransRDD.filter( _._2.toDouble > 0).filter(_._1.startsWith("SB")) goodTransRecords.collect val sumAmount = goodTransRecords.map(_._2.toDouble).reduce(_ + _) val maxAmount = goodTransRecords.map(_._2.toDouble).reduce((a, b) => if (a > b) a else b) val minAmount = goodTransRecords.map(_._2.toDouble).reduce((a, b) => if (a < b) a else b) val highValueTransRecords = goodTransRecords.filter(_._2.toDouble > 1000) highValueTransRecords.collect val badAmountLambda = (trans: (String, Double)) => trans._2 <= 0 val badAmountRecords = acTransRDD.filter(badAmountLambda) badAmountRecords.collect val badAcNoLambda = (trans: (String, Double)) => trans._1.startsWith("SB") == false val badAccountRecords = acTransRDD.filter(badAcNoLambda) badAccountRecords.collect val badTransRecords = badAmountRecords.union(badAccountRecords) badTransRecords.collect val combineAllElements = sc.parallelize(acTransList).flatMap(trans => trans.split(",")) combineAllElements.collect val allGoodAccountNos = combineAllElements.filter(_.startsWith("SB")) allGoodAccountNos.distinct.collect val accSummary = acTransRDD.reduceByKey(_ + _).sortByKey() accSummary.collect5、Spark Transformation And Actions a. Spark transformation filter(fn) map(fn) flatMap(fn) union(other) join(other, [numTasks]):根据key连接两个RDD b. Spark action collect() reduce(fn): foreach(fn):迭代 reduceByKey(fn,[noOfTasks]):根据key分类,通过fn计算value sortByKey([ascending], [numTasks]):通过key进行排序 first():返回RDD的第一个元素 take(n):返回RDD前n个元素 countByKey():计算每个key的个数 count():返回RDD元素的个数6、从文件中创建RDDLocal filesystem val textFile = sc.textFile("README.md")HDFS val textFile = sc.textFile("hdfs://")