您好,欢迎来到华佗小知识。
搜索
您的当前位置:首页JUC——并发编程—第三部分

JUC——并发编程—第三部分

来源:华佗小知识

四大函数式接口(必须掌握)

函数式接口:只有一个方法的接口

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

//简化编程模型,在新版本的框架底层大量应用
//foreach(消费者的函数式接口)

Function接口(函数型接口)

/**
 * Function 函数型接口,有一个输入参数,有一个输出
 * 只要是函数型接口,可以用lambda表达式简化
 */
public class demo01 {
    public static void main(String[] args) {
//       Function<String,String> function=new Function<String,String>(){
//            @Override
//            public String apply(String o) {
//                return o;
//            }
//        };
       Function<String,String> function=(str)->{
           return str;
        };
        System.out.println(function.apply("asd"));
    }
}

Preddicate接口(断定型接口)

/**
 * 断定型接口: 有一个输入参数,返回值只能是布尔值!
 */
public class Demo02 {
    public static void main(String[] args) {

//        Predicate<String> predicate=new Predicate<String>() {
//            @Override
//            public boolean test(String o) {
//                if(o.isEmpty())
//                {
//                    return true;
//                }
//                return false;
//            }
//        };
        Predicate<String> predicate=(str)->{
            return str.isEmpty();
        };
        System.out.println(predicate.test("123"));
    }
}

Consumer消费型接口

/**
 * Consumer 消费型接口:只有输入,没有返回值
 */
public class demo03 {
    public static void main(String[] args) {
//        Consumer<String> consumer = new Consumer<String>(){
//            @Override
//            public void accept(String s) {
//                System.out.println(s);
//            }
//        };
        Consumer<String> consumer=(s)->{
            System.out.println(s);
        };
        consumer.accept("yhy");
    }
}

Supplier供给型接口

/**
 * Suppiler 没有参数,只有返回值
 */
public class demo04 {
    public static void main(String[] args) {
//        Supplier supplier=new Supplier<String>() {
//            @Override
//            public String get() {
//                return "114514";
//            }
//        };

        Supplier supplier=()->{
            return "114514";
        };
        System.out.println(supplier.get());
    }
}

Stream流式计算

集合,MySQL本质都是存储东西。计算交给流来操作。

在这个接口有如下方法

等等这些方法都是函数式接口。 

public class Test {
    public static void main(String[] args) {
        User u1=new User(1,"a",21);
        User u2=new User(2,"b",22);
        User u3=new User(3,"c",23);
        User u4=new User(4,"d",24);
        User u5=new User(5,"e",25);
        User u6=new User(6,"g",26);
        //集合存储
        List<User> list= Arrays.asList(u1,u2,u3,u4,u5,u6);
        //计算交给Stream流
        //Lambda表达式、链式编程、函数式接口、Stream流式计算
        list.stream()
                .filter(u->{return u.getId()%2==0;})
                .filter(u->{return u.getAge()>23;})
                .map(u->{return u.getName().toUpperCase();})
                .sorted((uu1,uu2)->{return uu2.compareTo(uu1);})
                .limit(1)
                .forEach(System.out::println);
    }
}

Forkjoin(分支合并)

在jdk1.7,并行执行任务!提高效率。大数据量!

大数据:Map Reduce(把大任务拆分为小任务)

Forkjoin特点:工作窃取

里面维护的都是双端队列。

两个线程执行任务,一个提前完成了就会去执行另一个的任务。


/**
 * 求和计算任务
 * 3000 6000(forkjoin) 9000(Stream并行流)
 * //如何使用Forkjoin
 * //1.forkjoinpool
 * //2.计算任务forkjointaskPool.execute(ForkJoinTask<?>task)
 * //3.计算类继承RecursiveTask<Long>
 */
public class ForkJoin extends RecursiveTask<Long> {
    private Long start;
    private Long end;

    private Long temp=10000L;

    public ForkJoin(Long start, Long end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        if((end-start)<temp) {
            Long sum = 0L;
            for(int i=0;i<10_0000_0000;i++)
            {
                sum+=i;
            }
            return sum;
        }else{
            //分支合并计算
            Long middle=(start+end)/2;//中间值
            ForkJoin task1=new ForkJoin(start,middle);
            task1.fork();//拆分任务,将任务压入线程队列。
            ForkJoin task2=new ForkJoin(middle+1,end);
            task2.fork();//拆分任务,将任务压入线程队列。
            return task1.join()+task2.join();
        }
    }
}

 这个玩意本质上就是的二分+递归,很简单的算法。

每二分一次线程数+1;各个线程执行不同段的计算任务。

/**
 *  3000 6000(forkjoin) 9000(Stream并行流)
 */
public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
//        test1(); //sum-243309312时间:9346
//        test2(); //8088??????  好像也没快多少
        test3(); //700 究极快,最大限度利用CPU
    }
    public static void test1(){
        long start =System.currentTimeMillis();
        Long sum=0L;
        for(Long i=1L;i<=10_0000_0000;i++){
            sum+=i;
        }
        long end=System.currentTimeMillis();
        System.out.println("sum"+sum+"时间:"+(end-start));
    }

    //使用forkjoin
    public static void test2() throws ExecutionException, InterruptedException {
        long start =System.currentTimeMillis();
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Long> forkJoin = new ForkJoin(0L, 10_0000_0000L);
        ForkJoinTask<Long> submit=forkJoinPool.submit(forkJoin);//提交任务,有结果
        Long sum=submit.get();
    //forkJoinPool.execute(forkJoin);//执行,没有结果
        long end=System.currentTimeMillis();
        System.out.println("sum"+sum+"时间:"+(end-start));
    }
    //使用stream并行流
    public static void test3(){
        long start =System.currentTimeMillis();
        //(]
        LongStream.rangeClosed(0L,10_0000_0000L).parallel().reduce(0,Long::sum);
        long end=System.currentTimeMillis();
        System.out.println("sum"+"时间:"+(end-start));
    }
}

异步回调(Future接口)

Future接口设计的初衷:对将来的事件进行建模。

前后端异步通信有ajax,java中也有异步通信。

 

/**
 * 异步调用: CompltableFuture
 * //异步调用
 * //成功回调
 * //失败回调
 */
public class demo01 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
//        //没有返回值的异步回调
//        CompletableFuture<Void> completedFuture = CompletableFuture.runAsync(()->{
//            try {
//                TimeUnit.SECONDS.sleep(2);
//            } catch (InterruptedException e) {
//                throw new RuntimeException(e);
//            }
//            System.out.println(Thread.currentThread().getName()+"runAsync=>void");
//        });
//        System.out.println("11111");
//        completedFuture.get(); //阻塞获取执行结果
//        System.out.println("22222");

        //有返回值的异步回调
        //ajax,成功和失败的回调
        //返回的是错误信息
        CompletableFuture<Integer> completedFuture =CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+"  runAsync=>void");
            int t=10/0;
            return 1024;
        });
        System.out.println(completedFuture.whenComplete((t, u) -> {
            System.out.println("t=>"+t); //正常的返回结果
            System.out.println("u=>"+u); //抛出的错误信息
        }).exceptionally((e) -> {
            System.out.println(e.getMessage());//获取报错信息
            return 233; //获取到错误的返回结果
        }).get());
    }
}

综上所述,这句输出语句会执行是因为whenComplete(...)方法在正常或异常完成时都会执行,而exceptionally(...)方法会捕获异常并处理它,然后返回备用结果。

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- huatuo0.cn 版权所有 湘ICP备2023017654号-2

违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务