/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.timeseries.transport;

import java.net.ConnectException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.Message;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.ExceptionsHelper;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.OpenSearchTimeoutException;
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.action.search.SearchPhaseExecutionException;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.NetworkExceptionHelper;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.common.io.stream.NotSerializableExceptionWrapper;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.node.NodeClosedException;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.AnalysisType;
import org.opensearch.timeseries.NodeStateManager;
import org.opensearch.timeseries.cluster.HashRing;
import org.opensearch.timeseries.common.exception.ClientException;
import org.opensearch.timeseries.common.exception.EndRunException;
import org.opensearch.timeseries.common.exception.InternalFailure;
import org.opensearch.timeseries.common.exception.NotSerializedExceptionName;
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.feature.CompositeRetriever;
import org.opensearch.timeseries.feature.FeatureManager;
import org.opensearch.timeseries.indices.IndexManagement;
import org.opensearch.timeseries.ml.SingleStreamModelIdMapper;
import org.opensearch.timeseries.model.Config;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.FeatureData;
import org.opensearch.timeseries.model.IndexableResult;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
import org.opensearch.timeseries.model.TaskType;
import org.opensearch.timeseries.model.TimeSeriesTask;
import org.opensearch.timeseries.stats.StatNames;
import org.opensearch.timeseries.stats.Stats;
import org.opensearch.timeseries.task.TaskCacheManager;
import org.opensearch.timeseries.task.TaskManager;
import org.opensearch.timeseries.transport.EntityResultRequest;
import org.opensearch.timeseries.transport.ResultRequest;
import org.opensearch.timeseries.transport.ResultResponse;
import org.opensearch.timeseries.transport.SingleStreamResultRequest;
import org.opensearch.timeseries.util.DataUtil;
import org.opensearch.timeseries.util.ExceptionUtil;
import org.opensearch.timeseries.util.SecurityClientUtil;
import org.opensearch.timeseries.util.TimeUtil;
import org.opensearch.transport.ActionNotFoundTransportException;
import org.opensearch.transport.ConnectTransportException;
import org.opensearch.transport.NodeNotConnectedException;
import org.opensearch.transport.ReceiveTimeoutTransportException;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestOptions;
import org.opensearch.transport.TransportResponseHandler;
import org.opensearch.transport.TransportService;

public abstract class ResultProcessor<TransportResultRequestType extends ResultRequest, IndexableResultType extends IndexableResult, ResultResponseType extends ResultResponse<IndexableResultType>, TaskCacheManagerType extends TaskCacheManager, TaskTypeEnum extends TaskType, TaskClass extends TimeSeriesTask, IndexType extends Enum<IndexType>, IndexManagementType extends IndexManagement<IndexType>, TaskManagerType extends TaskManager<TaskCacheManagerType, TaskTypeEnum, TaskClass, IndexType, IndexManagementType>> {
    private static final Logger LOG = LogManager.getLogger(ResultProcessor.class);
    static final String WAIT_FOR_THRESHOLD_ERR_MSG = "Exception in waiting for threshold result";
    static final String NO_ACK_ERR = "no acknowledgements from model hosting nodes.";
    public static final String TROUBLE_QUERYING_ERR_MSG = "Having trouble querying data: ";
    public static final String NULL_RESPONSE = "Received null response from";
    public static final String INDEX_READ_BLOCKED = "Cannot read user index due to read block.";
    public static final String READ_WRITE_BLOCKED = "Cannot read/write due to global block.";
    public static final String NODE_UNRESPONSIVE_ERR_MSG = "Model node is unresponsive.  Mute node";
    protected final TransportRequestOptions option;
    private String entityResultAction;
    protected Class<ResultResponseType> transportResultResponseClazz;
    private StatNames hcRequestCountStat;
    private String threadPoolName;
    private int maxEntitiesPerInterval;
    private int pageSize;
    protected final ThreadPool threadPool;
    protected final HashRing hashRing;
    protected final NodeStateManager nodeStateManager;
    protected final TransportService transportService;
    private final Stats timeSeriesStats;
    private final TaskManagerType realTimeTaskManager;
    private NamedXContentRegistry xContentRegistry;
    protected final Client client;
    private final SecurityClientUtil clientUtil;
    private Settings settings;
    private final IndexNameExpressionResolver indexNameExpressionResolver;
    private final ClusterService clusterService;
    protected final FeatureManager featureManager;
    protected final AnalysisType analysisType;
    protected final String singleStreamActionName;
    protected boolean runOnce;

