MapReduce 实践题:Web 访问日志分析与异常检测
撕得失败的标签 2024-07-22 14:33:02 阅读 55
文章目录
作业描述MapReduce 实践题:Web 访问日志分析与异常检测题目背景数据集说明任务要求输入数据示例输出数据示例实现步骤
解题思路1. 数据预处理2. 访问统计3. 异常检测4. 主方法5. 结果输出
作业描述
MapReduce 实践题:Web 访问日志分析与异常检测
题目背景
你被要求设计和实现一个基于 MapReduce 的大规模 Web 访问日志分析与异常检测系统。该系统的目标是从每日数百万条访问日志中提取有用的信息,并检测出潜在的异常访问行为。访问日志文件格式如下:
<code>127.0.0.1 - - [10/Oct/2021:13:55:36 -0700] "GET /index.html HTTP/1.1" 200 1043
192.168.0.1 - - [10/Oct/2021:13:56:12 -0700] "POST /login HTTP/1.1" 200 2326
...
数据集说明
IP 地址:例如,127.0.0.1
。时间戳:例如,[10/Oct/2021:13:55:36 -0700]
。请求方法:例如,"GET"
或 "POST"
。请求 URL:例如,"/index.html"
。HTTP 响应码:例如,200
、404
或 500
。响应大小:例如,1043
。
任务要求
数据预处理:
解析每条日志记录,提取以下字段:IP 地址、请求时间、请求方法、请求 URL、HTTP 响应码、响应大小。将解析后的数据格式化为结构化格式(例如,JSON)。 访问统计:
统计每个 IP 地址在一天中的访问次数。统计每个请求 URL 在一天中的访问次数。 异常检测:
检测异常高的访问频率:对于每个 IP 地址,计算访问次数的平均值和标准差,标记访问次数超过均值加三倍标准差的 IP 地址。检测潜在的恶意请求:检测 HTTP 响应码为 4xx 和 5xx 的请求,统计每个 IP 地址的异常请求次数,并标记异常请求次数占总请求次数比例超过 20% 的 IP 地址。 结果输出:
输出访问统计结果:每个 IP 地址的访问次数,每个请求 URL 的访问次数。输出异常检测结果:异常高访问频率的 IP 地址及其访问次数,潜在的恶意请求 IP 地址及其异常请求次数和总请求次数的比例。
输入数据示例
127.0.0.1 - - [10/Oct/2021:13:55:36 -0700] "GET /index.html HTTP/1.1" 200 1043
192.168.0.1 - - [10/Oct/2021:13:56:12 -0700] "POST /login HTTP/1.1" 200 2326
...
输出数据示例
访问统计结果:
IP访问次数:
127.0.0.1 150
192.168.0.1 200
URL访问次数:
/index.html 300
/login 400
异常检测结果:
异常高访问频率 IP:
192.168.0.1 1200
潜在恶意请求 IP:
127.0.0.1 50 25.0%
实现步骤
数据预处理 Mapper:
解析日志记录,提取必要字段并输出结构化数据。 访问统计 Mapper 和 Reducer:
Mapper:统计每个 IP 地址和每个 URL 的访问次数。Reducer:汇总每个 IP 地址和每个 URL 的访问次数。 异常检测 Mapper 和 Reducer:
Mapper:计算每个 IP 地址的访问次数,检测 HTTP 响应码为 4xx 和 5xx 的请求。Reducer:计算每个 IP 地址访问次数的均值和标准差,标记异常高访问频率的 IP 地址;统计每个 IP 地址的异常请求次数并计算异常请求比例,标记潜在的恶意请求 IP 地址。
解题思路
数据预处理 Mapper:解析日志记录,提取必要字段并输出结构化数据。访问统计 Mapper 和 Reducer:统计每个 IP 地址和每个 URL 的访问次数。异常检测 Mapper 和 Reducer:计算每个 IP 地址的访问次数,检测 HTTP 响应码为 4xx 和 5xx 的请求。主方法:设置三个 MapReduce 作业:数据预处理、访问统计和异常检测。
1. 数据预处理
PreprocessMapper
package org.example.mapreduce.t1;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author 撕得失败的标签
* @version 1.0
* @description: 数据预处理
* @date 2024/6/22 22:35
*/
public class PreprocessMapper extends Mapper<LongWritable, Text, Text, Text> {
/**
* @description:
* 1. 解析每条日志记录,提取以下字段:IP 地址、请求时间、请求方法、请求 URL、HTTP 响应码、响应大小。
* 2. 将解析后的数据格式化为结构化格式(例如,JSON)。
* @author 撕得失败的标签
* @date 2024/6/23 11:09
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] strings = line.split(" ");
if (strings.length == 10) {
// 提取匹配到的字段
String ipAddress = strings[0];
String timestamp = strings[3] + " " + strings[4];
String requestMethod = strings[5];
String requestUrl = strings[6];
String httpStatusCode = strings[8];
String responseSize = strings[9];
context.write(new Text(ipAddress), new Text(timestamp + "," + requestMethod + "," + requestUrl + "," + httpStatusCode + "," + responseSize));
}
}
}
2. 访问统计
AccessStatistics
package org.example.mapreduce.t1;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import java.io.IOException;
/**
* @author 撕得失败的标签
* @version 1.0
* @description: 访问统计
* @date 2024/6/22 22:55
*/
public class AccessStatistics {
/**
* @description:
* 1. 统计每个 IP 地址在一天中的访问次数。
* 2. 统计每个请求 URL 在一天中的访问次数。
* @author 撕得失败的标签
* @date 2024/6/23 11:08
*/
public static class Map extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws InterruptedException, IOException {
String line = value.toString();
String[] strings = line.split(" ");
// 统计一天的,以 20/Jun/2024 为例
if (strings[3].contains("20/Jun/2024")) {
// IP
context.write(new Text(strings[0]), new LongWritable(1));
// URL
context.write(new Text(strings[6]), new LongWritable(1));
}
}
}
public static class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
for (LongWritable value : values) {
sum += value.get();
}
context.write(key, new LongWritable(sum));
}
}
}
3. 异常检测
AnomalyDetection
package org.example.mapreduce.t1;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.LongWritable;
import java.io.IOException;
import java.util.HashMap;
/**
* @author 撕得失败的标签
* @version 1.0
* @description: 异常检测
* @date 2024/6/23 11:08
*/
public class AnomalyDetection {
/**
* @description:
* 1. 检测异常高的访问频率:对于每个 IP 地址,计算访问次数的平均值和标准差,标记访问次数超过均值加三倍标准差的 IP 地址。
* 2. 检测潜在的恶意请求:检测 HTTP 响应码为 4xx 和 5xx 的请求,
* 统计每个 IP 地址的异常请求次数,
* 并标记异常请求次数占总请求次数比例超过 20% 的 IP 地址。
* @author 撕得失败的标签
* @date 2024/6/23 11:08
*/
public static class Map extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] strings = value.toString().split(" ");
String ip = strings[0];
context.write(new Text(ip), new LongWritable(1));
String httpStatusCode = strings[8];
if (httpStatusCode.startsWith("4") || httpStatusCode.startsWith("5")) {
String anomaly = "+" + ip;
context.write(new Text(anomaly), new LongWritable(1));
}
}
}
public static class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {
private final HashMap<String, Long> ipToCount = new HashMap<String, Long>();
private final HashMap<String, Long> ipToAnomalyCount = new HashMap<String, Long>();
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
for (LongWritable value : values) {
sum += value.get();
}
// context.write(key, new LongWritable(sum));
String ip = key.toString();
if (ip.startsWith("+")) {
ip = ip.substring(1);
ipToAnomalyCount.put(ip, sum);
}
ipToCount.put(ip, sum);
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
// 实现异常检测的逻辑
long sum = 0;
for (String k : ipToCount.keySet()) {
sum += ipToCount.get(k);
}
double avg = (double) (sum / ipToCount.size());
double std = 0;
for (String k : ipToCount.keySet()) {
std += Math.pow(ipToCount.get(k) - avg, 2);
}
// 异常高访问频率 IP
for (String k : ipToCount.keySet()) {
if (ipToCount.get(k) > avg + 3 * std) {
context.write(new Text(k), new LongWritable(ipToCount.get(k)));
}
}
// 潜在恶意请求 IP
for (String k : ipToAnomalyCount.keySet()) {
double anomaly = (double) ipToAnomalyCount.get(k) / ipToCount.get(k);
if (anomaly > 0.2) {
context.write(new Text(k + "\t" + String.format("%.1f", anomaly * 100) + "%"), new LongWritable(ipToAnomalyCount.get(k)));
}
}
}
}
}
4. 主方法
Main
package org.example.mapreduce.t1;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.LinkedList;
import java.util.List;
/**
* @author 撕得失败的标签
* @version 1.0
* @description: 主方法
* @date 2024/6/22 22:34
*/
public class Main {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 创建配置信息
Configuration conf = new Configuration();
conf.set("fs.default.name","hdfs://hadoop102:9000");
// 1. 数据预处理 PreprocessMapper
Job preprocessJob = Job.getInstance(conf, "preprocess job");
preprocessJob.setJarByClass(Main.class);
preprocessJob.setMapperClass(PreprocessMapper.class);
preprocessJob.setOutputKeyClass(Text.class);
preprocessJob.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(preprocessJob, new Path("/m1"));
FileSystem fs = FileSystem.get(conf);
Path outPath = new Path("/t1/preprocess");
if(fs.exists(outPath)) {
fs.delete(outPath, true);
}
FileOutputFormat.setOutputPath(preprocessJob, outPath);
preprocessJob.waitForCompletion(true);
// 2. 访问统计 AccessStatistics
Job accessStatisticsJob = Job.getInstance(conf, "access statistics job");
accessStatisticsJob.setJarByClass(Main.class);
accessStatisticsJob.setMapperClass(AccessStatistics.Map.class);
accessStatisticsJob.setReducerClass(AccessStatistics.Reduce.class);
accessStatisticsJob.setOutputKeyClass(Text.class);
accessStatisticsJob.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(accessStatisticsJob, new Path("/m1"));
FileSystem fs1 = FileSystem.get(conf);
Path outPath1 = new Path("/t1/statistics");
if(fs1.exists(outPath1)) {
fs1.delete(outPath1, true);
}
FileOutputFormat.setOutputPath(accessStatisticsJob, outPath1);
accessStatisticsJob.waitForCompletion(true);
// 3. 异常检测 AnomalyDetection
Job anomalyDetectionJob = Job.getInstance(conf, "anomaly detection job");
anomalyDetectionJob.setJarByClass(Main.class);
anomalyDetectionJob.setMapperClass(AnomalyDetection.Map.class);
anomalyDetectionJob.setReducerClass(AnomalyDetection.Reduce.class);
anomalyDetectionJob.setOutputKeyClass(Text.class);
anomalyDetectionJob.setOutputValueClass(LongWritable.class);
FileInputFormat.addInputPath(anomalyDetectionJob, new Path("/m1"));
FileSystem fs2 = FileSystem.get(conf);
Path outPath2 = new Path("/t1/anomaly");
if(fs2.exists(outPath2)) {
fs2.delete(outPath2, true);
}
FileOutputFormat.setOutputPath(anomalyDetectionJob, outPath2);
anomalyDetectionJob.waitForCompletion(true);
// 4. 输出结果 Output
// 访问统计结果:
FileSystem fs3 = FileSystem.get(conf);
Path outPath3 = new Path("/t1/statistics/part-r-00000");
BufferedReader br = new BufferedReader(new InputStreamReader(fs3.open(outPath3)));
List<String> ip = new LinkedList<String>();
List<String> url = new LinkedList<String>();
String line;
while ((line = br.readLine()) != null) {
if (line.startsWith("/")) {
url.add(line);
} else {
ip.add(line);
}
}
// IP访问次数:
System.out.println("\nIP访问次数:");
for (String s : ip) {
System.out.println(s);
}
// URL访问次数:
System.out.println("\nURL访问次数:");
for (String s : url) {
System.out.println(s);
}
// 异常检测结果:
FileSystem fs4 = FileSystem.get(conf);
Path outPath4 = new Path("/t1/anomaly/part-r-00000");
BufferedReader br1 = new BufferedReader(new InputStreamReader(fs4.open(outPath4)));
List<String> potential = new LinkedList<String>();
List<String> anomaly = new LinkedList<String>();
String line1;
while ((line1 = br1.readLine()) != null) {
String[] strings = line1.split("\t");
if (strings.length == 2) {
anomaly.add(line1);
} else {
potential.add(line1);
}
}
// 异常高访问频率 IP:
System.out.println("\n异常高访问频率 IP:");
if (anomaly.size() == 0) {
System.out.println("无");
} else {
for (String s : anomaly) {
System.out.println(s);
}
}
// 潜在恶意请求 IP:
System.out.println("\n潜在异常高访问频率 IP:");
if (potential.size() == 0) {
System.out.println("无");
} else {
for (String s : potential) {
String[] strings = s.split("\t");
System.out.println(strings[0] + "\t" + strings[2] + "\t" + strings[1]);
}
}
}
}
5. 结果输出
IP访问次数:
10.0.0.1334003
10.0.0.2334350
10.0.0.3333056
10.0.0.4333947
10.0.0.5333263
127.0.0.1332347
127.0.0.2333025
127.0.0.3332450
127.0.0.4333005
127.0.0.5333428
192.168.0.1334054
192.168.0.2332883
192.168.0.3333681
192.168.0.4333133
192.168.0.5333375
URL访问次数:
/cart713975
/checkout713453
/contact715382
/home.html712570
/index.html715544
/login714255
/products714821
异常高访问频率 IP:
无
潜在异常高访问频率 IP:
192.168.0.222249866.8%
192.168.0.122216566.5%
127.0.0.522177866.5%
192.168.0.422209666.7%
127.0.0.422215666.7%
192.168.0.322222766.6%
192.168.0.522207066.6%
10.0.0.422224366.6%
10.0.0.322196666.6%
10.0.0.522234766.7%
10.0.0.222266466.6%
10.0.0.122249366.6%
127.0.0.322146466.6%
127.0.0.222219766.7%
127.0.0.122170266.7%
Process finished with exit code 0
上一篇: [240625] Continue -- 开源 Copilot | Web-Check 网站分析工具 | Story of EOL
下一篇: 【vue工作随笔】前端配合后端进行文件下载的方法记录
本文标签
声明
本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。