大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友

CSDN 2024-08-26 15:35:01 阅读 72

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)HDFS(已更完)MapReduce(已更完)Hive(已更完)Flume(已更完)Sqoop(已更完)Zookeeper(已更完)HBase(已更完)Redis (已更完)Kafka(已更完)Spark(正在更新!)

章节内容

上节我们完成了如下的内容:

Spark 学习 WordCount 程序Scala & Java 的方式分别编写 WordCount 程序

在这里插入图片描述

计算圆周率

需求背景

我们要实现一个程序来实现圆周率的计算,将利用下面的公式:

在这里插入图片描述

编写代码

<code>package icu.wzk

import org.apache.spark.{ -- -->SparkConf, SparkContext}

import scala.math.random

object SparkPi {

def main(args: Array[String]): Unit = {

var conf = new SparkConf()

.setAppName("ScalaSparkPi")

.setMaster("local[*]")

val sc = new SparkContext(conf)

sc.setLogLevel("WARN")

val slices = if (args.length > 0) {

args(0).toInt

} else {

0

}

val N = 100000000

val count = sc.makeRDD(1 to N, slices)

.map(idx => {

val (x, y) = (random, random)

if (x*x + y*y <= 1) {

1

} else {

0

}

}).reduce(_ + _)

println(s"Pi is ${ 4.0 * count / N}")

}

}

代码部分截图如下所示:

在这里插入图片描述

代码解释

object SparkPi { … }

这个对象定义了一个 Scala 应用程序的入口。Scala 的 object 关键字用于定义一个单例对象,这意味着 SparkPi 只能有一个实例。

def main(args: Array[String]): Unit = { … }

main 方法是 Scala 应用程序的入口点,类似于 Java 中的 main 方法。args 是传递给程序的命令行参数,类型为 Array[String]。Unit 表示该方法没有返回值。

var conf = new SparkConf().setAppName(“ScalaSparkPi”)

SparkConf() 用于配置 Spark 应用程序。setAppName(“ScalaSparkPi”) 设置应用程序的名称为 ScalaSparkPi。setMaster("local[]") 表示 Spark 应用程序将在本地运行,使用所有可用的 CPU 核心。local[] 是 Spark 中的特殊设置,表示本地模式下使用所有的 CPU 核心。

val sc = new SparkContext(conf)

SparkContext 是 Spark 应用程序的核心,负责与 Spark 集群进行交互。这里通过配置对象 conf 创建了一个新的 SparkContext 实例。

sc.setLogLevel(“WARN”)

设置日志的级别为 “WARN”。这意味着只会记录警告级别及以上的日志信息,减少不必要的日志输出。

val slices = if (args.length > 0) { … }

这段代码用来处理传递给程序的第一个参数,如果有参数传递过来,则将其转换为整数,作为分片数 slices。如果没有参数,则默认值为 0。

val N = 100000000

定义一个常量 N,表示将进行一亿次随机点的生成,以此来估算 \pi 值。

val count = sc.makeRDD(1 to N, slices)

sc.makeRDD(1 to N, slices) 创建一个包含从 1 到 N 的整数的 RDD(弹性分布式数据集),并将其划分为 slices 个分片进行并行计算。map(idx => { … }) 是对 RDD 中的每个元素进行映射操作。对于每个 idx,生成两个随机数 x 和 y,分别表示点的 x 和 y 坐标。if (xx + yy <= 1) 判断点 (x, y) 是否在单位圆内。如果在圆内,则返回 1,否则返回 0。

reduce(_ + _)

reduce(_ + _) 将所有的 1 和 0 相加,得到在单位圆内的点的总数。

println(s"Pi is ${4.0 * count / N}")

计算 \pi 的估计值:使用公式 \pi \approx 4 \times (\text{圆内点的数量} / \text{总点数})。输出计算结果。

打包上传

<code>mvn clean package

打包完成上传Jar包:

在这里插入图片描述

运行项目

<code>spark-submit --master local[*] --class icu.wzk.SparkPi spark-wordcount-1.0-SNAPSHOT.jar 15

运行等待结果

在这里插入图片描述

运行完毕的结果如下:

在这里插入图片描述

找共同好友

需求背景

目前有一组数据

<code>100, 200 300 400 500 600

200, 100 300 400

300, 100 200 400 500

400, 100 200 300

500, 100 300

600, 100

第一列表示用户,后边的数字表示该用户的好友,我们要对上面的这几列进行分析计算,得出共同的好友。

在这里插入图片描述

编写代码

方法一

核心思想利用笛卡尔积求两两之间的好友 然后去除多余的数据

<code>package icu.wzk

import org.apache.spark.rdd.RDD

import org.apache.spark.{ -- -->SparkConf, SparkContext}

object FindFriends {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

.setAppName("SparkFindFriends")

.setMaster("local[*]")

val sc = new SparkContext(conf)

sc.setLogLevel("WARN")

val lines: RDD[String] = sc.textFile(args(0))

val friendsRDD: RDD[(String, Array[String])] = lines.map{

line =>

val fields: Array[String] = line.split(",")

val userId = fields(0).trim

val friends: Array[String] = fields(1).trim.split("\\s+")

(userId, friends)

}

friendsRDD

.cartesian(friendsRDD)

.filter({

case ((id1, _), (id2, _)) => id1 < id2

})

.map{

case ((id1, friends1), (id2, friends2)) => ((id1, id2), friends1.intersect(friends2).sorted.toBuffer)

}

.sortByKey()

.collect()

.foreach(println)

sc.stop()

}

}

方法二

消除笛卡尔积 核心思想是:将数据变形,找到两两的好友,再执行数据的合并

package icu.wzk

import org.apache.spark.rdd.RDD

import org.apache.spark.{ SparkConf, SparkContext}

object FindFriends2 {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

.setAppName("SparkFindFriends")

.setMaster("local[*]")

val sc = new SparkContext(conf)

sc.setLogLevel("WARN")

val lines: RDD[String] = sc.textFile(args(0))

val friendsRDD: RDD[(String, Array[String])] = lines.map{

line =>

val fields: Array[String] = line.split(",")

val userId = fields(0).trim

val friends: Array[String] = fields(1).trim.split("\\s+")

(userId, friends)

}

friendsRDD

.flatMapValues(friends => friends.combinations(2))

.map{

case (k, v) => (v.mkString(" & "), Set(k))

}

.reduceByKey(_ | _)

.sortByKey()

.collect()

.foreach(println)

sc.stop()

}

}

打包上传

在这里插入图片描述

运行项目

方法一

<code>spark-submit --master local[*] --class icu.wzk.FindFriends spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/friends.txt

运行结果如下图:

在这里插入图片描述

方法二

<code>spark-submit --master local[*] --class icu.wzk.FindFriends2 spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/friends.txt

运行结果如下图所示:

在这里插入图片描述



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。