/*
 * Decompiled with CFR 0.152.
 */
package org.opensearch.performanceanalyzer.net;

import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.PerformanceAnalyzerApp;
import org.opensearch.performanceanalyzer.collectors.StatExceptionCode;
import org.opensearch.performanceanalyzer.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.grpc.FlowUnitMessage;
import org.opensearch.performanceanalyzer.grpc.InterNodeRpcServiceGrpc;
import org.opensearch.performanceanalyzer.grpc.MetricsRequest;
import org.opensearch.performanceanalyzer.grpc.MetricsResponse;
import org.opensearch.performanceanalyzer.grpc.PublishResponse;
import org.opensearch.performanceanalyzer.grpc.SubscribeMessage;
import org.opensearch.performanceanalyzer.grpc.SubscribeResponse;
import org.opensearch.performanceanalyzer.net.GRPCConnectionManager;
import org.opensearch.performanceanalyzer.rca.framework.metrics.RcaGraphMetrics;
import org.opensearch.performanceanalyzer.rca.framework.util.InstanceDetails;

public class NetClient {
    private static final Logger LOG = LogManager.getLogger(NetClient.class);
    private final GRPCConnectionManager connectionManager;
    private ConcurrentMap<InstanceDetails.Id, ConcurrentMap<String, AtomicReference<StreamObserver<FlowUnitMessage>>>> perHostAndNodeOpenDataStreamMap = new ConcurrentHashMap<InstanceDetails.Id, ConcurrentMap<String, AtomicReference<StreamObserver<FlowUnitMessage>>>>();

    public NetClient(GRPCConnectionManager connectionManager) {
        this.connectionManager = connectionManager;
    }

    public GRPCConnectionManager getConnectionManager() {
        return this.connectionManager;
    }

    protected ConcurrentMap<InstanceDetails.Id, ConcurrentMap<String, AtomicReference<StreamObserver<FlowUnitMessage>>>> getPerHostAndNodeOpenDataStreamMap() {
        return this.perHostAndNodeOpenDataStreamMap;
    }

    public void subscribe(InstanceDetails remoteHost, SubscribeMessage subscribeMessage, StreamObserver<SubscribeResponse> serverResponseStream) {
        LOG.debug("Trying to send intent message to {}", (Object)remoteHost);
        try {
            this.connectionManager.getClientStubForHost(remoteHost).subscribe(subscribeMessage, serverResponseStream);
            PerformanceAnalyzerApp.RCA_GRAPH_METRICS_AGGREGATOR.updateStat(RcaGraphMetrics.NET_BYTES_OUT, subscribeMessage.getRequesterGraphNode(), subscribeMessage.getSerializedSize());
        }
        catch (StatusRuntimeException sre) {
            LOG.error("Encountered an error trying to subscribe. Status: {}", (Object)sre.getStatus(), (Object)sre);
            StatsCollector.instance().logException(StatExceptionCode.RCA_NETWORK_ERROR);
        }
    }

    public void publish(InstanceDetails remoteHost, FlowUnitMessage flowUnitMessage, StreamObserver<PublishResponse> serverResponseStream) {
        LOG.debug("Publishing {} data to {}", (Object)flowUnitMessage.getGraphNode(), (Object)remoteHost);
        try {
            StreamObserver<FlowUnitMessage> stream = this.getDataStreamForHost(remoteHost, flowUnitMessage.getGraphNode(), serverResponseStream);
            stream.onNext((Object)flowUnitMessage);
            PerformanceAnalyzerApp.RCA_GRAPH_METRICS_AGGREGATOR.updateStat(RcaGraphMetrics.NET_BYTES_OUT, flowUnitMessage.getGraphNode(), flowUnitMessage.getSerializedSize());
        }
        catch (StatusRuntimeException sre) {
            LOG.error("rca: Encountered an error trying to publish a flow unit. Status: {}", (Object)sre.getStatus(), (Object)sre);
            StatsCollector.instance().logException(StatExceptionCode.RCA_NETWORK_ERROR);
        }
    }

    public void getMetrics(InstanceDetails remoteNodeIP, MetricsRequest request, StreamObserver<MetricsResponse> responseObserver) {
        InterNodeRpcServiceGrpc.InterNodeRpcServiceStub stub = this.connectionManager.getClientStubForHost(remoteNodeIP);
        stub.getMetrics(request, responseObserver);
    }

    public void stop() {
        LOG.debug("Shutting down client streaming connections..");
        this.closeAllDataStreams();
        this.connectionManager.shutdown();
    }

    public void flushStream(InstanceDetails.Id remoteHost) {
        LOG.debug("removing data streams for {} as we are no publishing to it.", (Object)remoteHost);
        this.perHostAndNodeOpenDataStreamMap.remove(remoteHost);
    }

    private void closeAllDataStreams() {
        for (Map.Entry entry : this.perHostAndNodeOpenDataStreamMap.entrySet()) {
            LOG.debug("Closing stream for host: {}", entry.getKey());
            for (Map.Entry perInstanceEntry : ((ConcurrentMap)entry.getValue()).entrySet()) {
                ((StreamObserver)((AtomicReference)perInstanceEntry.getValue()).get()).onCompleted();
            }
            this.perHostAndNodeOpenDataStreamMap.remove(entry.getKey());
        }
    }

    private StreamObserver<FlowUnitMessage> getDataStreamForHost(InstanceDetails remoteHost, String graphNode, StreamObserver<PublishResponse> serverResponseStream) {
        ConcurrentMap streamObserverAtomicReference = (ConcurrentMap)this.perHostAndNodeOpenDataStreamMap.get(remoteHost.getInstanceId());
        if (streamObserverAtomicReference != null && streamObserverAtomicReference.get(graphNode) != null) {
            return (StreamObserver)((AtomicReference)streamObserverAtomicReference.get(graphNode)).get();
        }
        return this.addOrUpdateDataStreamForHost(remoteHost, graphNode, serverResponseStream);
    }

    private synchronized StreamObserver<FlowUnitMessage> addOrUpdateDataStreamForHost(InstanceDetails remoteHost, final String graphNode, StreamObserver<PublishResponse> serverResponseStream) {
        InterNodeRpcServiceGrpc.InterNodeRpcServiceStub stub = this.connectionManager.getClientStubForHost(remoteHost);
        StreamObserver<FlowUnitMessage> dataStream = stub.publish(serverResponseStream);
        this.perHostAndNodeOpenDataStreamMap.computeIfAbsent(remoteHost.getInstanceId(), k -> new ConcurrentHashMap<String, AtomicReference<StreamObserver<FlowUnitMessage>>>(){
            {
                this.put(graphNode, new AtomicReference());
            }
        });
        ((ConcurrentMap)this.perHostAndNodeOpenDataStreamMap.get(remoteHost.getInstanceId())).computeIfAbsent(graphNode, k -> new AtomicReference()).set(dataStream);
        return dataStream;
    }
}

