package org.apache.nifi.remote.protocol.socket;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import java.util.zip.CheckedOutputStream;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PortAuthorizationResult;
import org.apache.nifi.remote.RemoteResourceFactory;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.remote.StandardRemoteGroupPort;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.remote.cluster.NodeInformant;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.io.CompressionInputStream;
import org.apache.nifi.remote.io.CompressionOutputStream;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.DataPacket;
import org.apache.nifi.remote.protocol.RequestType;
import org.apache.nifi.remote.protocol.ServerProtocol;
import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol.class */
public class SocketFlowFileServerProtocol implements ServerProtocol {
    public static final String RESOURCE_NAME = "SocketFlowFileProtocol";
    private ProcessGroup rootGroup;
    private String commsIdentifier;
    private boolean handshakeCompleted;
    private Boolean useGzip;
    private long requestExpirationMillis;
    private RootGroupPort port;
    private static final long DEFAULT_BATCH_NANOS = TimeUnit.SECONDS.toNanos(5);
    private boolean shutdown = false;
    private FlowFileCodec negotiatedFlowFileCodec = null;
    private String transitUriPrefix = null;
    private int requestedBatchCount = 0;
    private long requestedBatchBytes = 0;
    private long requestedBatchNanos = 0;
    private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(new int[]{5, 4, 3, 2, 1});
    private final Logger logger = LoggerFactory.getLogger(SocketFlowFileServerProtocol.class);

    /* renamed from: org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/nifi/remote/protocol/socket/SocketFlowFileServerProtocol$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$nifi$remote$protocol$socket$HandshakeProperty;
        static final /* synthetic */ int[] $SwitchMap$org$apache$nifi$remote$protocol$socket$ResponseCode = new int[ResponseCode.values().length];

