大数据-88 Spark 集群 案例学习 Spark Scala 案例 SuperWordCount 计算结果数据写入MySQL

CSDN 2024-09-18 11:05:03 阅读 72

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

Hadoop(已更完)HDFS(已更完)MapReduce(已更完)Hive(已更完)Flume(已更完)Sqoop(已更完)Zookeeper(已更完)HBase(已更完)Redis (已更完)Kafka(已更完)Spark(正在更新!)

章节内容

上节完成的内容如下:

Spark案例编写 Scala计算圆周率找共同的好友

在这里插入图片描述

Super Word Count

需求背景

给定一段文本将单词全部转换为小写去除标点符号去除停用词count值降序保存结果保存到MySQL额外要求:标点符合和停用词可以自定义

编写代码

先实现到MySQL保存前的内容,我们需要先编写测试一下我们的代码是否正确

<code>package icu.wzk

import org.apache.spark.rdd.RDD

import org.apache.spark.{ -- -->SparkConf, SparkContext}

object SuperWordCount1 {

private val stopWords = "in on to from by a an the is are were was i we you your he his".split("\\s+")

private val punctuation = "[\\)\\.,:;'!\\?]"

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

.setAppName("ScalaSuperWordCount1")

.setMaster("local[*]")

val sc = new SparkContext(conf)

sc.setLogLevel("WARN")

val lines: RDD[String] = sc.textFile(args(0))

lines

.flatMap(_.split("\\s+"))

.map(_.toLowerCase)

.map(_.replaceAll(punctuation, ""))

.filter(word => !stopWords.contains(word) && word.trim.nonEmpty)

.map((_, 1))

.reduceByKey(_ + _)

.sortBy(_._2, false)

.collect()

.foreach(println)

sc.stop()

}

}

详细解释

object SuperWordCount1 { … }

SuperWordCount1 是一个 Scala 对象,定义了一个单例对象用于运行单词计数程序。

private val stopWords = “in on to from by a an the is are were was i we you your he his”.split(“\s+”)

这里定义了一个 stopWords 列表,包含了常见的停用词,这些词在统计单词频率时会被过滤掉。split(“\s+”) 方法将这些停用词用空白字符分割成数组,便于后续的查找和过滤。

private val punctuation = “[\)\.,:;'!\?]”

定义了一个正则表达式 punctuation,用于匹配常见的标点符号。这些标点符号在统计单词频率时会被去除。

def main(args: Array[String]): Unit = { … }

main 方法是程序的入口点,args 是命令行参数,其中 args(0) 通常表示输入文件的路径。

val conf = new SparkConf().setAppName(“ScalaSuperWordCount1”).setMaster(“local[*]”)

SparkConf() 用于配置 Spark 应用程序。setAppName(“ScalaSuperWordCount1”) 设置应用程序的名称。setMaster(“local[*]”) 指定应用程序以本地模式运行,使用所有可用的 CPU 核心。

val sc = new SparkContext(conf)

SparkContext 是 Spark 应用程序的核心,用于与 Spark 集群进行交互。

sc.setLogLevel(“WARN”)

设置日志级别为 “WARN”,减少日志输出,方便查看重要信息。

val lines: RDD[String] = sc.textFile(args(0))

sc.textFile(args(0)) 从指定的文本文件路径加载数据,创建一个 RDD[String],其中每一行文本都作为一个字符串元素。lines 是包含输入文本数据的 RDD。

flatMap(_.split(“\s+”))

flatMap 方法将每一行字符串按空白字符拆分成单词,并将其展开成单个单词的 RDD。

map(_.toLowerCase)

将每个单词转换为小写,以确保统计时不区分大小写。

map(_.replaceAll(punctuation, “”))

使用正则表达式 punctuation 去除单词中的标点符号,使得统计结果更加准确。

filter(word => !stopWords.contains(word) && word.trim.nonEmpty)

filter 方法过滤掉停用词和空白单词:!stopWords.contains(word) 确保当前单词不在停用词列表中。word.trim.nonEmpty 确保单词在去除前后空白字符后不是空字符串。

map((_, 1))

将每个单词映射为 (word, 1) 的键值对,表示每个单词出现一次。

reduceByKey(_ + _)

reduceByKey 方法根据键(单词)对值(计数)进行累加,统计每个单词的总出现次数。

sortBy(_._2, false)

将统计结果按值(单词出现的次数)从大到小排序。

collect().foreach(println)

collect() 方法将 RDD 中的数据收集到驱动程序中(即本地),然后使用 foreach(println) 输出每个单词及其出现的次数。由于 collect 会将数据从分布式环境中拉到本地,需要注意数据量大的情况下可能导致内存不足的问题。

sc.stop()

在计算完成后,调用 sc.stop() 方法停止 SparkContext,释放资源。

添加依赖

<dependency>

<groupId>mysql</groupId>

<artifactId>mysql-connector-java</artifactId>

<version>8.0.28</version>

</dependency>

同时我们需要在build的部分,也要加入对应的内容,让驱动可以加载进来:

<build>

<plugins>

<plugin>

<groupId>net.alchim31.maven</groupId>

<artifactId>scala-maven-plugin</artifactId>

<version>4.4.0</version>

<executions>

<execution>

<goals>

<goal>compile</goal>

<goal>testCompile</goal>

</goals>

</execution>

</executions>

</plugin>

<plugin>

<groupId>org.apache.maven.plugins</groupId>

