/*
 * Decompiled with CFR 0.152.
 */
package org.apache.solr.cloud;

import com.google.common.util.concurrent.AtomicLongMap;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.lang.invoke.MethodHandles;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import javax.management.JMException;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.common.cloud.SolrZkClient;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.ObjectReleaseTracker;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.util.TimeOut;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.jmx.ManagedUtil;
import org.apache.zookeeper.server.NIOServerCnxn;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.SessionTracker;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZkTestServer {
    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    public static File SOLRHOME;
    public static final int TIMEOUT = 45000;
    public static final int TICK_TIME = 1000;
    protected final ZKServerMain zkServer = new ZKServerMain();
    private volatile Path zkDir;
    private volatile int clientPort;
    private volatile Thread zooThread;
    private volatile int theTickTime = 1000;
    private volatile int maxSessionTimeout = 90000;
    private volatile int minSessionTimeout = 3000;
    protected volatile SolrZkClient rootClient;
    protected volatile SolrZkClient chRootClient;
    private volatile ZKDatabase zkDb;

    public ZkTestServer(Path zkDir) throws Exception {
        this(zkDir, 0);
    }

    public ZkTestServer(Path zkDir, int port) throws KeeperException, InterruptedException {
        String limiterAction;
        this.zkDir = zkDir;
        this.clientPort = port;
        String reportAction = System.getProperty("tests.zk.violationReportAction");
        if (reportAction != null) {
            log.info("Overriding violation report action to: {}", (Object)reportAction);
            this.setViolationReportAction(LimitViolationAction.valueOf(reportAction));
        }
        if ((limiterAction = System.getProperty("tests.zk.limiterAction")) != null) {
            log.info("Overriding limiter action to: {}", (Object)limiterAction);
            this.getLimiter().setAction(LimitViolationAction.valueOf(limiterAction));
        }
        ObjectReleaseTracker.track((Object)this);
    }

    private void init(boolean solrFormat) throws Exception {
        try {
            this.rootClient = new SolrZkClient(this.getZkHost(), 45000, 30000);
        }
        catch (Exception e) {
            log.error("error making rootClient, trying one more time", (Throwable)e);
            this.rootClient = new SolrZkClient(this.getZkHost(), 45000, 30000);
        }
        if (solrFormat) {
            this.tryCleanSolrZkNode();
            this.makeSolrZkNode();
        }
        this.chRootClient = new SolrZkClient(this.getZkAddress(), 45000, 30000);
    }

    public String getZkHost() {
        String hostName = System.getProperty("hostName", "127.0.0.1");
        return hostName + ":" + this.zkServer.getLocalPort();
    }

    public String getZkAddress() {
        return this.getZkAddress("/solr");
    }

    public String getZkAddress(String chroot) {
        if (!chroot.startsWith("/")) {
            chroot = "/" + chroot;
        }
        return this.getZkHost() + chroot;
    }

    public void ensurePathExists(String path) throws IOException {
        try (SolrZkClient client = new SolrZkClient(this.getZkHost(), 10000);){
            client.makePath(path, null, CreateMode.PERSISTENT, null, false, true, 0);
        }
        catch (InterruptedException | KeeperException e) {
            e.printStackTrace();
            throw new IOException("Error checking path " + path, SolrZkClient.checkInterrupted((Throwable)e));
        }
    }

    public int getPort() {
        return this.zkServer.getLocalPort();
    }

    public void expire(final long sessionId) {
        this.zkServer.zooKeeperServer.expire(new SessionTracker.Session(){

            public long getSessionId() {
                return sessionId;
            }

            public int getTimeout() {
                return 4000;
            }

            public boolean isClosing() {
                return false;
            }
        });
    }

    public ZKDatabase getZKDatabase() {
        return this.zkServer.zooKeeperServer.getZKDatabase();
    }

    public void setZKDatabase(ZKDatabase zkDb) {
        this.zkDb = zkDb;
        this.zkServer.zooKeeperServer.setZKDatabase(zkDb);
    }

    public void run() throws InterruptedException, IOException {
        this.run(true);
    }

    public void run(boolean solrFormat) throws InterruptedException, IOException {
        log.info("STARTING ZK TEST SERVER");
        try {
            if (this.zooThread != null) {
                throw new IllegalStateException("ZK TEST SERVER IS ALREADY RUNNING");
            }
            this.zooThread = new Thread("ZkTestServer Run Thread"){

                @Override
                public void run() {
                    ServerConfig config = new ServerConfig(){
                        {
                            this.setClientPort(ZkTestServer.this.clientPort);
                            this.dataDir = ZkTestServer.this.zkDir.toFile();
                            this.dataLogDir = ZkTestServer.this.zkDir.toFile();
                            this.tickTime = ZkTestServer.this.theTickTime;
                            this.maxSessionTimeout = ZkTestServer.this.maxSessionTimeout;
                            this.minSessionTimeout = ZkTestServer.this.minSessionTimeout;
                        }

                        public void setClientPort(int clientPort) {
                            if (this.clientPortAddress != null) {
                                try {
                                    this.clientPortAddress = new InetSocketAddress(InetAddress.getByName(this.clientPortAddress.getHostName()), clientPort);
                                }
                                catch (UnknownHostException e) {
                                    throw new RuntimeException(e);
                                }
                            } else {
                                this.clientPortAddress = new InetSocketAddress(clientPort);
                            }
                            log.info("client port: {}", (Object)this.clientPortAddress);
                        }
                    };
                    try {
                        ZkTestServer.this.zkServer.runFromConfig(config);
                    }
                    catch (Throwable t) {
                        log.error("zkServer error", t);
                    }
                }
            };
            ObjectReleaseTracker.track((Object)this.zooThread);
            this.zooThread.start();
            int cnt = 0;
            int port = -1;
            try {
                port = this.getPort();
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            while (port < 1) {
                Thread.sleep(100L);
                try {
                    port = this.getPort();
                }
                catch (IllegalStateException illegalStateException) {
                    // empty catch block
                }
                if (cnt == 500) {
                    throw new RuntimeException("Could not get the port for ZooKeeper server");
                }
                ++cnt;
            }
            log.info("start zk server on port: {}", (Object)port);
            ZkTestServer.waitForServerUp(this.getZkHost(), 30000L);
            this.init(solrFormat);
        }
        catch (Exception e) {
            log.error("Error trying to run ZK Test Server", (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    /*
     * Loose catch block
     */
    public void shutdown() throws IOException, InterruptedException {
        log.info("Shutting down ZkTestServer.");
        try {
            IOUtils.closeQuietly((Closeable)this.rootClient);
            IOUtils.closeQuietly((Closeable)this.chRootClient);
        }
        finally {
            block16: {
                try {
                    this.zkServer.shutdown();
                }
                catch (Exception e) {
                    log.error("Exception shutting down ZooKeeper Test Server", (Throwable)e);
                }
                if (this.zkDb != null) {
                    this.zkDb.close();
                }
                while (true) {
                    try {
                        this.zooThread.join();
                        ObjectReleaseTracker.release((Object)this.zooThread);
                        this.zooThread = null;
                        break block16;
                    }
                    catch (InterruptedException e) {
                        continue;
                    }
                    break;
                }
                catch (NullPointerException e) {}
            }
        }
        ObjectReleaseTracker.release((Object)this);
    }

    public static boolean waitForServerDown(String hp, long timeoutMs) {
        log.info("waitForServerDown: {}", (Object)hp);
        TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
        while (true) {
            try {
                HostPort hpobj = ZkTestServer.parseHostPortList(hp).get(0);
                ZkTestServer.send4LetterWord(hpobj.host, hpobj.port, "stat");
            }
            catch (IOException e) {
                return true;
            }
            if (timeout.hasTimedOut()) {
                throw new RuntimeException("Time out waiting for ZooKeeper shutdown!");
            }
            try {
                Thread.sleep(250L);
            }
            catch (InterruptedException interruptedException) {
            }
        }
    }

    public static boolean waitForServerUp(String hp, long timeoutMs) {
        log.info("waitForServerUp: {}", (Object)hp);
        TimeOut timeout = new TimeOut(timeoutMs, TimeUnit.MILLISECONDS, TimeSource.NANO_TIME);
        while (true) {
            try {
                HostPort hpobj = ZkTestServer.parseHostPortList(hp).get(0);
                ZkTestServer.send4LetterWord(hpobj.host, hpobj.port, "stat");
                return true;
            }
            catch (IOException e) {
                e.printStackTrace();
                if (timeout.hasTimedOut()) {
                    throw new RuntimeException("Time out waiting for ZooKeeper to startup!");
                }
                try {
                    Thread.sleep(250L);
                }
                catch (InterruptedException interruptedException) {
                }
                continue;
            }
            break;
        }
    }

    public static String send4LetterWord(String host, int port, String cmd) throws IOException {
        log.info("connecting to {} {}", (Object)host, (Object)port);
        try (BufferedReader reader = null;){
            String string;
            try (Socket sock = new Socket(host, port);){
                String line;
                OutputStream outstream = sock.getOutputStream();
                outstream.write(cmd.getBytes(StandardCharsets.US_ASCII));
                outstream.flush();
                sock.shutdownOutput();
                reader = new BufferedReader(new InputStreamReader(sock.getInputStream(), StandardCharsets.US_ASCII));
                StringBuilder sb = new StringBuilder();
                while ((line = reader.readLine()) != null) {
                    sb.append(line).append("\n");
                }
                string = sb.toString();
            }
            return string;
        }
    }

    public static List<HostPort> parseHostPortList(String hplist) {
        log.info("parse host and port list: {}", (Object)hplist);
        ArrayList<HostPort> alist = new ArrayList<HostPort>();
        for (String hp : hplist.split(",")) {
            int port;
            int idx = hp.lastIndexOf(58);
            String host = hp.substring(0, idx);
            try {
                port = Integer.parseInt(hp.substring(idx + 1));
            }
            catch (RuntimeException e) {
                throw new RuntimeException("Problem parsing " + hp + e.toString());
            }
            alist.add(new HostPort(host, port));
        }
        return alist;
    }

    public int getTheTickTime() {
        return this.theTickTime;
    }

    public void setTheTickTime(int theTickTime) {
        this.theTickTime = theTickTime;
    }

    public Path getZkDir() {
        return this.zkDir;
    }

    public void setViolationReportAction(LimitViolationAction violationReportAction) {
        this.zkServer.setViolationReportAction(violationReportAction);
    }

    public ZKServerMain.WatchLimiter getLimiter() {
        return this.zkServer.getLimiter();
    }

    public int getMaxSessionTimeout() {
        return this.maxSessionTimeout;
    }

    public int getMinSessionTimeout() {
        return this.minSessionTimeout;
    }

    public void setMaxSessionTimeout(int maxSessionTimeout) {
        this.maxSessionTimeout = maxSessionTimeout;
    }

    public void setMinSessionTimeout(int minSessionTimeout) {
        this.minSessionTimeout = minSessionTimeout;
    }

    void buildZooKeeper(String config, String schema) throws Exception {
        this.buildZooKeeper(SOLRHOME, config, schema);
    }

    public static void putConfig(String confName, SolrZkClient zkClient, File solrhome, String name) throws Exception {
        ZkTestServer.putConfig(confName, zkClient, null, solrhome, name, name);
    }

    public static void putConfig(String confName, SolrZkClient zkClient, File solrhome, String srcName, String destName) throws Exception {
        ZkTestServer.putConfig(confName, zkClient, null, solrhome, srcName, destName);
    }

    public static void putConfig(String confName, SolrZkClient zkClient, String zkChroot, File solrhome, String srcName, String destName) throws Exception {
        File file = new File(solrhome, "collection1" + File.separator + "conf" + File.separator + srcName);
        if (!file.exists()) {
            if (log.isInfoEnabled()) {
                log.info("skipping {} because it doesn't exist", (Object)file.getAbsolutePath());
            }
            return;
        }
        String destPath = "/configs/" + confName + "/" + destName;
        if (zkChroot != null) {
            destPath = zkChroot + destPath;
        }
        if (log.isInfoEnabled()) {
            log.info("put {} to {}", (Object)file.getAbsolutePath(), (Object)destPath);
        }
        zkClient.makePath(destPath, file, false, true);
    }

    public void buildZooKeeper(File solrhome, String config, String schema) throws Exception {
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("configName", "conf1");
        ZkNodeProps zkProps = new ZkNodeProps(props);
        ArrayList<Op> ops = new ArrayList<Op>(2);
        String path = "/collections";
        ops.add(Op.create((String)path, null, (List)this.chRootClient.getZkACLProvider().getACLsToAdd(path), (CreateMode)CreateMode.PERSISTENT));
        path = "/collections/collection1";
        ops.add(Op.create((String)path, (byte[])Utils.toJSON((Object)zkProps), (List)this.chRootClient.getZkACLProvider().getACLsToAdd(path), (CreateMode)CreateMode.PERSISTENT));
        path = "/collections/collection1/shards";
        ops.add(Op.create((String)path, null, (List)this.chRootClient.getZkACLProvider().getACLsToAdd(path), (CreateMode)CreateMode.PERSISTENT));
        path = "/collections/control_collection";
        ops.add(Op.create((String)path, (byte[])Utils.toJSON((Object)zkProps), (List)this.chRootClient.getZkACLProvider().getACLsToAdd(path), (CreateMode)CreateMode.PERSISTENT));
        path = "/collections/control_collection/shards";
        ops.add(Op.create((String)path, null, (List)this.chRootClient.getZkACLProvider().getACLsToAdd(path), (CreateMode)CreateMode.PERSISTENT));
        path = "/configs";
        ops.add(Op.create((String)path, null, (List)this.chRootClient.getZkACLProvider().getACLsToAdd(path), (CreateMode)CreateMode.PERSISTENT));
        path = "/configs/conf1";
        ops.add(Op.create((String)path, null, (List)this.chRootClient.getZkACLProvider().getACLsToAdd(path), (CreateMode)CreateMode.PERSISTENT));
        this.chRootClient.multi(ops, true);
        String defaultClusterProps = "{\"legacyCloud\":\"true\"}";
        this.chRootClient.makePath("/clusterprops.json", defaultClusterProps.getBytes(StandardCharsets.UTF_8), CreateMode.PERSISTENT, true);
        ZkTestServer.putConfig("conf1", this.chRootClient, solrhome, config, "solrconfig.xml");
        ZkTestServer.putConfig("conf1", this.chRootClient, solrhome, schema, "schema.xml");
        ZkTestServer.putConfig("conf1", this.chRootClient, solrhome, "solrconfig.snippet.randomindexconfig.xml");
        ZkTestServer.putConfig("conf1", this.chRootClient, solrhome, "stopwords.txt");
        ZkTestServer.putConfig("conf1", this.chRootClient, solrhome, "protwords.txt");
        ZkTestServer.putConfig("conf1", this.chRootClient, solrhome, "currency.xml");
        ZkTestServer.putConfig("conf1", this.chRootClient, solrhome, "enumsConfig.xml");
        ZkTestServer.putConfig("conf1", this.chRootClient, solrhome, "open-exchange-rates.json");
        ZkTestServer.putConfig("conf1", this.chRootClient, solrhome, "mapping-ISOLatin1Accent.txt");
        ZkTestServer.putConfig("conf1", this.chRootClient, solrhome, "old_synonyms.txt");
        ZkTestServer.putConfig("conf1", this.chRootClient, solrhome, "synonyms.txt");
    }

    public void makeSolrZkNode() throws Exception {
        this.rootClient.makePath("/solr", false, true);
    }

    public void tryCleanSolrZkNode() throws Exception {
        this.tryCleanPath("/solr");
    }

    void tryCleanPath(String path) throws Exception {
        if (this.rootClient.exists(path, true).booleanValue()) {
            this.rootClient.clean(path);
        }
    }

    protected void printLayout() throws Exception {
        this.rootClient.printLayoutToStdOut();
    }

    public SolrZkClient getZkClient() {
        return this.chRootClient;
    }

    static {
        try {
            SOLRHOME = new File(SolrTestCaseJ4.TEST_HOME());
        }
        catch (RuntimeException e) {
            log.warn("TEST_HOME() does not exist - solrj test?");
        }
    }

    class ZKServerMain {
        private volatile ServerCnxnFactory cnxnFactory;
        private volatile ZooKeeperServer zooKeeperServer;
        private volatile LimitViolationAction violationReportAction = LimitViolationAction.REPORT;
        private volatile WatchLimiter limiter = new WatchLimiter(1L, LimitViolationAction.IGNORE);

        ZKServerMain() {
        }

        protected void initializeAndRun(String[] args) throws QuorumPeerConfig.ConfigException, IOException {
            try {
                ManagedUtil.registerLog4jMBeans();
            }
            catch (JMException e) {
                log.warn("Unable to register log4j JMX control", (Throwable)e);
            }
            ServerConfig config = new ServerConfig();
            if (args.length == 1) {
                config.parse(args[0]);
            } else {
                config.parse(args);
            }
            this.runFromConfig(config);
        }

        public void runFromConfig(ServerConfig config) throws IOException {
            ObjectReleaseTracker.track((Object)this);
            log.info("Starting server");
            try {
                String limitViolations;
                System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
                FileTxnSnapLog ftxn = new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir());
                this.zooKeeperServer = new ZooKeeperServer(ftxn, config.getTickTime(), config.getMinSessionTimeout(), config.getMaxSessionTimeout(), (ZKDatabase)new TestZKDatabase(ftxn, this.limiter));
                this.cnxnFactory = new TestServerCnxnFactory(this.limiter);
                this.cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());
                this.cnxnFactory.startup(this.zooKeeperServer);
                this.cnxnFactory.join();
                if (this.violationReportAction != LimitViolationAction.IGNORE && !(limitViolations = this.limiter.reportLimitViolations()).isEmpty()) {
                    log.warn("Watch limit violations: {}", (Object)limitViolations);
                    if (this.violationReportAction == LimitViolationAction.FAIL) {
                        throw new AssertionError((Object)"Parallel watch limits violated");
                    }
                }
            }
            catch (InterruptedException e) {
                log.warn("Server interrupted", (Throwable)e);
            }
        }

        protected void shutdown() throws IOException {
            ZKDatabase zkDb = this.zooKeeperServer.getZKDatabase();
            try {
                if (this.cnxnFactory != null) {
                    while (true) {
                        this.cnxnFactory.shutdown();
                        try {
                            this.cnxnFactory.join();
                        }
                        catch (InterruptedException interruptedException) {
                            continue;
                        }
                        break;
                    }
                }
                if (zkDb != null) {
                    zkDb.close();
                }
                if (this.cnxnFactory != null && this.cnxnFactory.getLocalPort() != 0) {
                    ZkTestServer.waitForServerDown(ZkTestServer.this.getZkHost(), 30000L);
                }
            }
            finally {
                ObjectReleaseTracker.release((Object)this);
            }
        }

        public int getLocalPort() {
            int port;
            if (this.cnxnFactory == null) {
                throw new IllegalStateException("A port has not yet been selected");
            }
            try {
                port = this.cnxnFactory.getLocalPort();
            }
            catch (NullPointerException e) {
                throw new IllegalStateException("A port has not yet been selected");
            }
            if (port == 0) {
                throw new IllegalStateException("A port has not yet been selected");
            }
            return port;
        }

        public void setViolationReportAction(LimitViolationAction violationReportAction) {
            this.violationReportAction = violationReportAction;
        }

        public WatchLimiter getLimiter() {
            return this.limiter;
        }

        public class WatchLimiter {
            WatchLimit statLimit;
            WatchLimit dataLimit;
            WatchLimit childrenLimit;

            private WatchLimiter(long limit, LimitViolationAction action) {
                this.statLimit = new WatchLimit(limit, "create/delete", action);
                this.dataLimit = new WatchLimit(limit, "data", action);
                this.childrenLimit = new WatchLimit(limit, "children", action);
            }

            public void setAction(LimitViolationAction action) {
                this.statLimit.setAction(action);
                this.dataLimit.setAction(action);
                this.childrenLimit.setAction(action);
            }

            public void setLimit(long limit) {
                this.statLimit.setLimit(limit);
                this.dataLimit.setLimit(limit);
                this.childrenLimit.setLimit(limit);
            }

            public String reportLimitViolations() {
                return this.statLimit.reportLimitViolations() + this.dataLimit.reportLimitViolations() + this.childrenLimit.reportLimitViolations();
            }

            private void updateForFire(WatchedEvent event) {
                switch (event.getType()) {
                    case None: {
                        break;
                    }
                    case NodeCreated: 
                    case NodeDeleted: {
                        this.statLimit.updateForFire(event);
                        break;
                    }
                    case NodeDataChanged: {
                        this.dataLimit.updateForFire(event);
                        break;
                    }
                    case NodeChildrenChanged: {
                        this.childrenLimit.updateForFire(event);
                        break;
                    }
                    case ChildWatchRemoved: {
                        break;
                    }
                }
            }
        }

        private class TestZKDatabase
        extends ZKDatabase {
            private final WatchLimiter limiter;

            public TestZKDatabase(FileTxnSnapLog snapLog, WatchLimiter limiter) {
                super(snapLog);
                this.limiter = limiter;
            }

            public Stat statNode(String path, ServerCnxn serverCnxn) throws KeeperException.NoNodeException {
                this.limiter.statLimit.updateForWatch(path, (Watcher)serverCnxn);
                return super.statNode(path, serverCnxn);
            }

            public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
                this.limiter.dataLimit.updateForWatch(path, watcher);
                return super.getData(path, stat, watcher);
            }

            public List<String> getChildren(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
                this.limiter.childrenLimit.updateForWatch(path, watcher);
                return super.getChildren(path, stat, watcher);
            }
        }

        private class TestServerCnxnFactory
        extends NIOServerCnxnFactory {
            private final WatchLimiter limiter;

            public TestServerCnxnFactory(WatchLimiter limiter) throws IOException {
                this.limiter = limiter;
            }
        }

        private class TestServerCnxn
        extends NIOServerCnxn {
            private final WatchLimiter limiter;

            public TestServerCnxn(ZooKeeperServer zk, SocketChannel sock, SelectionKey sk, NIOServerCnxnFactory factory, WatchLimiter limiter) throws IOException {
                super(zk, sock, sk, factory, null);
                this.limiter = limiter;
            }

            public synchronized void process(WatchedEvent event) {
                this.limiter.updateForFire(event);
                super.process(event);
            }
        }

        private class WatchLimit {
            private long limit;
            private final String desc;
            private volatile LimitViolationAction action;
            private AtomicLongMap<String> counters = AtomicLongMap.create();
            private ConcurrentHashMap<String, Long> maxCounters = new ConcurrentHashMap();

            WatchLimit(long limit, String desc, LimitViolationAction action) {
                this.limit = limit;
                this.desc = desc;
                this.action = action;
            }

            public void setAction(LimitViolationAction action) {
                this.action = action;
            }

            public void setLimit(long limit) {
                this.limit = limit;
            }

            public void updateForWatch(String key, Watcher watcher) {
                if (watcher != null) {
                    log.debug("Watch added: {}: {}", (Object)this.desc, (Object)key);
                    long count = this.counters.incrementAndGet((Object)key);
                    Long lastCount = this.maxCounters.get(key);
                    if (lastCount == null || count > lastCount) {
                        this.maxCounters.put(key, count);
                    }
                    if (count > this.limit && this.action != LimitViolationAction.IGNORE) {
                        String msg = "Number of watches created in parallel for data: " + key + ", type: " + this.desc + " exceeds limit (" + count + " > " + this.limit + ")";
                        log.warn("{}", (Object)msg);
                        if (this.action == LimitViolationAction.FAIL) {
                            throw new AssertionError((Object)msg);
                        }
                    }
                }
            }

            public void updateForFire(WatchedEvent event) {
                if (log.isDebugEnabled()) {
                    log.debug("Watch fired: {}: {}", (Object)this.desc, (Object)event.getPath());
                }
                this.counters.decrementAndGet((Object)event.getPath());
            }

            private String reportLimitViolations() {
                String[] maxKeys = ((ConcurrentHashMap.CollectionView)((Object)this.maxCounters.keySet())).toArray(new String[this.maxCounters.size()]);
                Arrays.sort(maxKeys, new Comparator<String>(){
                    private final Comparator<Long> valComp = Comparator.naturalOrder().reversed();

                    @Override
                    public int compare(String o1, String o2) {
                        return this.valComp.compare((Long)WatchLimit.this.maxCounters.get(o1), (Long)WatchLimit.this.maxCounters.get(o2));
                    }
                });
                StringBuilder sb = new StringBuilder();
                boolean first = true;
                for (String key : maxKeys) {
                    long value = this.maxCounters.get(key);
                    if (value <= this.limit) continue;
                    if (first) {
                        sb.append("\nMaximum concurrent ").append(this.desc).append(" watches above limit:\n\n");
                        first = false;
                    }
                    sb.append("\t").append(this.maxCounters.get(key)).append('\t').append(key).append('\n');
                }
                return sb.toString();
            }
        }
    }

    public static enum LimitViolationAction {
        IGNORE,
        REPORT,
        FAIL;

    }

    public static class HostPort {
        String host;
        int port;

        HostPort(String host, int port) {
            assert (!host.contains(":")) : host;
            this.host = host;
            this.port = port;
        }
    }
}

