建站快車凡科網(wǎng)站維護(hù)的內(nèi)容有哪些
前言
? ? ? ? 上節(jié)我們學(xué)了 lambda 表達(dá)式,很快我就在 Flink 的學(xué)習(xí)中用到了,我學(xué)的是 Java 版本的 Flink,一開始會(huì)以為代碼會(huì)很復(fù)雜,但事實(shí)上 Flink 中很多地方都用到了 函數(shù)接口,這也讓我們?cè)诰帉?Flink 程序的時(shí)候可以使用 lambda 表達(dá)式非常地簡潔地實(shí)現(xiàn)匿名函數(shù)。
? ? ? ? 今天再來學(xué)習(xí)一個(gè)新的特性,Stream 流,光是看名字就覺得和大數(shù)據(jù)能扯上關(guān)系,我們的 Spark、Flink 當(dāng)中不就都是這種流的概念嘛。
1、什么是 Strem 流
? ? ? ? Stream 是 JDK1.8 中處理集合的關(guān)鍵抽象概念, Lambda 表達(dá)式 和 Stream 是JDK1.8 新增的函數(shù)式編程中最有亮點(diǎn)的特性了,它可以指定你希望對(duì)集合進(jìn)行操作,可以執(zhí)行非常復(fù)雜的查詢過濾和映射等操作。使用 Stream API 對(duì)集合數(shù)據(jù)進(jìn)行操作,就類似于使用 SQL 來執(zhí)行對(duì) Java 集合運(yùn)算和表達(dá)的高階抽象。
? ? ? ? Stream API 可以極大地提高 Java 程序員的生產(chǎn)力,讓程序員寫出更加高效、干凈、簡潔的代碼。那對(duì)我在大數(shù)據(jù)開發(fā)中更是如此。
? ? ? ? 這種風(fēng)格將要處理的元素集合看做一種流,流在管道中傳輸,并且可以在管道的節(jié)點(diǎn)上進(jìn)行處理,比如過濾、排序、聚合等。
2、Stream 創(chuàng)建方式
1、創(chuàng)建串行 Stream
Stream<User> userStream = list.stream();
2、創(chuàng)建并行 Stream
Stream<User> userStream = list.parallelStream();
3、關(guān)閉
在Java中,Stream只能被操作一次,一旦你對(duì)其進(jìn)行了一次操作(比如forEach, collect等),它就會(huì)被關(guān)閉,再次操作就會(huì)報(bào)錯(cuò):stream has already been operated upon or closed。
3、Stream 將 List 轉(zhuǎn)換為 Set
1、創(chuàng)建 List 集合
Stream 是通過集合創(chuàng)建出來的,所以我們先創(chuàng)建一個(gè)集合,而集合內(nèi)我們需要存放實(shí)體,所以先創(chuàng)建一個(gè)實(shí)體類 User:
public class User {public String name;public int age;public User(){}public User(String name, int age) {this.name = name;this.age = age;}@Overridepublic String toString() {return "User{" +"name='" + name + '\'' +", age=" + age +'}';}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;User user = (User) o;return age == user.age && Objects.equals(name, user.name);}@Overridepublic int hashCode() {return Objects.hash(name, age);}public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}
}
創(chuàng)建集合
List<User> list = new ArrayList<>();list.add(new User("燕雙鷹",28));list.add(new User("李大喜",20));list.add(new User("李元芳", 30));list.add(new User("李元芳", 30));
重寫 equals
注意:這里我們對(duì)實(shí)體類的 equals 和 hashcode 方法進(jìn)行了重寫,這在之前我是不會(huì)去重寫的。重寫和不重寫的區(qū)別就是:
????????重寫后,當(dāng)兩個(gè)實(shí)體對(duì)象的屬性相同時(shí),equals 方法返回 true,如果沒有重寫,則 equals 返回 false。
== 和 equals
????????== 用于比較基本數(shù)據(jù)類型的值是否相等或者對(duì)象的引用地址是否相同。
int a = 10;
int b = 10;
System.out.println(a==b); //trueString str1 = "hello";
String str2 = "hello";
System.out.println(str1==str2); //false
equals?用于比較兩個(gè)對(duì)象的內(nèi)容是否相等。在Object類中,默認(rèn)的“equals()”實(shí)現(xiàn)使用“==”操作符比較對(duì)象的引用。但是,許多類(如String、Integer等)重寫了“equals()”方法,以便根據(jù)類的特定屬性比較對(duì)象的內(nèi)容。
set 去重底層原理
set 去重底層依賴于 map 集合實(shí)現(xiàn)放重復(fù)的 key,map 集合底層基于 equals ,它先比較 key 的hashcode 是否相同,相同情況下再調(diào)用 equals 方法判斷是否真的相等。
所以一個(gè)實(shí)體類是否重寫 equals 方法區(qū)別很大。
User u1 = new User("s",1);User u2 = new User("s",1);System.out.println(u1.equals(u2));
上面的代碼,如果我們以 User 對(duì)象作為 key,如果我們的?User 沒有重寫 equals 方法,那么返回的就是 false,因?yàn)槟J(rèn)使用 == ,引用地址不同;如果重寫了 equals 方法,那么返回的就是 true,因?yàn)槭褂弥貙懞蟮?equals ,兩個(gè)對(duì)象屬性相同返回 true。
注意:對(duì)象的比較不會(huì)去比較 hashcode。?
HashMap<User, String> map = new HashMap<>();map.put(u1,"a");map.put(u2,"b");System.out.println(map.get(u1).equals(map.get(u2)));
上面的代碼,如果我們沒有重寫?hashcode 的情況下,那么返回的就是 true,因?yàn)?map 的底層是通過 hashcode 來比較兩個(gè) key 是否相同;如果重寫了 hashcode ,那么返回的就是 true。
2、List 轉(zhuǎn)為 Set
public static void main(String[] args) {List<User> list = new ArrayList<>();list.add(new User("燕雙鷹",28));list.add(new User("李大喜",20));// 下面是兩個(gè)屬性相同的兩個(gè)對(duì)象(我們已經(jīng)重寫了 equals 和 hashcode 方法)list.add(new User("李元芳", 30));list.add(new User("李元芳", 30));// todo 創(chuàng)建 Stream 的兩種方式// 1. 串行流 stream() 單線程Stream<User> stream = list.stream();Set<User> set = stream.collect(Collectors.toSet());set.forEach(user->{System.out.println(user.toString());});
}
運(yùn)行結(jié)果:
User{name='李元芳', age=30}
User{name='燕雙鷹', age=28}
User{name='李大喜', age=20}
可以看到,重寫 equals 和 hashcode 方法后,雖然相同屬性的兩個(gè)對(duì)象的內(nèi)存地址不同,但也被去除重復(fù)了。
4、Stream 將 List 轉(zhuǎn)為 Map
1、創(chuàng)建 List
注意:List 轉(zhuǎn)為 Map 的時(shí)候,由于 Map 集合不允許存在重復(fù)的 key,所以我們必須保證 list 集合中作為?key 字段的屬性值唯一。
List<User> list = new ArrayList<>();list.add(new User("燕雙鷹",28));list.add(new User("李大喜",20));list.add(new User("李元芳", 30));
?2、List 轉(zhuǎn)為 Map
Stream<User> stream = list.stream();// list 集合是沒有 key 的,所以不能直接轉(zhuǎn)為 map 集合,需要指定 key(指定對(duì)象的某個(gè)字段作為key)Map<String, User> collect = stream.collect(Collectors.toMap(new Function<User, String>() { // 第一個(gè)參數(shù) list中的類型,第二個(gè)參數(shù)是key類型: String@Overridepublic String apply(User user) {return user.getName();}}, new Function<User, User>() { // 第一個(gè)參數(shù) list中的類型,第二個(gè)參數(shù)是value類型: User@Overridepublic User apply(User user) {return user;}}));collect.forEach(new BiConsumer<String, User>() {@Overridepublic void accept(String key, User user) {System.out.println(key+","+user.toString());}});
使用 lambda 表達(dá)式簡化一下代碼:
// 用lambda表達(dá)式Map<String, User> collect = stream.collect(Collectors.toMap(User::getName, user -> user));collect.forEach((key,user)-> System.out.println(key+","+user.toString()));
運(yùn)行結(jié)果:
李元芳,User{name='李元芳', age=30}
李大喜,User{name='李大喜', age=20}
燕雙鷹,User{name='燕雙鷹', age=28}
5、Strem 通過 reduce 方法求和
1、簡單求和
這里我們通過 Stream.of() 方法來進(jìn)行數(shù)據(jù)的構(gòu)造(這讓我想到了最近 Flink)。
Stream<Integer> stream = Stream.of(10, 50, 30, 10);Optional<Integer> res = stream.reduce(new BinaryOperator<Integer>() {@Overridepublic Integer apply(Integer integer, Integer integer2) {return integer + integer2;}});
使用 lamda 表達(dá)式?
Optional<Integer> res = stream.reduce(Integer::sum);
關(guān)于結(jié)果的打印,我們后面講到 Optional 類的時(shí)候再詳細(xì)說,一般直接:
System.out.println(res.get());
2、對(duì)象屬性和
我們構(gòu)造一個(gè) List 集合,然后轉(zhuǎn)為?Stream 調(diào)用 reduce 方法進(jìn)行求和。
注意:reduce 方法的返回結(jié)果類型必須和 Stream 的類型一致(就像我們 Hadoop 中的 WordCount)。
List<User> list = new ArrayList<>();list.add(new User("燕雙鷹",28));list.add(new User("李大喜",20));list.add(new User("李元芳", 30));Stream<User> stream = list.stream();Optional<User> sum = stream.reduce(new BinaryOperator<User>() {@Overridepublic User apply(User user1, User user2) {return new User("sum",user1.getAge()+ user2.getAge());}});System.out.println(sum.get());
lambda 表達(dá)式簡化:
Stream<User> stream = list.stream();Optional<User> sum = stream.reduce(((user1, user2) -> new User("sum", user1.getAge() + user2.getAge())));System.out.println(sum.get());//78
6、Strem 查找集合最大值和最小值
1、創(chuàng)建集合
List<User> list = new ArrayList<>();
list.add(new User("燕雙鷹",28));
list.add(new User("李大喜",20));
list.add(new User("李元芳", 30));
2、查找最大 age 屬性對(duì)象
Optional<User> max = stream.max(new Comparator<User>() {@Overridepublic int compare(User o1, User o2) {return o1.getAge() - o2.getAge();}});System.out.println(max.get());
lambda表達(dá)式簡化:
Optional<User> max = stream.max((user1, user2) -> user1.getAge() - user2.getAge());System.out.println(max.get()); //30
3、查找最小 age 屬性對(duì)象
Optional<User> min = stream.min((user1, user2) -> user1.getAge() - user2.getAge());System.out.println(min.get()); //20
7、Stream 中 Match 用法
anyMatch 表示,任意一個(gè)元素滿足條件返回 true。
allMatch 表示,所有元素滿足條件才會(huì)返回 true。
noMatch 表示,所有條件都不滿足這個(gè)條件才會(huì)返回 true。
1、創(chuàng)建集合
List<User> list = new ArrayList<>();list.add(new User("燕雙鷹",28));list.add(new User("李大喜",20));list.add(new User("李元芳", 30));Stream<User> stream = list.stream();
2、anyMatch
判斷集合中是否存在 age 屬性大于 25 的對(duì)象。
boolean res = stream.anyMatch(new Predicate<User>() {@Overridepublic boolean test(User user) {return user.getAge() > 25;}});System.out.println(res);
?lambda 表達(dá)式:
boolean res = stream.anyMatch(user -> user.getAge() > 25);System.out.println(res); //true
3、allMatch
判斷是否所有對(duì)象的 age屬性都大于 30
boolean res = stream.allMatch(user -> user.getAge() > 30);System.out.println(res); //false
4、noMatch
判斷是否用戶都不滿足 name 為 “光頭強(qiáng) ”
boolean res = stream.noneMatch(user -> user.getName().equals("光頭強(qiáng)"));System.out.println(res); //true
8、Stream 過濾器
和我們 Flink 的 DataStream API 中的轉(zhuǎn)換算子 filter 很像,它們都是把 判斷條件結(jié)果為 true 的數(shù)據(jù)留下,false 則丟掉。
1、創(chuàng)建集合
List<User> list = new ArrayList<>();list.add(new User("燕雙鷹",28));list.add(new User("李大喜",20));list.add(new User("李元芳", 30));Stream<User> stream = list.stream();
2、過濾
Stream<User> filterStream = stream.filter(new Predicate<User>() {@Overridepublic boolean test(User user) { //為 true 則留下return user.getAge()>25;}});filterStream.forEach(new Consumer<User>() {@Overridepublic void accept(User user) {System.out.println(user);}});
運(yùn)行結(jié)果:?
User{name='燕雙鷹', age=28}
User{name='李元芳', age=30}
lambda表達(dá)式:
Stream<User> filterStream = stream.filter(user -> user.getAge() > 25);filterStream.forEach(System.out::println);
9、Stream Limit 和 Skip
同樣,Stream 需要通過集合來創(chuàng)建。
List<User> list = new ArrayList<>();
list.add(new User("燕雙鷹",28));
list.add(new User("李大喜",20));
list.add(new User("李元芳", 30));
list.add(new User("熊大",15));
list.add(new User("熊二",14));
list.add(new User("光頭強(qiáng)",20));
Stream<User> stream = list.stream();
1、取出前2條數(shù)據(jù)
// 在mysql中l(wèi)imit(start,end)需要傳兩個(gè)參數(shù),但在這里只允許傳入一個(gè)long類型的 maxSize// 取前2條數(shù)據(jù)stream.limit(2).forEach(System.out::println);
運(yùn)行結(jié)果:
User{name='燕雙鷹', age=28}
User{name='李大喜', age=20}
?
2、取出第 [3,6) 條數(shù)據(jù)
注意,這里的索引是從 0 開始的。
// 取 [3,6)條數(shù)據(jù) 想要分頁從先 skip 再 limit
stream.skip(2).limit(3).forEach(System.out::println);
運(yùn)行結(jié)果:
?
User{name='李元芳', age=30}
User{name='熊大', age=15}
User{name='熊二', age=14}
10、Stream 排序 sorted
下面用到的數(shù)據(jù)。
List<User> list = new ArrayList<>();
list.add(new User("燕雙鷹",28));
list.add(new User("李大喜",20));
list.add(new User("李元芳", 30));
list.add(new User("熊大",15));
list.add(new User("熊二",14));
list.add(new User("光頭強(qiáng)",20));
Stream<User> stream = list.stream();
1、直接排序
對(duì)于數(shù)值型的數(shù)據(jù)可以直接進(jìn)行排序
Stream<Integer> integerStream = Stream.of(1, 5, 8, 3, 7);
integerStream.sorted().forEach(System.out::println); //1 3 5 7 8
2、根據(jù)對(duì)象字段進(jìn)行升序
stream.sorted(new Comparator<User>() {@Overridepublic int compare(User o1, User o2) {return o1.getAge()-o2.getAge();}
}).forEach(System.out::println);
lambda 表達(dá)式:
stream.sorted((o1, o2) -> o1.getAge()-o2.getAge()).forEach(System.out::println);
運(yùn)行結(jié)果:?
User{name='熊二', age=14}
User{name='熊大', age=15}
User{name='李大喜', age=20}
User{name='光頭強(qiáng)', age=20}
User{name='燕雙鷹', age=28}
User{name='李元芳', age=30}
JDK1.8 提供的函數(shù)接口
都在包 java.util.function 包下。
并行流
案例1 - 500億次求和
1、使用單線程
Instant start = Instant.now();long sum = 0;for (long i = 0; i <= 50000000000L; i++) {sum+=i;}Instant end = Instant.now();System.out.println(sum);System.out.println("500億次求和花費(fèi)時(shí)間: "+ Duration.between(start,end).toMillis()+"ms"); // 單線程 11s左右 多線程 6s左右
2、使用并行流
Instant start = Instant.now();LongStream longStream = LongStream.rangeClosed(0,50000000000L);OptionalLong result = longStream.parallel().reduce(new LongBinaryOperator() {@Overridepublic long applyAsLong(long left, long right) {return left + right;}});Instant end = Instant.now();System.out.println(result.getAsLong());System.out.println("500億次求和花費(fèi)時(shí)間: "+ Duration.between(start,end).toMillis()+"ms"); // 單線程 11s左右 多線程 6s左右
可以發(fā)現(xiàn),多線程明顯要快很多。
總結(jié)
????????本次學(xué)習(xí)收獲非常大,函數(shù)接口的思想在 Flink 中隨處可見,的確,這樣一種能夠使得代碼簡潔高效的技術(shù)在大數(shù)據(jù)開發(fā)中是非常重要的。