<artifactId>maven-assembly-plugin</artifactId>

<version>3.3.0</version>

<configuration>

<archive>

<manifest>

<mainClass>cn.lagou.sparkcore.WordCount</mainClass>

</manifest>

</archive>

<descriptorRefs>

<descriptorRef>jar-with-dependencies</descriptorRef>

</descriptorRefs>

</configuration>

<executions>

<execution>

<phase>package</phase>

<goals>

<goal>single</goal>

</goals>

</execution>

</executions>

</plugin>

</plugins>

</build>

创建库表

我们新建一个数据库,也要新建一个数据表

CREATE TABLE `wordcount` (

`word` varchar(255) DEFAULT NULL,

`count` int(11) DEFAULT NULL

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

写入SQL-未优化

我们在 foreach 中保存了数据,此时需要创建大量的MySQL连接,效率是比较低的。

package icu.wzk

import com.mysql.cj.xdevapi.PreparableStatement

import org.apache.spark.rdd.RDD

import org.apache.spark.{ SparkConf, SparkContext}

import java.sql.{ Connection, DriverManager, PreparedStatement}

object SuperWordCount2 {

private val stopWords = "in on to from by a an the is are were was i we you your he his".split("\\s+")

private val punctuation = "[\\)\\.,:;'!\\?]"

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

.setAppName("ScalaSuperWordCount2")

.setMaster("local[*]")

val sc = new SparkContext(conf)

sc.setLogLevel("WARN")

val lines: RDD[String] = sc.textFile(args(0))

val words: RDD[String] = lines

.flatMap(_.split("\\s+"))

.map(_.trim.toLowerCase())

val clearWords: RDD[String] = words

.filter(!stopWords.contains(_))

.map(_.replaceAll(punctuation, ""))

val result: RDD[(String, Int)] = clearWords

.map((_, 1))

.reduceByKey(_ + _)

.sortBy(_._2, false)

result.foreach(println)

// 输出到 MySQL

val username = "hive"

val password = "hive@wzk.icu"

val url = "jdbc:mysql://h122.wzk.icu:3306/spark-test?useUnicode=true&characterEncoding=utf-8&useSSL=false"

var conn: Connection = null

var stmt: PreparedStatement = null

var sql = "insert into wordcount values(?, ?)"

result.foreach{

case (word, count) => try {

conn = DriverManager.getConnection(url, username, password)

stmt = conn.prepareStatement(sql)

stmt.setString(1, word)

stmt.setInt(2, count)

} catch {

case e: Exception => e.printStackTrace()

} finally {

if (stmt != null) {

stmt.close()

}

if (conn != null) {

conn.close()

}

}

}

sc.stop()

}

}

写入SQL-优化版

优化后使用 foreachPartition 保存数据,一个分区创建一个链接:cache RDD

注意:

SparkSQL 有方便的读写MySQL的方法,给参数直接调用即可但掌握这个方法很重要,因为SparkSQL不是支持所有类型的数据库

package icu.wzk

import org.apache.spark.rdd.RDD

import org.apache.spark.{ SparkConf, SparkContext}

import java.sql.{ Connection, DriverManager, PreparedStatement}

object SuperWordCount3 {

private val stopWords = "in on to from by a an the is are were was i we you your he his".split("\\s+")

private val punctuation = "[\\)\\.,:;'!\\?]"

private val username = "hive"

private val password = "hive@wzk.icu"

private val url = "jdbc:mysql://h122.wzk.icu:3306/spark-test?useUnicode=true&characterEncoding=utf-8&useSSL=false"

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

.setAppName("ScalaSuperWordCount2")

.setMaster("local[*]")

val sc = new SparkContext(conf)

sc.setLogLevel("WARN")

val lines: RDD[String] = sc.textFile(args(0))

val words: RDD[String] = lines

.flatMap(_.split("\\s+"))

.map(_.trim.toLowerCase())

val clearWords: RDD[String] = words

.filter(!stopWords.contains(_))

.map(_.replaceAll(punctuation, ""))

val result: RDD[(String, Int)] = clearWords

.map((_, 1))

.reduceByKey(_ + _)

.sortBy(_._2, false)

result.foreach(println)

result.foreachPartition(saveAsMySQL)

sc.stop()

}

def saveAsMySQL(iter: Iterator[(String, Int)]): Unit = {

var conn: Connection = null

var stmt: PreparedStatement = null

var sql = "insert into wordcount values(?, ?)"

try {

conn = DriverManager.getConnection(url, username, password)

stmt = conn.prepareStatement(sql)

iter.foreach{

case (word, count) =>

stmt.setString(1, word)

stmt.setInt(2, count)

}

} catch {

case e: Exception => e.printStackTrace()

} finally {

if (stmt != null) {

stmt.close()

}

if (conn != null) {

conn.close()

}

}

}

}

打包上传

mvn clean package

打包并上传到项目:

在这里插入图片描述

运行项目

不写入SQL版

<code>spark-submit --master local[*] --class icu.wzk.SuperWordCount1 spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/goodtbl.java

运行结果如下图:

在这里插入图片描述

写入SQL-未优化版

<code>spark-submit --master local[*] --class icu.wzk.SuperWordCount2 spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/goodtbl.java

写入SQL-优化版

spark-submit --master local[*] --class icu.wzk.SuperWordCount3 spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/goodtbl.java

运行结果如下图:

在这里插入图片描述

查看数据

查看数据库,内容如下:

在这里插入图片描述



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。