package org.apache.nifi.web.api;

import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation;
import com.wordnik.swagger.annotations.ApiParam;
import com.wordnik.swagger.annotations.ApiResponse;
import com.wordnik.swagger.annotations.ApiResponses;
import com.wordnik.swagger.annotations.Authorization;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.UriInfo;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.authorization.AccessDeniedException;
import org.apache.nifi.authorization.AuthorizationRequest;
import org.apache.nifi.authorization.AuthorizationResult;
import org.apache.nifi.authorization.Authorizer;
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.UserContextKeys;
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.authorization.user.NiFiUserUtils;
import org.apache.nifi.remote.HttpRemoteSiteListener;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerDescription;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.remote.client.http.TransportProtocolVersionNegotiator;
import org.apache.nifi.remote.exception.BadRequestException;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.NotAuthorizedException;
import org.apache.nifi.remote.exception.RequestExpiredException;
import org.apache.nifi.remote.io.http.HttpServerCommunicationsSession;
import org.apache.nifi.remote.protocol.HandshakeProperty;
import org.apache.nifi.remote.protocol.ResponseCode;
import org.apache.nifi.remote.protocol.http.HttpFlowFileServerProtocol;
import org.apache.nifi.remote.protocol.http.StandardHttpFlowFileServerProtocol;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.web.api.ApplicationResource;
import org.apache.nifi.web.api.entity.TransactionResultEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path("/data-transfer")
@Api(value = "/data-transfer", description = "Supports data transfers with this NiFi using HTTP based site to site")
/* loaded from: input_file:WEB-INF/classes/org/apache/nifi/web/api/DataTransferResource.class */
public class DataTransferResource extends ApplicationResource {
    private static final Logger logger = LoggerFactory.getLogger(DataTransferResource.class);
    public static final String CHECK_SUM = "checksum";
    public static final String RESPONSE_CODE = "responseCode";
    private static final String PORT_TYPE_INPUT = "input-ports";
    private static final String PORT_TYPE_OUTPUT = "output-ports";
    private Authorizer authorizer;
    private final ApplicationResource.ResponseCreator responseCreator = new ApplicationResource.ResponseCreator();
    private final VersionNegotiator transportProtocolVersionNegotiator = new TransportProtocolVersionNegotiator(new int[]{1});
    private final HttpRemoteSiteListener transactionManager = HttpRemoteSiteListener.getInstance();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/classes/org/apache/nifi/web/api/DataTransferResource$ValidateRequestResult.class */
    public class ValidateRequestResult {
        private Integer transportProtocolVersion;
        private Response errResponse;

        private ValidateRequestResult() {
        }
    }

    protected void authorizeDataTransfer(ResourceType resourceType, String str) {
        HashMap hashMap;
        NiFiUser niFiUser = NiFiUserUtils.getNiFiUser();
        if (!ResourceType.InputPort.equals(resourceType) && !ResourceType.OutputPort.equals(resourceType)) {
            throw new IllegalArgumentException("The resource must be an Input or Output Port.");
        }
        if (niFiUser.getClientAddress() == null || niFiUser.getClientAddress().trim().isEmpty()) {
            hashMap = null;
        } else {
            hashMap = new HashMap();
            hashMap.put(UserContextKeys.CLIENT_ADDRESS.name(), niFiUser.getClientAddress());
        }
        AuthorizationResult authorize = this.authorizer.authorize(new AuthorizationRequest.Builder().resource(ResourceFactory.getDataTransferResource(ResourceFactory.getComponentResource(resourceType, str, str))).identity(niFiUser.getIdentity()).anonymous(Boolean.valueOf(niFiUser.isAnonymous())).accessAttempt(true).action(RequestAction.WRITE).userContext(hashMap).build());
        if (AuthorizationResult.Result.Approved.equals(authorize.getResult())) {
        } else {
            throw new AccessDeniedException(StringUtils.isNotBlank(authorize.getExplanation()) ? authorize.getExplanation() : "Access is denied");
        }
    }

