/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.performanceanalyzer.rca.store.rca.hot_node;

import com.google.common.annotations.VisibleForTesting;
import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jooq.Record;
import org.jooq.Result;
import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics;
import org.opensearch.performanceanalyzer.commons.stats.ServiceMetrics;
import org.opensearch.performanceanalyzer.commons.stats.measurements.MeasurementSet;
import org.opensearch.performanceanalyzer.grpc.FlowUnitMessage;
import org.opensearch.performanceanalyzer.rca.framework.api.Metric;
import org.opensearch.performanceanalyzer.rca.framework.api.Rca;
import org.opensearch.performanceanalyzer.rca.framework.api.Resources;
import org.opensearch.performanceanalyzer.rca.framework.api.contexts.ResourceContext;
import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.MetricFlowUnit;
import org.opensearch.performanceanalyzer.rca.framework.api.flow_units.ResourceFlowUnit;
import org.opensearch.performanceanalyzer.rca.framework.api.metrics.Thread_Blocked_Time;
import org.opensearch.performanceanalyzer.rca.framework.api.metrics.Thread_Waited_Time;
import org.opensearch.performanceanalyzer.rca.framework.api.summaries.HotNodeSummary;
import org.opensearch.performanceanalyzer.rca.framework.metrics.RcaRuntimeMetrics;
import org.opensearch.performanceanalyzer.rca.framework.util.InstanceDetails;
import org.opensearch.performanceanalyzer.rca.scheduler.FlowUnitOperationArgWrapper;
import org.opensearch.performanceanalyzer.rca.store.rca.hot_node.ThreadAnalysis;
import org.opensearch.performanceanalyzer.rca.store.rca.hot_node.ThreadMetric;
import org.opensearch.performanceanalyzer.rca.store.rca.hot_node.ThreadMetricsSlidingWindow;

