/*
 * Decompiled with CFR 0.152.
 */
package org.apache.commons.jcs3.auxiliary.lateral.socket.tcp;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.commons.jcs3.auxiliary.lateral.LateralElementDescriptor;
import org.apache.commons.jcs3.auxiliary.lateral.behavior.ILateralCacheListener;
import org.apache.commons.jcs3.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
import org.apache.commons.jcs3.engine.CacheInfo;
import org.apache.commons.jcs3.engine.behavior.ICacheElement;
import org.apache.commons.jcs3.engine.behavior.ICompositeCacheManager;
import org.apache.commons.jcs3.engine.behavior.IElementSerializer;
import org.apache.commons.jcs3.engine.behavior.IShutdownObserver;
import org.apache.commons.jcs3.engine.control.CompositeCache;
import org.apache.commons.jcs3.log.Log;
import org.apache.commons.jcs3.log.LogManager;
import org.apache.commons.jcs3.utils.serialization.StandardSerializer;

public class LateralTCPListener<K, V>
implements ILateralCacheListener<K, V>,
IShutdownObserver {
    private static final Log log = LogManager.getLog(LateralTCPListener.class);
    private static final int acceptTimeOut = 1000;
    private transient ICompositeCacheManager cacheManager;
    private static final ConcurrentHashMap<String, ILateralCacheListener<?, ?>> instances = new ConcurrentHashMap();
    private ITCPLateralCacheAttributes tcpLateralCacheAttributes;
    private Thread listenerThread;
    private IElementSerializer serializer;
    private int putCnt;
    private int removeCnt;
    private int getCnt;
    private long listenerId = CacheInfo.listenerId;
    private final AtomicBoolean shutdown = new AtomicBoolean();
    private final AtomicBoolean terminated = new AtomicBoolean();

    @Deprecated
    public static <K, V> LateralTCPListener<K, V> getInstance(ITCPLateralCacheAttributes ilca, ICompositeCacheManager cacheMgr) {
        return (LateralTCPListener)instances.computeIfAbsent(String.valueOf(ilca.getTcpListenerPort()), k -> {
            LateralTCPListener newIns = new LateralTCPListener(ilca, new StandardSerializer());
            newIns.init();
            newIns.setCacheManager(cacheMgr);
            Supplier[] supplierArray = new Supplier[1];
            supplierArray[0] = ilca::getTcpListenerPort;
            log.info("Created new listener {0}", supplierArray);
            return newIns;
        });
    }

    public static <K, V> LateralTCPListener<K, V> getInstance(ITCPLateralCacheAttributes ilca, ICompositeCacheManager cacheMgr, IElementSerializer serializer) {
        return (LateralTCPListener)instances.computeIfAbsent(String.valueOf(ilca.getTcpListenerPort()), k -> {
            LateralTCPListener newIns = new LateralTCPListener(ilca, serializer);
            newIns.init();
            newIns.setCacheManager(cacheMgr);
            Supplier[] supplierArray = new Supplier[1];
            supplierArray[0] = ilca::getTcpListenerPort;
            log.info("Created new listener {0}", supplierArray);
            return newIns;
        });
    }

    @Deprecated
    protected LateralTCPListener(ITCPLateralCacheAttributes ilca) {
        this(ilca, new StandardSerializer());
    }

    protected LateralTCPListener(ITCPLateralCacheAttributes ilca, IElementSerializer serializer) {
        this.setTcpLateralCacheAttributes(ilca);
        this.serializer = serializer;
    }

    @Override
    public synchronized void init() {
        try {
            InetSocketAddress endPoint;
            int port = this.getTcpLateralCacheAttributes().getTcpListenerPort();
            String host = this.getTcpLateralCacheAttributes().getTcpListenerHost();
            this.terminated.set(false);
            this.shutdown.set(false);
            ServerSocketChannel serverSocket = ServerSocketChannel.open();
            if (host != null && !host.isEmpty()) {
                log.info("Listening on {0}:{1}", host, port);
                endPoint = new InetSocketAddress(host, port);
            } else {
                log.info("Listening on port {0}", port);
                endPoint = new InetSocketAddress(port);
            }
            serverSocket.bind(endPoint);
            serverSocket.configureBlocking(false);
            this.listenerThread = new Thread(() -> this.runListener(serverSocket), "JCS-LateralTCPListener-" + host + ":" + port);
            this.listenerThread.setDaemon(true);
            this.listenerThread.start();
        }
        catch (IOException ex) {
            throw new IllegalStateException(ex);
        }
    }

    @Override
    public void setListenerId(long id) throws IOException {
        this.listenerId = id;
        log.debug("set listenerId = {0}", id);
    }

    @Override
    public long getListenerId() throws IOException {
        return this.listenerId;
    }

    @Override
    public void handlePut(ICacheElement<K, V> element) throws IOException {
        ++this.putCnt;
        if (log.isInfoEnabled() && this.getPutCnt() % 100 == 0) {
            log.info("Put Count (port {0}) = {1}", () -> this.getTcpLateralCacheAttributes().getTcpListenerPort(), this::getPutCnt);
        }
        Supplier[] supplierArray = new Supplier[2];
        supplierArray[0] = element::getCacheName;
        supplierArray[1] = element::getKey;
        log.debug("handlePut> cacheName={0}, key={1}", supplierArray);
        this.getCache(element.getCacheName()).localUpdate(element);
    }

    @Override
    public void handleRemove(String cacheName, K key) throws IOException {
        ++this.removeCnt;
        if (log.isInfoEnabled() && this.getRemoveCnt() % 100 == 0) {
            log.info("Remove Count = {0}", this::getRemoveCnt);
        }
        log.debug("handleRemove> cacheName={0}, key={1}", cacheName, key);
        this.getCache(cacheName).localRemove(key);
    }

    @Override
    public void handleRemoveAll(String cacheName) throws IOException {
        log.debug("handleRemoveAll> cacheName={0}", cacheName);
        this.getCache(cacheName).localRemoveAll();
    }

    public ICacheElement<K, V> handleGet(String cacheName, K key) throws IOException {
        ++this.getCnt;
        if (log.isInfoEnabled() && this.getGetCnt() % 100 == 0) {
            log.info("Get Count (port {0}) = {1}", () -> this.getTcpLateralCacheAttributes().getTcpListenerPort(), this::getGetCnt);
        }
        log.debug("handleGet> cacheName={0}, key={1}", cacheName, key);
        return this.getCache(cacheName).localGet(key);
    }

    public Map<K, ICacheElement<K, V>> handleGetMatching(String cacheName, String pattern) throws IOException {
        ++this.getCnt;
        if (log.isInfoEnabled() && this.getGetCnt() % 100 == 0) {
            log.info("GetMatching Count (port {0}) = {1}", () -> this.getTcpLateralCacheAttributes().getTcpListenerPort(), this::getGetCnt);
        }
        log.debug("handleGetMatching> cacheName={0}, pattern={1}", cacheName, pattern);
        return this.getCache(cacheName).localGetMatching(pattern);
    }

    public Set<K> handleGetKeySet(String cacheName) throws IOException {
        return this.getCache(cacheName).getKeySet(true);
    }

    @Override
    public void handleDispose(String cacheName) throws IOException {
        log.info("handleDispose > cacheName={0} | Ignoring message. Do not dispose from remote.", cacheName);
        this.dispose();
    }

    @Override
    public synchronized void dispose() {
        if (this.terminated.compareAndSet(false, true)) {
            this.notify();
            this.listenerThread.interrupt();
        }
    }

    protected CompositeCache<K, V> getCache(String name) {
        return this.getCacheManager().getCache(name);
    }

    public int getPutCnt() {
        return this.putCnt;
    }

    public int getGetCnt() {
        return this.getCnt;
    }

    public int getRemoveCnt() {
        return this.removeCnt;
    }

    @Override
    public void setCacheManager(ICompositeCacheManager cacheMgr) {
        this.cacheManager = cacheMgr;
    }

    @Override
    public ICompositeCacheManager getCacheManager() {
        return this.cacheManager;
    }

    public void setTcpLateralCacheAttributes(ITCPLateralCacheAttributes tcpLateralCacheAttributes) {
        this.tcpLateralCacheAttributes = tcpLateralCacheAttributes;
    }

    public ITCPLateralCacheAttributes getTcpLateralCacheAttributes() {
        return this.tcpLateralCacheAttributes;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void runListener(ServerSocketChannel serverSocket) {
        try (Selector selector = Selector.open();){
            serverSocket.register(selector, 16);
            log.debug("Waiting for clients to connect");
            while (!this.terminated.get()) {
                int activeKeys = selector.select(1000L);
                if (activeKeys == 0) continue;
                Iterator<SelectionKey> i = selector.selectedKeys().iterator();
                while (i.hasNext() && !this.terminated.get()) {
                    SelectionKey key2 = i.next();
                    i.remove();
                    if (!key2.isValid()) continue;
                    if (key2.isAcceptable()) {
                        ServerSocketChannel server = (ServerSocketChannel)key2.channel();
                        SocketChannel client = server.accept();
                        if (client == null) continue;
                        log.info("Connected to client at {0}", client.getRemoteAddress());
                        client.configureBlocking(false);
                        client.register(selector, 1);
                    }
                    if (!key2.isReadable()) continue;
                    this.handleClient(key2);
                }
            }
            log.debug("Thread terminated, exiting gracefully");
            selector.keys().forEach(key -> {
                try {
                    key.channel().close();
                }
                catch (IOException e) {
                    log.warn("Problem closing channel", e);
                }
            });
        }
        catch (IOException e) {
            log.error("Exception caught in TCP listener", e);
        }
        finally {
            try {
                serverSocket.close();
            }
            catch (IOException e) {
                log.error("Exception closing TCP listener", e);
            }
        }
    }

    private void handleClient(SelectionKey key) {
        SocketChannel socketChannel = (SocketChannel)key.channel();
        try {
            LateralElementDescriptor led = (LateralElementDescriptor)this.serializer.deSerializeFrom(socketChannel, null);
            if (led == null) {
                log.debug("LateralElementDescriptor is null");
                return;
            }
            if (led.getRequesterId() == this.getListenerId()) {
                log.debug("from self");
            } else {
                log.debug("receiving LateralElementDescriptor from another led = {0}", led);
                Object obj = this.handleElement(led);
                if (obj != null) {
                    this.serializer.serializeTo(obj, socketChannel);
                }
            }
        }
        catch (IOException e) {
            log.info("Caught {0}, closing connection.", e.getClass().getSimpleName(), e);
            try {
                socketChannel.close();
            }
            catch (IOException e1) {
                log.error("Error while closing connection", e);
            }
        }
        catch (ClassNotFoundException e) {
            log.error("Deserialization failed reading from socket", e);
        }
    }

    private Object handleElement(LateralElementDescriptor<K, V> led) throws IOException {
        String cacheName = led.getPayload().getCacheName();
        K key = led.getPayload().getKey();
        Map<K, ICacheElement<K, V>> obj = null;
        switch (led.getCommand()) {
            case UPDATE: {
                this.handlePut(led.getPayload());
                break;
            }
            case REMOVE: {
                ICacheElement<K, V> test;
                if (led.getValHashCode() != -1 && this.getTcpLateralCacheAttributes().isFilterRemoveByHashCode() && (test = this.getCache(cacheName).localGet(key)) != null) {
                    if (test.getVal().hashCode() == led.getValHashCode()) {
                        log.debug("Filtering detected identical hashCode [{0}], not issuing a remove for led {1}", led.getValHashCode(), led);
                        return null;
                    }
                    Supplier[] supplierArray = new Supplier[2];
                    supplierArray[0] = test.getVal()::hashCode;
                    supplierArray[1] = led::getValHashCode;
                    log.debug("Different hash codes, in cache [{0}] sent [{1}]", supplierArray);
                }
                this.handleRemove(cacheName, key);
                break;
            }
            case REMOVEALL: {
                this.handleRemoveAll(cacheName);
                break;
            }
            case GET: {
                obj = this.handleGet(cacheName, key);
                break;
            }
            case GET_MATCHING: {
                obj = this.handleGetMatching(cacheName, (String)key);
                break;
            }
            case GET_KEYSET: {
                obj = this.handleGetKeySet(cacheName);
                break;
            }
        }
        return obj;
    }

    @Override
    public void shutdown() {
        if (this.shutdown.compareAndSet(false, true)) {
            log.info("Shutting down TCP Lateral receiver.");
            this.dispose();
        } else {
            log.debug("Shutdown already called.");
        }
    }

    @Deprecated
    public class ConnectionHandler
    implements Runnable {
        private final Socket socket;

        public ConnectionHandler(Socket socket) {
            this.socket = socket;
        }

        @Override
        public void run() {
            try {
                InputStream is = this.socket.getInputStream();
                try {
                    while (true) {
                        LateralElementDescriptor led;
                        if ((led = (LateralElementDescriptor)LateralTCPListener.this.serializer.deSerializeFrom(is, null)) == null) {
                            log.debug("LateralElementDescriptor is null");
                            continue;
                        }
                        if (led.getRequesterId() == LateralTCPListener.this.getListenerId()) {
                            log.debug("from self");
                            continue;
                        }
                        log.debug("receiving LateralElementDescriptor from another led = {0}", led);
                        Object obj = LateralTCPListener.this.handleElement(led);
                        if (obj == null) continue;
                        OutputStream os = this.socket.getOutputStream();
                        LateralTCPListener.this.serializer.serializeTo(obj, os);
                        os.flush();
                    }
                }
                catch (Throwable throwable) {
                    if (is != null) {
                        try {
                            is.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
            }
            catch (IOException e) {
                log.info("Caught {0}, closing connection.", e.getClass().getSimpleName(), e);
            }
            catch (ClassNotFoundException e) {
                log.error("Deserialization failed reading from socket", e);
            }
        }
    }

    @Deprecated
    public class ListenerThread
    extends Thread {
        private final ServerSocket serverSocket;

        public ListenerThread(ServerSocket serverSocket) {
            this.serverSocket = serverSocket;
        }

        @Override
        public void run() {
            LateralTCPListener.this.runListener(this.serverSocket.getChannel());
        }
    }
}

