/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.pair;

import java.util.HashMap;
import java.util.Map;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.Environment;
import reactor.fn.BiFunction;
import reactor.fn.tuple.Tuple;
import reactor.fn.tuple.Tuple2;
import reactor.rx.action.Action;
import reactor.rx.stream.MapStream;
import reactor.rx.subscription.PushSubscription;

public class ScanByKeyAction<K, V>
extends Action<Tuple2<K, V>, Tuple2<K, V>> {
    protected final BiFunction<? super V, ? super V, V> fn;
    protected final Publisher<? extends MapStream.Signal<K, V>> mapListener;
    protected final Map<K, V> store;

    public ScanByKeyAction(BiFunction<? super V, ? super V, V> fn, MapStream<K, V> mapStream) {
        this(fn, mapStream, mapStream);
    }

    public ScanByKeyAction(BiFunction<? super V, ? super V, V> fn, Map<K, V> store, Publisher<? extends MapStream.Signal<K, V>> mapListener) {
        this.fn = fn;
        HashMap hashMap = this.store = store == null ? new HashMap() : store;
        if (mapListener == null) {
            MapStream mapStream;
            block5: {
                mapStream = null;
                if (MapStream.class.isAssignableFrom(this.store.getClass())) {
                    try {
                        mapStream = (MapStream)this.store;
                    }
                    catch (ClassCastException cce) {
                        if (!Environment.alive()) break block5;
                        Environment.get().routeError((Throwable)cce);
                    }
                }
            }
            this.mapListener = mapStream;
        } else {
            this.mapListener = mapListener;
        }
    }

    @Override
    public void subscribe(final Subscriber<? super Tuple2<K, V>> subscriber) {
        if (this.mapListener != null) {
            this.mapListener.subscribe(new Subscriber<MapStream.Signal<K, V>>(){
                Subscription s;
                PushSubscription<Tuple2<K, V>> child;

                public void onSubscribe(final Subscription s) {
                    this.s = s;
                    this.child = new PushSubscription<Tuple2<K, V>>(ScanByKeyAction.this, subscriber){

                        @Override
                        protected void onRequest(long elements) {
                            s.request(elements);
                            if (ScanByKeyAction.this.upstreamSubscription == null) {
                                this.updatePendingRequests(elements);
                            } else {
                                ScanByKeyAction.this.requestUpstream(-1L, this.isComplete(), elements);
                            }
                        }

                        @Override
                        public void cancel() {
                            super.cancel();
                            s.cancel();
                        }
                    };
                    ScanByKeyAction.this.addSubscription(this.child);
                    subscriber.onSubscribe(this.child);
                }

                public void onNext(MapStream.Signal<K, V> kvSignal) {
                    if (this.child != null && kvSignal.op() == MapStream.Operation.put) {
                        ScanByKeyAction.this.doNext(this.child, kvSignal.pair());
                    }
                }

                public void onError(Throwable t) {
                    if (this.s != null) {
                        this.s.cancel();
                    }
                    if (this.child != null) {
                        this.child.onError(t);
                    }
                }

                public void onComplete() {
                    if (this.s != null) {
                        this.s.cancel();
                    }
                    if (this.child != null) {
                        this.child.onComplete();
                    }
                }
            });
        } else {
            super.subscribe(subscriber);
        }
    }

    @Override
    protected void doNext(Tuple2<K, V> ev) {
        V previous = this.store.get(ev.t1);
        Object acc = previous == null ? ev.t2 : this.fn.apply(previous, ev.t2);
        this.store.put(ev.t1, acc);
        if (this.mapListener == null && this.downstreamSubscription != null) {
            this.doNext(this.downstreamSubscription, Tuple.of((Object)ev.t1, (Object)acc));
        }
    }

    protected void doNext(PushSubscription<Tuple2<K, V>> subscriber, Tuple2<K, V> ev) {
        try {
            subscriber.onNext(ev);
        }
        catch (Throwable t) {
            subscriber.onError(t);
        }
    }
}

