/*
 * Decompiled with CFR 0.152.
 */
package groovyx.gpars.dataflow.operator.component;

import groovyx.gpars.dataflow.DataflowChannelListener;
import groovyx.gpars.dataflow.DataflowReadChannel;
import groovyx.gpars.dataflow.operator.DataflowEventAdapter;
import groovyx.gpars.dataflow.operator.DataflowProcessor;
import groovyx.gpars.dataflow.operator.component.OperatorStateMonitor;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class GracefulShutdownListener
extends DataflowEventAdapter {
    private boolean collectingMessages = false;
    private final AtomicInteger activeForks = new AtomicInteger(0);
    private final OperatorStateMonitor monitor;
    private DataflowProcessor processor = null;
    private volatile boolean shutdownFlag = false;
    private final AtomicLong messagesInChannels = new AtomicLong(0L);

    public GracefulShutdownListener(OperatorStateMonitor monitor) {
        this.monitor = monitor;
        monitor.registerProcessorListener(this);
    }

    @Override
    public void registered(DataflowProcessor processor) {
        this.processor = processor;
        processor.registerChannelListenersToAllInputs(new DataflowChannelListener<Object>(){

            @Override
            public void onMessage(Object message) {
                GracefulShutdownListener.this.messagesInChannels.incrementAndGet();
            }
        });
    }

    @Override
    public Object messageArrived(DataflowProcessor processor, DataflowReadChannel<Object> channel, int index, Object message) {
        this.fireEvent();
        this.collectingMessages = true;
        this.messagesInChannels.decrementAndGet();
        return message;
    }

    @Override
    public Object controlMessageArrived(DataflowProcessor processor, DataflowReadChannel<Object> channel, int index, Object message) {
        this.fireEvent();
        this.collectingMessages = false;
        this.messagesInChannels.decrementAndGet();
        return message;
    }

    @Override
    public List<Object> beforeRun(DataflowProcessor processor, List<Object> messages) {
        this.fireEvent();
        this.collectingMessages = false;
        this.activeForks.incrementAndGet();
        return messages;
    }

    @Override
    public void afterRun(DataflowProcessor processor, List<Object> messages) {
        this.fireEvent();
        this.activeForks.decrementAndGet();
    }

    private void fireEvent() {
        if (this.shutdownFlag) {
            this.monitor.stateChanged();
        }
    }

    final void initiateShutdown() {
        this.shutdownFlag = true;
    }

    public final boolean isIdle() {
        return !this.collectingMessages && this.activeForks.get() == 0;
    }

    public final boolean isIdleAndNoIncomingMessages() {
        if (this.processor == null) {
            throw new IllegalStateException("The GracefulShutdownListener has not been registered with a dataflow processor yet.");
        }
        return this.isIdle() && this.messagesInChannels.get() <= 0L;
    }

    final void terminateProcessor() {
        this.processor.terminate();
    }
}

