/*
 * Decompiled with CFR 0.152.
 */
package reactor.core.reactivestreams;

import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.reactivestreams.SubscriberBarrier;
import reactor.core.reactivestreams.SubscriberWithContext;
import reactor.core.support.Assert;
import reactor.core.support.Exceptions;
import reactor.core.support.SpecificationExceptions;
import reactor.fn.BiConsumer;
import reactor.fn.Consumer;
import reactor.fn.Function;

public final class PublisherFactory {
    public static <T> Publisher<T> create(BiConsumer<Long, SubscriberWithContext<T, Void>> requestConsumer) {
        return PublisherFactory.create(requestConsumer, null, null);
    }

    public static <T, C> Publisher<T> create(BiConsumer<Long, SubscriberWithContext<T, C>> requestConsumer, Function<Subscriber<? super T>, C> contextFactory) {
        return PublisherFactory.create(requestConsumer, contextFactory, null);
    }

    public static <T, C> Publisher<T> create(BiConsumer<Long, SubscriberWithContext<T, C>> requestConsumer, Function<Subscriber<? super T>, C> contextFactory, Consumer<C> shutdownConsumer) {
        return new ReactorPublisher<T, C>(requestConsumer, contextFactory, shutdownConsumer);
    }

    public static <T> Publisher<T> forEach(Consumer<SubscriberWithContext<T, Void>> requestConsumer) {
        return PublisherFactory.forEach(requestConsumer, null, null);
    }

    public static <T, C> Publisher<T> forEach(Consumer<SubscriberWithContext<T, C>> requestConsumer, Function<Subscriber<? super T>, C> contextFactory) {
        return PublisherFactory.forEach(requestConsumer, contextFactory, null);
    }

    public static <T, C> Publisher<T> forEach(Consumer<SubscriberWithContext<T, C>> requestConsumer, Function<Subscriber<? super T>, C> contextFactory, Consumer<C> shutdownConsumer) {
        Assert.notNull(requestConsumer, "A data producer must be provided");
        return new ForEachPublisher<T, C>(requestConsumer, contextFactory, shutdownConsumer);
    }

    public static <I, O> Publisher<O> barrier(Publisher<I> source, BiConsumer<I, Subscriber<? super O>> dataConsumer) {
        return PublisherFactory.barrier(source, dataConsumer, null, null);
    }

    public static <I, O> Publisher<O> barrier(Publisher<I> source, BiConsumer<I, Subscriber<? super O>> dataConsumer, BiConsumer<Throwable, Subscriber<? super O>> errorConsumer) {
        return PublisherFactory.barrier(source, dataConsumer, errorConsumer, null);
    }

    public static <I, O> Publisher<O> barrier(Publisher<I> source, final BiConsumer<I, Subscriber<? super O>> dataConsumer, final BiConsumer<Throwable, Subscriber<? super O>> errorConsumer, final Consumer<Subscriber<? super O>> completeConsumer) {
        return PublisherFactory.intercept(source, new Function<Subscriber<? super O>, SubscriberBarrier<I, O>>(){

            @Override
            public SubscriberBarrier<I, O> apply(Subscriber<? super O> subscriber) {
                return new ConsumerSubscriberBarrier(subscriber, dataConsumer, errorConsumer, completeConsumer);
            }
        });
    }

    public static <I, O> Publisher<O> intercept(Publisher<? extends I> source, Function<Subscriber<? super O>, SubscriberBarrier<I, O>> barrierProvider) {
        Assert.notNull(source, "A data source must be provided");
        Assert.notNull(barrierProvider, "A barrier interceptor must be provided");
        return new ProxyPublisher<I, O>(source, barrierProvider);
    }

    public static class PrematureCompleteException
    extends RuntimeException {
        public static final PrematureCompleteException INSTANCE = new PrematureCompleteException();

        private PrematureCompleteException() {
        }

        @Override
        public synchronized Throwable fillInStackTrace() {
            return this;
        }
    }

