package org.kitesdk.data.spi.filesystem;

import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetWriter;
import org.kitesdk.data.Flushable;
import org.kitesdk.data.PartitionStrategy;
import org.kitesdk.data.Syncable;
import org.kitesdk.data.URIBuilder;
import org.kitesdk.data.ValidationException;
import org.kitesdk.data.spi.AbstractDatasetWriter;
import org.kitesdk.data.spi.EntityAccessor;
import org.kitesdk.data.spi.PartitionListener;
import org.kitesdk.data.spi.ReaderWriterState;
import org.kitesdk.data.spi.StorageKey;
import org.kitesdk.shaded.com.google.common.base.Objects;
import org.kitesdk.shaded.com.google.common.base.Preconditions;
import org.kitesdk.shaded.com.google.common.cache.CacheBuilder;
import org.kitesdk.shaded.com.google.common.cache.CacheLoader;
import org.kitesdk.shaded.com.google.common.cache.LoadingCache;
import org.kitesdk.shaded.com.google.common.cache.RemovalListener;
import org.kitesdk.shaded.com.google.common.cache.RemovalNotification;
import org.kitesdk.shaded.com.google.common.util.concurrent.UncheckedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kitesdk/data/spi/filesystem/PartitionedDatasetWriter.class */
class PartitionedDatasetWriter<E> extends AbstractDatasetWriter<E> {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionedDatasetWriter.class);
    private static final int DEFAULT_WRITER_CACHE_SIZE = 10;
    private FileSystemView<E> view;
    private final int maxWriters;
    private final PartitionStrategy partitionStrategy;
    private LoadingCache<StorageKey, DatasetWriter<E>> cachedWriters;
    private final StorageKey reusedKey;
    private final EntityAccessor<E> accessor;
    private final Map<String, Object> provided;
    private ReaderWriterState state;

    /* loaded from: input_file:org/kitesdk/data/spi/filesystem/PartitionedDatasetWriter$DatasetWriterCacheLoader.class */
    private static class DatasetWriterCacheLoader<E> extends CacheLoader<StorageKey, DatasetWriter<E>> {
        private final FileSystemView<E> view;
        private final PathConversion convert;

        public DatasetWriterCacheLoader(FileSystemView<E> fileSystemView) {
            this.view = fileSystemView;
            this.convert = new PathConversion(fileSystemView.getDataset().getDescriptor().getSchema());
        }

        @Override // org.kitesdk.shaded.com.google.common.cache.CacheLoader
        public DatasetWriter<E> load(StorageKey storageKey) throws Exception {
            Preconditions.checkState(this.view.getDataset() instanceof FileSystemDataset, "FileSystemWriters cannot create writer for " + this.view.getDataset());
            FileSystemDataset fileSystemDataset = (FileSystemDataset) this.view.getDataset();
            Path fromKey = this.convert.fromKey(storageKey);
            FileSystemWriter newWriter = FileSystemWriter.newWriter(fileSystemDataset.getFileSystem(), new Path(fileSystemDataset.getDirectory(), fromKey), fileSystemDataset.getDescriptor());
            PartitionListener partitionListener = fileSystemDataset.getPartitionListener();
            if (partitionListener != null) {
                partitionListener.partitionAdded(fileSystemDataset.getNamespace(), fileSystemDataset.getName(), fromKey.toString());
            }
            newWriter.initialize();
            return newWriter;
        }
    }

    /* loaded from: input_file:org/kitesdk/data/spi/filesystem/PartitionedDatasetWriter$DatasetWriterCloser.class */
    private static class DatasetWriterCloser<E> implements RemovalListener<StorageKey, DatasetWriter<E>> {
        private DatasetWriterCloser() {
        }

        @Override // org.kitesdk.shaded.com.google.common.cache.RemovalListener
        public void onRemoval(RemovalNotification<StorageKey, DatasetWriter<E>> removalNotification) {
            DatasetWriter<E> value = removalNotification.getValue();
            PartitionedDatasetWriter.LOG.debug("Closing writer:{} for partition:{}", value, removalNotification.getKey());
            value.close();
        }
    }

    public PartitionedDatasetWriter(FileSystemView<E> fileSystemView) {
        DatasetDescriptor descriptor = fileSystemView.getDataset().getDescriptor();
        Preconditions.checkArgument(descriptor.isPartitioned(), "Dataset " + fileSystemView.getDataset() + " is not partitioned");
        this.view = fileSystemView;
        this.partitionStrategy = descriptor.getPartitionStrategy();
        int i = 10;
        if (descriptor.hasProperty(FileSystemProperties.WRITER_CACHE_SIZE_PROP)) {
            try {
                i = Integer.parseInt(descriptor.getProperty(FileSystemProperties.WRITER_CACHE_SIZE_PROP));
            } catch (NumberFormatException e) {
                LOG.warn("Not an integer: kite.writer.cache-size=" + descriptor.getProperty(FileSystemProperties.WRITER_CACHE_SIZE_PROP));
            }
        } else if (this.partitionStrategy.getCardinality() != -1) {
            i = Math.min(10, this.partitionStrategy.getCardinality());
        }
        this.maxWriters = i;
        this.state = ReaderWriterState.NEW;
        this.reusedKey = new StorageKey(this.partitionStrategy);
        this.accessor = fileSystemView.getAccessor();
        this.provided = fileSystemView.getProvidedValues();
    }

    @Override // org.kitesdk.data.spi.InitializeAccessor
    public void initialize() {
        Preconditions.checkState(this.state.equals(ReaderWriterState.NEW), "Unable to open a writer from state:%s", this.state);
        DatasetDescriptor descriptor = this.view.getDataset().getDescriptor();
        ValidationException.check(FileSystemWriter.isSupportedFormat(descriptor), "Not a supported format: %s", descriptor.getFormat());
        LOG.debug("Opening partitioned dataset writer w/strategy:{}", this.partitionStrategy);
        this.cachedWriters = CacheBuilder.newBuilder().maximumSize(this.maxWriters).removalListener(new DatasetWriterCloser()).build(new DatasetWriterCacheLoader(this.view));
        this.state = ReaderWriterState.OPEN;
    }

    @Override // org.kitesdk.data.DatasetWriter
    public void write(E e) {
        Preconditions.checkState(this.state.equals(ReaderWriterState.OPEN), "Attempt to write to a writer in state:%s", this.state);
        this.accessor.keyFor(e, this.provided, this.reusedKey);
        DatasetWriter<E> ifPresent = this.cachedWriters.getIfPresent(this.reusedKey);
        if (ifPresent == null) {
            Preconditions.checkArgument(this.view.includes(e), "View %s does not include entity %s", this.view, e);
            try {
                ifPresent = this.cachedWriters.getUnchecked(StorageKey.copy(this.reusedKey));
            } catch (UncheckedExecutionException e2) {
                throw new IllegalArgumentException("Problem creating view for entity: " + e, e2.getCause());
            }
        }
        ifPresent.write(e);
    }

    @Override // org.kitesdk.data.DatasetWriter, org.kitesdk.data.Flushable, java.io.Flushable
    @SuppressWarnings(value = {"BC_VACUOUS_INSTANCEOF", "SIO_SUPERFLUOUS_INSTANCEOF"}, justification = "Flushable will be removed from DatasetWriter in 1.0.0")
    public void flush() {
        Preconditions.checkState(this.state.equals(ReaderWriterState.OPEN), "Attempt to flush a writer in state:%s", this.state);
        LOG.debug("Flushing all cached writers for view:{}", this.view);
        for (DatasetWriter<E> datasetWriter : this.cachedWriters.asMap().values()) {
            LOG.debug("Flushing partition writer:{}", datasetWriter);
            if (datasetWriter instanceof Flushable) {
                datasetWriter.flush();
            }
        }
    }

    @Override // org.kitesdk.data.DatasetWriter, org.kitesdk.data.Syncable
    @SuppressWarnings(value = {"BC_VACUOUS_INSTANCEOF", "SIO_SUPERFLUOUS_INSTANCEOF"}, justification = "Syncable will be removed from DatasetWriter in 1.0.0")
    public void sync() {
        Preconditions.checkState(this.state.equals(ReaderWriterState.OPEN), "Attempt to sync a writer in state:%s", this.state);
        LOG.debug("Syncing all cached writers for view:{}", this.view);
        for (DatasetWriter<E> datasetWriter : this.cachedWriters.asMap().values()) {
            LOG.debug("Syncing partition writer:{}", datasetWriter);
            if (datasetWriter instanceof Syncable) {
                datasetWriter.sync();
            }
        }
    }

    @Override // org.kitesdk.data.DatasetWriter, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.state.equals(ReaderWriterState.OPEN)) {
            LOG.debug("Closing all cached writers for view:{}", this.view);
            for (DatasetWriter<E> datasetWriter : this.cachedWriters.asMap().values()) {
                LOG.debug("Closing partition writer:{}", datasetWriter);
                datasetWriter.close();
            }
            this.state = ReaderWriterState.CLOSED;
        }
    }

    @Override // org.kitesdk.data.DatasetWriter
    public boolean isOpen() {
        return this.state.equals(ReaderWriterState.OPEN);
    }

    public String toString() {
        return Objects.toStringHelper(this).add("partitionStrategy", this.partitionStrategy).add("maxWriters", this.maxWriters).add(URIBuilder.VIEW_SCHEME, this.view).add("cachedWriters", this.cachedWriters).toString();
    }
}
