package com.twitter.distributedlog.service.stream;

import com.twitter.distributedlog.AsyncLogWriter;
import com.twitter.distributedlog.DLSN;
import com.twitter.distributedlog.LogRecord;
import com.twitter.distributedlog.acl.AccessControlManager;
import com.twitter.distributedlog.exceptions.AlreadyClosedException;
import com.twitter.distributedlog.exceptions.DLException;
import com.twitter.distributedlog.exceptions.LockingException;
import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
import com.twitter.distributedlog.exceptions.RequestDeniedException;
import com.twitter.distributedlog.service.ResponseUtils;
import com.twitter.distributedlog.thrift.service.BulkWriteResponse;
import com.twitter.distributedlog.thrift.service.ResponseHeader;
import com.twitter.distributedlog.thrift.service.StatusCode;
import com.twitter.distributedlog.thrift.service.WriteResponse;
import com.twitter.distributedlog.util.Sequencer;
import com.twitter.util.ConstFuture;
import com.twitter.util.Future;
import com.twitter.util.Future$;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Try;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import scala.runtime.AbstractFunction1;

/* loaded from: input_file:com/twitter/distributedlog/service/stream/BulkWriteOp.class */
public class BulkWriteOp extends AbstractStreamOp<BulkWriteResponse> implements WriteOpWithPayload {
    private final List<ByteBuffer> buffers;
    private final long payloadSize;
    private final Counter deniedBulkWriteCounter;
    private final Counter successRecordCounter;
    private final Counter failureRecordCounter;
    private final Counter redirectRecordCounter;
    private final OpStatsLogger latencyStat;
    private final Counter bytes;
    private final Counter bulkWriteBytes;
    private final AccessControlManager accessControlManager;

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isDefiniteFailure(Try<DLSN> r3) {
        boolean z = false;
        try {
            r3.get();
        } catch (Exception e) {
            if ((e instanceof OwnershipAcquireFailedException) || (e instanceof AlreadyClosedException) || (e instanceof LockingException)) {
                z = true;
            }
        }
        return z;
    }

    public BulkWriteOp(String str, List<ByteBuffer> list, StatsLogger statsLogger, StatsLogger statsLogger2, Long l, Feature feature, AccessControlManager accessControlManager) {
        super(str, requestStat(statsLogger, "bulkWrite"), l, feature);
        this.buffers = list;
        long j = 0;
        while (list.iterator().hasNext()) {
            j += r0.next().remaining();
        }
        this.payloadSize = j;
        StreamOpStats streamOpStats = new StreamOpStats(statsLogger, statsLogger2);
        this.deniedBulkWriteCounter = streamOpStats.requestDeniedCounter("bulkWrite");
        this.successRecordCounter = streamOpStats.recordsCounter("success");
        this.failureRecordCounter = streamOpStats.recordsCounter("failure");
        this.redirectRecordCounter = streamOpStats.recordsCounter("redirect");
        this.bulkWriteBytes = streamOpStats.scopedRequestCounter("bulkWrite", "bytes");
        this.latencyStat = streamOpStats.streamRequestLatencyStat(str, "bulkWrite");
        this.bytes = streamOpStats.streamRequestCounter(str, "bulkWrite", "bytes");
        this.accessControlManager = accessControlManager;
        final long payloadSize = getPayloadSize();
        result().addEventListener(new FutureEventListener<BulkWriteResponse>() { // from class: com.twitter.distributedlog.service.stream.BulkWriteOp.1
            public void onSuccess(BulkWriteResponse bulkWriteResponse) {
                if (bulkWriteResponse.getHeader().getCode() != StatusCode.SUCCESS) {
                    BulkWriteOp.this.latencyStat.registerFailedEvent(BulkWriteOp.this.stopwatch().elapsed(TimeUnit.MICROSECONDS));
                    return;
                }
                BulkWriteOp.this.latencyStat.registerSuccessfulEvent(BulkWriteOp.this.stopwatch().elapsed(TimeUnit.MICROSECONDS));
                BulkWriteOp.this.bytes.add(payloadSize);
                BulkWriteOp.this.bulkWriteBytes.add(payloadSize);
            }

            public void onFailure(Throwable th) {
                BulkWriteOp.this.latencyStat.registerFailedEvent(BulkWriteOp.this.stopwatch().elapsed(TimeUnit.MICROSECONDS));
            }
        });
    }

