JAVA基础之七-Collection和它的并行和流处理

cnblogs 2024-09-30 16:39:01 阅读 78

Collection 翻下词典,有许多含义:

收集;聚集;(常指同类的)收藏品;募捐,募集;作品集;聚积;取走;一群人;拿走;(常为季节性推出的)系列时装(或家用品);一批物品

选择“集合”作为翻译名,我觉得可行,除非我们现在重新创造一个汉语词组。

对于CRUD和非CRUD,集合都是一个无比重要的东西,因为计算机的本质是对信息的处理。

信息一般不是单个,是一堆,一堆堆,一块块,一个个....

网上关于集合的资料无比多,所以本文主要是做一个简要的介绍,并添加一些注意事项和个人感悟。

一、简介

不过Collection的子孙过于多,用现有词汇命名这些子孙并不容易,有待创建新的词汇。

常用知名子孙有:

List -- 列表,javaDoc的释义是:有序集合。

--ArrayList 动态大小列表 ,这是crud中最常用的类型 。不保证顺序

--LinkedList 双链列表,可以固定成员顺序。本身实现了Deque的接口,可用于辅助实现FiLo的算法

Set - 无重复集合,允许有一个null成员

---TreeSet 有序集合

-- HastSet 哈希集合 ,主要是操作的性能好一些

-- LinkedHashSet 双向链哈希集合,保持了插入顺序,又具有对应的性能

Queue -队列

--Deque 双端操作队列。它有一个著名的实现 LinkedList

Buffer --缓冲

不过这个主要是阿帕奇的实现org.apache.commons.collections.Buffer,算不得java的基础类型

如果是初级程序员,或者以CRUD为主的,那么只要学些掌握ArrayList就差不多了,因为现在的大部分的ORM或者JDBC的上级实现都适用ArrayList来存储数据集。

二、集合的基本方法

仅仅介绍Collection的接口方法,为了便于理解,以LinkedList为例子。

这些方法都极其简单,也没有什么特别好解释的,直接上例子吧!

package study.base.types.collection.list;

import java.util.*;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.stream.Collectors;

/**

* 演示Collection接口的基本操作和LinkedList的一些典型操作

* @author lto

*/

