/*
 * Decompiled with CFR 0.152.
 */
package net.rubyeye.xmemcached.impl;

import com.google.code.yanf4j.config.Configuration;
import com.google.code.yanf4j.core.Controller;
import com.google.code.yanf4j.core.ControllerStateListener;
import com.google.code.yanf4j.core.EventType;
import com.google.code.yanf4j.core.Session;
import com.google.code.yanf4j.core.WriteMessage;
import com.google.code.yanf4j.nio.NioSession;
import com.google.code.yanf4j.nio.NioSessionConfig;
import com.google.code.yanf4j.nio.impl.SocketChannelController;
import com.google.code.yanf4j.util.ConcurrentHashSet;
import com.google.code.yanf4j.util.SystemUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import net.rubyeye.xmemcached.CommandFactory;
import net.rubyeye.xmemcached.FlowControl;
import net.rubyeye.xmemcached.MemcachedOptimizer;
import net.rubyeye.xmemcached.MemcachedSessionLocator;
import net.rubyeye.xmemcached.buffer.BufferAllocator;
import net.rubyeye.xmemcached.command.Command;
import net.rubyeye.xmemcached.exception.MemcachedException;
import net.rubyeye.xmemcached.impl.ConnectFuture;
import net.rubyeye.xmemcached.impl.FlowControlLinkedTransferQueue;
import net.rubyeye.xmemcached.impl.MemcachedSessionComparator;
import net.rubyeye.xmemcached.impl.MemcachedTCPSession;
import net.rubyeye.xmemcached.impl.Optimizer;
import net.rubyeye.xmemcached.impl.OptimizerMBean;
import net.rubyeye.xmemcached.impl.ReconnectRequest;
import net.rubyeye.xmemcached.networking.Connector;
import net.rubyeye.xmemcached.networking.MemcachedSession;
import net.rubyeye.xmemcached.utils.InetSocketAddressWrapper;
import net.rubyeye.xmemcached.utils.Protocol;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class MemcachedConnector
extends SocketChannelController
implements Connector {
    private final DelayQueue<ReconnectRequest> waitingQueue = new DelayQueue();
    private BufferAllocator bufferAllocator;
    private final Set<InetSocketAddress> removedAddrSet = new ConcurrentHashSet<InetSocketAddress>();
    private final MemcachedOptimizer optimiezer;
    private long healSessionInterval = 2000L;
    private int connectionPoolSize;
    protected Protocol protocol;
    private boolean enableHealSession = true;
    private final CommandFactory commandFactory;
    private boolean failureMode;
    private final ConcurrentHashMap<InetSocketAddress, List<Session>> standbySessionMap = new ConcurrentHashMap();
    private final FlowControl flowControl;
    private volatile boolean shuttingDown = false;
    protected MemcachedSessionLocator sessionLocator;
    protected final ConcurrentHashMap<InetSocketAddress, Queue<Session>> sessionMap = new ConcurrentHashMap();
    private static final MemcachedSessionComparator sessionComparator = new MemcachedSessionComparator();
    private final Random random = new Random();
    private final SessionMonitor sessionMonitor = new SessionMonitor();

    public void shuttingDown() {
        this.shuttingDown = true;
    }

    @Override
    public void setSessionLocator(MemcachedSessionLocator sessionLocator) {
        this.sessionLocator = sessionLocator;
    }

    @Override
    public void setEnableHealSession(boolean enableHealSession) {
        this.enableHealSession = enableHealSession;
        if (this.sessionMonitor != null && this.sessionMonitor.isAlive()) {
            this.sessionMonitor.interrupt();
        }
    }

    @Override
    public Queue<ReconnectRequest> getReconnectRequestQueue() {
        return this.waitingQueue;
    }

    @Override
    public Set<Session> getSessionSet() {
        Collection<Queue<Session>> sessionQueues = this.sessionMap.values();
        HashSet<Session> result = new HashSet<Session>();
        for (Queue<Session> queue : sessionQueues) {
            result.addAll(queue);
        }
        return result;
    }

    @Override
    public final void setHealSessionInterval(long healConnectionInterval) {
        this.healSessionInterval = healConnectionInterval;
    }

    @Override
    public long getHealSessionInterval() {
        return this.healSessionInterval;
    }

    @Override
    public void setOptimizeGet(boolean optimiezeGet) {
        ((OptimizerMBean)((Object)this.optimiezer)).setOptimizeGet(optimiezeGet);
    }

    @Override
    public void setOptimizeMergeBuffer(boolean optimizeMergeBuffer) {
        ((OptimizerMBean)((Object)this.optimiezer)).setOptimizeMergeBuffer(optimizeMergeBuffer);
    }

    public Protocol getProtocol() {
        return this.protocol;
    }

    public synchronized void addSession(Session session) {
        InetSocketAddress mainNodeAddress;
        MemcachedSession tcpSession = (MemcachedSession)session;
        InetSocketAddressWrapper addrWrapper = tcpSession.getInetSocketAddressWrapper();
        if (addrWrapper.getRemoteAddressStr() == null) {
            addrWrapper.setRemoteAddressStr(String.valueOf(session.getRemoteSocketAddress()));
        }
        if ((mainNodeAddress = addrWrapper.getMainNodeAddress()) != null) {
            this.addStandbySession(session, mainNodeAddress);
        } else {
            this.addMainSession(session);
            this.updateSessions();
        }
    }

    private void addMainSession(Session session) {
        Queue<Session> oldSessions;
        InetSocketAddress remoteSocketAddress = session.getRemoteSocketAddress();
        log.info("Add a session: " + SystemUtils.getRawAddress(remoteSocketAddress) + ":" + remoteSocketAddress.getPort());
        Queue<Session> sessions = this.sessionMap.get(remoteSocketAddress);
        if (sessions == null && null != (oldSessions = this.sessionMap.putIfAbsent(remoteSocketAddress, sessions = new ConcurrentLinkedQueue<Session>()))) {
            sessions = oldSessions;
        }
        if (this.failureMode) {
            Iterator it = sessions.iterator();
            while (it.hasNext()) {
                Session tmp = (Session)it.next();
                if (!tmp.isClosed()) continue;
                it.remove();
                break;
            }
        }
        sessions.offer(session);
        while (sessions.size() > this.connectionPoolSize) {
            Session oldSession = sessions.poll();
            ((MemcachedSession)oldSession).setAllowReconnect(false);
            oldSession.close();
        }
    }

    private void addStandbySession(Session session, InetSocketAddress mainNodeAddress) {
        List<Session> oldSessions;
        InetSocketAddress remoteSocketAddress = session.getRemoteSocketAddress();
        log.info("Add a standby session: " + SystemUtils.getRawAddress(remoteSocketAddress) + ":" + remoteSocketAddress.getPort() + " for " + SystemUtils.getRawAddress(mainNodeAddress) + ":" + mainNodeAddress.getPort());
        List<Session> sessions = this.standbySessionMap.get(mainNodeAddress);
        if (sessions == null && null != (oldSessions = this.standbySessionMap.putIfAbsent(mainNodeAddress, sessions = new CopyOnWriteArrayList<Session>()))) {
            sessions = oldSessions;
        }
        sessions.add(session);
    }

    public List<Session> getSessionListBySocketAddress(InetSocketAddress inetSocketAddress) {
        Queue<Session> queue = this.sessionMap.get(inetSocketAddress);
        if (queue != null) {
            return new ArrayList<Session>(queue);
        }
        return null;
    }

    @Override
    public void removeReconnectRequest(InetSocketAddress inetSocketAddress) {
        this.removedAddrSet.add(inetSocketAddress);
        Iterator<ReconnectRequest> it = this.waitingQueue.iterator();
        while (it.hasNext()) {
            ReconnectRequest request = it.next();
            if (!request.getInetSocketAddressWrapper().getInetSocketAddress().equals(inetSocketAddress)) continue;
            it.remove();
            log.warn("Remove invalid reconnect task for " + request.getInetSocketAddressWrapper().getInetSocketAddress());
        }
    }

    @Override
    public final void updateSessions() {
        Collection<Queue<Session>> sessionCollection = this.sessionMap.values();
        ArrayList<Session> sessionList = new ArrayList<Session>(20);
        for (Queue<Session> sessions : sessionCollection) {
            sessionList.addAll(sessions);
        }
        Collections.sort(sessionList, sessionComparator);
        this.sessionLocator.updateSessions(sessionList);
    }

    @Override
    public synchronized void removeSession(Session session) {
        MemcachedTCPSession tcpSession = (MemcachedTCPSession)session;
        InetSocketAddressWrapper addrWrapper = tcpSession.getInetSocketAddressWrapper();
        InetSocketAddress mainNodeAddr = addrWrapper.getMainNodeAddress();
        if (mainNodeAddr != null) {
            this.removeStandbySession(session, mainNodeAddr);
        } else {
            this.removeMainSession(session);
        }
    }

    private void removeMainSession(Session session) {
        InetSocketAddress remoteSocketAddress = session.getRemoteSocketAddress();
        if (this.failureMode && ((MemcachedSession)session).isAllowReconnect() && !this.shuttingDown && this.isStarted()) {
            log.warn("Client in failure mode,we don't remove session " + SystemUtils.getRawAddress(remoteSocketAddress) + ":" + remoteSocketAddress.getPort());
            return;
        }
        log.info("Remove a session: " + SystemUtils.getRawAddress(remoteSocketAddress) + ":" + remoteSocketAddress.getPort());
        Queue<Session> sessionQueue = this.sessionMap.get(session.getRemoteSocketAddress());
        if (null != sessionQueue) {
            sessionQueue.remove(session);
            if (sessionQueue.size() == 0) {
                this.sessionMap.remove(session.getRemoteSocketAddress());
            }
            this.updateSessions();
        }
    }

    private void removeStandbySession(Session session, InetSocketAddress mainNodeAddr) {
        List<Session> sessionList = this.standbySessionMap.get(mainNodeAddr);
        if (null != sessionList) {
            sessionList.remove(session);
            if (sessionList.size() == 0) {
                this.standbySessionMap.remove(mainNodeAddr);
            }
        }
    }

    @Override
    protected void doStart() throws IOException {
        this.setLocalSocketAddress(new InetSocketAddress("localhost", 0));
    }

    @Override
    public void onConnect(SelectionKey key) throws IOException {
        key.interestOps(key.interestOps() & 0xFFFFFFF7);
        ConnectFuture future = (ConnectFuture)key.attachment();
        if (future == null || future.isCancelled()) {
            this.cancelKey(key);
            return;
        }
        try {
            if (!((SocketChannel)key.channel()).finishConnect()) {
                this.cancelKey(key);
                future.failure(new IOException("Connect to " + SystemUtils.getRawAddress(future.getInetSocketAddressWrapper().getInetSocketAddress()) + ":" + future.getInetSocketAddressWrapper().getInetSocketAddress().getPort() + " fail"));
            } else {
                key.attach(null);
                this.addSession(this.createSession((SocketChannel)key.channel(), future.getInetSocketAddressWrapper()));
                future.setResult(Boolean.TRUE);
            }
        }
        catch (Exception e) {
            future.failure(e);
            this.cancelKey(key);
            throw new IOException("Connect to " + SystemUtils.getRawAddress(future.getInetSocketAddressWrapper().getInetSocketAddress()) + ":" + future.getInetSocketAddressWrapper().getInetSocketAddress().getPort() + " fail," + e.getMessage());
        }
    }

    private void cancelKey(SelectionKey key) throws IOException {
        try {
            if (key.channel() != null) {
                key.channel().close();
            }
        }
        finally {
            key.cancel();
        }
    }

    protected MemcachedTCPSession createSession(SocketChannel socketChannel, InetSocketAddressWrapper wrapper) {
        MemcachedTCPSession session = (MemcachedTCPSession)this.buildSession(socketChannel);
        session.setInetSocketAddressWrapper(wrapper);
        this.selectorManager.registerSession(session, EventType.ENABLE_READ);
        session.start();
        session.onEvent(EventType.CONNECTED, null);
        return session;
    }

    @Override
    public void addToWatingQueue(ReconnectRequest request) {
        this.waitingQueue.add(request);
    }

    @Override
    public Future<Boolean> connect(InetSocketAddressWrapper addressWrapper) throws IOException {
        if (addressWrapper == null) {
            throw new NullPointerException("Null Address");
        }
        this.removedAddrSet.remove(addressWrapper.getInetSocketAddress());
        SocketChannel socketChannel = null;
        try {
            socketChannel = SocketChannel.open();
            this.configureSocketChannel(socketChannel);
            ConnectFuture future = new ConnectFuture(addressWrapper);
            if (!socketChannel.connect(addressWrapper.getInetSocketAddress())) {
                this.selectorManager.registerChannel(socketChannel, 8, future);
            } else {
                this.addSession(this.createSession(socketChannel, addressWrapper));
                future.setResult(true);
            }
            return future;
        }
        catch (IOException e) {
            if (socketChannel != null) {
                socketChannel.close();
            }
            throw e;
        }
    }

    @Override
    public void closeChannel(Selector selector) throws IOException {
    }

    @Override
    public Session send(Command msg) throws MemcachedException {
        MemcachedSession session = (MemcachedSession)this.findSessionByKey(msg.getKey());
        if (session == null) {
            throw new MemcachedException("There is no available connection at this moment");
        }
        if (session.isClosed()) {
            session = this.findStandbySession(session);
        }
        if (session.isClosed()) {
            throw new MemcachedException("Session(" + SystemUtils.getRawAddress(session.getRemoteSocketAddress()) + ":" + session.getRemoteSocketAddress().getPort() + ") has been closed");
        }
        if (session.isAuthFailed()) {
            throw new MemcachedException("Auth failed to connection " + session.getRemoteSocketAddress());
        }
        session.write(msg);
        return session;
    }

    private MemcachedSession findStandbySession(MemcachedSession session) {
        List<Session> sessionList;
        if (this.failureMode && (sessionList = this.getStandbySessionListByMainNodeAddr(session.getRemoteSocketAddress())) != null && !sessionList.isEmpty()) {
            return (MemcachedTCPSession)sessionList.get(this.random.nextInt(sessionList.size()));
        }
        return session;
    }

    @Override
    public List<Session> getStandbySessionListByMainNodeAddr(InetSocketAddress addr) {
        return this.standbySessionMap.get(addr);
    }

    public final Session findSessionByKey(String key) {
        return this.sessionLocator.getSessionByKey(key);
    }

    @Override
    public final Queue<Session> getSessionByAddress(InetSocketAddress addr) {
        return this.sessionMap.get(addr);
    }

    public MemcachedConnector(Configuration configuration, MemcachedSessionLocator locator, BufferAllocator allocator, CommandFactory commandFactory, int poolSize, int maxQueuedNoReplyOperations) {
        super(configuration, null);
        this.sessionLocator = locator;
        this.protocol = commandFactory.getProtocol();
        this.addStateListener(new InnerControllerStateListener());
        this.updateSessions();
        this.bufferAllocator = allocator;
        this.optimiezer = new Optimizer(this.protocol);
        this.optimiezer.setBufferAllocator(this.bufferAllocator);
        this.connectionPoolSize = poolSize;
        this.soLingerOn = true;
        this.commandFactory = commandFactory;
        this.flowControl = new FlowControl(maxQueuedNoReplyOperations);
        this.setSelectorPoolSize(configuration.getSelectorPoolSize());
    }

    @Override
    public final void setConnectionPoolSize(int poolSize) {
        this.connectionPoolSize = poolSize;
    }

    @Override
    public void setMergeFactor(int mergeFactor) {
        ((OptimizerMBean)((Object)this.optimiezer)).setMergeFactor(mergeFactor);
    }

    @Override
    public FlowControl getNoReplyOpsFlowControl() {
        return this.flowControl;
    }

    @Override
    protected NioSession buildSession(SocketChannel sc) {
        Queue<WriteMessage> queue = this.buildQueue();
        NioSessionConfig sessionCofig = this.buildSessionConfig(sc, queue);
        MemcachedTCPSession session = new MemcachedTCPSession(sessionCofig, this.configuration.getSessionReadBufferSize(), this.optimiezer, this.getReadThreadCount(), this.commandFactory);
        session.setBufferAllocator(this.bufferAllocator);
        return session;
    }

    @Override
    protected Queue<WriteMessage> buildQueue() {
        return new FlowControlLinkedTransferQueue(this.flowControl);
    }

    public BufferAllocator getBufferAllocator() {
        return this.bufferAllocator;
    }

    @Override
    public synchronized void quitAllSessions() {
        for (Session session : this.sessionSet) {
            ((MemcachedSession)session).quit();
        }
        int sleepCount = 0;
        while (sleepCount++ < 5 && this.sessionSet.size() > 0) {
            try {
                this.wait(1000L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override
    public void setFailureMode(boolean failureMode) {
        this.failureMode = failureMode;
    }

    @Override
    public void setBufferAllocator(BufferAllocator allocator) {
        this.bufferAllocator = allocator;
        for (Session session : this.getSessionSet()) {
            ((MemcachedSession)session).setBufferAllocator(allocator);
        }
    }

    public Collection<InetSocketAddress> getServerAddresses() {
        return Collections.unmodifiableCollection(this.sessionMap.keySet());
    }

    class InnerControllerStateListener
    implements ControllerStateListener {
        InnerControllerStateListener() {
        }

        public void onAllSessionClosed(Controller controller) {
        }

        public void onException(Controller controller, Throwable t) {
            log.error("Exception occured in controller", t);
        }

        public void onReady(Controller controller) {
            MemcachedConnector.this.sessionMonitor.setDaemon(true);
            MemcachedConnector.this.sessionMonitor.start();
        }

        public void onStarted(Controller controller) {
        }

        public void onStopped(Controller controller) {
            if (MemcachedConnector.this.sessionMonitor.isAlive()) {
                MemcachedConnector.this.sessionMonitor.interrupt();
            }
        }
    }

    class SessionMonitor
    extends Thread {
        public SessionMonitor() {
            this.setName("Heal-Session-Thread");
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            while (MemcachedConnector.this.isStarted() && MemcachedConnector.this.enableHealSession) {
                ReconnectRequest request = null;
                try {
                    request = (ReconnectRequest)MemcachedConnector.this.waitingQueue.take();
                    InetSocketAddress address = request.getInetSocketAddressWrapper().getInetSocketAddress();
                    if (!MemcachedConnector.this.removedAddrSet.contains(address)) {
                        boolean connected = false;
                        Future<Boolean> future = MemcachedConnector.this.connect(request.getInetSocketAddressWrapper());
                        request.setTries(request.getTries() + 1);
                        try {
                            log.info("Trying to connect to " + address.getAddress().getHostAddress() + ":" + address.getPort() + " for " + request.getTries() + " times");
                            if (!future.get(60000L, TimeUnit.MILLISECONDS).booleanValue()) {
                                connected = false;
                                continue;
                            }
                            connected = true;
                            continue;
                        }
                        catch (TimeoutException e) {
                            future.cancel(true);
                            continue;
                        }
                        catch (ExecutionException e) {
                            future.cancel(true);
                            continue;
                        }
                        finally {
                            if (!connected) {
                                this.rescheduleConnectRequest(request);
                            }
                            continue;
                        }
                    }
                    log.info("Remove invalid reconnect task for " + address);
                }
                catch (InterruptedException address) {
                }
                catch (Exception e) {
                    log.error("SessionMonitor connect error", (Throwable)e);
                    this.rescheduleConnectRequest(request);
                }
            }
        }

        private void rescheduleConnectRequest(ReconnectRequest request) {
            if (request == null) {
                return;
            }
            InetSocketAddress address = request.getInetSocketAddressWrapper().getInetSocketAddress();
            request.updateNextReconnectTimeStamp(MemcachedConnector.this.healSessionInterval * (long)request.getTries());
            log.error("Reconnected to " + address + " fail");
            MemcachedConnector.this.waitingQueue.offer(request);
        }
    }
}