    @Override // com.twitter.distributedlog.service.stream.AbstractStreamOp, com.twitter.distributedlog.service.stream.StreamOp
    public void preExecute() throws DLException {
        if (this.accessControlManager.allowWrite(this.stream)) {
            super.preExecute();
        } else {
            this.deniedBulkWriteCounter.inc();
            throw new RequestDeniedException(this.stream, "bulkWrite");
        }
    }

    @Override // com.twitter.distributedlog.service.stream.WriteOpWithPayload
    public long getPayloadSize() {
        return this.payloadSize;
    }

    @Override // com.twitter.distributedlog.service.stream.AbstractStreamOp
    protected Future<BulkWriteResponse> executeOp(AsyncLogWriter asyncLogWriter, Sequencer sequencer, Object obj) {
        Future<List<Future<DLSN>>> writeBulk;
        synchronized (obj) {
            writeBulk = asyncLogWriter.writeBulk(asRecordList(this.buffers, sequencer));
        }
        return asTryList(writeBulk).flatMap(new AbstractFunction1<List<Try<DLSN>>, Future<BulkWriteResponse>>() { // from class: com.twitter.distributedlog.service.stream.BulkWriteOp.2
            public Future<BulkWriteResponse> apply(List<Try<DLSN>> list) {
                ArrayList arrayList = new ArrayList(list.size());
                BulkWriteResponse writeResponses = ResponseUtils.bulkWriteSuccess().setWriteResponses(arrayList);
                if (list.size() > 0) {
                    Try<DLSN> r0 = list.get(0);
                    if (BulkWriteOp.this.isDefiniteFailure(r0)) {
                        return new ConstFuture(r0);
                    }
                }
                Iterator<Try<DLSN>> it = list.iterator();
                while (it.hasNext()) {
                    try {
                        arrayList.add(ResponseUtils.writeSuccess().setDlsn(((DLSN) it.next().get()).serialize()));
                        BulkWriteOp.this.successRecordCounter.inc();
                    } catch (Exception e) {
                        WriteResponse write = ResponseUtils.write(ResponseUtils.exceptionToHeader(e));
                        arrayList.add(write);
                        if (StatusCode.FOUND == write.getHeader().getCode()) {
                            BulkWriteOp.this.redirectRecordCounter.inc();
                        } else {
                            BulkWriteOp.this.failureRecordCounter.inc();
                        }
                    }
                }
                return Future.value(writeResponses);
            }
        });
    }

    private List<LogRecord> asRecordList(List<ByteBuffer> list, Sequencer sequencer) {
        ArrayList arrayList = new ArrayList(list.size());
        for (ByteBuffer byteBuffer : list) {
            byte[] bArr = new byte[byteBuffer.remaining()];
            byteBuffer.get(bArr);
            arrayList.add(new LogRecord(sequencer.nextId(), bArr));
        }
        return arrayList;
    }

    private Future<List<Try<DLSN>>> asTryList(Future<List<Future<DLSN>>> future) {
        return future.flatMap(new AbstractFunction1<List<Future<DLSN>>, Future<List<Try<DLSN>>>>() { // from class: com.twitter.distributedlog.service.stream.BulkWriteOp.3
            public Future<List<Try<DLSN>>> apply(List<Future<DLSN>> list) {
                return Future$.MODULE$.collectToTry(list);
            }
        });
    }

    @Override // com.twitter.distributedlog.service.stream.AbstractStreamOp
    protected void fail(ResponseHeader responseHeader) {
        if (StatusCode.FOUND == responseHeader.getCode()) {
            this.redirectRecordCounter.add(this.buffers.size());
        } else {
            this.failureRecordCounter.add(this.buffers.size());
        }
        setResponse(ResponseUtils.bulkWrite(responseHeader));
    }

    @Override // com.twitter.distributedlog.service.stream.StreamOp
    public Future<ResponseHeader> responseHeader() {
        return result().map(new AbstractFunction1<BulkWriteResponse, ResponseHeader>() { // from class: com.twitter.distributedlog.service.stream.BulkWriteOp.4
            public ResponseHeader apply(BulkWriteResponse bulkWriteResponse) {
                return bulkWriteResponse.getHeader();
            }
        });
    }
}