public class TestLinkedList {

private LinkedList<MoneyJar> list;

private String[] givers = new String[]{"爸爸","妈妈","哥哥","姐姐","爷爷","奶奶"};

private Random random = new Random();

private Map<String,Long> realGivers;

public TestLinkedList(int size) {

this.list = new LinkedList<>();

this.realGivers = new HashMap<>();

//插入100个MoneyJar,金额和日期都是随机的,giver是随机

for (int i = 0; i < size; i++) {

String giver = givers[random.nextInt(givers.length)];

int amount = random.nextInt(100);

this.list.add(new MoneyJar(giver, amount, new Date()));

}

//按照giver分组统计个数,并赋值给realGivers

this.list.stream().collect(Collectors.groupingBy(MoneyJar::giver,

Collectors.counting())).forEach((k,v)->{

realGivers.put(k,(long)v);});

//打印realGivers

this.realGivers.forEach((k,v)->{System.out.println(String.format("%s共有%d个", k, v));});

}

public void count(){

long start = System.currentTimeMillis();

final long[] total = {0};

this.list.spliterator().forEachRemaining(mj-> total[0] += mj.amount());

System.out.println(String.format("总共%d元",total[0]));

System.out.println("耗费时间:"+(System.currentTimeMillis()-start));

}

public void sortByAmount(){

this.list.sort((o1, o2) -> o1.amount().compareTo(o2.amount()));

}

/**

* 统计每个giver给的钱,并打印结果

*/

public void sumByGiver(){

System.out.println("--------------------****************-----------------------------");

//根据giver分组统计每个giver给的钱,并返回一个ListMap

long start = System.currentTimeMillis();

Map<String, Integer> result= this.list.stream().collect(Collectors.groupingBy(MoneyJar::giver,

Collectors.summingInt(MoneyJar::amount)));

//打印统计结果

result.forEach((k,v)->{System.out.println(String.format("%s给的钱是%d", k, v));});

System.out.println("耗费时间:"+(System.currentTimeMillis()-start));

//采用for循环的方式,分组统计

System.out.println("采用for循环的方式,分组统计-----------------------------");

long start1 = System.currentTimeMillis();

Map<String,List<Integer>> result1= new HashMap<>();

//初始化result1,把realGivers的每个元素作为key,初始值为0

this.realGivers.forEach((k,v)->{

result1.put(k,new ArrayList<>());

});

//遍历list,计算每个giver给的钱

for (MoneyJar moneyJar : list) {

result1.get(moneyJar.giver()).add(moneyJar.amount());

}

//根据result1的成员个数,创建对应的线程,然后在线程中计算每个giver给的钱,并计算总和

int numThreads=result1.size();

CountDownLatch latch = new CountDownLatch(numThreads);

ExecutorService executor = Executors.newFixedThreadPool(numThreads);

result1.forEach((k,v)->{

Runnable worker = () -> {

try {

long sum=0;

for (int i : v) {

sum+=i;

}

System.out.println(String.format("%s给的钱是%d", k, sum));

} finally {

latch.countDown(); // 计数减一

}

};

// 使用executor提交任务,而不是直接启动Thread

executor.submit(worker);

});

try {

// 等待所有线程完成

latch.await();

System.out.println("All threads have finished.");

} catch (InterruptedException e) {

e.printStackTrace();

}

// 关闭executor,释放资源

executor.shutdown();

System.out.println("耗费时间:"+(System.currentTimeMillis()-start1));

}

public void splitToSum(){

//把钱罐的钱分为n份,分别统计,然后再合并总的金额,并统计耗费时间

System.out.println("-- 采用并行流的方法");

long start = System.currentTimeMillis();

Long total=list.parallelStream().mapToLong(MoneyJar::amount).sum();

System.out.println("耗费时间:"+(System.currentTimeMillis()-start));

System.out.println("总金额是"+total.toString());

//采用传统的for循环方式累积

System.out.println("-- 采用传统的for循环的方法");

start = System.currentTimeMillis();

long sum=0;

for (MoneyJar moneyJar : list) {

sum+=moneyJar.amount();

}

System.out.println("总金额是"+sum);

System.out.println("耗费时间:"+(System.currentTimeMillis()-start));

}

/**

* 把小于等于指定的金额的钱都清理掉

* @param amount

*/

public void purgeSmallMoney(int amount){

this.list.removeIf(moneyJar -> moneyJar.amount()<=amount);

}

record MoneyJar(String giver,Integer amount,

Date putDay){

}

public static void main(String[] args) {

//当10万个的时候,并行的速度反而是for的3倍左右。

TestLinkedList test = new TestLinkedList(200);

test.splitToSum();

test.sortByAmount();

System.out.println("-- 排序后 -----");

for (MoneyJar moneyJar : test.list) {

System.out.println(moneyJar);

}

//测试100万的情况

TestLinkedList test100 = new TestLinkedList(1000000);

test100.splitToSum();

//测试2000万的情况

TestLinkedList test1000 = new TestLinkedList(20000000);

test1000.splitToSum();

//以上三个例子,哈无例外,都是简单的循环胜出。那么parametrizedStream的效率就值得怀疑了。

//是否因为没有正确设置并行度,还是计算机的环境存在问题

test1000.sumByGiver();

test1000.count();

}

}

以上例子并没有测试每一个接口方法,是因为有些太简单不值得浪费篇幅。

三、并行处理和流处理

在J8之前,如果把一个集合,以ArrayList为例子,进行并行处理,那么必须自己来动手,过程可能是这样的:

1.分隔集合为n个子集

2.创建n个线程,用于分别处理n个子集

3.如果需要合并处理,还需要特定注意线程的等待和合并

写起来还是相对比较麻烦的。当然,现在借助于ai,没有那么复杂。但和J8之后提供的特性相比,自然还是麻烦一些。

至于流,更不用说了,J8之前并没有这个概念。

在JDK17中,可以看到Collection接口和并发以及流有关的方法:

default Stream<E> parallelStream() {

return StreamSupport.stream(spliterator(), true);

}

@Override

default Spliterator<E> spliterator() {

return Spliterators.spliterator(this, 0);

}

default Stream<E> stream() {

return StreamSupport.stream(spliterator(), false);

}

三个都是默认方法,可以直接使用

parallelStream可以提供并行流处理。

--

根据已知的一些报告和我几次不是很严谨的测试,Stream和for相比并没有什么优势。

由此可以得出一个不是很严谨的结论:

在相当大的业务场景(crud为主的信息系统)中,甚至可以说,在大部分的业务场景中,Stream其实居于下风。

stream的作用仅仅是为了节约工程师的精力和体力

只有数据集巨大,且cpu充足的情况下,例如千万级别左右,并行流才会有一些可见的优势。但是,又有多少面向

