/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.microprofile.reactive.streams.operators.tck.spi;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreamsFactory;
import org.eclipse.microprofile.reactive.streams.operators.tck.spi.AbstractStageVerification;
import org.eclipse.microprofile.reactive.streams.operators.tck.spi.QuietRuntimeException;
import org.eclipse.microprofile.reactive.streams.operators.tck.spi.ReactiveStreamsSpiVerification;
import org.eclipse.microprofile.reactive.streams.operators.tck.spi.ScheduledPublisher;
import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.tck.SubscriberWhiteboxVerification;
import org.testng.Assert;
import org.testng.annotations.Test;

public class FlatMapStageVerification
extends AbstractStageVerification {
    FlatMapStageVerification(ReactiveStreamsSpiVerification.VerificationDeps deps) {
        super(deps);
    }

    @Test
    public void flatMapStageShouldMapElements() {
        Assert.assertEquals((Collection)((Collection)this.await(this.rs.of((Object[])new Integer[]{1, 2, 3}).flatMap(n -> this.rs.of((Object[])new Integer[]{n, n, n})).toList().run(this.getEngine()))), Arrays.asList(1, 1, 1, 2, 2, 2, 3, 3, 3));
    }

    @Test
    public void flatMapStageShouldAllowEmptySubStreams() {
        Assert.assertEquals((Collection)((Collection)this.await(this.rs.of((Object[])new PublisherBuilder[]{this.rs.empty(), this.rs.of((Object[])new Integer[]{1, 2})}).flatMap(Function.identity()).toList().run(this.getEngine()))), Arrays.asList(1, 2));
    }

    @Test(expectedExceptions={QuietRuntimeException.class}, expectedExceptionsMessageRegExp="failed")
    public void flatMapStageShouldHandleExceptions() {
        CompletableFuture cancelled = new CompletableFuture();
        CompletionStage result = this.infiniteStream().onTerminate(() -> cancelled.complete(null)).flatMap(foo -> {
            throw new QuietRuntimeException("failed");
        }).toList().run(this.getEngine());
        this.await(cancelled);
        this.await(result);
    }

    @Test(expectedExceptions={QuietRuntimeException.class}, expectedExceptionsMessageRegExp="failed")
    public void flatMapStageShouldPropagateUpstreamExceptions() {
        this.await(this.rs.failed((Throwable)new QuietRuntimeException("failed")).flatMap(arg_0 -> ((ReactiveStreamsFactory)this.rs).of(arg_0)).toList().run(this.getEngine()));
    }

    @Test(expectedExceptions={QuietRuntimeException.class}, expectedExceptionsMessageRegExp="failed")
    public void flatMapStageShouldPropagateSubstreamExceptions() {
        CompletableFuture cancelled = new CompletableFuture();
        CompletionStage result = this.infiniteStream().onTerminate(() -> cancelled.complete(null)).flatMap(f -> this.rs.failed((Throwable)new QuietRuntimeException("failed"))).toList().run(this.getEngine());
        this.await(cancelled);
        this.await(result);
    }

    @Test
    public void flatMapStageShouldOnlySubscribeToOnePublisherAtATime() throws Exception {
        AtomicInteger activePublishers = new AtomicInteger();
        CompletionStage result = this.rs.of((Object[])new Integer[]{1, 2, 3, 4, 5}).flatMap(id -> this.rs.fromPublisher((Publisher)new ScheduledPublisher((int)id, activePublishers, this::getExecutorService))).toList().run(this.getEngine());
        Assert.assertEquals((Collection)((Collection)result.toCompletableFuture().get(2L, TimeUnit.SECONDS)), Arrays.asList(1, 2, 3, 4, 5));
    }

    @Test
    public void flatMapStageShouldPropgateCancelToSubstreams() {
        CompletableFuture outerCancelled = new CompletableFuture();
        CompletableFuture innerCancelled = new CompletableFuture();
        this.await(this.infiniteStream().onTerminate(() -> outerCancelled.complete(null)).flatMap(i -> this.infiniteStream().onTerminate(() -> innerCancelled.complete(null))).limit(5L).toList().run(this.getEngine()));
        this.await(outerCancelled);
        this.await(innerCancelled);
    }

    @Test
    public void flatMapStageBuilderShouldBeReusable() {
        ProcessorBuilder flatMap = this.rs.builder().flatMap(Function.identity());
        Assert.assertEquals((Collection)((Collection)this.await(this.rs.of((Object)this.rs.of((Object[])new Integer[]{1, 2})).via(flatMap).toList().run(this.getEngine()))), Arrays.asList(1, 2));
        Assert.assertEquals((Collection)((Collection)this.await(this.rs.of((Object)this.rs.of((Object[])new Integer[]{3, 4})).via(flatMap).toList().run(this.getEngine()))), Arrays.asList(3, 4));
    }

    @Override
    List<Object> reactiveStreamsTckVerifiers() {
        return Arrays.asList(new Object[]{new OuterProcessorVerification(), new InnerSubscriberVerification()});
    }

    public class InnerSubscriberVerification
    extends AbstractStageVerification.StageSubscriberWhiteboxVerification<Integer> {
        public Subscriber<Integer> createSubscriber(final SubscriberWhiteboxVerification.WhiteboxSubscriberProbe<Integer> probe) {
            CompletableFuture subscriber = new CompletableFuture();
            FlatMapStageVerification.this.rs.of((Object)FlatMapStageVerification.this.rs.fromPublisher(subscriber::complete)).flatMap(Function.identity()).to((Subscriber)new Subscriber<Integer>(){

                public void onSubscribe(final Subscription subscription) {
                    subscription.request(1L);
                    probe.registerOnSubscribe(new SubscriberWhiteboxVerification.SubscriberPuppet(){

                        public void triggerRequest(long elements) {
                            subscription.request(elements);
                        }

                        public void signalCancel() {
                            subscription.cancel();
                        }
                    });
                }

                public void onNext(Integer item) {
                    probe.registerOnNext((Object)item);
                }

                public void onError(Throwable throwable) {
                    probe.registerOnError(throwable);
                }

                public void onComplete() {
                    probe.registerOnComplete();
                }
            }).run(FlatMapStageVerification.this.getEngine());
            return (Subscriber)FlatMapStageVerification.this.await(subscriber);
        }

        public Integer createElement(int element) {
            return element;
        }
    }

    public class OuterProcessorVerification
    extends AbstractStageVerification.StageProcessorVerification<Integer> {
        public Processor<Integer, Integer> createIdentityProcessor(int bufferSize) {
            return FlatMapStageVerification.this.rs.builder().flatMap(arg_0 -> ((ReactiveStreamsFactory)FlatMapStageVerification.this.rs).of(arg_0)).buildRs(FlatMapStageVerification.this.getEngine());
        }

        @Override
        public Publisher<Integer> createFailedPublisher() {
            return FlatMapStageVerification.this.rs.failed((Throwable)new RuntimeException("failed")).flatMap(arg_0 -> ((ReactiveStreamsFactory)FlatMapStageVerification.this.rs).of(arg_0)).buildRs(FlatMapStageVerification.this.getEngine());
        }

        public Integer createElement(int element) {
            return element;
        }
    }
}

