MapReduce 实践题:Web 访问日志分析与异常检测

撕得失败的标签 2024-07-22 14:33:02 阅读 55

文章目录

作业描述MapReduce 实践题:Web 访问日志分析与异常检测题目背景数据集说明任务要求输入数据示例输出数据示例实现步骤

解题思路1. 数据预处理2. 访问统计3. 异常检测4. 主方法5. 结果输出

作业描述

MapReduce 实践题:Web 访问日志分析与异常检测

题目背景

你被要求设计和实现一个基于 MapReduce 的大规模 Web 访问日志分析与异常检测系统。该系统的目标是从每日数百万条访问日志中提取有用的信息,并检测出潜在的异常访问行为。访问日志文件格式如下:

<code>127.0.0.1 - - [10/Oct/2021:13:55:36 -0700] "GET /index.html HTTP/1.1" 200 1043

192.168.0.1 - - [10/Oct/2021:13:56:12 -0700] "POST /login HTTP/1.1" 200 2326

...

数据集说明

IP 地址:例如,127.0.0.1时间戳:例如,[10/Oct/2021:13:55:36 -0700]请求方法:例如,"GET""POST"请求 URL:例如,"/index.html"HTTP 响应码:例如,200404500响应大小:例如,1043

任务要求

数据预处理

解析每条日志记录,提取以下字段:IP 地址、请求时间、请求方法、请求 URL、HTTP 响应码、响应大小。将解析后的数据格式化为结构化格式(例如,JSON)。 访问统计

统计每个 IP 地址在一天中的访问次数。统计每个请求 URL 在一天中的访问次数。 异常检测

检测异常高的访问频率:对于每个 IP 地址,计算访问次数的平均值和标准差,标记访问次数超过均值加三倍标准差的 IP 地址。检测潜在的恶意请求:检测 HTTP 响应码为 4xx 和 5xx 的请求,统计每个 IP 地址的异常请求次数,并标记异常请求次数占总请求次数比例超过 20% 的 IP 地址。 结果输出

输出访问统计结果:每个 IP 地址的访问次数,每个请求 URL 的访问次数。输出异常检测结果:异常高访问频率的 IP 地址及其访问次数,潜在的恶意请求 IP 地址及其异常请求次数和总请求次数的比例。

输入数据示例

127.0.0.1 - - [10/Oct/2021:13:55:36 -0700] "GET /index.html HTTP/1.1" 200 1043

192.168.0.1 - - [10/Oct/2021:13:56:12 -0700] "POST /login HTTP/1.1" 200 2326

...

输出数据示例

访问统计结果

IP访问次数:

127.0.0.1 150

192.168.0.1 200

URL访问次数:

/index.html 300

/login 400

异常检测结果

异常高访问频率 IP:

192.168.0.1 1200

潜在恶意请求 IP:

127.0.0.1 50 25.0%

实现步骤

数据预处理 Mapper

解析日志记录,提取必要字段并输出结构化数据。 访问统计 Mapper 和 Reducer

Mapper:统计每个 IP 地址和每个 URL 的访问次数。Reducer:汇总每个 IP 地址和每个 URL 的访问次数。 异常检测 Mapper 和 Reducer

Mapper:计算每个 IP 地址的访问次数,检测 HTTP 响应码为 4xx 和 5xx 的请求。Reducer:计算每个 IP 地址访问次数的均值和标准差,标记异常高访问频率的 IP 地址;统计每个 IP 地址的异常请求次数并计算异常请求比例,标记潜在的恶意请求 IP 地址。

解题思路

数据预处理 Mapper:解析日志记录,提取必要字段并输出结构化数据。访问统计 Mapper 和 Reducer:统计每个 IP 地址和每个 URL 的访问次数。异常检测 Mapper 和 Reducer:计算每个 IP 地址的访问次数,检测 HTTP 响应码为 4xx 和 5xx 的请求。主方法:设置三个 MapReduce 作业:数据预处理、访问统计和异常检测。

1. 数据预处理

PreprocessMapper

package org.example.mapreduce.t1;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**

* @author 撕得失败的标签

* @version 1.0

* @description: 数据预处理

* @date 2024/6/22 22:35

*/

public class PreprocessMapper extends Mapper<LongWritable, Text, Text, Text> {

/**

* @description:

* 1. 解析每条日志记录,提取以下字段:IP 地址、请求时间、请求方法、请求 URL、HTTP 响应码、响应大小。

* 2. 将解析后的数据格式化为结构化格式(例如,JSON)。

* @author 撕得失败的标签

* @date 2024/6/23 11:09

*/

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String line = value.toString();

String[] strings = line.split(" ");

if (strings.length == 10) {

// 提取匹配到的字段

String ipAddress = strings[0];

String timestamp = strings[3] + " " + strings[4];

String requestMethod = strings[5];

String requestUrl = strings[6];

String httpStatusCode = strings[8];

String responseSize = strings[9];

context.write(new Text(ipAddress), new Text(timestamp + "," + requestMethod + "," + requestUrl + "," + httpStatusCode + "," + responseSize));

}

}

}