        static {
            try {
                $SwitchMap$org$apache$nifi$remote$protocol$socket$ResponseCode[ResponseCode.CONTINUE_TRANSACTION.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$nifi$remote$protocol$socket$ResponseCode[ResponseCode.FINISH_TRANSACTION.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$nifi$remote$protocol$socket$ResponseCode[ResponseCode.CANCEL_TRANSACTION.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$nifi$remote$protocol$socket$ResponseCode[ResponseCode.CONFIRM_TRANSACTION.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$nifi$remote$protocol$socket$ResponseCode[ResponseCode.BAD_CHECKSUM.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            $SwitchMap$org$apache$nifi$remote$protocol$socket$HandshakeProperty = new int[HandshakeProperty.values().length];
            try {
                $SwitchMap$org$apache$nifi$remote$protocol$socket$HandshakeProperty[HandshakeProperty.GZIP.ordinal()] = 1;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$nifi$remote$protocol$socket$HandshakeProperty[HandshakeProperty.REQUEST_EXPIRATION_MILLIS.ordinal()] = 2;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$nifi$remote$protocol$socket$HandshakeProperty[HandshakeProperty.BATCH_COUNT.ordinal()] = 3;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$nifi$remote$protocol$socket$HandshakeProperty[HandshakeProperty.BATCH_SIZE.ordinal()] = 4;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$nifi$remote$protocol$socket$HandshakeProperty[HandshakeProperty.BATCH_DURATION.ordinal()] = 5;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$apache$nifi$remote$protocol$socket$HandshakeProperty[HandshakeProperty.PORT_IDENTIFIER.ordinal()] = 6;
            } catch (NoSuchFieldError e11) {
            }
        }
    }

    public void setRootProcessGroup(ProcessGroup processGroup) {
        if (!processGroup.isRootGroup()) {
            throw new IllegalArgumentException();
        }
        this.rootGroup = processGroup;
    }

    public void handshake(Peer peer) throws IOException, HandshakeException {
        if (this.handshakeCompleted) {
            throw new IllegalStateException("Handshake has already been completed");
        }
        if (this.shutdown) {
            throw new IllegalStateException("Protocol is shutdown");
        }
        this.logger.debug("{} Handshaking with {}", this, peer);
        CommunicationsSession communicationsSession = peer.getCommunicationsSession();
        DataInputStream dataInputStream = new DataInputStream(communicationsSession.getInput().getInputStream());
        DataOutputStream dataOutputStream = new DataOutputStream(communicationsSession.getOutput().getOutputStream());
        this.commsIdentifier = dataInputStream.readUTF();
        if (this.versionNegotiator.getVersion() >= 3) {
            this.transitUriPrefix = dataInputStream.readUTF();
            if (!this.transitUriPrefix.endsWith("/")) {
                this.transitUriPrefix += "/";
            }
        }
        HashMap hashMap = new HashMap();
        int readInt = dataInputStream.readInt();
        for (int i = 0; i < readInt; i++) {
            hashMap.put(dataInputStream.readUTF(), dataInputStream.readUTF());
        }
        boolean z = false;
        for (Map.Entry entry : hashMap.entrySet()) {
            String str = (String) entry.getKey();
            String str2 = (String) entry.getValue();
            try {
                HandshakeProperty valueOf = HandshakeProperty.valueOf(str);
                try {
                    switch (AnonymousClass2.$SwitchMap$org$apache$nifi$remote$protocol$socket$HandshakeProperty[valueOf.ordinal()]) {
                        case StandardRemoteGroupPort.GZIP_COMPRESSION_LEVEL /* 1 */:
                            this.useGzip = Boolean.valueOf(Boolean.parseBoolean(str2));
                            break;
                        case 2:
                            this.requestExpirationMillis = Long.parseLong(str2);
                            break;
                        case 3:
                            this.requestedBatchCount = Integer.parseInt(str2);
                            if (this.requestedBatchCount < 0) {
                                throw new HandshakeException("Cannot request Batch Count less than 1; requested value: " + str2);
                            }
                            break;
                        case 4:
                            this.requestedBatchBytes = Long.parseLong(str2);
                            if (this.requestedBatchBytes < 0) {
                                throw new HandshakeException("Cannot request Batch Size less than 1; requested value: " + str2);
                            }
                            break;
                        case 5:
                            this.requestedBatchNanos = TimeUnit.MILLISECONDS.toNanos(Long.parseLong(str2));
                            if (this.requestedBatchNanos < 0) {
                                throw new HandshakeException("Cannot request Batch Duration less than 1; requested value: " + str2);
                            }
                            break;
                        case 6:
                            Port inputPort = this.rootGroup.getInputPort(str2);
                            if (inputPort == null) {
                                inputPort = this.rootGroup.getOutputPort(str2);
                            }
                            if (inputPort == null) {
                                this.logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", str2);
                                ResponseCode.UNKNOWN_PORT.writeResponse(dataOutputStream);
                                throw new HandshakeException("Received unknown port identifier: " + str2);
                            }
                            if (!(inputPort instanceof RootGroupPort)) {
                                this.logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", str2);
                                ResponseCode.UNKNOWN_PORT.writeResponse(dataOutputStream);
                                throw new HandshakeException("Received port identifier " + str2 + ", but this Port is not a RootGroupPort");
                            }
                            this.port = (RootGroupPort) inputPort;
                            PortAuthorizationResult checkUserAuthorization = this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn());
                            if (checkUserAuthorization.isAuthorized()) {
                                if (inputPort.isValid()) {
                                    if (inputPort.isRunning()) {
                                        if (getVersionNegotiator().getVersion() > 1) {
                                            Iterator it = this.port.getConnections().iterator();
                                            while (true) {
                                                if (it.hasNext()) {
                                                    if (((Connection) it.next()).getFlowFileQueue().isFull()) {
                                                        this.logger.debug("Responding with ResponseCode PORTS_DESTINATION_FULL for {}", inputPort);
                                                        ResponseCode.PORTS_DESTINATION_FULL.writeResponse(dataOutputStream);
                                                        z = true;
                                                    }
                                                }
                                            }
                                            break;
                                        } else {
                                            break;
                                        }
                                    } else {
                                        this.logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", inputPort);
                                        ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dataOutputStream, "Port not running");
                                        z = true;
                                        break;
                                    }
                                } else {
                                    this.logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", inputPort);
                                    ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dataOutputStream, "Port is not valid");
                                    z = true;
                                    break;
                                }
                            } else {
                                this.logger.debug("Responding with ResponseCode UNAUTHORIZED: ", checkUserAuthorization.getExplanation());
                                ResponseCode.UNAUTHORIZED.writeResponse(dataOutputStream, checkUserAuthorization.getExplanation());
                                z = true;
                                break;
                            }
                    }
                } catch (NumberFormatException e) {
                    throw new HandshakeException("Received invalid value for property '" + valueOf + "'; invalid value: " + str2);
                }
            } catch (Exception e2) {
                ResponseCode.UNKNOWN_PROPERTY_NAME.writeResponse(dataOutputStream, "Unknown Property Name: " + str);
                throw new HandshakeException("Received unknown property: " + str);
            }
        }
        if (this.useGzip == null) {
            this.logger.debug("Responding with ResponseCode MISSING_PROPERTY because GZIP Property missing");
            ResponseCode.MISSING_PROPERTY.writeResponse(dataOutputStream, HandshakeProperty.GZIP.name());
            throw new HandshakeException("Missing Property " + HandshakeProperty.GZIP.name());
        }
        if (!z) {
            ResponseCode.PROPERTIES_OK.writeResponse(dataOutputStream);
        }
        this.logger.debug("{} Finished handshake with {}", this, peer);
        this.handshakeCompleted = true;
    }

    public boolean isHandshakeSuccessful() {
        return this.handshakeCompleted;
    }

    public RootGroupPort getPort() {
        return this.port;
    }

    public FlowFileCodec negotiateCodec(Peer peer) throws IOException, ProtocolException {
        if (!this.handshakeCompleted) {
            throw new IllegalStateException("Handshake has not been completed");
        }
        if (this.shutdown) {
            throw new IllegalStateException("Protocol is shutdown");
        }
        this.logger.debug("{} Negotiating Codec with {} using {}", new Object[]{this, peer, peer.getCommunicationsSession()});
        CommunicationsSession communicationsSession = peer.getCommunicationsSession();
        DataInputStream dataInputStream = new DataInputStream(communicationsSession.getInput().getInputStream());
        DataOutputStream dataOutputStream = new DataOutputStream(communicationsSession.getOutput().getOutputStream());
        if (this.port == null) {
            RemoteResourceFactory.rejectCodecNegotiation(dataInputStream, dataOutputStream, "Cannot transfer FlowFiles because no port was specified");
        }
        try {
            this.negotiatedFlowFileCodec = RemoteResourceFactory.receiveCodecNegotiation(dataInputStream, dataOutputStream);
            this.logger.debug("{} Negotiated Codec {} with {}", new Object[]{this, this.negotiatedFlowFileCodec, peer});
            return this.negotiatedFlowFileCodec;
        } catch (HandshakeException e) {
            throw new ProtocolException(e.toString());
        }
    }

    public FlowFileCodec getPreNegotiatedCodec() {
        return this.negotiatedFlowFileCodec;
    }

    public int transferFlowFiles(Peer peer, ProcessContext processContext, ProcessSession processSession, final FlowFileCodec flowFileCodec) throws IOException, ProtocolException {
        if (!this.handshakeCompleted) {
            throw new IllegalStateException("Handshake has not been completed");
        }
        if (this.shutdown) {
            throw new IllegalStateException("Protocol is shutdown");
        }
        this.logger.debug("{} Sending FlowFiles to {}", this, peer);
        CommunicationsSession communicationsSession = peer.getCommunicationsSession();
        DataInputStream dataInputStream = new DataInputStream(communicationsSession.getInput().getInputStream());
        CompressionOutputStream dataOutputStream = new DataOutputStream(communicationsSession.getOutput().getOutputStream());
        String userDn = communicationsSession.getUserDn();
        if (userDn == null) {
            userDn = "none";
        }
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            this.logger.debug("{} No data to send to {}", this, peer);
            ResponseCode.NO_MORE_DATA.writeResponse(dataOutputStream);
            return 0;
        }
        this.logger.debug("{} Data is available to send to {}", this, peer);
        ResponseCode.MORE_DATA.writeResponse(dataOutputStream);
        StopWatch stopWatch = new StopWatch(true);
        long j = 0;
        HashSet hashSet = new HashSet();
        CRC32 crc32 = new CRC32();
        boolean z = true;
        long nanoTime = System.nanoTime();
        String str = "";
        while (z) {
            CompressionOutputStream compressionOutputStream = this.useGzip.booleanValue() ? new CompressionOutputStream(dataOutputStream) : dataOutputStream;
            this.logger.debug("{} Sending {} to {}", new Object[]{this, flowFile, peer});
            final CheckedOutputStream checkedOutputStream = new CheckedOutputStream(compressionOutputStream, crc32);
            StopWatch stopWatch2 = new StopWatch(true);
            final FlowFile flowFile2 = flowFile;
            processSession.read(flowFile, new InputStreamCallback() { // from class: org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol.1
                public void process(InputStream inputStream) throws IOException {
                    flowFileCodec.encode(new StandardDataPacket(flowFile2.getAttributes(), inputStream, flowFile2.getSize()), checkedOutputStream);
                }
            });
            long elapsed = stopWatch2.getElapsed(TimeUnit.MILLISECONDS);
            if (this.useGzip.booleanValue()) {
                checkedOutputStream.close();
            }
            hashSet.add(flowFile);
            j += flowFile.getSize();
            processSession.getProvenanceReporter().send(flowFile, this.transitUriPrefix == null ? peer.getUrl() : this.transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key()), "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, elapsed, false);
            processSession.remove(flowFile);
            long nanoTime2 = System.nanoTime() - nanoTime;
            boolean z2 = true;
            if (nanoTime2 >= this.requestedBatchNanos && this.requestedBatchNanos > 0) {
                z2 = false;
            }
            if (j >= this.requestedBatchBytes && this.requestedBatchBytes > 0) {
                z2 = false;
            }
            if (hashSet.size() >= this.requestedBatchCount && this.requestedBatchCount > 0) {
                z2 = false;
            }
            if (this.requestedBatchNanos == 0 && this.requestedBatchBytes == 0 && this.requestedBatchCount == 0) {
                z2 = nanoTime2 < DEFAULT_BATCH_NANOS;
            }
            flowFile = z2 ? processSession.get() : null;
            z = flowFile != null;
            if (z) {
                this.logger.debug("{} Sending ContinueTransaction indicator to {}", this, peer);
                ResponseCode.CONTINUE_TRANSACTION.writeResponse(dataOutputStream);
            } else {
                this.logger.debug("{} Sending FinishTransaction indicator to {}", this, peer);
                ResponseCode.FINISH_TRANSACTION.writeResponse(dataOutputStream);
                str = String.valueOf(checkedOutputStream.getChecksum().getValue());
            }
        }
        Response read = Response.read(dataInputStream);
        if (read.getCode() != ResponseCode.CONFIRM_TRANSACTION) {
            throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + read);
        }
        this.logger.debug("{} Received {}  from {}", new Object[]{this, read, peer});
        String message = read.getMessage();
        if (this.versionNegotiator.getVersion() > 3 && !message.equals(str)) {
            ResponseCode.BAD_CHECKSUM.writeResponse(dataOutputStream);
            processSession.rollback();
            throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + str + " while peer calculated CRC32 Checksum as " + message + "; canceling transaction and rolling back session");
        }
        ResponseCode.CONFIRM_TRANSACTION.writeResponse(dataOutputStream, "");
        String obj = hashSet.size() < 20 ? hashSet.toString() : hashSet.size() + " FlowFiles";
        try {
            Response read2 = Response.read(dataInputStream);
            this.logger.debug("{} received {} from {}", new Object[]{this, read2, peer});
            if (read2.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL) {
                peer.penalize(this.port.getIdentifier(), this.port.getYieldPeriod(TimeUnit.MILLISECONDS));
            } else if (read2.getCode() != ResponseCode.TRANSACTION_FINISHED) {
                throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + read2);
            }
            processSession.commit();
            stopWatch.stop();
            this.logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[]{this, obj, FormatUtils.formatDataSize(j), peer, Long.valueOf(stopWatch.getDuration(TimeUnit.MILLISECONDS)), stopWatch.calculateDataRate(j)});
            return hashSet.size();
        } catch (IOException e) {
            this.logger.error("{} Failed to receive a response from {} when expecting a TransactionFinished Indicator. It is unknown whether or not the peer successfully received/processed the data. Therefore, {} will be rolled back, possibly resulting in data duplication of {}", new Object[]{this, peer, processSession, obj});
            processSession.rollback();
            throw e;
        }
    }

    public int receiveFlowFiles(Peer peer, ProcessContext processContext, ProcessSession processSession, FlowFileCodec flowFileCodec) throws IOException, ProtocolException {
        if (!this.handshakeCompleted) {
            throw new IllegalStateException("Handshake has not been completed");
        }
        if (this.shutdown) {
            throw new IllegalStateException("Protocol is shutdown");
        }
        this.logger.debug("{} receiving FlowFiles from {}", this, peer);
        CommunicationsSession communicationsSession = peer.getCommunicationsSession();
        CompressionInputStream dataInputStream = new DataInputStream(communicationsSession.getInput().getInputStream());
        DataOutputStream dataOutputStream = new DataOutputStream(communicationsSession.getOutput().getOutputStream());
        String userDn = communicationsSession.getUserDn();
        if (userDn == null) {
            userDn = "none";
        }
        StopWatch stopWatch = new StopWatch(true);
        CRC32 crc32 = new CRC32();
        HashSet hashSet = new HashSet();
        long j = 0;
        boolean z = true;
        String str = "";
        while (z) {
            long nanoTime = System.nanoTime();
            CheckedInputStream checkedInputStream = new CheckedInputStream(this.useGzip.booleanValue() ? new CompressionInputStream(dataInputStream) : dataInputStream, crc32);
            DataPacket decode = flowFileCodec.decode(checkedInputStream);
            FlowFile putAllAttributes = processSession.putAllAttributes(processSession.importFrom(decode.getData(), processSession.create()), decode.getAttributes());
            long convert = TimeUnit.MILLISECONDS.convert(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
            String str2 = (String) decode.getAttributes().get(CoreAttributes.UUID.key());
            FlowFile putAttribute = processSession.putAttribute(putAllAttributes, CoreAttributes.UUID.key(), UUID.randomUUID().toString());
            processSession.getProvenanceReporter().receive(putAttribute, this.transitUriPrefix == null ? peer.getUrl() : this.transitUriPrefix + str2, str2 == null ? null : "urn:nifi:" + str2, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, convert);
            processSession.transfer(putAttribute, Relationship.ANONYMOUS);
            hashSet.add(putAttribute);
            j += putAttribute.getSize();
            Response read = Response.read(dataInputStream);
            switch (AnonymousClass2.$SwitchMap$org$apache$nifi$remote$protocol$socket$ResponseCode[read.getCode().ordinal()]) {
                case StandardRemoteGroupPort.GZIP_COMPRESSION_LEVEL /* 1 */:
                    this.logger.debug("{} Received ContinueTransaction indicator from {}", this, peer);
                    break;
                case 2:
                    this.logger.debug("{} Received FinishTransaction indicator from {}", this, peer);
                    z = false;
                    str = String.valueOf(checkedInputStream.getChecksum().getValue());
                    break;
                case 3:
                    this.logger.info("{} Received CancelTransaction indicator from {} with explanation {}", new Object[]{this, peer, read.getMessage()});
                    processSession.rollback();
                    return 0;
                default:
                    throw new ProtocolException("Received unexpected response from peer: when expecting Continue Transaction or Finish Transaction, received" + read);
            }
        }
        this.logger.debug("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer);
        ResponseCode.CONFIRM_TRANSACTION.writeResponse(dataOutputStream, str);
        Response read2 = Response.read(dataInputStream);
        this.logger.debug("{} Received {} from {}", new Object[]{this, read2, peer});
        switch (AnonymousClass2.$SwitchMap$org$apache$nifi$remote$protocol$socket$ResponseCode[read2.getCode().ordinal()]) {
            case 4:
                processSession.commit();
                if (processContext.getAvailableRelationships().isEmpty()) {
                    this.logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
                    ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dataOutputStream);
                } else {
                    this.logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
                    ResponseCode.TRANSACTION_FINISHED.writeResponse(dataOutputStream);
                }
                stopWatch.stop();
                this.logger.info("{} Successfully received {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{this, hashSet.size() < 20 ? hashSet.toString() : hashSet.size() + " FlowFiles", FormatUtils.formatDataSize(j), peer, Long.valueOf(stopWatch.getDuration(TimeUnit.MILLISECONDS)), stopWatch.calculateDataRate(j)});
                return hashSet.size();
            case 5:
                processSession.rollback();
                throw new IOException(this + " Received a BadChecksum response from peer " + peer);
            default:
                throw new ProtocolException(this + " Received unexpected Response Code from peer " + peer + " : " + read2 + "; expected 'Confirm Transaction' Response Code");
        }
    }

    public RequestType getRequestType(Peer peer) throws IOException {
        if (!this.handshakeCompleted) {
            throw new IllegalStateException("Handshake has not been completed");
        }
        if (this.shutdown) {
            throw new IllegalStateException("Protocol is shutdown");
        }
        this.logger.debug("{} Reading Request Type from {} using {}", new Object[]{this, peer, peer.getCommunicationsSession()});
        RequestType readRequestType = RequestType.readRequestType(new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream()));
        this.logger.debug("{} Got Request Type {} from {}", new Object[]{this, readRequestType, peer});
        return readRequestType;
    }

    public VersionNegotiator getVersionNegotiator() {
        return this.versionNegotiator;
    }

    public void shutdown(Peer peer) {
        this.logger.debug("{} Shutting down with {}", this, peer);
        this.shutdown = true;
    }

    public boolean isShutdown() {
        return this.shutdown;
    }

    public void sendPeerList(Peer peer) throws IOException {
        if (!this.handshakeCompleted) {
            throw new IllegalStateException("Handshake has not been completed");
        }
        if (this.shutdown) {
            throw new IllegalStateException("Protocol is shutdown");
        }
        this.logger.debug("{} Sending Peer List to {}", this, peer);
        DataOutputStream dataOutputStream = new DataOutputStream(peer.getCommunicationsSession().getOutput().getOutputStream());
        NiFiProperties niFiProperties = NiFiProperties.getInstance();
        dataOutputStream.writeInt(1);
        dataOutputStream.writeUTF(InetAddress.getLocalHost().getHostName());
        dataOutputStream.writeInt(niFiProperties.getRemoteInputPort().intValue());
        dataOutputStream.writeBoolean(niFiProperties.isSiteToSiteSecure().booleanValue());
        dataOutputStream.writeInt(0);
        dataOutputStream.flush();
    }

    public String getResourceName() {
        return "SocketFlowFileProtocol";
    }

    public void setNodeInformant(NodeInformant nodeInformant) {
    }

    public long getRequestExpiration() {
        return this.requestExpirationMillis;
    }

    public String toString() {
        return "SocketFlowFileServerProtocol[CommsID=" + this.commsIdentifier + "]";
    }
}
