package org.neo4j.kernel.ha.comm;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.queue.BlockingReadHandler;
import org.jboss.netty.handler.stream.ChunkedWriteHandler;
import org.neo4j.kernel.IdType;
import org.neo4j.kernel.ha.HaCommunicationException;
import org.neo4j.kernel.ha.IdAllocation;
import org.neo4j.kernel.ha.LockResult;
import org.neo4j.kernel.ha.Master;
import org.neo4j.kernel.ha.Response;
import org.neo4j.kernel.ha.SlaveContext;
import org.neo4j.kernel.ha.TransactionStream;
import org.neo4j.kernel.ha.comm.DataWriter;
import org.neo4j.kernel.ha.zookeeper.Machine;
import org.neo4j.kernel.impl.util.StringLogger;

/* loaded from: input_file:org/neo4j/kernel/ha/comm/MasterClient.class */
public class MasterClient implements Master, ChannelPipelineFactory {
    private final Deque<Channel> unusedChannels;
    private final Map<Thread, Channel> channels;
    private final ClientBootstrap bootstrap;
    private final String hostNameOrIp;
    private final int port;
    private final TransactionApplier txApplier;
    private final StringLogger msgLog;

    public MasterClient(String str, int i, String str2) {
        this.unusedChannels = new LinkedList();
        this.channels = new HashMap();
        this.txApplier = new TransactionApplier();
        this.hostNameOrIp = str;
        this.port = i;
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        this.bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(newCachedThreadPool, newCachedThreadPool));
        this.bootstrap.setPipelineFactory(this);
        this.msgLog = StringLogger.getLogger(str2 + "/messages.log");
        this.msgLog.logMessage("Client connected to " + str + ":" + i, true);
    }

    public MasterClient(Machine machine, String str) {
        this((String) machine.getServer().first(), ((Integer) machine.getServer().other()).intValue(), str);
    }

    private <T> Response<T> sendRequest(RequestType requestType, SlaveContext slaveContext, DataWriter dataWriter) {
        try {
            ChannelBuffer dynamicBuffer = ChannelBuffers.dynamicBuffer();
            dynamicBuffer.writeByte(requestType.ordinal());
            if (requestType.includesSlaveContext) {
                CommunicationUtils.writeSlaveContext(slaveContext, dynamicBuffer);
            }
            dataWriter.write(dynamicBuffer);
            Channel channel = getChannel();
            channel.write(dynamicBuffer);
            ChannelBuffer channelBuffer = (ChannelBuffer) channel.getPipeline().get("blockingHandler").read(20L, TimeUnit.SECONDS);
            if (channelBuffer == null) {
                throw new HaCommunicationException("Channel has been closed");
            }
            return Response.wrapResponseObjectOnly(requestType.readResponse(channelBuffer));
        } catch (IOException e) {
            throw new HaCommunicationException(e);
        } catch (InterruptedException e2) {
            throw new HaCommunicationException(e2);
        } catch (Exception e3) {
            throw new HaCommunicationException(e3);
        }
    }

    private Channel getChannel() throws Exception {
        Channel channel;
        Channel poll;
        Thread currentThread = Thread.currentThread();
        synchronized (this.channels) {
            Channel channel2 = this.channels.get(currentThread);
            if (channel2 == null) {
                while (channel2 == null && (poll = this.unusedChannels.poll()) != null) {
                    if (poll.isConnected()) {
                        this.msgLog.logMessage("Found unused (and still connected) channel");
                        channel2 = poll;
                    } else {
                        this.msgLog.logMessage("Found unused stale channel, discarding it");
                    }
                }
                if (channel2 == null) {
                    int i = 0;
                    while (true) {
                        if (i >= 5) {
                            break;
                        }
                        ChannelFuture connect = this.bootstrap.connect(new InetSocketAddress(this.hostNameOrIp, this.port));
                        connect.awaitUninterruptibly();
                        if (connect.isSuccess()) {
                            channel2 = connect.getChannel();
                            this.msgLog.logMessage("Opened a new channel to " + this.hostNameOrIp + ":" + this.port, true);
                            break;
                        }
                        this.msgLog.logMessage("Retrying connect to " + this.hostNameOrIp + ":" + this.port, true);
                        try {
                            Thread.sleep(500L);
                        } catch (InterruptedException e) {
                            Thread.interrupted();
                        }
                        i++;
                    }
                }
                if (channel2 == null) {
                    throw new IOException("Not able to connect to master");
                }
                this.channels.put(currentThread, channel2);
            }
            channel = channel2;
        }
        return channel;
    }

    private void releaseChannel() {
        synchronized (this.channels) {
            Channel remove = this.channels.remove(Thread.currentThread());
            if (remove != null) {
                if (this.unusedChannels.size() < 5) {
                    this.unusedChannels.push(remove);
                } else {
                    remove.close();
                }
            }
        }
    }

    @Override // org.neo4j.kernel.ha.Master
    public IdAllocation allocateIds(IdType idType) {
        return (IdAllocation) sendRequest(RequestType.allocateIds(idType), null, null).response();
    }

    @Override // org.neo4j.kernel.ha.Master
    public Response<Integer> createRelationshipType(SlaveContext slaveContext, String str) {
        return sendRequest(RequestType.CREATE_RELATIONSHIP_TYPE, slaveContext, new DataWriter.WriteString(str));
    }

    @Override // org.neo4j.kernel.ha.Master
    public Response<LockResult> acquireNodeWriteLock(SlaveContext slaveContext, long... jArr) {
        return sendRequest(RequestType.ACQUIRE_NODE_WRITE_LOCK, slaveContext, new DataWriter.WriteIdArray(jArr));
    }

    @Override // org.neo4j.kernel.ha.Master
    public Response<LockResult> acquireNodeReadLock(SlaveContext slaveContext, long... jArr) {
        return sendRequest(RequestType.ACQUIRE_NODE_READ_LOCK, slaveContext, new DataWriter.WriteIdArray(jArr));
    }

    @Override // org.neo4j.kernel.ha.Master
    public Response<LockResult> acquireRelationshipWriteLock(SlaveContext slaveContext, long... jArr) {
        return sendRequest(RequestType.ACQUIRE_RELATIONSHIP_WRITE_LOCK, slaveContext, new DataWriter.WriteIdArray(jArr));
    }

    @Override // org.neo4j.kernel.ha.Master
    public Response<LockResult> acquireRelationshipReadLock(SlaveContext slaveContext, long... jArr) {
        return sendRequest(RequestType.ACQUIRE_RELATIONSHIP_READ_LOCK, slaveContext, new DataWriter.WriteIdArray(jArr));
    }

    @Override // org.neo4j.kernel.ha.Master
    public Response<Long> commitSingleResourceTransaction(SlaveContext slaveContext, String str, TransactionStream transactionStream) {
        throw new UnsupportedOperationException("Not implemented: commitSingleResourceTransaction()");
    }

    @Override // org.neo4j.kernel.ha.Master
    public Response<Void> finishTransaction(SlaveContext slaveContext) {
        return sendRequest(RequestType.FINISH, slaveContext, null);
    }

    public void rollbackOngoingTransactions(SlaveContext slaveContext) {
        throw new UnsupportedOperationException("Should never be called from the client side");
    }

    @Override // org.neo4j.kernel.ha.Master
    public Response<Void> pullUpdates(SlaveContext slaveContext) {
        return sendRequest(RequestType.PULL_UPDATES, slaveContext, null);
    }

    @Override // org.neo4j.kernel.ha.Master
    public int getMasterIdForCommittedTx(long j) {
        return ((Integer) sendRequest(RequestType.GET_MASTER_ID_FOR_TX, null, new DataWriter.WriteLong(j)).response()).intValue();
    }

    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline pipeline = Channels.pipeline();
        pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
        pipeline.addLast("responseDecoder", new ResponseDecoder());
        pipeline.addLast("applyTransactions", this.txApplier);
        pipeline.addLast("blockingHandler", new BlockingReadHandler());
        return pipeline;
    }

    public void shutdown() {
        this.msgLog.logMessage("MasterClient shutdown", true);
        synchronized (this.channels) {
            Iterator<Channel> it = this.unusedChannels.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            Iterator<Channel> it2 = this.channels.values().iterator();
            while (it2.hasNext()) {
                it2.next().close();
            }
        }
    }
}
