大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友
CSDN 2024-08-26 15:35:01 阅读 72
点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
Hadoop(已更完)HDFS(已更完)MapReduce(已更完)Hive(已更完)Flume(已更完)Sqoop(已更完)Zookeeper(已更完)HBase(已更完)Redis (已更完)Kafka(已更完)Spark(正在更新!)
章节内容
上节我们完成了如下的内容:
Spark 学习 WordCount 程序Scala & Java 的方式分别编写 WordCount 程序
计算圆周率
需求背景
我们要实现一个程序来实现圆周率的计算,将利用下面的公式:
编写代码
<code>package icu.wzk
import org.apache.spark.{ -- -->SparkConf, SparkContext}
import scala.math.random
object SparkPi {
def main(args: Array[String]): Unit = {
var conf = new SparkConf()
.setAppName("ScalaSparkPi")
.setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val slices = if (args.length > 0) {
args(0).toInt
} else {
0
}
val N = 100000000
val count = sc.makeRDD(1 to N, slices)
.map(idx => {
val (x, y) = (random, random)
if (x*x + y*y <= 1) {
1
} else {
0
}
}).reduce(_ + _)
println(s"Pi is ${ 4.0 * count / N}")
}
}
代码部分截图如下所示:
代码解释
object SparkPi { … }
这个对象定义了一个 Scala 应用程序的入口。Scala 的 object 关键字用于定义一个单例对象,这意味着 SparkPi 只能有一个实例。
def main(args: Array[String]): Unit = { … }
main 方法是 Scala 应用程序的入口点,类似于 Java 中的 main 方法。args 是传递给程序的命令行参数,类型为 Array[String]。Unit 表示该方法没有返回值。
var conf = new SparkConf().setAppName(“ScalaSparkPi”)
SparkConf() 用于配置 Spark 应用程序。setAppName(“ScalaSparkPi”) 设置应用程序的名称为 ScalaSparkPi。setMaster("local[]") 表示 Spark 应用程序将在本地运行,使用所有可用的 CPU 核心。local[] 是 Spark 中的特殊设置,表示本地模式下使用所有的 CPU 核心。
val sc = new SparkContext(conf)
SparkContext 是 Spark 应用程序的核心,负责与 Spark 集群进行交互。这里通过配置对象 conf 创建了一个新的 SparkContext 实例。
sc.setLogLevel(“WARN”)
设置日志的级别为 “WARN”。这意味着只会记录警告级别及以上的日志信息,减少不必要的日志输出。
val slices = if (args.length > 0) { … }
这段代码用来处理传递给程序的第一个参数,如果有参数传递过来,则将其转换为整数,作为分片数 slices。如果没有参数,则默认值为 0。
val N = 100000000
定义一个常量 N,表示将进行一亿次随机点的生成,以此来估算 \pi 值。
val count = sc.makeRDD(1 to N, slices)
sc.makeRDD(1 to N, slices) 创建一个包含从 1 到 N 的整数的 RDD(弹性分布式数据集),并将其划分为 slices 个分片进行并行计算。map(idx => { … }) 是对 RDD 中的每个元素进行映射操作。对于每个 idx,生成两个随机数 x 和 y,分别表示点的 x 和 y 坐标。if (xx + yy <= 1) 判断点 (x, y) 是否在单位圆内。如果在圆内,则返回 1,否则返回 0。
reduce(_ + _)
reduce(_ + _) 将所有的 1 和 0 相加,得到在单位圆内的点的总数。
println(s"Pi is ${4.0 * count / N}")
计算 \pi 的估计值:使用公式 \pi \approx 4 \times (\text{圆内点的数量} / \text{总点数})。输出计算结果。
打包上传
<code>mvn clean package
打包完成上传Jar包:
运行项目
<code>spark-submit --master local[*] --class icu.wzk.SparkPi spark-wordcount-1.0-SNAPSHOT.jar 15
运行等待结果
运行完毕的结果如下:
找共同好友
需求背景
目前有一组数据
<code>100, 200 300 400 500 600
200, 100 300 400
300, 100 200 400 500
400, 100 200 300
500, 100 300
600, 100
第一列表示用户,后边的数字表示该用户的好友,我们要对上面的这几列进行分析计算,得出共同的好友。
编写代码
方法一
核心思想利用笛卡尔积求两两之间的好友 然后去除多余的数据
<code>package icu.wzk
import org.apache.spark.rdd.RDD
import org.apache.spark.{ -- -->SparkConf, SparkContext}
object FindFriends {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("SparkFindFriends")
.setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val lines: RDD[String] = sc.textFile(args(0))
val friendsRDD: RDD[(String, Array[String])] = lines.map{
line =>
val fields: Array[String] = line.split(",")
val userId = fields(0).trim
val friends: Array[String] = fields(1).trim.split("\\s+")
(userId, friends)
}
friendsRDD
.cartesian(friendsRDD)
.filter({
case ((id1, _), (id2, _)) => id1 < id2
})
.map{
case ((id1, friends1), (id2, friends2)) => ((id1, id2), friends1.intersect(friends2).sorted.toBuffer)
}
.sortByKey()
.collect()
.foreach(println)
sc.stop()
}
}
方法二
消除笛卡尔积 核心思想是:将数据变形,找到两两的好友,再执行数据的合并
package icu.wzk
import org.apache.spark.rdd.RDD
import org.apache.spark.{ SparkConf, SparkContext}
object FindFriends2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("SparkFindFriends")
.setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("WARN")
val lines: RDD[String] = sc.textFile(args(0))
val friendsRDD: RDD[(String, Array[String])] = lines.map{
line =>
val fields: Array[String] = line.split(",")
val userId = fields(0).trim
val friends: Array[String] = fields(1).trim.split("\\s+")
(userId, friends)
}
friendsRDD
.flatMapValues(friends => friends.combinations(2))
.map{
case (k, v) => (v.mkString(" & "), Set(k))
}
.reduceByKey(_ | _)
.sortByKey()
.collect()
.foreach(println)
sc.stop()
}
}
打包上传
运行项目
方法一
<code>spark-submit --master local[*] --class icu.wzk.FindFriends spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/friends.txt
运行结果如下图:
方法二
<code>spark-submit --master local[*] --class icu.wzk.FindFriends2 spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/friends.txt
运行结果如下图所示:
声明
本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。