/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.performanceanalyzer.rca.store.rca.hotshard;

import com.google.common.collect.MinMaxPriorityQueue;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jooq.Record;
import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics;
import org.opensearch.performanceanalyzer.commons.stats.ServiceMetrics;
import org.opensearch.performanceanalyzer.commons.stats.measurements.MeasurementSet;
import org.opensearch.performanceanalyzer.grpc.FlowUnitMessage;
import org.opensearch.performanceanalyzer.grpc.HotShardSummaryMessage;
import org.opensearch.performanceanalyzer.rca.configs.HotShardRcaConfig;
import org.opensearch.performanceanalyzer.rca.framework.api.Metric;
import org.opensearch.performanceanalyzer.rca.framework.api.Rca;
import org.opensearch.performanceanalyzer.rca.framework.api.Resources;
import org.opensearch.performanceanalyzer.rca.framework.api.aggregators.SummarizedWindow;
import org.opensearch.performanceanalyzer.rca.framework.api.contexts.ResourceContext;
import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.MetricFlowUnit;
import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit;
import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary;
import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotShardSummary;
import org.opensearch.performanceanalyzer.rca.framework.core.RcaConf;
import org.opensearch.performanceanalyzer.rca.framework.metrics.RcaVerticesMetrics;
import org.opensearch.performanceanalyzer.rca.framework.util.InstanceDetails;
import org.opensearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper;
import org.opensearch.performanceanalyzer.rca.store.rca.hotshard.IndexShardKey;
import org.opensearch.performanceanalyzer.rca.store.rca.hotshard.NamedSummarizedWindow;
import org.opensearch.performanceanalyzer.rca.store.rca.hotshard.SummarizedWindowCPUComparator;