    @Path("{portType}/{portId}/transactions")
    @ApiOperation(value = "Create a transaction to the specified output port or input port", response = TransactionResultEntity.class, authorizations = {@Authorization(value = "Write - /data-transfer/{component-type}/{uuid}", type = "")})
    @ApiResponses({@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), @ApiResponse(code = 401, message = "Client could not be authenticated."), @ApiResponse(code = 403, message = "Client is not authorized to make this request."), @ApiResponse(code = 404, message = "The specified resource could not be found."), @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful."), @ApiResponse(code = 503, message = "NiFi instance is not ready for serving request, or temporarily overloaded. Retrying the same request later may be successful")})
    @POST
    @Produces({"application/json"})
    public Response createPortTransaction(@PathParam("portType") @ApiParam(value = "The port type.", required = true, allowableValues = "input-ports, output-ports") String str, @PathParam("portId") String str2, @Context HttpServletRequest httpServletRequest, @Context ServletContext servletContext, @Context UriInfo uriInfo, InputStream inputStream) {
        if (!PORT_TYPE_INPUT.equals(str) && !PORT_TYPE_OUTPUT.equals(str)) {
            return this.responseCreator.wrongPortTypeResponse(str, str2);
        }
        authorizeDataTransfer(PORT_TYPE_INPUT.equals(str) ? ResourceType.InputPort : ResourceType.OutputPort, str2);
        ValidateRequestResult validateResult = validateResult(httpServletRequest, str2);
        if (validateResult.errResponse != null) {
            return validateResult.errResponse;
        }
        logger.debug("createPortTransaction request: clientId={}, portType={}, portId={}", str, str2);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        String createTransaction = this.transactionManager.createTransaction();
        Peer constructPeer = constructPeer(httpServletRequest, inputStream, byteArrayOutputStream, str2, createTransaction);
        int intValue = validateResult.transportProtocolVersion.intValue();
        try {
            initiateServerProtocol(httpServletRequest, constructPeer, Integer.valueOf(intValue));
            TransactionResultEntity transactionResultEntity = new TransactionResultEntity();
            transactionResultEntity.setResponseCode(ResponseCode.PROPERTIES_OK.getCode());
            transactionResultEntity.setMessage("Handshake properties are valid, and port is running. A transaction is created:" + createTransaction);
            return this.responseCreator.locationResponse(uriInfo, str, str2, createTransaction, transactionResultEntity, Integer.valueOf(intValue), this.transactionManager);
        } catch (Exception e) {
            this.transactionManager.cancelTransaction(createTransaction);
            return this.responseCreator.unexpectedErrorResponse(str2, e);
        } catch (HandshakeException e2) {
            this.transactionManager.cancelTransaction(createTransaction);
            return this.responseCreator.handshakeExceptionResponse(e2);
        }
    }

    @Path("input-ports/{portId}/transactions/{transactionId}/flow-files")
    @Consumes({"application/octet-stream"})
    @ApiOperation(value = "Transfer flow files to the input port", response = String.class, authorizations = {@Authorization(value = "Write - /data-transfer/input-ports/{uuid}", type = "")})
    @ApiResponses({@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), @ApiResponse(code = 401, message = "Client could not be authenticated."), @ApiResponse(code = 403, message = "Client is not authorized to make this request."), @ApiResponse(code = 404, message = "The specified resource could not be found."), @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful."), @ApiResponse(code = 503, message = "NiFi instance is not ready for serving request, or temporarily overloaded. Retrying the same request later may be successful")})
    @POST
    @Produces({"text/plain"})
    public Response receiveFlowFiles(@PathParam("portId") @ApiParam(value = "The input port id.", required = true) String str, @PathParam("transactionId") String str2, @Context HttpServletRequest httpServletRequest, @Context ServletContext servletContext, InputStream inputStream) {
        authorizeDataTransfer(ResourceType.InputPort, str);
        ValidateRequestResult validateResult = validateResult(httpServletRequest, str, str2);
        if (validateResult.errResponse != null) {
            return validateResult.errResponse;
        }
        logger.debug("receiveFlowFiles request: portId={}", str);
        Peer constructPeer = constructPeer(httpServletRequest, inputStream, new ByteArrayOutputStream(), str, str2);
        int intValue = validateResult.transportProtocolVersion.intValue();
        try {
            HttpFlowFileServerProtocol initiateServerProtocol = initiateServerProtocol(httpServletRequest, constructPeer, Integer.valueOf(intValue));
            int receiveFlowFiles = initiateServerProtocol.getPort().receiveFlowFiles(constructPeer, initiateServerProtocol);
            logger.debug("finished receiving flow files, numOfFlowFiles={}", Integer.valueOf(receiveFlowFiles));
            if (receiveFlowFiles < 1) {
                return Response.status(Response.Status.BAD_REQUEST).entity("Client should send request when there is data to send. There was no flow file sent.").build();
            }
            return this.responseCreator.acceptedResponse(this.transactionManager, constructPeer.getCommunicationsSession().getChecksum(), Integer.valueOf(intValue));
        } catch (BadRequestException | RequestExpiredException e) {
            return this.responseCreator.badRequestResponse(e);
        } catch (NotAuthorizedException e2) {
            return this.responseCreator.unauthorizedResponse(e2);
        } catch (Exception e3) {
            return this.responseCreator.unexpectedErrorResponse(str, e3);
        } catch (HandshakeException e4) {
            return this.responseCreator.handshakeExceptionResponse(e4);
        }
    }

    private HttpFlowFileServerProtocol initiateServerProtocol(HttpServletRequest httpServletRequest, Peer peer, Integer num) throws IOException {
        StandardVersionNegotiator standardVersionNegotiator = new StandardVersionNegotiator(new int[]{new TransportProtocolVersionNegotiator(new int[]{num.intValue()}).getTransactionProtocolVersion()});
        peer.getCommunicationsSession().setDataTransferUrl(httpServletRequest.getRequestURL().toString());
        HttpFlowFileServerProtocol httpFlowFileServerProtocol = getHttpFlowFileServerProtocol(standardVersionNegotiator);
        HttpRemoteSiteListener.getInstance().setupServerProtocol(httpFlowFileServerProtocol);
        httpFlowFileServerProtocol.handshake(peer);
        return httpFlowFileServerProtocol;
    }

    HttpFlowFileServerProtocol getHttpFlowFileServerProtocol(VersionNegotiator versionNegotiator) {
        return new StandardHttpFlowFileServerProtocol(versionNegotiator);
    }

    private Peer constructPeer(HttpServletRequest httpServletRequest, InputStream inputStream, OutputStream outputStream, String str, String str2) {
        String remoteHost = httpServletRequest.getRemoteHost();
        int remotePort = httpServletRequest.getRemotePort();
        PeerDescription peerDescription = new PeerDescription(remoteHost, remotePort, httpServletRequest.isSecure());
        NiFiUser niFiUser = NiFiUserUtils.getNiFiUser();
        HttpServerCommunicationsSession httpServerCommunicationsSession = new HttpServerCommunicationsSession(inputStream, outputStream, str2, niFiUser == null ? null : niFiUser.getIdentity());
        boolean z = false;
        String header = httpServletRequest.getHeader("x-nifi-site-to-site-use-compression");
        if (!StringUtils.isEmpty(header) && Boolean.valueOf(header).booleanValue()) {
            z = true;
        }
        String header2 = httpServletRequest.getHeader("x-nifi-site-to-site-request-expiration");
        String header3 = httpServletRequest.getHeader("x-nifi-site-to-site-batch-count");
        String header4 = httpServletRequest.getHeader("x-nifi-site-to-site-batch-size");
        String header5 = httpServletRequest.getHeader("x-nifi-site-to-site-batch-duration");
        httpServerCommunicationsSession.putHandshakeParam(HandshakeProperty.PORT_IDENTIFIER, str);
        httpServerCommunicationsSession.putHandshakeParam(HandshakeProperty.GZIP, String.valueOf(z));
        if (!StringUtils.isEmpty(header2)) {
            httpServerCommunicationsSession.putHandshakeParam(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, header2);
        }
        if (!StringUtils.isEmpty(header3)) {
            httpServerCommunicationsSession.putHandshakeParam(HandshakeProperty.BATCH_COUNT, header3);
        }
        if (!StringUtils.isEmpty(header4)) {
            httpServerCommunicationsSession.putHandshakeParam(HandshakeProperty.BATCH_SIZE, header4);
        }
        if (!StringUtils.isEmpty(header5)) {
            httpServerCommunicationsSession.putHandshakeParam(HandshakeProperty.BATCH_DURATION, header5);
        }
        if (peerDescription.isSecure()) {
            NiFiUser niFiUser2 = NiFiUserUtils.getNiFiUser();
            logger.debug("initiating peer, nifiUser={}", niFiUser2);
            httpServerCommunicationsSession.setUserDn(niFiUser2.getIdentity());
        }
        return new Peer(peerDescription, httpServerCommunicationsSession, "nifi://" + remoteHost + ":" + remotePort, "nifi://localhost:" + httpServletRequest.getLocalPort());
    }

    @Path("output-ports/{portId}/transactions/{transactionId}")
    @Consumes({"application/octet-stream"})
    @DELETE
    @ApiOperation(value = "Commit or cancel the specified transaction", response = TransactionResultEntity.class, authorizations = {@Authorization(value = "Write - /data-transfer/output-ports/{uuid}", type = "")})
    @ApiResponses({@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), @ApiResponse(code = 401, message = "Client could not be authenticated."), @ApiResponse(code = 403, message = "Client is not authorized to make this request."), @ApiResponse(code = 404, message = "The specified resource could not be found."), @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful."), @ApiResponse(code = 503, message = "NiFi instance is not ready for serving request, or temporarily overloaded. Retrying the same request later may be successful")})
    @Produces({"application/json"})
    public Response commitOutputPortTransaction(@ApiParam(value = "The response code. Available values are CONFIRM_TRANSACTION(12) or CANCEL_TRANSACTION(15).", required = true) @QueryParam("responseCode") Integer num, @ApiParam(value = "A checksum calculated at client side using CRC32 to check flow file content integrity. It must match with the value calculated at server side.", required = true) @QueryParam("checksum") @DefaultValue("") String str, @PathParam("portId") @ApiParam(value = "The output port id.", required = true) String str2, @PathParam("transactionId") @ApiParam(value = "The transaction id.", required = true) String str3, @Context HttpServletRequest httpServletRequest, @Context ServletContext servletContext, InputStream inputStream) {
        authorizeDataTransfer(ResourceType.OutputPort, str2);
        ValidateRequestResult validateResult = validateResult(httpServletRequest, str2, str3);
        if (validateResult.errResponse != null) {
            return validateResult.errResponse;
        }
        logger.debug("commitOutputPortTransaction request: portId={}, transactionId={}", str2, str3);
        int intValue = validateResult.transportProtocolVersion.intValue();
        Peer constructPeer = constructPeer(httpServletRequest, inputStream, new ByteArrayOutputStream(), str2, str3);
        TransactionResultEntity transactionResultEntity = new TransactionResultEntity();
        try {
            HttpFlowFileServerProtocol initiateServerProtocol = initiateServerProtocol(httpServletRequest, constructPeer, Integer.valueOf(intValue));
            String str4 = null;
            if (num == null) {
                str4 = "responseCode is required.";
            } else if (ResponseCode.CONFIRM_TRANSACTION.getCode() != num.intValue() && ResponseCode.CANCEL_TRANSACTION.getCode() != num.intValue()) {
                str4 = "responseCode " + num + " is invalid. ";
            }
            if (str4 != null) {
                transactionResultEntity.setMessage(str4);
                transactionResultEntity.setResponseCode(ResponseCode.ABORT.getCode());
                return Response.status(Response.Status.BAD_REQUEST).entity(transactionResultEntity).build();
            }
            if (ResponseCode.CANCEL_TRANSACTION.getCode() == num.intValue()) {
                return cancelTransaction(str3, transactionResultEntity);
            }
            int commitTransferTransaction = initiateServerProtocol.commitTransferTransaction(constructPeer, str);
            transactionResultEntity.setResponseCode(ResponseCode.CONFIRM_TRANSACTION.getCode());
            transactionResultEntity.setFlowFileSent(commitTransferTransaction);
            return clusterContext(noCache(setCommonHeaders(Response.ok(transactionResultEntity), Integer.valueOf(intValue), this.transactionManager))).build();
        } catch (HandshakeException e) {
            return this.responseCreator.handshakeExceptionResponse(e);
        } catch (Exception e2) {
            HttpServerCommunicationsSession communicationsSession = constructPeer.getCommunicationsSession();
            logger.error("Failed to process the request", e2);
            if (!ResponseCode.BAD_CHECKSUM.equals(communicationsSession.getResponseCode())) {
                return this.responseCreator.unexpectedErrorResponse(str2, str3, e2);
            }
            transactionResultEntity.setResponseCode(communicationsSession.getResponseCode().getCode());
            transactionResultEntity.setMessage(e2.getMessage());
            return clusterContext(noCache(Response.status(Response.Status.BAD_REQUEST).entity(transactionResultEntity))).build();
        }
    }

    @Path("input-ports/{portId}/transactions/{transactionId}")
    @Consumes({"application/octet-stream"})
    @DELETE
    @ApiOperation(value = "Commit or cancel the specified transaction", response = TransactionResultEntity.class, authorizations = {@Authorization(value = "Write - /data-transfer/input-ports/{uuid}", type = "")})
    @ApiResponses({@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), @ApiResponse(code = 401, message = "Client could not be authenticated."), @ApiResponse(code = 403, message = "Client is not authorized to make this request."), @ApiResponse(code = 404, message = "The specified resource could not be found."), @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful."), @ApiResponse(code = 503, message = "NiFi instance is not ready for serving request, or temporarily overloaded. Retrying the same request later may be successful")})
    @Produces({"application/json"})
    public Response commitInputPortTransaction(@ApiParam(value = "The response code. Available values are BAD_CHECKSUM(19), CONFIRM_TRANSACTION(12) or CANCEL_TRANSACTION(15).", required = true) @QueryParam("responseCode") Integer num, @PathParam("portId") @ApiParam(value = "The input port id.", required = true) String str, @PathParam("transactionId") @ApiParam(value = "The transaction id.", required = true) String str2, @Context HttpServletRequest httpServletRequest, @Context ServletContext servletContext, InputStream inputStream) {
        authorizeDataTransfer(ResourceType.InputPort, str);
        ValidateRequestResult validateResult = validateResult(httpServletRequest, str, str2);
        if (validateResult.errResponse != null) {
            return validateResult.errResponse;
        }
        logger.debug("commitInputPortTransaction request: portId={}, transactionId={}, responseCode={}", new Object[]{str, str2, num});
        int intValue = validateResult.transportProtocolVersion.intValue();
        Peer constructPeer = constructPeer(httpServletRequest, inputStream, new ByteArrayOutputStream(), str, str2);
        TransactionResultEntity transactionResultEntity = new TransactionResultEntity();
        try {
            HttpFlowFileServerProtocol initiateServerProtocol = initiateServerProtocol(httpServletRequest, constructPeer, Integer.valueOf(intValue));
            HttpServerCommunicationsSession communicationsSession = constructPeer.getCommunicationsSession();
            String str3 = null;
            if (num == null) {
                str3 = "responseCode is required.";
            } else if (ResponseCode.BAD_CHECKSUM.getCode() != num.intValue() && ResponseCode.CONFIRM_TRANSACTION.getCode() != num.intValue() && ResponseCode.CANCEL_TRANSACTION.getCode() != num.intValue()) {
                str3 = "responseCode " + num + " is invalid. ";
            }
            if (str3 != null) {
                transactionResultEntity.setMessage(str3);
                transactionResultEntity.setResponseCode(ResponseCode.ABORT.getCode());
                return Response.status(Response.Status.BAD_REQUEST).entity(transactionResultEntity).build();
            }
            if (ResponseCode.CANCEL_TRANSACTION.getCode() == num.intValue()) {
                return cancelTransaction(str2, transactionResultEntity);
            }
            communicationsSession.setResponseCode(ResponseCode.fromCode(num.intValue()));
            try {
                int commitReceiveTransaction = initiateServerProtocol.commitReceiveTransaction(constructPeer);
                transactionResultEntity.setResponseCode(communicationsSession.getResponseCode().getCode());
                transactionResultEntity.setFlowFileSent(commitReceiveTransaction);
                return clusterContext(noCache(setCommonHeaders(Response.ok(transactionResultEntity), Integer.valueOf(intValue), this.transactionManager))).build();
            } catch (IOException e) {
                if (ResponseCode.BAD_CHECKSUM.getCode() != num.intValue() || !e.getMessage().contains("Received a BadChecksum response")) {
                    return this.responseCreator.unexpectedErrorResponse(str, str2, e);
                }
                transactionResultEntity.setResponseCode(ResponseCode.CANCEL_TRANSACTION.getCode());
                return clusterContext(noCache(Response.ok(transactionResultEntity))).build();
            }
        } catch (Exception e2) {
            return this.responseCreator.unexpectedErrorResponse(str, str2, e2);
        } catch (HandshakeException e3) {
            return this.responseCreator.handshakeExceptionResponse(e3);
        }
    }

    private Response cancelTransaction(String str, TransactionResultEntity transactionResultEntity) {
        this.transactionManager.cancelTransaction(str);
        transactionResultEntity.setMessage("Transaction has been canceled.");
        transactionResultEntity.setResponseCode(ResponseCode.CANCEL_TRANSACTION.getCode());
        return Response.ok(transactionResultEntity).build();
    }

    @GET
    @Path("output-ports/{portId}/transactions/{transactionId}/flow-files")
    @Consumes({"*/*"})
    @ApiOperation(value = "Transfer flow files from the output port", response = StreamingOutput.class, authorizations = {@Authorization(value = "Write - /data-transfer/output-ports/{uuid}", type = "")})
    @ApiResponses({@ApiResponse(code = 200, message = "There is no flow file to return."), @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), @ApiResponse(code = 401, message = "Client could not be authenticated."), @ApiResponse(code = 403, message = "Client is not authorized to make this request."), @ApiResponse(code = 404, message = "The specified resource could not be found."), @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful."), @ApiResponse(code = 503, message = "NiFi instance is not ready for serving request, or temporarily overloaded. Retrying the same request later may be successful")})
    @Produces({"application/octet-stream"})
    public Response transferFlowFiles(@PathParam("portId") @ApiParam(value = "The output port id.", required = true) String str, @PathParam("transactionId") String str2, @Context HttpServletRequest httpServletRequest, @Context HttpServletResponse httpServletResponse, @Context ServletContext servletContext, InputStream inputStream) {
        authorizeDataTransfer(ResourceType.OutputPort, str);
        ValidateRequestResult validateResult = validateResult(httpServletRequest, str, str2);
        if (validateResult.errResponse != null) {
            return validateResult.errResponse;
        }
        logger.debug("transferFlowFiles request: portId={}", str);
        final Peer constructPeer = constructPeer(httpServletRequest, inputStream, new ByteArrayOutputStream(), str, str2);
        int intValue = validateResult.transportProtocolVersion.intValue();
        try {
            final HttpFlowFileServerProtocol initiateServerProtocol = initiateServerProtocol(httpServletRequest, constructPeer, Integer.valueOf(intValue));
            return this.responseCreator.acceptedResponse(this.transactionManager, new StreamingOutput() { // from class: org.apache.nifi.web.api.DataTransferResource.1
                public void write(OutputStream outputStream) throws IOException, WebApplicationException {
                    constructPeer.getCommunicationsSession().getOutput().setOutputStream(outputStream);
                    try {
                        int transferFlowFiles = initiateServerProtocol.getPort().transferFlowFiles(constructPeer, initiateServerProtocol);
                        DataTransferResource.logger.debug("finished transferring flow files, numOfFlowFiles={}", Integer.valueOf(transferFlowFiles));
                        if (transferFlowFiles < 1) {
                            throw new WebApplicationException(Response.Status.OK);
                        }
                    } catch (NotAuthorizedException | BadRequestException | RequestExpiredException e) {
                        throw new IOException("Failed to process the request.", e);
                    }
                }
            }, Integer.valueOf(intValue));
        } catch (HandshakeException e) {
            return this.responseCreator.handshakeExceptionResponse(e);
        } catch (Exception e2) {
            return this.responseCreator.unexpectedErrorResponse(str, e2);
        }
    }

    @Path("input-ports/{portId}/transactions/{transactionId}")
    @Consumes({"*/*"})
    @ApiOperation(value = "Extend transaction TTL", response = TransactionResultEntity.class, authorizations = {@Authorization(value = "Write - /data-transfer/input-ports/{uuid}", type = "")})
    @ApiResponses({@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), @ApiResponse(code = 401, message = "Client could not be authenticated."), @ApiResponse(code = 403, message = "Client is not authorized to make this request."), @ApiResponse(code = 404, message = "The specified resource could not be found."), @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")})
    @Produces({"application/json"})
    @PUT
    public Response extendInputPortTransactionTTL(@PathParam("portId") String str, @PathParam("transactionId") String str2, @Context HttpServletRequest httpServletRequest, @Context HttpServletResponse httpServletResponse, @Context ServletContext servletContext, @Context UriInfo uriInfo, InputStream inputStream) {
        authorizeDataTransfer(ResourceType.InputPort, str);
        return extendPortTransactionTTL(PORT_TYPE_INPUT, str, str2, httpServletRequest, httpServletResponse, servletContext, uriInfo, inputStream);
    }

    @Path("output-ports/{portId}/transactions/{transactionId}")
    @Consumes({"*/*"})
    @ApiOperation(value = "Extend transaction TTL", response = TransactionResultEntity.class, authorizations = {@Authorization(value = "Write - /data-transfer/output-ports/{uuid}", type = "")})
    @ApiResponses({@ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), @ApiResponse(code = 401, message = "Client could not be authenticated."), @ApiResponse(code = 403, message = "Client is not authorized to make this request."), @ApiResponse(code = 404, message = "The specified resource could not be found."), @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful."), @ApiResponse(code = 503, message = "NiFi instance is not ready for serving request, or temporarily overloaded. Retrying the same request later may be successful")})
    @Produces({"application/json"})
    @PUT
    public Response extendOutputPortTransactionTTL(@PathParam("portId") String str, @PathParam("transactionId") String str2, @Context HttpServletRequest httpServletRequest, @Context HttpServletResponse httpServletResponse, @Context ServletContext servletContext, @Context UriInfo uriInfo, InputStream inputStream) {
        authorizeDataTransfer(ResourceType.OutputPort, str);
        return extendPortTransactionTTL(PORT_TYPE_OUTPUT, str, str2, httpServletRequest, httpServletResponse, servletContext, uriInfo, inputStream);
    }

    public Response extendPortTransactionTTL(String str, String str2, String str3, HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse, ServletContext servletContext, UriInfo uriInfo, InputStream inputStream) {
        ValidateRequestResult validateResult = validateResult(httpServletRequest, str2, str3);
        if (validateResult.errResponse != null) {
            return validateResult.errResponse;
        }
        if (!PORT_TYPE_INPUT.equals(str) && !PORT_TYPE_OUTPUT.equals(str)) {
            return this.responseCreator.wrongPortTypeResponse(str, str2);
        }
        logger.debug("extendOutputPortTransactionTTL request: portType={}, portId={}, transactionId={}", new Object[]{str, str2, str3});
        int intValue = validateResult.transportProtocolVersion.intValue();
        try {
            initiateServerProtocol(httpServletRequest, constructPeer(httpServletRequest, inputStream, new ByteArrayOutputStream(), str2, str3), Integer.valueOf(intValue));
            this.transactionManager.extendTransaction(str3);
            TransactionResultEntity transactionResultEntity = new TransactionResultEntity();
            transactionResultEntity.setResponseCode(ResponseCode.CONTINUE_TRANSACTION.getCode());
            transactionResultEntity.setMessage("Extended TTL.");
            return clusterContext(noCache(setCommonHeaders(Response.ok(transactionResultEntity), Integer.valueOf(intValue), this.transactionManager))).build();
        } catch (Exception e) {
            return this.responseCreator.unexpectedErrorResponse(str2, str3, e);
        } catch (HandshakeException e2) {
            return this.responseCreator.handshakeExceptionResponse(e2);
        }
    }

    private ValidateRequestResult validateResult(HttpServletRequest httpServletRequest, String str) {
        return validateResult(httpServletRequest, str, null);
    }

    private ValidateRequestResult validateResult(HttpServletRequest httpServletRequest, String str, String str2) {
        ValidateRequestResult validateRequestResult = new ValidateRequestResult();
        if (!this.properties.isSiteToSiteHttpEnabled().booleanValue()) {
            validateRequestResult.errResponse = this.responseCreator.httpSiteToSiteIsNotEnabledResponse();
            return validateRequestResult;
        }
        try {
            validateRequestResult.transportProtocolVersion = negotiateTransportProtocolVersion(httpServletRequest, this.transportProtocolVersionNegotiator);
            if (StringUtils.isEmpty(str2) || this.transactionManager.isTransactionActive(str2)) {
                return validateRequestResult;
            }
            validateRequestResult.errResponse = this.responseCreator.transactionNotFoundResponse(str, str2);
            return validateRequestResult;
        } catch (BadRequestException e) {
            validateRequestResult.errResponse = this.responseCreator.badRequestResponse(e);
            return validateRequestResult;
        }
    }

    public void setAuthorizer(Authorizer authorizer) {
        this.authorizer = authorizer;
    }
}