public class ThreadMetricsRca
extends Rca<ResourceFlowUnit<HotNodeSummary>> {
    private static final Logger LOG = LogManager.getLogger(ThreadMetricsRca.class);
    public static final double HIGH_BLOCKED_TIME_THRESHOLD_IN_SECONDS = 5.0;
    private final int rcaPeriod;
    private final Thread_Blocked_Time threadBlockedTime;
    private final Thread_Waited_Time threadWaitedTime;
    private final Clock clock;
    @VisibleForTesting
    final List<ThreadAnalysis> threadAnalyses;

    public ThreadMetricsRca(Thread_Blocked_Time threadBlockedTime, Thread_Waited_Time threadWaitedTime, int rcaPeriodInSeconds) {
        super(rcaPeriodInSeconds);
        this.rcaPeriod = rcaPeriodInSeconds;
        this.threadBlockedTime = threadBlockedTime;
        this.threadWaitedTime = threadWaitedTime;
        this.threadAnalyses = new ArrayList<ThreadAnalysis>();
        this.initThreadAnalyses();
        this.clock = Clock.systemUTC();
    }

    private void initThreadAnalyses() {
        this.threadAnalyses.add(new ThreadAnalysis(s -> s.contains("transport"), RcaRuntimeMetrics.BLOCKED_TRANSPORT_THREAD_COUNT, RcaRuntimeMetrics.WAITED_TRANSPORT_THREAD_COUNT, RcaRuntimeMetrics.MAX_TRANSPORT_THREAD_BLOCKED_TIME, RcaRuntimeMetrics.MAX_TRANSPORT_THREAD_WAITED_TIME));
    }

    @Override
    public void generateFlowUnitListFromWire(FlowUnitOperationArgWrapper args) {
        List<FlowUnitMessage> flowUnitMessages = args.getWireHopper().readFromWire(args.getNode());
        ArrayList flowUnitList = new ArrayList();
        LOG.debug("rca: Executing fromWire: {}", (Object)this.getClass().getSimpleName());
        for (FlowUnitMessage flowUnitMessage : flowUnitMessages) {
            flowUnitList.add(ResourceFlowUnit.buildFlowUnitFromWrapper(flowUnitMessage));
        }
        this.setFlowUnits(flowUnitList);
    }

    @Override
    public ResourceFlowUnit<HotNodeSummary> operate() {
        try {
            InstanceDetails instanceDetails = this.getInstanceDetails();
            long currentTimeMillis = this.clock.millis();
            LOG.debug("ThreadMetricsRca run at {}", (Object)currentTimeMillis);
            this.collateThreadMetricData(currentTimeMillis);
            this.publishStats();
            return new ResourceFlowUnit<HotNodeSummary>(this.clock.millis(), new ResourceContext(Resources.State.HEALTHY), new HotNodeSummary(instanceDetails.getInstanceId(), instanceDetails.getInstanceIp()), false);
        }
        catch (Exception e) {
            LOG.error("ThreadMetricsRca.operate() Failed", (Throwable)e);
            return new ResourceFlowUnit<HotNodeSummary>(this.clock.millis());
        }
    }

    private void publishStats() {
        this.threadAnalyses.forEach(analysis -> {
            ServiceMetrics.RCA_RUNTIME_METRICS_AGGREGATOR.updateStat((MeasurementSet)analysis.getBlockedThreadCountMetric(), (Number)analysis.getBlockedTimeWindow().getCountExceedingThreshold(5.0));
            ServiceMetrics.RCA_RUNTIME_METRICS_AGGREGATOR.updateStat((MeasurementSet)analysis.getMaxBlockedTimeMetric(), (Number)analysis.getBlockedTimeWindow().getMaxSum());
            ServiceMetrics.RCA_RUNTIME_METRICS_AGGREGATOR.updateStat((MeasurementSet)analysis.getWaitedThreadCountMetric(), (Number)analysis.getWaitedTimeWindow().getCountExceedingThreshold(5.0));
            ServiceMetrics.RCA_RUNTIME_METRICS_AGGREGATOR.updateStat((MeasurementSet)analysis.getMaxWaitedTimeMetric(), (Number)analysis.getWaitedTimeWindow().getMaxSum());
        });
    }

    private void collateThreadMetricData(long currentTimeMillis) {
        this.collateThreadMetricData(currentTimeMillis, this.threadBlockedTime, ThreadAnalysis::getBlockedTimeWindow);
        this.collateThreadMetricData(currentTimeMillis, this.threadWaitedTime, ThreadAnalysis::getWaitedTimeWindow);
    }

    private void collateThreadMetricData(long currentTimeMillis, Metric metric, Function<ThreadAnalysis, ThreadMetricsSlidingWindow> slidingWindowFunction) {
        ArrayList<ThreadMetric> threadList = new ArrayList<ThreadMetric>();
        block2: for (MetricFlowUnit flowUnit : metric.getFlowUnits()) {
            Result<Record> result;
            if (flowUnit.isEmpty() || (result = flowUnit.getData()) == null) continue;
            for (Record record : result) {
                Object nameObj = record.get(AllMetrics.CommonDimension.THREAD_NAME.toString());
                Object operationObj = record.get(AllMetrics.CommonDimension.OPERATION.toString());
                Object valObj = record.get("avg");
                if (nameObj == null || operationObj == null || valObj == null) continue;
                try {
                    String operation = operationObj.toString();
                    String threadName = nameObj.toString();
                    double val = Double.parseDouble(valObj.toString());
                    if (!(val > 0.0)) continue;
                    ThreadMetric tm = new ThreadMetric(threadName, val, currentTimeMillis, operation);
                    threadList.add(tm);
                }
                catch (Exception e) {
                    LOG.error("ThreadMetricsRca.operate() Failed to parse data for record " + record.formatJSON(), (Throwable)e);
                    continue block2;
                }
            }
        }
        this.threadAnalyses.forEach(analysis -> {
            ThreadMetricsSlidingWindow slidingWindow = (ThreadMetricsSlidingWindow)slidingWindowFunction.apply((ThreadAnalysis)analysis);
            slidingWindow.next(currentTimeMillis, threadList.stream().filter(tm -> analysis.getTypeFilter().test(tm.getOperation())).collect(Collectors.toList()));
        });
    }
}