2. 访问统计

AccessStatistics

package org.example.mapreduce.t1;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.LongWritable;

import java.io.IOException;

/**

* @author 撕得失败的标签

* @version 1.0

* @description: 访问统计

* @date 2024/6/22 22:55

*/

public class AccessStatistics {

/**

* @description:

* 1. 统计每个 IP 地址在一天中的访问次数。

* 2. 统计每个请求 URL 在一天中的访问次数。

* @author 撕得失败的标签

* @date 2024/6/23 11:08

*/

public static class Map extends Mapper<LongWritable, Text, Text, LongWritable> {

@Override

protected void map(LongWritable key, Text value, Context context) throws InterruptedException, IOException {

String line = value.toString();

String[] strings = line.split(" ");

// 统计一天的,以 20/Jun/2024 为例

if (strings[3].contains("20/Jun/2024")) {

// IP

context.write(new Text(strings[0]), new LongWritable(1));

// URL

context.write(new Text(strings[6]), new LongWritable(1));

}

}

}

public static class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {

@Override

protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

long sum = 0;

for (LongWritable value : values) {

sum += value.get();

}

context.write(key, new LongWritable(sum));

}

}

}

3. 异常检测

AnomalyDetection

package org.example.mapreduce.t1;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.LongWritable;

import java.io.IOException;

import java.util.HashMap;

/**

* @author 撕得失败的标签

* @version 1.0

* @description: 异常检测

* @date 2024/6/23 11:08

*/

public class AnomalyDetection {

/**

* @description:

* 1. 检测异常高的访问频率:对于每个 IP 地址,计算访问次数的平均值和标准差,标记访问次数超过均值加三倍标准差的 IP 地址。

* 2. 检测潜在的恶意请求:检测 HTTP 响应码为 4xx 和 5xx 的请求,

* 统计每个 IP 地址的异常请求次数,

* 并标记异常请求次数占总请求次数比例超过 20% 的 IP 地址。

* @author 撕得失败的标签

* @date 2024/6/23 11:08

*/

public static class Map extends Mapper<LongWritable, Text, Text, LongWritable> {

@Override

protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

String[] strings = value.toString().split(" ");

String ip = strings[0];

context.write(new Text(ip), new LongWritable(1));

String httpStatusCode = strings[8];

if (httpStatusCode.startsWith("4") || httpStatusCode.startsWith("5")) {

String anomaly = "+" + ip;

context.write(new Text(anomaly), new LongWritable(1));

}

}

}

public static class Reduce extends Reducer<Text, LongWritable, Text, LongWritable> {

private final HashMap<String, Long> ipToCount = new HashMap<String, Long>();

private final HashMap<String, Long> ipToAnomalyCount = new HashMap<String, Long>();

@Override

protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {

long sum = 0;

for (LongWritable value : values) {

sum += value.get();

}

// context.write(key, new LongWritable(sum));

String ip = key.toString();

if (ip.startsWith("+")) {

ip = ip.substring(1);

ipToAnomalyCount.put(ip, sum);

}

ipToCount.put(ip, sum);

}

@Override

protected void cleanup(Context context) throws IOException, InterruptedException {

// 实现异常检测的逻辑

long sum = 0;

for (String k : ipToCount.keySet()) {

sum += ipToCount.get(k);

}

double avg = (double) (sum / ipToCount.size());

double std = 0;

for (String k : ipToCount.keySet()) {

std += Math.pow(ipToCount.get(k) - avg, 2);

}

// 异常高访问频率 IP

for (String k : ipToCount.keySet()) {

if (ipToCount.get(k) > avg + 3 * std) {

context.write(new Text(k), new LongWritable(ipToCount.get(k)));

}

}

// 潜在恶意请求 IP

for (String k : ipToAnomalyCount.keySet()) {

double anomaly = (double) ipToAnomalyCount.get(k) / ipToCount.get(k);

if (anomaly > 0.2) {

context.write(new Text(k + "\t" + String.format("%.1f", anomaly * 100) + "%"), new LongWritable(ipToAnomalyCount.get(k)));

}

}

}

}

}

4. 主方法

Main

package org.example.mapreduce.t1;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

import java.io.BufferedReader;

import java.io.InputStreamReader;

import java.util.LinkedList;

import java.util.List;

/**

* @author 撕得失败的标签

* @version 1.0

* @description: 主方法

* @date 2024/6/22 22:34

*/