企业基别的信息系统,会在应用级别这样疯狂地处理千万级别的数据,难道不怕jvm爆了吗?

用数据库的集合运算功能不是更好更简单吗?

四、工具类

4.1官定工具- Collections

这是集合最重要的工具类。

全路径:java.util.Collections

需要特别申明的是,Collections不仅仅会处理Colletion的子子孙孙,也会处理Map,所以不能被它的名称骗了。

由于存在JAVADOC,且这个Colllections的成员巨多,所以不逐一列出,避免浪费篇幅。

Collections方法大体包含三类:

1.运算

例如排序(sort)、翻转(reverse)、打乱(shuffle)、交换元素(swap)、填充元素(fill),经典聚集(min,max),集合运算等等

其中和经典集合运算有关的:

frequency -频率

disjoint-判断是否有交集

总之,结合Collecion自身的实现和Collections工具,要实现两个集合的并集、交集、差集、是否包含等等都是可以的,只不过有点麻烦。

2.构造特定类型的对象

a.不可修改集合(含map)

b.线程同步集合(含map)

c.锁定类型集合

d.空集合(无元素集合)

e.单元素集合(Singleton

前四个都容易理解,最后一个Singleton有点迷惑,就是为了返回只有一个成员的集合?

3.其它杂项

诸如复制、替换等等。

不过没有提供深度复制的方法。

4.2阿帕奇集合工具(CollectionUtils

相比java自带的集合工具,阿帕奇的工具主要集中在以下几个用途:

1.集合运算

这个比java官方的强大多了,所以还是用这个把。看看都有什么:

union(并集),intersection(交集),disjunction(!交集,或者独立并集),substract(移除子集),containAny(是否有交集)

isSubCollection(是否子集),isEqualCollection(是否相等),retainAll(交集),以及其它。

注意:retainAll和intersection都可以用于获取交集,但是二者还是有明显区别的,后者(intersection)会给出不重复的结果,而前者(retainAll)会给出重复的结果

以下是关于这些本人重视的集合运算方法的示例:

public void testApacheCollectionUtils(){

List<Integer> me = Arrays.asList(90, 80, 70,90,92,88);

List<Integer> mother = Arrays.asList(90, 80, 70,90,92,88);

List<Integer> auntScore = Arrays.asList(90, 80, 70,90,92,88);

List<Integer> fatherScore = Arrays.asList(99, 81, 71,90,98,88);

List<Integer> 赵云 = Arrays.asList(90,80);

List<Integer> 崔颢 = Arrays.asList(77);

List<Integer> myNewScore = (List<Integer>) CollectionUtils.union(me, 赵云);

System.out.println("我和赵云的合并∪="+myNewScore);

List<Integer> myIntersectionScore = (List<Integer>) CollectionUtils.intersection(me, fatherScore);

System.out.println("我和爸爸交集="+myIntersectionScore);

//差集

List<Integer> myDifferenceScore = (List<Integer>) CollectionUtils.subtract(me, fatherScore);

System.out.println("我和爸爸的差集="+myDifferenceScore);

//非公共部分

List<Integer> myDisJointScore = (List<Integer>) CollectionUtils.disjunction(me, fatherScore);

System.out.println("我和爸爸的非公共部分="+myDisJointScore);

//我和爸爸是否有交集

if(!CollectionUtils.containsAny(me, fatherScore)) {

System.out.println("我和爸爸没有交集");

} else {

System.out.println("我和爸爸有交集");

}

//我和崔颢是否有交集

if(!CollectionUtils.containsAny(me, 崔颢)) {

System.out.println("我和崔颢没有交集");

} else {

System.out.println("我和崔颢有交集");

}

//我和赵云的交集

List<Integer> myIntersectionScore2 = (List<Integer>) CollectionUtils.retainAll(me, 赵云);

System.out.println("我和赵云的交集(retainAll)="+myIntersectionScore2);

System.out.println("我和赵云的交集(inter)="+ CollectionUtils.intersection(me, 赵云));

//和崔颢Score的交集

List<Integer> myIntersectionScore3 = (List<Integer>) CollectionUtils.retainAll(崔颢, 赵云);

System.out.println("和崔颢的交集="+myIntersectionScore3);

//赵云是否是me的子集

if(CollectionUtils.isSubCollection(赵云, me)) {

System.out.println("赵云是me的子集");

} else {

System.out.println("赵云不是me的子集");

}

//崔颢Score是否是me的子集

if(CollectionUtils.isSubCollection(崔颢, me)) {

System.out.println("崔颢是me的子集");

} else {

System.out.println("崔颢不是me的子集");

}

//妈妈和阿姨是否一致

if(CollectionUtils.isEqualCollection(mother, auntScore)) {

System.out.println("妈妈和阿姨一致");

} else {

System.out.println("妈妈和阿姨不一致");

}

}

输出结果:

我和赵云的合并∪=[80, 70, 88, 90, 90, 92]

我和爸爸交集=[88, 90]

我和爸爸的差集=[80, 70, 90, 92]

我和爸爸的非公共部分=[80, 81, 98, 99, 70, 71, 90, 92]

我和爸爸有交集

我和崔颢没有交集

我和赵云的交集(retainAll)=[90, 80, 90]

我和赵云的交集(inter)=[80, 90]

和崔颢的交集=[]

赵云是me的子集

崔颢不是me的子集

妈妈和阿姨一致

2.元素处理

find,filter,exists,countMatches、select、collect、get、

3.构造特定类型集合

  • synchronizedCollection
  • unmodifiableCollection
  • predicatedCollection
  • typedCollection

需要注意的是,这里的几个方法,个人倾向于少用,尽量用java标准的Collections。

4.杂项

isEmpty,isNotEmpty,cardinality...

4.3其它杂项工具

现在工具有点泛滥了。这是因为复制工具代码已经很简单,再加上实在有一些个性化的需要,所以越做越多。

Spring有,JSON有,mybatis有...

这些已经泛滥的就不提了,它们主要用于一些极其个性化的,或者自认为更有效率更安全(存疑)。

4.4 小结

为安全起见,我个人都是尽量用官方的Collections和阿帕奇的CollectionUtils。

从工程角度出发,尽量少依赖也是一个大体正确的选择。

其它的不是万不得已不要用。当然各个组织也完全可以自行创建工具。

只不过,这两个工具集已包含绝大部分集合有关的操作,再结合Stream和Colllection自有的功能,应该很够用了。

五、CRUD和集合

编写crud的时候,我们可能会常常使用以下几种基于jdbc的方式创建集合:

1.使用基于jdbc的orm,例如典型的mybatis

2.基于sping的jdbcTemplate

实际是对原生jdbc的封装

3.基于原生jdbc

现在已经很少人用jpa来访问处理数据。

在绝大部分CRUD项目中,一般都用mytabis之类的Orm

所以,这里主要讨论mybatis(或者类似的框架工具即可)。

当返回集合的时候,mytais支持返回List(ArrayList),Set ,对这两个类型的支持是很友好的。

以下是方法(org.apache.ibatis.jdbc.SqlRunner#getResults,selectAll)的部分

public List<Map<String, Object>> selectAll(String sql, Object... args) throws SQLException {

try (PreparedStatement ps = connection.prepareStatement(sql)) {

setParameters(ps, args);

try (ResultSet rs = ps.executeQuery()) {

return getResults(rs);

}

}

}

private List<Map<String, Object>> getResults(ResultSet rs) throws SQLException {

List<Map<String, Object>> list = new ArrayList<>();

.....

while (rs.next()) {

Map<String, Object> row = new HashMap<>();

for (int i = 0, n = columns.size(); i < n; i++) {

String name = columns.get(i);

TypeHandler<?> handler = typeHandlers.get(i);

row.put(name.toUpperCase(Locale.ENGLISH), handler.getResult(rs, name));

}

list.add(row);

}

return list;

}

可以看出,在mybatis的底层是用ArrayList来承接原生数据集的结果的。用ArrayList是因为一个性能较好,另外一个是因为集合的数量不可测的缘故。

在不考虑极端性能的要求下,用mybatis还是不错的,因为它提供了主要的类型转换和spring的集成。

很少有人考虑使用LinkedList等其它集合来承接数据即可。

由于List实现了Collection接口,所以可以使用mybatis在获得List之后,再做流处理。

六、适用场景和挑战

集合的子孙巨多,有不同的业务场景对应,以最常见的来说:

ArrayList -- crud,随机访问性能高。但crud很少随机访问某个,一般都丢到前端处理了。

如前,Colletions提供了大量构建特定用途的集合的方法,可以让动态列表用于线程安全等场景。

LinkedList -- 双向链表用途很广,一般不CRUD的时候,常常会考虑用它,它的优缺点:

频繁进行插入和删除更高效;可以用作用作栈(Stack)和队列(Queue);保持元素插入顺序的场景实现双向遍历

缺点:随机访问慢

Set -大量的非crud的,需要保持元素唯一的情况

Queue -队列,主要用于需要堆栈操作的情况

再结合线程同步、不可修改、指定类型等等,可以细分为更多的子场景。

由于子孙太多,如果个人对每个类型的优缺点不是太明白,那么至少要知道大类的适用场景,然后再查看javaDoc/ai即可。

6.1 挑战-线程安全

如果,java工具Colleections已经提供了适用于大部分业务场景的并发集合对象,以便在线程操作情况下,能够保证安全。

以非常典型的java.util.Collections.synchronizedList(List<T>)为例子,下面是相关代码:

public static <T> List<T> synchronizedList(List<T> list) {

return (list instanceof RandomAccess ?

new SynchronizedRandomAccessList<>(list) :

new SynchronizedList<>(list));

}

static class SynchronizedList<E>

extends SynchronizedCollection<E>

implements List<E> {

@java.io.Serial

private static final long serialVersionUID = -7754090372962971524L;

@SuppressWarnings("serial") // Conditionally serializable

final List<E> list;

SynchronizedList(List<E> list) {

super(list);

this.list = list;

}

SynchronizedList(List<E> list, Object mutex) {

super(list, mutex);

this.list = list;

}

public boolean equals(Object o) {

if (this == o)

return true;

synchronized (mutex) {return list.equals(o);}

}

public int hashCode() {

synchronized (mutex) {return list.hashCode();}

}

public E get(int index) {

synchronized (mutex) {return list.get(index);}

}

public E set(int index, E element) {

synchronized (mutex) {return list.set(index, element);}

}

public void add(int index, E element) {

synchronized (mutex) {list.add(index, element);}

}

public E remove(int index) {

synchronized (mutex) {return list.remove(index);}

}

public int indexOf(Object o) {

synchronized (mutex) {return list.indexOf(o);}

}

public int lastIndexOf(Object o) {

synchronized (mutex) {return list.lastIndexOf(o);}

}

public boolean addAll(int index, Collection<? extends E> c) {

synchronized (mutex) {return list.addAll(index, c);}

}

public ListIterator<E> listIterator() {

return list.listIterator(); // Must be manually synched by user

}

public ListIterator<E> listIterator(int index) {

return list.listIterator(index); // Must be manually synched by user

}

public List<E> subList(int fromIndex, int toIndex) {

synchronized (mutex) {

return new SynchronizedList<>(list.subList(fromIndex, toIndex),

mutex);

}

}

@Override

public void replaceAll(UnaryOperator<E> operator) {

synchronized (mutex) {list.replaceAll(operator);}

}

@Override

public void sort(Comparator<? super E> c) {

synchronized (mutex) {list.sort(c);}

}

@java.io.Serial

private Object readResolve() {

return (list instanceof RandomAccess

? new SynchronizedRandomAccessList<>(list)

: this);

}

}

从代码可以看出,这个SynchronizedList对大部分的集合操作都使用关键字synchronized,包括基本的get,add,indexOf...

但是需要注意,并不是所有的操作都是上同步锁,例如获得迭代器(iterator())就不会。具体哪些不会,需要工程师自己去阅读代码。

实现单个jvm内的线程安全问题不大,工程师主要的调整来自于性能要求,需要谨慎地分辨这些上锁的代价是否过于大,大到不如直接使用串行的

方式进行处理。

通常而言,如果锁内操作很短,而锁外的操作相对长的多,那么还是值得那样进行操作的。

七、小结

1.集合的子孙比较多,建议先认识一遍,这样有助于开发,不要浪费自己的时间

2.应付一般的CRUD,依靠JAVA和阿帕奇的已经基本够了用了。

如果实在不够可以自己额外编写工具集,不推荐采用三方的工具集(存在安全和更新问题)当然类似阿帕奇这样的可以例外。

如果是开发产品,更不推荐采用非知名的小组织/个人的工具包。

3.需要注意线程安全情况下的用法,这个有赖于个人实践之后的体验,虽然JAVADOC有一些说明,但是不够。

4.使用ai辅助编写代码的时候,应该有适当的辨别能力,避免每个集合都是stream()之后再操作

最简单的,例如 list.filter(),没有必要list.stream().filter,除非filter后还挂着其它操作。sort()也是类似。

不能太机械。



声明

本文内容仅代表作者观点,或转载于其他网站,本站不以此文作为商业用途
如有涉及侵权,请联系本站进行删除
转载本站原创文章,请注明来源及作者。