    private static final class ConsumerSubscriberBarrier<I, O>
    extends SubscriberBarrier<I, O> {
        private final BiConsumer<I, Subscriber<? super O>> dataConsumer;
        private final BiConsumer<Throwable, Subscriber<? super O>> errorConsumer;
        private final Consumer<Subscriber<? super O>> completeConsumer;

        public ConsumerSubscriberBarrier(Subscriber<? super O> subscriber, BiConsumer<I, Subscriber<? super O>> dataConsumer, BiConsumer<Throwable, Subscriber<? super O>> errorConsumer, Consumer<Subscriber<? super O>> completeConsumer) {
            super(subscriber);
            this.dataConsumer = dataConsumer;
            this.errorConsumer = errorConsumer;
            this.completeConsumer = completeConsumer;
        }

        @Override
        protected void doNext(I o) {
            if (this.dataConsumer != null) {
                this.dataConsumer.accept(o, this.subscriber);
            } else {
                super.doNext(o);
            }
        }

        @Override
        protected void doError(Throwable throwable) {
            if (this.errorConsumer != null) {
                this.errorConsumer.accept(throwable, this.subscriber);
            } else {
                super.doError(throwable);
            }
        }

        @Override
        protected void doComplete() {
            if (this.completeConsumer != null) {
                this.completeConsumer.accept(this.subscriber);
            } else {
                super.doComplete();
            }
        }

        public String toString() {
            return "ConsumerSubscriberBarrier{subscriber=" + this.subscriber + ", dataConsumer=" + this.dataConsumer + ", errorConsumer=" + this.errorConsumer + ", completeConsumer=" + this.completeConsumer + '}';
        }
    }

    private static final class ProxyPublisher<I, O>
    implements Publisher<O> {
        private final Publisher<? extends I> source;
        private final Function<Subscriber<? super O>, SubscriberBarrier<I, O>> barrierProvider;

        public ProxyPublisher(Publisher<? extends I> source, Function<Subscriber<? super O>, SubscriberBarrier<I, O>> barrierProvider) {
            this.source = source;
            this.barrierProvider = barrierProvider;
        }

        public void subscribe(Subscriber<? super O> s) {
            this.source.subscribe((Subscriber)this.barrierProvider.apply(s));
        }

        public String toString() {
            return "ProxyPublisher{source=" + this.source + '}';
        }
    }

    private static final class ForEachBiConsumer<T, C>
    implements BiConsumer<Long, SubscriberWithContext<T, C>> {
        private final Consumer<SubscriberWithContext<T, C>> requestConsumer;
        private volatile long pending = 0L;
        private static final AtomicLongFieldUpdater<ForEachBiConsumer> PENDING_UPDATER = AtomicLongFieldUpdater.newUpdater(ForEachBiConsumer.class, "pending");

        public ForEachBiConsumer(Consumer<SubscriberWithContext<T, C>> requestConsumer) {
            this.requestConsumer = requestConsumer;
        }

        @Override
        public void accept(Long n, SubscriberWithContext<T, C> sub) {
            long afterAdd;
            if (this.pending == Long.MAX_VALUE) {
                return;
            }
            long demand = n;
            if (!PENDING_UPDATER.compareAndSet(this, 0L, demand) && (afterAdd = PENDING_UPDATER.addAndGet(this, demand)) != demand) {
                if (afterAdd < 0L) {
                    if (!PENDING_UPDATER.compareAndSet(this, afterAdd, Long.MAX_VALUE)) {
                        return;
                    }
                } else {
                    return;
                }
            }
            do {
                long requestCursor = 0L;
                while (!(requestCursor++ >= demand && demand != Long.MAX_VALUE || sub.isCancelled())) {
                    this.requestConsumer.accept(sub);
                }
            } while ((demand = PENDING_UPDATER.addAndGet(this, -demand)) > 0L && !sub.isCancelled());
        }
    }