    public ResultProcessor(Setting<TimeValue> requestTimeoutSetting, String entityResultAction, StatNames hcRequestCountStat, Settings settings, ClusterService clusterService, ThreadPool threadPool, String threadPoolName, HashRing hashRing, NodeStateManager nodeStateManager, TransportService transportService, Stats timeSeriesStats, TaskManagerType realTimeTaskManager, NamedXContentRegistry xContentRegistry, Client client, SecurityClientUtil clientUtil, IndexNameExpressionResolver indexNameExpressionResolver, Class<ResultResponseType> transportResultResponseClazz, FeatureManager featureManager, Setting<Integer> maxEntitiesPerIntervalSetting, Setting<Integer> pageSizeSetting, AnalysisType context, boolean runOnce, String singleStreamActionName) {
        this.option = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.REG).withTimeout((TimeValue)requestTimeoutSetting.get(settings)).build();
        this.maxEntitiesPerInterval = (Integer)maxEntitiesPerIntervalSetting.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(maxEntitiesPerIntervalSetting, it -> {
            this.maxEntitiesPerInterval = it;
        });
        this.pageSize = (Integer)pageSizeSetting.get(settings);
        clusterService.getClusterSettings().addSettingsUpdateConsumer(pageSizeSetting, it -> {
            this.pageSize = it;
        });
        this.entityResultAction = entityResultAction;
        this.hcRequestCountStat = hcRequestCountStat;
        this.threadPool = threadPool;
        this.hashRing = hashRing;
        this.nodeStateManager = nodeStateManager;
        this.transportService = transportService;
        this.timeSeriesStats = timeSeriesStats;
        this.realTimeTaskManager = realTimeTaskManager;
        this.xContentRegistry = xContentRegistry;
        this.client = client;
        this.clientUtil = clientUtil;
        this.settings = settings;
        this.indexNameExpressionResolver = indexNameExpressionResolver;
        this.clusterService = clusterService;
        this.transportResultResponseClazz = transportResultResponseClazz;
        this.featureManager = featureManager;
        this.analysisType = context;
        this.threadPoolName = threadPoolName;
        this.runOnce = runOnce;
        this.singleStreamActionName = singleStreamActionName;
    }

    public ActionListener<Optional<? extends Config>> onGetConfig(ActionListener<ResultResponseType> listener, String configID, TransportResultRequestType request, Optional<Set<String>> hcDetectors) {
        return ActionListener.wrap(configOptional -> {
            if (!configOptional.isPresent()) {
                listener.onFailure((Exception)new EndRunException(configID, "config is not available.", true));
                return;
            }
            Config config = (Config)configOptional.get();
            if (config.isHighCardinality() && hcDetectors.isPresent()) {
                ((Set)hcDetectors.get()).add(configID);
                this.timeSeriesStats.getStat(this.hcRequestCountStat.getName()).increment();
            }
            if (request.getStart() <= 0L) {
                long duration = config.getIntervalInMilliseconds();
                long executionStartTime = request.getEnd() - duration;
                request.setStart(executionStartTime);
            }
            long delayMillis = Optional.ofNullable((IntervalTimeConfiguration)config.getWindowDelay()).map(t -> t.toDuration().toMillis()).orElse(0L);
            long dataStartTime = request.getStart() - delayMillis;
            long dataEndTime = request.getEnd() - delayMillis;
            if (this.runOnce) {
                ((TaskManager)this.realTimeTaskManager).createRunOnceTaskAndCleanupStaleTasks(configID, config, this.transportService, ActionListener.wrap(r -> {
                    if (r == null) {
                        LOG.error("Unexpected empty new task for " + configID);
                        listener.onFailure((Exception)new OpenSearchStatusException("Failed to bootstrap run once task for " + configID, RestStatus.INTERNAL_SERVER_ERROR, new Object[0]));
                        return;
                    }
                    this.executeAnalysis(listener, configID, (ResultRequest)((Object)request), config, dataStartTime, dataEndTime, r.getTaskId());
                }, e -> {
                    LOG.error("Failed to init run once task for " + configID, (Throwable)e);
                    listener.onFailure((Exception)new OpenSearchStatusException("Failed to bootstrap run once task for " + configID, RestStatus.INTERNAL_SERVER_ERROR, new Object[0]));
                }));
            } else {
                ((TaskManager)this.realTimeTaskManager).initRealtimeTaskCacheAndCleanupStaleCache(configID, config, this.transportService, (ActionListener<Boolean>)ActionListener.runAfter(this.initRealtimeTaskListener(configID), () -> this.executeAnalysis(listener, configID, (ResultRequest)((Object)request), config, dataStartTime, dataEndTime, null)));
            }
        }, exception -> ResultProcessor.handleExecuteException(exception, (ActionListener<? extends ActionResponse>)listener, configID));
    }

    private ActionListener<Boolean> initRealtimeTaskListener(String configId) {
        return ActionListener.wrap(r -> {
            if (r.booleanValue()) {
                LOG.debug("Realtime task initied for config {}", (Object)configId);
            }
        }, e -> LOG.error("Failed to init realtime task for " + configId, (Throwable)e));
    }

    private void executeAnalysis(ActionListener<ResultResponseType> listener, String configID, ResultRequest request, Config config, long dataStartTime, long dataEndTime, String taskId) {
        if (config.isHighCardinality()) {
            Optional<Exception> previousException = this.nodeStateManager.fetchExceptionAndClear(configID);
            if (previousException.isPresent()) {
                EndRunException endRunException;
                Exception exception = previousException.get();
                LOG.error((Message)new ParameterizedMessage("Previous exception of [{}]", (Object)configID), (Throwable)exception);
                if (exception instanceof EndRunException && (endRunException = (EndRunException)exception).isEndNow()) {
                    listener.onFailure(exception);
                    return;
                }
            }
            long nextDetectionStartTime = request.getEnd() + config.getIntervalInMilliseconds();
            CompositeRetriever compositeRetriever = new CompositeRetriever(dataStartTime, dataEndTime, config, this.xContentRegistry, this.client, this.clientUtil, nextDetectionStartTime, this.settings, this.maxEntitiesPerInterval, this.pageSize, this.indexNameExpressionResolver, this.clusterService, this.analysisType);
            CompositeRetriever.PageIterator pageIterator = null;
            try {
                pageIterator = compositeRetriever.iterator();
            }
            catch (Exception e) {
                listener.onFailure((Exception)new EndRunException(config.getId(), "Invalid search query.", e, false));
                return;
            }
            PageListener getEntityFeatureslistener = new PageListener(pageIterator, config, dataStartTime, dataEndTime, taskId);
            if (pageIterator.hasNext()) {
                pageIterator.next(getEntityFeatureslistener);
            } else if (config.getImputationOption() != null) {
                this.imputeHC(dataStartTime, dataEndTime, configID, taskId);
            }
            if (previousException.isPresent()) {
                listener.onFailure(previousException.get());
            } else {
                listener.onResponse(this.createResultResponse(new ArrayList<FeatureData>(), null, null, config.getIntervalInMinutes(), true, taskId));
            }
            return;
        }
        String rcfModelID = SingleStreamModelIdMapper.getRcfModelId(configID, 0);
        Optional<DiscoveryNode> asRCFNode = this.hashRing.getOwningNodeWithSameLocalVersionForRealtime(rcfModelID);
        if (asRCFNode.isEmpty()) {
            listener.onFailure((Exception)new InternalFailure(configID, "RCF model node is not available."));
            return;
        }
        DiscoveryNode rcfNode = asRCFNode.get();
        if (!this.shouldStart(listener, configID, config, rcfNode.getId(), rcfModelID)) {
            return;
        }
        this.featureManager.getCurrentFeatures(config, dataStartTime, dataEndTime, this.analysisType, this.onFeatureResponseForSingleStreamConfig(config, listener, rcfModelID, rcfNode, dataStartTime, dataEndTime, taskId));
    }

    protected void handleQueryFailure(Exception exception, ActionListener<ResultResponseType> listener, String adID) {
        Exception convertedQueryFailureException = this.convertedQueryFailureException(exception, adID);
        if (convertedQueryFailureException instanceof EndRunException) {
            listener.onFailure(convertedQueryFailureException);
        } else {
            ResultProcessor.handleExecuteException(convertedQueryFailureException, listener, adID);
        }
    }

    private Exception convertedQueryFailureException(Exception exception, String adID) {
        if (ExceptionUtil.isIndexNotAvailable(exception)) {
            return new EndRunException(adID, TROUBLE_QUERYING_ERR_MSG + exception.getMessage(), false).countedInStats(false);
        }
        if (exception instanceof SearchPhaseExecutionException && this.invalidQuery((SearchPhaseExecutionException)exception)) {
            return new EndRunException(adID, "Invalid search query. " + ((SearchPhaseExecutionException)exception).getDetailedMessage(), exception, false).countedInStats(false);
        }
        return exception;
    }

    protected void findException(Throwable cause, String configID, AtomicReference<Exception> failure, String nodeId) {
        if (cause == null) {
            LOG.error((Message)new ParameterizedMessage("Null input exception", new Object[0]));
            return;
        }
        if (cause instanceof Error) {
            LOG.error((Message)new ParameterizedMessage("Error during prediction for {}: ", (Object)configID), cause);
            return;
        }
        Exception causeException = (Exception)cause;
        if (causeException instanceof TimeSeriesException) {
            failure.set(causeException);
        } else if (causeException instanceof NotSerializableExceptionWrapper) {
            Optional<TimeSeriesException> actualException = NotSerializedExceptionName.convertWrappedTimeSeriesException((NotSerializableExceptionWrapper)causeException, configID);
            if (actualException.isPresent()) {
                TimeSeriesException tsException = actualException.get();
                failure.set(tsException);
                if (tsException instanceof ResourceNotFoundException) {
                    this.nodeStateManager.addPressure(nodeId, configID);
                }
            } else {
                failure.set(new EndRunException(configID, "We might have bugs.", causeException, false));
            }
        } else if (causeException instanceof OpenSearchTimeoutException) {
            failure.set(new InternalFailure(configID, causeException));
        } else if (causeException instanceof IllegalArgumentException) {
            failure.set(new InternalFailure(configID, causeException));
        } else {
            failure.set(new EndRunException(configID, "We might have bugs.", causeException, false));
        }
    }

    private boolean invalidQuery(SearchPhaseExecutionException ex) {
        for (ShardSearchFailure failure : ex.shardFailures()) {
            if (RestStatus.BAD_REQUEST == failure.status() && failure.getCause() instanceof IllegalArgumentException) continue;
            return false;
        }
        return true;
    }

    protected void handlePredictionFailure(Exception e, String adID, String nodeID, AtomicReference<Exception> failure) {
        LOG.error((Message)new ParameterizedMessage("Received an error from node {} while doing model inference for {}", (Object)nodeID, (Object)adID), (Throwable)e);
        if (e == null) {
            return;
        }
        Throwable cause = ExceptionsHelper.unwrapCause((Throwable)e);
        if (this.hasConnectionIssue(cause)) {
            this.handleConnectionException(nodeID, adID);
        } else {
            this.findException(cause, adID, failure, nodeID);
        }
    }

    private boolean hasConnectionIssue(Throwable e) {
        return e instanceof ConnectTransportException || e instanceof NodeClosedException || e instanceof ReceiveTimeoutTransportException || e instanceof NodeNotConnectedException || e instanceof ConnectException || NetworkExceptionHelper.isCloseConnectionException((Throwable)e) || e instanceof ActionNotFoundTransportException;
    }

    private void handleConnectionException(String node, String detectorId) {
        DiscoveryNodes nodes = this.clusterService.state().nodes();
        if (!nodes.nodeExists(node)) {
            this.hashRing.buildCirclesForRealtime();
            return;
        }
        this.nodeStateManager.addPressure(node, detectorId);
    }

    private boolean checkGlobalBlock(ClusterState state) {
        return state.blocks().globalBlockedException(ClusterBlockLevel.READ) != null || state.blocks().globalBlockedException(ClusterBlockLevel.WRITE) != null;
    }

    private boolean checkIndicesBlocked(ClusterState state, ClusterBlockLevel level, String ... indices) {
        String[] concreteIndices = this.indexNameExpressionResolver.concreteIndexNames(state, IndicesOptions.lenientExpandOpen(), indices);
        return state.blocks().indicesBlockedException(level, concreteIndices) != null;
    }

    private boolean shouldStart(ActionListener<ResultResponseType> listener, String adID, Config detector, String rcfNodeId, String rcfModelID) {
        ClusterState state = this.clusterService.state();
        if (this.checkGlobalBlock(state)) {
            listener.onFailure((Exception)new InternalFailure(adID, READ_WRITE_BLOCKED));
            return false;
        }
        if (this.nodeStateManager.isMuted(rcfNodeId, adID)) {
            listener.onFailure((Exception)new InternalFailure(adID, String.format(Locale.ROOT, "Model node is unresponsive.  Mute node %s for rcf model %s", rcfNodeId, rcfModelID)));
            return false;
        }
        if (this.checkIndicesBlocked(state, ClusterBlockLevel.READ, detector.getIndices().toArray(new String[0]))) {
            listener.onFailure((Exception)new InternalFailure(adID, INDEX_READ_BLOCKED));
            return false;
        }
        return true;
    }

    public static void handleExecuteException(Exception ex, ActionListener<? extends ActionResponse> listener, String id) {
        if (ex instanceof ClientException) {
            listener.onFailure(ex);
        } else if (ex instanceof TimeSeriesException) {
            listener.onFailure((Exception)new InternalFailure((TimeSeriesException)ex));
        } else {
            Throwable cause = ExceptionsHelper.unwrapCause((Throwable)ex);
            listener.onFailure((Exception)new InternalFailure(id, cause));
        }
    }

    protected ActionListener<Optional<double[]>> onFeatureResponseForSingleStreamConfig(Config config, ActionListener<ResultResponseType> listener, String rcfModelId, DiscoveryNode rcfNode, long dataStartTime, long dataEndTime, String taskId) {
        String configId = config.getId();
        return ActionListener.wrap(featureOptional -> {
            Optional<Exception> previousException = this.nodeStateManager.fetchExceptionAndClear(configId);
            if (previousException.isPresent()) {
                EndRunException endRunException;
                Exception exception = previousException.get();
                LOG.error((Message)new ParameterizedMessage("Previous exception of [{}]", (Object)configId), (Throwable)exception);
                if (exception instanceof EndRunException && (endRunException = (EndRunException)exception).isEndNow()) {
                    listener.onFailure(exception);
                    return;
                }
            }
            if ((featureOptional.isEmpty() || DataUtil.areAnyElementsNaN((double[])featureOptional.get())) && config.getImputationOption() == null) {
                listener.onResponse(this.createResultResponse(new ArrayList<FeatureData>(), String.format(Locale.ROOT, "No data in current window between %d and %d for %s", dataStartTime, dataEndTime, configId), null, null, false, taskId));
                return;
            }
            AtomicReference<Exception> failure = new AtomicReference<Exception>();
            double[] point = null;
            if (featureOptional.isPresent()) {
                point = (double[])featureOptional.get();
            } else {
                int featureSize = config.getEnabledFeatureIds().size();
                point = new double[featureSize];
                Arrays.fill(point, Double.NaN);
            }
            if (DataUtil.areAnyElementsNaN(point)) {
                LOG.info("Sending a single stream request to node {} to impute/process data from timestamp {} to {} for config {}", (Object)rcfNode.getId(), (Object)dataStartTime, (Object)dataEndTime, (Object)configId);
            } else {
                LOG.info("Sending a single stream request to node {} to process data from timestamp {} to {} for config {}", (Object)rcfNode.getId(), (Object)dataStartTime, (Object)dataEndTime, (Object)configId);
            }
            this.transportService.sendRequest(rcfNode, this.singleStreamActionName, (TransportRequest)new SingleStreamResultRequest(configId, rcfModelId, dataStartTime, dataEndTime, point, taskId), this.option, (TransportResponseHandler)new ActionListenerResponseHandler((ActionListener)new ErrorResponseListener(rcfNode.getId(), configId, failure, new AtomicInteger()), AcknowledgedResponse::new, "same"));
            if (previousException.isPresent()) {
                listener.onFailure(previousException.get());
            } else {
                listener.onResponse(this.createResultResponse(new ArrayList<FeatureData>(), null, null, config.getIntervalInMinutes(), true, taskId));
            }
        }, exception -> this.handleQueryFailure((Exception)exception, listener, configId));
    }

    protected abstract ResultResponseType createResultResponse(List<FeatureData> var1, String var2, Long var3, Long var4, Boolean var5, String var6);

    protected abstract void imputeHC(long var1, long var3, String var5, String var6);

    public class ErrorResponseListener
    implements ActionListener<AcknowledgedResponse> {
        private String nodeId;
        private final String configId;
        private AtomicReference<Exception> failure;
        private AtomicInteger receivedPages;

        public ErrorResponseListener(String nodeId, String configId, AtomicReference<Exception> failure, AtomicInteger receivedPage) {
            this.nodeId = nodeId;
            this.configId = configId;
            this.failure = failure;
            this.receivedPages = receivedPage;
        }

        public void onResponse(AcknowledgedResponse response) {
            try {
                this.receivedPages.incrementAndGet();
                if (!response.isAcknowledged()) {
                    LOG.error("Cannot send entities' features to {} for {}", (Object)this.nodeId, (Object)this.configId);
                    ResultProcessor.this.nodeStateManager.addPressure(this.nodeId, this.configId);
                } else {
                    ResultProcessor.this.nodeStateManager.resetBackpressureCounter(this.nodeId, this.configId);
                }
            }
            catch (Exception ex) {
                LOG.error("Unexpected exception: {} for {}", (Object)ex, (Object)this.configId);
                this.handleException(ex);
            }
        }

        public void onFailure(Exception e) {
            try {
                this.receivedPages.incrementAndGet();
                LOG.error((Message)new ParameterizedMessage("Cannot send entities' features to {} for {}", (Object)this.nodeId, (Object)this.configId), (Throwable)e);
                this.handleException(e);
            }
            catch (Exception ex) {
                LOG.error("Unexpected exception: {} for {}", (Object)ex, (Object)this.configId);
                this.handleException(ex);
            }
        }

        private void handleException(Exception e) {
            ResultProcessor.this.handlePredictionFailure(e, this.configId, this.nodeId, this.failure);
            if (this.failure.get() != null) {
                ResultProcessor.this.nodeStateManager.setException(this.configId, this.failure.get());
            }
        }
    }

    class PageListener
    implements ActionListener<CompositeRetriever.Page> {
        private CompositeRetriever.PageIterator pageIterator;
        private String configId;
        private Config config;
        private long dataStartTime;
        private long dataEndTime;
        private String taskId;
        private AtomicInteger receivedPages;
        private AtomicInteger sentOutPages;
        private AtomicInteger pagesInFlight;

        PageListener(CompositeRetriever.PageIterator pageIterator, Config config, long dataStartTime, long dataEndTime, String taskId) {
            this.pageIterator = pageIterator;
            this.configId = config.getId();
            this.config = config;
            this.dataStartTime = dataStartTime;
            this.dataEndTime = dataEndTime;
            this.taskId = taskId;
            this.receivedPages = new AtomicInteger();
            this.sentOutPages = new AtomicInteger();
            this.pagesInFlight = new AtomicInteger();
        }

        public void onResponse(CompositeRetriever.Page entityFeatures) {
            this.pagesInFlight.incrementAndGet();
            if (this.pageIterator.hasNext()) {
                this.pageIterator.next(this);
            } else if (this.config.getImputationOption() != null) {
                this.scheduleImputeHCTask();
            }
            if (entityFeatures != null && !entityFeatures.isEmpty()) {
                LOG.info("Sending an HC request to process data from timestamp {} to {} for config {}", (Object)this.dataStartTime, (Object)this.dataEndTime, (Object)this.configId);
                ResultProcessor.this.threadPool.executor(ResultProcessor.this.threadPoolName).execute(() -> {
                    try {
                        Set<Map.Entry<DiscoveryNode, Map<Entity, double[]>>> node2Entities = entityFeatures.getResults().entrySet().stream().filter(e -> ResultProcessor.this.hashRing.getOwningNodeWithSameLocalVersionForRealtime(((Entity)e.getKey()).toString()).isPresent()).collect(Collectors.groupingBy(e -> ResultProcessor.this.hashRing.getOwningNodeWithSameLocalVersionForRealtime(((Entity)e.getKey()).toString()).get(), Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))).entrySet();
                        Iterator<Map.Entry<DiscoveryNode, Map<Entity, double[]>>> iterator = node2Entities.iterator();
                        while (iterator.hasNext()) {
                            Map.Entry<DiscoveryNode, Map<Entity, double[]>> entry = iterator.next();
                            DiscoveryNode modelNode = entry.getKey();
                            if (modelNode == null) {
                                iterator.remove();
                                continue;
                            }
                            String modelNodeId = modelNode.getId();
                            if (!ResultProcessor.this.nodeStateManager.isMuted(modelNodeId, this.configId)) continue;
                            LOG.info(String.format(Locale.ROOT, "Model node is unresponsive.  Mute node %s for config %s", modelNodeId, this.configId));
                            iterator.remove();
                        }
                        AtomicReference failure = new AtomicReference();
                        node2Entities.stream().forEach(nodeEntity -> {
                            this.sentOutPages.incrementAndGet();
                            DiscoveryNode node = (DiscoveryNode)nodeEntity.getKey();
                            ResultProcessor.this.transportService.sendRequest(node, ResultProcessor.this.entityResultAction, (TransportRequest)new EntityResultRequest(this.configId, (Map)nodeEntity.getValue(), this.dataStartTime, this.dataEndTime, ResultProcessor.this.analysisType, this.taskId), ResultProcessor.this.option, (TransportResponseHandler)new ActionListenerResponseHandler((ActionListener)new ErrorResponseListener(node.getId(), this.configId, failure, this.receivedPages), AcknowledgedResponse::new, "same"));
                        });
                    }
                    catch (Exception e2) {
                        LOG.error("Unexpected exception", (Throwable)e2);
                        this.handleException(e2);
                    }
                    finally {
                        this.pagesInFlight.decrementAndGet();
                    }
                });
            } else {
                this.pagesInFlight.decrementAndGet();
            }
        }

        public void onFailure(Exception e) {
            LOG.error("Unexpetected exception", (Throwable)e);
            this.handleException(e);
        }

        private void handleException(Exception e) {
            Exception convertedException = ResultProcessor.this.convertedQueryFailureException(e, this.configId);
            if (!(convertedException instanceof TimeSeriesException)) {
                Throwable cause = ExceptionsHelper.unwrapCause((Throwable)convertedException);
                convertedException = new InternalFailure(this.configId, cause);
            }
            ResultProcessor.this.nodeStateManager.setException(this.configId, convertedException);
        }

        private void scheduleImputeHCTask() {
            final AtomicReference<Scheduler.Cancellable> cancellable = new AtomicReference<Scheduler.Cancellable>();
            final AtomicBoolean sent = new AtomicBoolean();
            Runnable checkerTask = new Runnable(){
                private final long timeoutMillis;
                {
                    this.timeoutMillis = TimeUtil.calculateTimeoutMillis(PageListener.this.config, PageListener.this.dataEndTime);
                }

                @Override
                public void run() {
                    if (PageListener.this.pagesInFlight.get() == 0 && PageListener.this.sentOutPages.get() == PageListener.this.receivedPages.get()) {
                        if (!sent.get()) {
                            sent.set(true);
                            ResultProcessor.this.imputeHC(PageListener.this.dataStartTime, PageListener.this.dataEndTime, PageListener.this.configId, PageListener.this.taskId);
                        }
                        if (cancellable.get() != null) {
                            ((Scheduler.Cancellable)cancellable.get()).cancel();
                        }
                    } else if (Instant.now().toEpochMilli() >= this.timeoutMillis) {
                        LOG.warn("Scheduled impute HC task is cancelled due to timeout, current epoch {}, timeout epoch {}, dataEndTime {}, sent out {}, receive {}", (Object)Instant.now().toEpochMilli(), (Object)this.timeoutMillis, (Object)PageListener.this.dataEndTime, (Object)PageListener.this.sentOutPages.get(), (Object)PageListener.this.receivedPages.get());
                        if (cancellable != null) {
                            ((Scheduler.Cancellable)cancellable.get()).cancel();
                        }
                    }
                }
            };
            cancellable.set(ResultProcessor.this.threadPool.scheduleWithFixedDelay(checkerTask, TimeValue.timeValueSeconds((long)2L), ResultProcessor.this.threadPoolName));
        }
    }
}

