package org.apache.nifi.remote.protocol;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import java.util.zip.CheckedOutputStream;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.SiteToSiteAttributes;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PortAuthorizationResult;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.remote.StandardRemoteGroupPort;
import org.apache.nifi.remote.cluster.NodeInformant;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.io.CompressionInputStream;
import org.apache.nifi.remote.io.CompressionOutputStream;
import org.apache.nifi.remote.util.StandardDataPacket;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.class */
public abstract class AbstractFlowFileServerProtocol implements ServerProtocol {
    protected ProcessGroup rootGroup;
    protected PublicPort port;
    protected boolean handshakeCompleted;
    protected HandshakeProperties handshakeProperties;
    protected static final long DEFAULT_BATCH_NANOS = TimeUnit.SECONDS.toNanos(5);
    protected boolean shutdown = false;
    protected FlowFileCodec negotiatedFlowFileCodec = null;
    protected final Logger logger = LoggerFactory.getLogger(getClass());

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.nifi.remote.protocol.AbstractFlowFileServerProtocol$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$nifi$remote$protocol$HandshakeProperty;
        static final /* synthetic */ int[] $SwitchMap$org$apache$nifi$remote$protocol$ResponseCode = new int[ResponseCode.values().length];

        static {
            try {
                $SwitchMap$org$apache$nifi$remote$protocol$ResponseCode[ResponseCode.CONTINUE_TRANSACTION.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$nifi$remote$protocol$ResponseCode[ResponseCode.FINISH_TRANSACTION.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$nifi$remote$protocol$ResponseCode[ResponseCode.CANCEL_TRANSACTION.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$nifi$remote$protocol$ResponseCode[ResponseCode.CONFIRM_TRANSACTION.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$nifi$remote$protocol$ResponseCode[ResponseCode.BAD_CHECKSUM.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            $SwitchMap$org$apache$nifi$remote$protocol$HandshakeProperty = new int[HandshakeProperty.values().length];
            try {
                $SwitchMap$org$apache$nifi$remote$protocol$HandshakeProperty[HandshakeProperty.GZIP.ordinal()] = 1;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$nifi$remote$protocol$HandshakeProperty[HandshakeProperty.REQUEST_EXPIRATION_MILLIS.ordinal()] = 2;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$nifi$remote$protocol$HandshakeProperty[HandshakeProperty.BATCH_COUNT.ordinal()] = 3;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$nifi$remote$protocol$HandshakeProperty[HandshakeProperty.BATCH_SIZE.ordinal()] = 4;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$nifi$remote$protocol$HandshakeProperty[HandshakeProperty.BATCH_DURATION.ordinal()] = 5;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$apache$nifi$remote$protocol$HandshakeProperty[HandshakeProperty.PORT_IDENTIFIER.ordinal()] = 6;
            } catch (NoSuchFieldError e11) {
            }
        }
    }

    public void setRootProcessGroup(ProcessGroup processGroup) {
        if (!processGroup.isRootGroup()) {
            throw new IllegalArgumentException("Specified group was not a root group.");
        }
        this.rootGroup = processGroup;
    }

    public boolean isHandshakeSuccessful() {
        return this.handshakeCompleted;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Failed to find 'out' block for switch in B:10:0x0072. Please report as an issue. */
    public void validateHandshakeRequest(HandshakeProperties handshakeProperties, Peer peer, Map<String, String> map) throws HandshakeException {
        Boolean bool = null;
        for (Map.Entry<String, String> entry : map.entrySet()) {
            String key = entry.getKey();
            String value = entry.getValue();
            try {
                HandshakeProperty valueOf = HandshakeProperty.valueOf(key);
                try {
                    switch (AnonymousClass2.$SwitchMap$org$apache$nifi$remote$protocol$HandshakeProperty[valueOf.ordinal()]) {
                        case StandardRemoteGroupPort.GZIP_COMPRESSION_LEVEL /* 1 */:
                            bool = Boolean.valueOf(Boolean.parseBoolean(value));
                            handshakeProperties.setUseGzip(bool);
                        case 2:
                            handshakeProperties.setExpirationMillis(Long.parseLong(value));
                        case 3:
                            handshakeProperties.setBatchCount(Integer.parseInt(value));
                        case 4:
                            handshakeProperties.setBatchBytes(Long.parseLong(value));
                        case 5:
                            handshakeProperties.setBatchDurationNanos(TimeUnit.MILLISECONDS.toNanos(Long.parseLong(value)));
                        case 6:
                            checkPortStatus(peer, value);
                    }
                } catch (NumberFormatException e) {
                    throw new HandshakeException(ResponseCode.ILLEGAL_PROPERTY_VALUE, "Received invalid value for property '" + valueOf + "'; invalid value: " + value);
                }
            } catch (Exception e2) {
                throw new HandshakeException(ResponseCode.UNKNOWN_PROPERTY_NAME, "Received unknown property: " + key);
            }
        }
        if (bool == null) {
            this.logger.debug("Responding with ResponseCode MISSING_PROPERTY because GZIP Property missing");
            throw new HandshakeException(ResponseCode.MISSING_PROPERTY, "Missing Property " + HandshakeProperty.GZIP.name());
        }
    }

    protected void checkPortStatus(Peer peer, String str) throws HandshakeException {
        Port findInputPort = this.rootGroup.findInputPort(str);
        if (findInputPort == null) {
            findInputPort = this.rootGroup.findOutputPort(str);
        }
        if (findInputPort == null) {
            this.logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", str);
            throw new HandshakeException(ResponseCode.UNKNOWN_PORT, "Received unknown port identifier: " + str);
        }
        if (!(findInputPort instanceof PublicPort)) {
            this.logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", str);
            throw new HandshakeException(ResponseCode.UNKNOWN_PORT, "Received port identifier " + str + ", but this Port is not remotely accessible");
        }
        this.port = (PublicPort) findInputPort;
        PortAuthorizationResult checkUserAuthorization = this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn());
        if (!checkUserAuthorization.isAuthorized()) {
            this.logger.debug("Responding with ResponseCode UNAUTHORIZED: ", checkUserAuthorization.getExplanation());
            throw new HandshakeException(ResponseCode.UNAUTHORIZED, checkUserAuthorization.getExplanation());
        }
        if (!findInputPort.isValid()) {
            this.logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", findInputPort);
            throw new HandshakeException(ResponseCode.PORT_NOT_IN_VALID_STATE, "Port is not valid");
        }
        if (!findInputPort.isRunning()) {
            this.logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", findInputPort);
            throw new HandshakeException(ResponseCode.PORT_NOT_IN_VALID_STATE, "Port not running");
        }
        if (getVersionNegotiator().getVersion() > 1) {
            Iterator it = this.port.getConnections().iterator();
            while (it.hasNext()) {
                if (((Connection) it.next()).getFlowFileQueue().isFull()) {
                    this.logger.debug("Responding with ResponseCode PORTS_DESTINATION_FULL for {}", this.port);
                    throw new HandshakeException(ResponseCode.PORTS_DESTINATION_FULL, "Received port identifier " + str + ", but its destination is full");
                }
            }
        }
    }

    public PublicPort getPort() {
        return this.port;
    }

    public FlowFileCodec getPreNegotiatedCodec() {
        return this.negotiatedFlowFileCodec;
    }

    public final void handshake(Peer peer) throws IOException, HandshakeException {
        if (this.handshakeCompleted) {
            throw new IllegalStateException("Handshake has already been completed");
        }
        if (this.shutdown) {
            throw new IllegalStateException("Protocol is shutdown");
        }
        this.logger.debug("{} Handshaking with {}", this, peer);
        this.handshakeProperties = doHandshake(peer);
        this.logger.debug("{} Finished handshake with {}", this, peer);
        this.handshakeCompleted = true;
    }

    protected abstract HandshakeProperties doHandshake(Peer peer) throws IOException, HandshakeException;

    public int transferFlowFiles(Peer peer, ProcessContext processContext, ProcessSession processSession, final FlowFileCodec flowFileCodec) throws IOException, ProtocolException {
        if (!this.handshakeCompleted) {
            throw new IllegalStateException("Handshake has not been completed");
        }
        if (this.shutdown) {
            throw new IllegalStateException("Protocol is shutdown");
        }
        this.logger.debug("{} Sending FlowFiles to {}", this, peer);
        CommunicationsSession communicationsSession = peer.getCommunicationsSession();
        String userDn = communicationsSession.getUserDn();
        if (userDn == null) {
            userDn = "none";
        }
        FlowFile flowFile = processSession.get();
        if (flowFile == null) {
            this.logger.debug("{} No data to send to {}", this, peer);
            writeTransactionResponse(true, ResponseCode.NO_MORE_DATA, communicationsSession);
            return 0;
        }
        this.logger.debug("{} Data is available to send to {}", this, peer);
        writeTransactionResponse(true, ResponseCode.MORE_DATA, communicationsSession);
        StopWatch stopWatch = new StopWatch(true);
        long j = 0;
        HashSet hashSet = new HashSet();
        CRC32 crc32 = new CRC32();
        boolean z = true;
        long nanoTime = System.nanoTime();
        String str = "";
        CompressionOutputStream dataOutputStream = new DataOutputStream(communicationsSession.getOutput().getOutputStream());
        while (z) {
            boolean isUseGzip = this.handshakeProperties.isUseGzip();
            CompressionOutputStream compressionOutputStream = isUseGzip ? new CompressionOutputStream(dataOutputStream) : dataOutputStream;
            this.logger.debug("{} Sending {} to {}", new Object[]{this, flowFile, peer});
            final CheckedOutputStream checkedOutputStream = new CheckedOutputStream(compressionOutputStream, crc32);
            StopWatch stopWatch2 = new StopWatch(true);
            final FlowFile flowFile2 = flowFile;
            processSession.read(flowFile, new InputStreamCallback() { // from class: org.apache.nifi.remote.protocol.AbstractFlowFileServerProtocol.1
                public void process(InputStream inputStream) throws IOException {
                    flowFileCodec.encode(new StandardDataPacket(flowFile2.getAttributes(), inputStream, flowFile2.getSize()), checkedOutputStream);
                }
            });
            long elapsed = stopWatch2.getElapsed(TimeUnit.MILLISECONDS);
            if (isUseGzip) {
                checkedOutputStream.close();
            }
            hashSet.add(flowFile);
            j += flowFile.getSize();
            processSession.getProvenanceReporter().send(flowFile, createTransitUri(peer, flowFile.getAttribute(CoreAttributes.UUID.key())), "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, elapsed, false);
            processSession.remove(flowFile);
            long nanoTime2 = System.nanoTime() - nanoTime;
            boolean z2 = true;
            double batchDurationNanos = this.handshakeProperties.getBatchDurationNanos();
            if (nanoTime2 >= batchDurationNanos && batchDurationNanos > 0.0d) {
                z2 = false;
            }
            double batchBytes = this.handshakeProperties.getBatchBytes();
            if (j >= batchBytes && batchBytes > 0.0d) {
                z2 = false;
            }
            double batchCount = this.handshakeProperties.getBatchCount();
            if (hashSet.size() >= batchCount && batchCount > 0.0d) {
                z2 = false;
            }
            if (batchDurationNanos == 0.0d && batchBytes == 0.0d && batchCount == 0.0d) {
                z2 = nanoTime2 < DEFAULT_BATCH_NANOS;
            }
            flowFile = z2 ? processSession.get() : null;
            z = flowFile != null;
            if (z) {
                this.logger.debug("{} Sending ContinueTransaction indicator to {}", this, peer);
                writeTransactionResponse(true, ResponseCode.CONTINUE_TRANSACTION, communicationsSession);
            } else {
                this.logger.debug("{} Sending FinishTransaction indicator to {}", this, peer);
                writeTransactionResponse(true, ResponseCode.FINISH_TRANSACTION, communicationsSession);
                str = String.valueOf(checkedOutputStream.getChecksum().getValue());
            }
        }
        return commitTransferTransaction(peer, new FlowFileTransaction(processSession, processContext, stopWatch, j, hashSet, str));
    }

    protected String createTransitUri(Peer peer, String str) {
        return peer.createTransitUri(str);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int commitTransferTransaction(Peer peer, FlowFileTransaction flowFileTransaction) throws IOException {
        ProcessSession session = flowFileTransaction.getSession();
        Set<FlowFile> flowFilesSent = flowFileTransaction.getFlowFilesSent();
        CommunicationsSession communicationsSession = peer.getCommunicationsSession();
        Response readTransactionResponse = readTransactionResponse(true, communicationsSession);
        if (readTransactionResponse.getCode() != ResponseCode.CONFIRM_TRANSACTION) {
            throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + readTransactionResponse);
        }
        this.logger.debug("{} Received {}  from {}", new Object[]{this, readTransactionResponse, peer});
        String message = readTransactionResponse.getMessage();
        if (getVersionNegotiator().getVersion() > 3) {
            String calculatedCRC = flowFileTransaction.getCalculatedCRC();
            if (!message.equals(calculatedCRC)) {
                writeTransactionResponse(true, ResponseCode.BAD_CHECKSUM, communicationsSession);
                session.rollback();
                throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + message + "; canceling transaction and rolling back session");
            }
        }
        writeTransactionResponse(true, ResponseCode.CONFIRM_TRANSACTION, communicationsSession, "");
        String obj = flowFilesSent.size() < 20 ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles";
        try {
            Response readTransactionResponse2 = readTransactionResponse(true, communicationsSession);
            this.logger.debug("{} received {} from {}", new Object[]{this, readTransactionResponse2, peer});
            if (readTransactionResponse2.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL) {
                peer.penalize(this.port.getIdentifier(), this.port.getYieldPeriod(TimeUnit.MILLISECONDS));
            } else if (readTransactionResponse2.getCode() != ResponseCode.TRANSACTION_FINISHED) {
                throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + readTransactionResponse2);
            }
            session.commit();
            StopWatch stopWatch = flowFileTransaction.getStopWatch();
            long bytesSent = flowFileTransaction.getBytesSent();
            stopWatch.stop();
            this.logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[]{this, obj, FormatUtils.formatDataSize(bytesSent), peer, Long.valueOf(stopWatch.getDuration(TimeUnit.MILLISECONDS)), stopWatch.calculateDataRate(bytesSent)});
            return flowFilesSent.size();
        } catch (IOException e) {
            this.logger.error("{} Failed to receive a response from {} when expecting a TransactionFinished Indicator. It is unknown whether or not the peer successfully received/processed the data. Therefore, {} will be rolled back, possibly resulting in data duplication of {}", new Object[]{this, peer, session, obj});
            session.rollback();
            throw e;
        }
    }

    protected Response readTransactionResponse(boolean z, CommunicationsSession communicationsSession) throws IOException {
        return Response.read(new DataInputStream(communicationsSession.getInput().getInputStream()));
    }

    protected final void writeTransactionResponse(boolean z, ResponseCode responseCode, CommunicationsSession communicationsSession) throws IOException {
        writeTransactionResponse(z, responseCode, communicationsSession, null);
    }

    protected void writeTransactionResponse(boolean z, ResponseCode responseCode, CommunicationsSession communicationsSession, String str) throws IOException {
        DataOutputStream dataOutputStream = new DataOutputStream(communicationsSession.getOutput().getOutputStream());
        if (str == null) {
            responseCode.writeResponse(dataOutputStream);
        } else {
            responseCode.writeResponse(dataOutputStream, str);
        }
    }

    public int receiveFlowFiles(Peer peer, ProcessContext processContext, ProcessSession processSession, FlowFileCodec flowFileCodec) throws IOException, ProtocolException {
        if (!this.handshakeCompleted) {
            throw new IllegalStateException("Handshake has not been completed");
        }
        if (this.shutdown) {
            throw new IllegalStateException("Protocol is shutdown");
        }
        this.logger.debug("{} receiving FlowFiles from {}", this, peer);
        CommunicationsSession communicationsSession = peer.getCommunicationsSession();
        CompressionInputStream dataInputStream = new DataInputStream(communicationsSession.getInput().getInputStream());
        String userDn = communicationsSession.getUserDn();
        if (userDn == null) {
            userDn = "none";
        }
        StopWatch stopWatch = new StopWatch(true);
        CRC32 crc32 = new CRC32();
        HashSet hashSet = new HashSet();
        long j = 0;
        boolean z = true;
        while (true) {
            if (z) {
                long nanoTime = System.nanoTime();
                CheckedInputStream checkedInputStream = new CheckedInputStream(this.handshakeProperties.isUseGzip() ? new CompressionInputStream(dataInputStream) : dataInputStream, crc32);
                DataPacket decode = flowFileCodec.decode(checkedInputStream);
                if (decode == null) {
                    this.logger.debug("{} Received null dataPacket indicating the end of transaction from {}", this, peer);
                } else {
                    FlowFile putAllAttributes = processSession.putAllAttributes(processSession.importFrom(decode.getData(), processSession.create()), decode.getAttributes());
                    if (this.handshakeProperties.isUseGzip()) {
                        checkedInputStream.close();
                    }
                    long convert = TimeUnit.MILLISECONDS.convert(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS);
                    String str = (String) decode.getAttributes().get(CoreAttributes.UUID.key());
                    String host = StringUtils.isEmpty(peer.getHost()) ? "unknown" : peer.getHost();
                    String valueOf = peer.getPort() <= 0 ? "unknown" : String.valueOf(peer.getPort());
                    HashMap hashMap = new HashMap(4);
                    hashMap.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
                    hashMap.put(SiteToSiteAttributes.S2S_HOST.key(), host);
                    hashMap.put(SiteToSiteAttributes.S2S_ADDRESS.key(), host + ":" + valueOf);
                    FlowFile putAllAttributes2 = processSession.putAllAttributes(putAllAttributes, hashMap);
                    processSession.getProvenanceReporter().receive(putAllAttributes2, createTransitUri(peer, str), str == null ? null : "urn:nifi:" + str, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, convert);
                    processSession.transfer(putAllAttributes2, Relationship.ANONYMOUS);
                    hashSet.add(putAllAttributes2);
                    j += putAllAttributes2.getSize();
                    Response readTransactionResponse = readTransactionResponse(false, communicationsSession);
                    switch (AnonymousClass2.$SwitchMap$org$apache$nifi$remote$protocol$ResponseCode[readTransactionResponse.getCode().ordinal()]) {
                        case StandardRemoteGroupPort.GZIP_COMPRESSION_LEVEL /* 1 */:
                            this.logger.debug("{} Received ContinueTransaction indicator from {}", this, peer);
                            break;
                        case 2:
                            this.logger.debug("{} Received FinishTransaction indicator from {}", this, peer);
                            z = false;
                            break;
                        case 3:
                            this.logger.info("{} Received CancelTransaction indicator from {} with explanation {}", new Object[]{this, peer, readTransactionResponse.getMessage()});
                            processSession.rollback();
                            return 0;
                        default:
                            throw new ProtocolException("Received unexpected response from peer: when expecting Continue Transaction or Finish Transaction, received" + readTransactionResponse);
                    }
                }
            }
        }
        this.logger.debug("{} Sending CONFIRM_TRANSACTION Response Code to {}", this, peer);
        String valueOf2 = String.valueOf(crc32.getValue());
        writeTransactionResponse(false, ResponseCode.CONFIRM_TRANSACTION, communicationsSession, valueOf2);
        return commitReceiveTransaction(peer, new FlowFileTransaction(processSession, processContext, stopWatch, j, hashSet, valueOf2));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int commitReceiveTransaction(Peer peer, FlowFileTransaction flowFileTransaction) throws IOException {
        CommunicationsSession communicationsSession = peer.getCommunicationsSession();
        ProcessSession session = flowFileTransaction.getSession();
        Response readTransactionResponse = readTransactionResponse(false, communicationsSession);
        this.logger.debug("{} Received {} from {}", new Object[]{this, readTransactionResponse, peer});
        switch (AnonymousClass2.$SwitchMap$org$apache$nifi$remote$protocol$ResponseCode[readTransactionResponse.getCode().ordinal()]) {
            case 4:
                session.commit();
                if (flowFileTransaction.getContext().getAvailableRelationships().isEmpty()) {
                    this.logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", this, peer);
                    writeTransactionResponse(false, ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL, communicationsSession);
                } else {
                    this.logger.debug("{} Sending TRANSACTION_FINISHED to {}", this, peer);
                    writeTransactionResponse(false, ResponseCode.TRANSACTION_FINISHED, communicationsSession);
                }
                Set<FlowFile> flowFilesSent = flowFileTransaction.getFlowFilesSent();
                long bytesSent = flowFileTransaction.getBytesSent();
                StopWatch stopWatch = flowFileTransaction.getStopWatch();
                stopWatch.stop();
                this.logger.info("{} Successfully received {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{this, flowFilesSent.size() < 20 ? flowFilesSent.toString() : flowFilesSent.size() + " FlowFiles", FormatUtils.formatDataSize(bytesSent), peer, Long.valueOf(stopWatch.getDuration(TimeUnit.MILLISECONDS)), stopWatch.calculateDataRate(bytesSent)});
                return flowFilesSent.size();
            case 5:
                session.rollback();
                throw new IOException(this + " Received a BadChecksum response from peer " + peer);
            default:
                throw new ProtocolException(this + " Received unexpected Response Code from peer " + peer + " : " + readTransactionResponse + "; expected 'Confirm Transaction' Response Code");
        }
    }

    public void shutdown(Peer peer) {
        this.logger.debug("{} Shutting down with {}", this, peer);
        this.shutdown = true;
    }

    public boolean isShutdown() {
        return this.shutdown;
    }

    public void setNodeInformant(NodeInformant nodeInformant) {
    }

    public long getRequestExpiration() {
        return this.handshakeProperties.getExpirationMillis();
    }

    public String toString() {
        return getClass().getSimpleName() + "[CommsID=" + (this.handshakeProperties != null ? this.handshakeProperties.getCommsIdentifier() : null) + "]";
    }
}