public class HotShardRca
extends Rca<ResourceFlowUnit<HotNodeSummary>> {
    private static final Logger LOG = LogManager.getLogger(HotShardRca.class);
    private static final int SLIDING_WINDOW_IN_SECONDS = 60;
    private double cpuUtilizationThreshold;
    private int topKConsumers;
    private final Metric cpuUtilization;
    private final int rcaPeriod;
    private int counter;
    protected Clock clock;
    private Map<IndexShardKey, SummarizedWindow> shardResourceSummarizationMap;

    public <M extends Metric> HotShardRca(long evaluationIntervalSeconds, int rcaPeriod, M cpuUtilization) {
        super(evaluationIntervalSeconds);
        this.cpuUtilization = cpuUtilization;
        this.rcaPeriod = rcaPeriod;
        this.counter = 0;
        this.clock = Clock.systemUTC();
        this.shardResourceSummarizationMap = new HashMap<IndexShardKey, SummarizedWindow>();
        this.cpuUtilizationThreshold = 0.015;
        this.topKConsumers = 50;
    }

    private void consumeFlowUnit(MetricFlowUnit metricFlowUnit, AllMetrics.OSMetrics metricType) {
        for (Record record : metricFlowUnit.getData()) {
            try {
                String indexName = (String)record.getValue(AllMetrics.CommonDimension.INDEX_NAME.toString(), String.class);
                Integer shardId = (Integer)record.getValue(AllMetrics.CommonDimension.SHARD_ID.toString(), Integer.class);
                if (indexName == null || shardId == null) continue;
                IndexShardKey indexShardKey = IndexShardKey.buildIndexShardKey(record);
                double usage = (Double)record.getValue("sum", Double.class);
                SummarizedWindow usageWindow = this.shardResourceSummarizationMap.get(indexShardKey);
                if (null == usageWindow) {
                    usageWindow = new SummarizedWindow();
                    this.shardResourceSummarizationMap.put(indexShardKey, usageWindow);
                }
                usageWindow.next(metricType, usage, this.clock.millis());
            }
            catch (Exception e) {
                ServiceMetrics.RCA_VERTICES_METRICS_AGGREGATOR.updateStat((MeasurementSet)RcaVerticesMetrics.HOT_SHARD_RCA_ERROR, (Number)1);
                LOG.error("Failed to parse metric in FlowUnit: {} from {}", (Object)record, (Object)metricType.toString(), (Object)e);
            }
        }
    }

    private void consumeMetric(Metric metric, AllMetrics.OSMetrics metricType) {
        for (MetricFlowUnit metricFlowUnit : metric.getFlowUnits()) {
            if (metricFlowUnit.getData() == null) continue;
            this.consumeFlowUnit(metricFlowUnit, metricType);
        }
    }

    private void drainQueue(MinMaxPriorityQueue<NamedSummarizedWindow> queue, Map<IndexShardKey, HotShardSummary> consumersToSend, InstanceDetails instanceDetails) {
        while (!queue.isEmpty()) {
            HotShardSummary summary;
            NamedSummarizedWindow candidate = (NamedSummarizedWindow)queue.remove();
            if (consumersToSend.containsKey(candidate.indexShardKey)) {
                summary = consumersToSend.get(candidate.indexShardKey);
                summary.setCriteria(HotShardSummaryMessage.CriteriaEnum.DOUBLE_CRITERIA);
                continue;
            }
            summary = new HotShardSummary(candidate.indexShardKey.getIndexName(), String.valueOf(candidate.indexShardKey.getShardId()), instanceDetails.getInstanceId().toString(), 60);
            double avgCpuUtilization = candidate.summarizedWindow.readAvgCpuUtilization(TimeUnit.SECONDS);
            summary.setCpuUtilization(avgCpuUtilization);
            summary.setCriteria(HotShardSummaryMessage.CriteriaEnum.CPU_UTILIZATION_CRITERIA);
            consumersToSend.put(candidate.indexShardKey, summary);
        }
    }

    private void isTopConsumer(MinMaxPriorityQueue<NamedSummarizedWindow> queue, IndexShardKey indexShardKey, SummarizedWindow summarizedWindow, AllMetrics.OSMetrics metricType, double threshold) {
        double metricValue = summarizedWindow.readAvgMetricValue(TimeUnit.SECONDS, metricType);
        if (metricValue > threshold) {
            queue.add((Object)new NamedSummarizedWindow(summarizedWindow, indexShardKey));
            LOG.debug("Top consumer Identified, Shard : {} , metricValue = {} , metricThreshold = {}, metricType = {}", (Object)indexShardKey.toString(), (Object)metricValue, (Object)threshold, (Object)metricType.toString());
        }
    }

    @Override
    public ResourceFlowUnit<HotNodeSummary> operate() {
        ++this.counter;
        this.consumeMetric(this.cpuUtilization, AllMetrics.OSMetrics.CPU_UTILIZATION);
        if (this.counter == this.rcaPeriod) {
            MinMaxPriorityQueue cpuUtilTopConsumers = MinMaxPriorityQueue.orderedBy((Comparator)new SummarizedWindowCPUComparator()).maximumSize(this.topKConsumers).create();
            for (Map.Entry<IndexShardKey, SummarizedWindow> entry : this.shardResourceSummarizationMap.entrySet()) {
                this.isTopConsumer((MinMaxPriorityQueue<NamedSummarizedWindow>)cpuUtilTopConsumers, entry.getKey(), entry.getValue(), AllMetrics.OSMetrics.CPU_UTILIZATION, this.cpuUtilizationThreshold);
            }
            this.shardResourceSummarizationMap.clear();
            InstanceDetails instanceDetails = this.getInstanceDetails();
            HashMap<IndexShardKey, HotShardSummary> consumersToSend = new HashMap<IndexShardKey, HotShardSummary>();
            this.drainQueue((MinMaxPriorityQueue<NamedSummarizedWindow>)cpuUtilTopConsumers, consumersToSend, instanceDetails);
            HotNodeSummary nodeSummary = new HotNodeSummary(instanceDetails.getInstanceId(), instanceDetails.getInstanceIp());
            nodeSummary.setHotShardSummaryList(new ArrayList<HotShardSummary>(consumersToSend.values()));
            ResourceContext context = new ResourceContext(nodeSummary.getNestedSummaryList().isEmpty() ? Resources.State.HEALTHY : Resources.State.UNHEALTHY);
            this.counter = 0;
            boolean isDataNode = !instanceDetails.getIsClusterManager();
            return new ResourceFlowUnit<HotNodeSummary>(this.clock.millis(), context, nodeSummary, isDataNode);
        }
        LOG.debug("Empty FlowUnit returned for Hot Shard RCA");
        return new ResourceFlowUnit<HotNodeSummary>(this.clock.millis());
    }

    @Override
    public void readRcaConf(RcaConf conf) {
        HotShardRcaConfig configObj = conf.getHotShardRcaConfig();
        this.cpuUtilizationThreshold = configObj.getCpuUtilizationThreshold();
        this.topKConsumers = configObj.getMaximumConsumersToSend();
    }

    @Override
    public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) {
        List<FlowUnitMessage> flowUnitMessages = args.getWireHopper().readFromWire(args.getNode());
        ArrayList flowUnitList = new ArrayList();
        LOG.debug("rca: Executing fromWire: {}", (Object)this.getClass().getSimpleName());
        for (FlowUnitMessage flowUnitMessage : flowUnitMessages) {
            flowUnitList.add(ResourceFlowUnit.buildFlowUnitFromWrapper(flowUnitMessage));
        }
        this.setFlowUnits(flowUnitList);
    }
}

