package io.fluo.core.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.data.Mutation;

/* loaded from: input_file:io/fluo/core/impl/SharedBatchWriter.class */
public class SharedBatchWriter {
    private final BatchWriter bw;
    private ArrayBlockingQueue<MutationBatch> mQueue = new ArrayBlockingQueue<>(1000);
    private MutationBatch end = new MutationBatch(new ArrayList());

    /* loaded from: input_file:io/fluo/core/impl/SharedBatchWriter$FlushTask.class */
    private class FlushTask implements Runnable {
        private FlushTask() {
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z = true;
            while (z) {
                try {
                    ArrayList arrayList = new ArrayList();
                    arrayList.add(SharedBatchWriter.this.mQueue.take());
                    SharedBatchWriter.this.mQueue.drainTo(arrayList);
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        MutationBatch mutationBatch = (MutationBatch) it.next();
                        if (mutationBatch != SharedBatchWriter.this.end) {
                            SharedBatchWriter.this.bw.addMutations(mutationBatch.mutations);
                        }
                    }
                    SharedBatchWriter.this.bw.flush();
                    Iterator it2 = arrayList.iterator();
                    while (it2.hasNext()) {
                        MutationBatch mutationBatch2 = (MutationBatch) it2.next();
                        if (mutationBatch2 == SharedBatchWriter.this.end) {
                            z = false;
                        }
                        mutationBatch2.cdl.countDown();
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/fluo/core/impl/SharedBatchWriter$MutationBatch.class */
    public static class MutationBatch {
        private List<Mutation> mutations;
        private CountDownLatch cdl = new CountDownLatch(1);

        public MutationBatch(Mutation mutation) {
            this.mutations = Collections.singletonList(mutation);
        }

        public MutationBatch(List<Mutation> list) {
            this.mutations = list;
        }
    }

    public SharedBatchWriter(BatchWriter batchWriter) {
        this.bw = batchWriter;
        Thread thread = new Thread(new FlushTask());
        thread.setDaemon(true);
        thread.start();
    }

    public void writeMutation(Mutation mutation) {
        try {
            MutationBatch mutationBatch = new MutationBatch(mutation);
            this.mQueue.put(mutationBatch);
            mutationBatch.cdl.await();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void writeMutations(List<Mutation> list) {
        if (list.size() == 0) {
            return;
        }
        try {
            MutationBatch mutationBatch = new MutationBatch(list);
            this.mQueue.put(mutationBatch);
            mutationBatch.cdl.await();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void close() {
        try {
            this.mQueue.put(this.end);
            this.end.cdl.await();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void writeMutationAsync(Mutation mutation) {
        try {
            this.mQueue.put(new MutationBatch(mutation));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
