/*
 * Decompiled with CFR 0.152.
 */
package groovyx.gpars.dataflow.operator;

import groovy.lang.Closure;
import groovyx.gpars.dataflow.DataflowReadChannel;
import groovyx.gpars.dataflow.DataflowVariable;
import groovyx.gpars.dataflow.operator.DataflowOperator;
import groovyx.gpars.dataflow.operator.DataflowProcessorActor;
import groovyx.gpars.dataflow.operator.StopGently;
import groovyx.gpars.group.PGroup;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

class DataflowOperatorActor
extends DataflowProcessorActor {
    private Map values = new HashMap(10);

    DataflowOperatorActor(DataflowOperator owningOperator, PGroup group, List outputs, List inputs, Closure code) {
        super(owningOperator, group, outputs, inputs, code);
    }

    @Override
    final void afterStart() {
        super.afterStart();
        this.queryInputs(true);
    }

    private void queryInputs(boolean initialRun) {
        for (int i = 0; i < this.inputs.size(); ++i) {
            DataflowReadChannel input = (DataflowReadChannel)this.inputs.get(i);
            if (initialRun || !(input instanceof DataflowVariable)) {
                input.getValAsync(i, this);
                continue;
            }
            try {
                this.values.put(i, input.getVal());
                continue;
            }
            catch (InterruptedException e) {
                throw new IllegalStateException("couldn't read the value of a DataflowVariable inside an operator.", e);
            }
        }
    }

    @Override
    public final void onMessage(Object message) {
        if (message instanceof StopGently) {
            this.stoppingGently = true;
            return;
        }
        Map msg = (Map)message;
        Object result = msg.get("result");
        Object attachment = msg.get("attachment");
        if (DataflowOperatorActor.isControlMessage(result)) {
            result = this.fireMessageArrived(result, (Integer)attachment, true);
            this.checkPoison(result);
            if (DataflowOperatorActor.isControlMessage(result)) {
                return;
            }
        }
        Object verifiedValue = this.fireMessageArrived(result, (Integer)attachment, false);
        this.values.put(attachment, verifiedValue);
        if (this.values.size() > this.inputs.size()) {
            throw new IllegalStateException("The DataflowOperatorActor is in an inconsistent state. values.size()=" + this.values.size() + ", inputs.size()=" + this.inputs.size());
        }
        if (this.values.size() == this.inputs.size()) {
            ArrayList arrivedMessages = new ArrayList(this.values.entrySet());
            Collections.sort(arrivedMessages, new Comparator<Map.Entry<Comparable, Object>>(){

                @Override
                public int compare(Map.Entry<Comparable, Object> o1, Map.Entry<Comparable, Object> o2) {
                    return o1.getKey().compareTo(o2.getKey());
                }
            });
            ArrayList<Object> arrivedValues = new ArrayList<Object>(arrivedMessages.size());
            for (Map.Entry entry : arrivedMessages) {
                arrivedValues.add(entry.getValue());
            }
            List<Object> verifiedValues = this.owningProcessor.fireBeforeRun(arrivedValues);
            this.startTask(verifiedValues);
            this.values = new HashMap(this.values.size());
            if (this.stoppingGently) {
                this.stop();
            }
            if (!this.hasBeenStopped()) {
                this.queryInputs(false);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void startTask(List<Object> results) {
        try {
            this.code.call(results.toArray(new Object[results.size()]));
        }
        catch (Throwable e) {
            this.reportException(e);
        }
        finally {
            this.owningProcessor.fireAfterRun(results);
        }
    }
}