public class Main {

public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

// 创建配置信息

Configuration conf = new Configuration();

conf.set("fs.default.name","hdfs://hadoop102:9000");

// 1. 数据预处理 PreprocessMapper

Job preprocessJob = Job.getInstance(conf, "preprocess job");

preprocessJob.setJarByClass(Main.class);

preprocessJob.setMapperClass(PreprocessMapper.class);

preprocessJob.setOutputKeyClass(Text.class);

preprocessJob.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(preprocessJob, new Path("/m1"));

FileSystem fs = FileSystem.get(conf);

Path outPath = new Path("/t1/preprocess");

if(fs.exists(outPath)) {

fs.delete(outPath, true);

}

FileOutputFormat.setOutputPath(preprocessJob, outPath);

preprocessJob.waitForCompletion(true);

// 2. 访问统计 AccessStatistics

Job accessStatisticsJob = Job.getInstance(conf, "access statistics job");

accessStatisticsJob.setJarByClass(Main.class);

accessStatisticsJob.setMapperClass(AccessStatistics.Map.class);

accessStatisticsJob.setReducerClass(AccessStatistics.Reduce.class);

accessStatisticsJob.setOutputKeyClass(Text.class);

accessStatisticsJob.setOutputValueClass(LongWritable.class);

FileInputFormat.addInputPath(accessStatisticsJob, new Path("/m1"));

FileSystem fs1 = FileSystem.get(conf);

Path outPath1 = new Path("/t1/statistics");

if(fs1.exists(outPath1)) {

fs1.delete(outPath1, true);

}

FileOutputFormat.setOutputPath(accessStatisticsJob, outPath1);

accessStatisticsJob.waitForCompletion(true);

// 3. 异常检测 AnomalyDetection

Job anomalyDetectionJob = Job.getInstance(conf, "anomaly detection job");

anomalyDetectionJob.setJarByClass(Main.class);

anomalyDetectionJob.setMapperClass(AnomalyDetection.Map.class);

anomalyDetectionJob.setReducerClass(AnomalyDetection.Reduce.class);

anomalyDetectionJob.setOutputKeyClass(Text.class);

anomalyDetectionJob.setOutputValueClass(LongWritable.class);

FileInputFormat.addInputPath(anomalyDetectionJob, new Path("/m1"));

FileSystem fs2 = FileSystem.get(conf);

Path outPath2 = new Path("/t1/anomaly");

if(fs2.exists(outPath2)) {

fs2.delete(outPath2, true);

}

FileOutputFormat.setOutputPath(anomalyDetectionJob, outPath2);

anomalyDetectionJob.waitForCompletion(true);

// 4. 输出结果 Output

// 访问统计结果:

FileSystem fs3 = FileSystem.get(conf);

Path outPath3 = new Path("/t1/statistics/part-r-00000");

BufferedReader br = new BufferedReader(new InputStreamReader(fs3.open(outPath3)));

List<String> ip = new LinkedList<String>();

List<String> url = new LinkedList<String>();

String line;

while ((line = br.readLine()) != null) {

if (line.startsWith("/")) {

url.add(line);

} else {

ip.add(line);

}

}

// IP访问次数:

System.out.println("\nIP访问次数:");

for (String s : ip) {

System.out.println(s);

}

// URL访问次数:

System.out.println("\nURL访问次数:");

for (String s : url) {

System.out.println(s);

}

// 异常检测结果:

FileSystem fs4 = FileSystem.get(conf);

Path outPath4 = new Path("/t1/anomaly/part-r-00000");

BufferedReader br1 = new BufferedReader(new InputStreamReader(fs4.open(outPath4)));

List<String> potential = new LinkedList<String>();

List<String> anomaly = new LinkedList<String>();

String line1;

while ((line1 = br1.readLine()) != null) {

String[] strings = line1.split("\t");

if (strings.length == 2) {

anomaly.add(line1);

} else {

potential.add(line1);

}

}

// 异常高访问频率 IP:

System.out.println("\n异常高访问频率 IP:");

if (anomaly.size() == 0) {

System.out.println("无");

} else {

for (String s : anomaly) {

System.out.println(s);

}

}

// 潜在恶意请求 IP:

System.out.println("\n潜在异常高访问频率 IP:");

if (potential.size() == 0) {

System.out.println("无");

} else {

for (String s : potential) {

String[] strings = s.split("\t");

System.out.println(strings[0] + "\t" + strings[2] + "\t" + strings[1]);

}

}

}

}

5. 结果输出

IP访问次数:

10.0.0.1334003

10.0.0.2334350

10.0.0.3333056

10.0.0.4333947

10.0.0.5333263

127.0.0.1332347

127.0.0.2333025

127.0.0.3332450

127.0.0.4333005

127.0.0.5333428

192.168.0.1334054

192.168.0.2332883

192.168.0.3333681

192.168.0.4333133

192.168.0.5333375

URL访问次数:

/cart713975

/checkout713453

/contact715382

/home.html712570

/index.html715544

/login714255

/products714821

异常高访问频率 IP:

潜在异常高访问频率 IP:

192.168.0.222249866.8%

192.168.0.122216566.5%

127.0.0.522177866.5%

192.168.0.422209666.7%

127.0.0.422215666.7%

192.168.0.322222766.6%

192.168.0.522207066.6%

10.0.0.422224366.6%

10.0.0.322196666.6%

10.0.0.522234766.7%

10.0.0.222266466.6%

10.0.0.122249366.6%

127.0.0.322146466.6%

127.0.0.222219766.7%

127.0.0.122170266.7%

Process finished with exit code 0



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。