    private static final class SubscriberProxy<T, C>
    extends SubscriberWithContext<T, C>
    implements Subscription {
        private final BiConsumer<Long, SubscriberWithContext<T, C>> requestConsumer;
        private final Consumer<C> shutdownConsumer;

        public SubscriberProxy(Subscriber<? super T> subscriber, C context, BiConsumer<Long, SubscriberWithContext<T, C>> requestConsumer, Consumer<C> shutdownConsumer) {
            super(context, subscriber);
            this.requestConsumer = requestConsumer;
            this.shutdownConsumer = shutdownConsumer;
        }

        public void request(long n) {
            if (this.isCancelled()) {
                return;
            }
            if (n <= 0L) {
                this.onError(SpecificationExceptions.spec_3_09_exception(n));
                return;
            }
            try {
                this.requestConsumer.accept(n, this);
            }
            catch (Throwable t) {
                this.onError(t);
            }
        }

        public void cancel() {
            if (TERMINAL_UPDATER.compareAndSet(this, 0, 1)) {
                this.doShutdown();
            }
        }

        @Override
        public void onError(Throwable t) {
            if (TERMINAL_UPDATER.compareAndSet(this, 0, 1)) {
                this.doShutdown();
                this.subscriber.onError(t);
            }
        }

        @Override
        public void onComplete() {
            if (TERMINAL_UPDATER.compareAndSet(this, 0, 1)) {
                this.doShutdown();
                try {
                    this.subscriber.onComplete();
                }
                catch (Throwable t) {
                    this.subscriber.onError(t);
                }
            }
        }

        private void doShutdown() {
            if (this.shutdownConsumer == null) {
                return;
            }
            try {
                this.shutdownConsumer.accept(this.context);
            }
            catch (Throwable t) {
                this.subscriber.onError(t);
            }
        }

        @Override
        public void onSubscribe(Subscription s) {
            throw new UnsupportedOperationException(" the delegate subscriber is already subscribed");
        }

        public String toString() {
            return this.context != null ? this.context.toString() : "SubscriberProxy{requestConsumer=" + this.requestConsumer + ", shutdownConsumer=" + this.shutdownConsumer + '}';
        }
    }

    private static final class ForEachPublisher<T, C>
    extends ReactorPublisher<T, C> {
        final Consumer<SubscriberWithContext<T, C>> forEachConsumer;

        public ForEachPublisher(Consumer<SubscriberWithContext<T, C>> forEachConsumer, Function<Subscriber<? super T>, C> contextFactory, Consumer<C> shutdownConsumer) {
            super(null, contextFactory, shutdownConsumer);
            this.forEachConsumer = forEachConsumer;
        }

        @Override
        protected Subscription createSubscription(Subscriber<? super T> subscriber, C context) {
            return new SubscriberProxy<T, C>(subscriber, context, new ForEachBiConsumer<T, C>(this.forEachConsumer), this.shutdownConsumer);
        }
    }

    private static class ReactorPublisher<T, C>
    implements Publisher<T> {
        protected final Function<Subscriber<? super T>, C> contextFactory;
        protected final BiConsumer<Long, SubscriberWithContext<T, C>> requestConsumer;
        protected final Consumer<C> shutdownConsumer;

        protected ReactorPublisher(BiConsumer<Long, SubscriberWithContext<T, C>> requestConsumer, Function<Subscriber<? super T>, C> contextFactory, Consumer<C> shutdownConsumer) {
            this.requestConsumer = requestConsumer;
            this.contextFactory = contextFactory;
            this.shutdownConsumer = shutdownConsumer;
        }

        public final void subscribe(Subscriber<? super T> subscriber) {
            try {
                C context = this.contextFactory != null ? (C)this.contextFactory.apply(subscriber) : null;
                subscriber.onSubscribe(this.createSubscription(subscriber, context));
            }
            catch (PrematureCompleteException context) {
            }
            catch (Throwable throwable) {
                Exceptions.throwIfFatal(throwable);
                subscriber.onError(throwable);
            }
        }

        protected Subscription createSubscription(Subscriber<? super T> subscriber, C context) {
            return new SubscriberProxy<T, C>(subscriber, context, this.requestConsumer, this.shutdownConsumer);
        }
    }
}

