/*
 * Decompiled with CFR 0.152.
 */
package reactor.rx.action.aggregation;

import java.util.concurrent.TimeUnit;
import org.reactivestreams.Subscription;
import reactor.core.Dispatcher;
import reactor.core.processor.InsufficientCapacityException;
import reactor.fn.Consumer;
import reactor.fn.Pausable;
import reactor.fn.timer.Timer;
import reactor.rx.action.Action;
import reactor.rx.subscription.BatchSubscription;
import reactor.rx.subscription.PushSubscription;

public abstract class BatchAction<T, V>
extends Action<T, V> {
    protected final boolean next;
    protected final boolean flush;
    protected final boolean first;
    protected final int batchSize;
    protected final Dispatcher dispatcher;
    protected final long timespan;
    protected final TimeUnit unit;
    protected final Timer timer;
    protected final Consumer<T> flushConsumer = new FlushConsumer();
    protected int index = 0;
    private Pausable timespanRegistration;

    public BatchAction(Dispatcher dispatcher, int batchSize, boolean next, boolean first, boolean flush) {
        this(dispatcher, batchSize, next, first, flush, -1L, null, null);
    }

    public BatchAction(Dispatcher dispatcher, int batchSize, boolean next, boolean first, boolean flush, long timespan, TimeUnit unit, Timer timer) {
        super(batchSize);
        this.dispatcher = dispatcher;
        if (timespan > 0L) {
            this.unit = unit != null ? unit : TimeUnit.SECONDS;
            this.timespan = timespan;
            this.timer = timer;
        } else {
            this.timespan = -1L;
            this.timer = null;
            this.unit = null;
        }
        this.first = first;
        this.flush = flush;
        this.next = next;
        this.batchSize = batchSize;
    }

    @Override
    protected PushSubscription<T> createTrackingSubscription(Subscription subscription) {
        return new BatchSubscription(subscription, this, this.batchSize);
    }

    @Override
    public boolean isReactivePull(Dispatcher dispatcher, long producerCapacity) {
        return false;
    }

    protected void nextCallback(T event) {
    }

    protected void flushCallback(T event) {
    }

    protected void firstCallback(T event) {
    }

    @Override
    protected void doNext(T value) {
        if (++this.index == 1) {
            if (this.timer != null) {
                this.timespanRegistration = this.timer.submit((Consumer)new Consumer<Long>(){

                    public void accept(Long aLong) {
                        try {
                            if (BatchAction.this.isPublishing()) {
                                BatchAction.this.dispatcher.tryDispatch(null, BatchAction.this.flushConsumer, null);
                            }
                        }
                        catch (InsufficientCapacityException insufficientCapacityException) {
                            // empty catch block
                        }
                    }
                }, this.timespan, this.unit);
            }
            if (this.first) {
                this.firstCallback(value);
            }
        }
        if (this.next) {
            this.nextCallback(value);
        }
        if (this.index % this.batchSize == 0) {
            if (this.timespanRegistration != null) {
                this.timespanRegistration.cancel();
                this.timespanRegistration = null;
            }
            this.index = 0;
            if (this.flush) {
                this.flushConsumer.accept(value);
            }
        }
    }

    @Override
    protected void doComplete() {
        this.flushConsumer.accept(null);
        super.doComplete();
    }

    @Override
    public String toString() {
        return super.toString() + "{" + (this.timer != null ? "timed - " + this.timespan + " " + (Object)((Object)this.unit) : "") + " batchSize=" + this.index + "/" + this.batchSize + " [" + (int)((float)this.index / (float)this.batchSize * 100.0f) + "%]";
    }

    @Override
    public final Dispatcher getDispatcher() {
        return this.dispatcher;
    }

    private final class FlushConsumer
    implements Consumer<T> {
        private FlushConsumer() {
        }

        public void accept(T n) {
            BatchAction.this.flushCallback(n);
            BatchAction.this.index = 0;
        }
    }
}

