package org.apache.crunch.lib.join;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Iterator;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.FilterFn;
import org.apache.crunch.MapFn;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.ParallelDoOptions;
import org.apache.crunch.ReadableData;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
import org.apache.crunch.types.avro.AvroMode;
import org.apache.crunch.types.avro.AvroType;
import org.apache.crunch.types.avro.AvroTypeFamily;
import org.apache.crunch.types.avro.Avros;
import org.apache.crunch.types.writable.WritableType;
import org.apache.crunch.types.writable.WritableTypeFamily;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.bloom.BloomFilter;
import org.apache.hadoop.util.bloom.Key;

/* loaded from: input_file:org/apache/crunch/lib/join/BloomFilterJoinStrategy.class */
public class BloomFilterJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
    private int vectorSize;
    private int nbHash;
    private JoinStrategy<K, U, V> delegateJoinStrategy;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/crunch/lib/join/BloomFilterJoinStrategy$AvroToBytesFn.class */
    public static class AvroToBytesFn<T> extends MapFn<T, byte[]> {
        private AvroType<T> ptype;
        private BinaryEncoder encoder;
        private DatumWriter datumWriter;

        AvroToBytesFn(AvroType<T> avroType, Configuration configuration) {
            this.ptype = avroType;
            this.datumWriter = AvroMode.fromType(avroType).withFactoryFromConfiguration(configuration).mo87getWriter(avroType.getSchema());
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.crunch.MapFn
        public byte[] map(T t) {
            Object map = this.ptype.getOutputMapFn().map(t);
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            this.encoder = EncoderFactory.get().binaryEncoder(byteArrayOutputStream, this.encoder);
            try {
                this.datumWriter.write(map, this.encoder);
                this.encoder.flush();
                return byteArrayOutputStream.toByteArray();
            } catch (IOException e) {
                throw new CrunchRuntimeException(e);
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.crunch.MapFn
        public /* bridge */ /* synthetic */ byte[] map(Object obj) {
            return map((AvroToBytesFn<T>) obj);
        }
    }

    /* loaded from: input_file:org/apache/crunch/lib/join/BloomFilterJoinStrategy$CreateBloomFilterFn.class */
    private static class CreateBloomFilterFn<K> extends DoFn<K, BloomFilter> {
        private int vectorSize;
        private int nbHash;
        private transient BloomFilter bloomFilter;
        private transient MapFn<K, byte[]> keyToBytesFn;
        private PType<K> ptype;

        CreateBloomFilterFn(int i, int i2, PType<K> pType) {
            this.vectorSize = i;
            this.nbHash = i2;
            this.ptype = pType;
        }

        @Override // org.apache.crunch.DoFn
        public void initialize() {
            super.initialize();
            this.bloomFilter = new BloomFilter(this.vectorSize, this.nbHash, 1);
            this.ptype.initialize(getConfiguration());
            this.keyToBytesFn = BloomFilterJoinStrategy.getKeyToBytesMapFn(this.ptype, getConfiguration());
        }

        @Override // org.apache.crunch.DoFn
        public void process(K k, Emitter<BloomFilter> emitter) {
            this.bloomFilter.add(new Key(this.keyToBytesFn.map(k)));
        }

        @Override // org.apache.crunch.DoFn
        public void cleanup(Emitter<BloomFilter> emitter) {
            emitter.emit(this.bloomFilter);
        }
    }

    /* loaded from: input_file:org/apache/crunch/lib/join/BloomFilterJoinStrategy$FilterKeysWithBloomFilterFn.class */
    private static class FilterKeysWithBloomFilterFn<K, V> extends FilterFn<Pair<K, V>> {
        private int vectorSize;
        private int nbHash;
        private PType<K> keyType;
        private PType<BloomFilter> bloomFilterPType;
        private transient BloomFilter bloomFilter;
        private transient MapFn<K, byte[]> keyToBytesFn;
        private ReadableData<BloomFilter> bloomData;

        FilterKeysWithBloomFilterFn(ReadableData<BloomFilter> readableData, int i, int i2, PType<K> pType) {
            this.bloomData = readableData;
            this.vectorSize = i;
            this.nbHash = i2;
            this.keyType = pType;
        }

        @Override // org.apache.crunch.DoFn
        public void configure(Configuration configuration) {
            this.bloomData.configure(configuration);
        }

        @Override // org.apache.crunch.DoFn
        public void initialize() {
            super.initialize();
            this.keyType.initialize(getConfiguration());
            this.keyToBytesFn = BloomFilterJoinStrategy.getKeyToBytesMapFn(this.keyType, getConfiguration());
            try {
                Iterable<BloomFilter> read = this.bloomData.read(getContext());
                this.bloomFilter = new BloomFilter(this.vectorSize, this.nbHash, 1);
                Iterator<BloomFilter> it = read.iterator();
                while (it.hasNext()) {
                    this.bloomFilter.or(it.next());
                }
            } catch (IOException e) {
                throw new CrunchRuntimeException("Error reading right-side of map side join: ", e);
            }
        }

        @Override // org.apache.crunch.FilterFn
        public boolean accept(Pair<K, V> pair) {
            return this.bloomFilter.membershipTest(new Key(this.keyToBytesFn.map(pair.first())));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/crunch/lib/join/BloomFilterJoinStrategy$WritableToBytesFn.class */
    public static class WritableToBytesFn<T> extends MapFn<T, byte[]> {
        private WritableType<T, ?> ptype;
        private DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();

        WritableToBytesFn(WritableType<T, ?> writableType, Configuration configuration) {
            this.ptype = writableType;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.crunch.MapFn
        public byte[] map(T t) {
            this.dataOutputBuffer.reset();
            try {
                ((Writable) this.ptype.getOutputMapFn().map(t)).write(this.dataOutputBuffer);
                byte[] bArr = new byte[this.dataOutputBuffer.getLength()];
                System.arraycopy(this.dataOutputBuffer.getData(), 0, bArr, 0, this.dataOutputBuffer.getLength());
                return bArr;
            } catch (IOException e) {
                throw new CrunchRuntimeException(e);
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.crunch.MapFn
        public /* bridge */ /* synthetic */ byte[] map(Object obj) {
            return map((WritableToBytesFn<T>) obj);
        }
    }

    public BloomFilterJoinStrategy(int i) {
        this(i, 0.05f);
    }

    public BloomFilterJoinStrategy(int i, float f) {
        this(i, f, new DefaultJoinStrategy());
    }

    public BloomFilterJoinStrategy(int i, float f, JoinStrategy<K, U, V> joinStrategy) {
        this.vectorSize = getOptimalVectorSize(i, f);
        this.nbHash = getOptimalNumHash(i, this.vectorSize);
        this.delegateJoinStrategy = joinStrategy;
    }

    private static int getOptimalVectorSize(int i, float f) {
        return (int) (((-i) * ((float) Math.log(f))) / Math.pow(Math.log(2.0d), 2.0d));
    }

    private static int getOptimalNumHash(int i, float f) {
        return (int) Math.round((f * Math.log(2.0d)) / i);
    }

    @Override // org.apache.crunch.lib.join.JoinStrategy
    public PTable<K, Pair<U, V>> join(PTable<K, U> pTable, PTable<K, V> pTable2, JoinType joinType) {
        if (joinType != JoinType.INNER_JOIN && joinType != JoinType.LEFT_OUTER_JOIN) {
            throw new IllegalStateException("JoinType " + joinType + " is not supported for BloomFilter joins");
        }
        ReadableData asReadable = pTable.keys().parallelDo("Create bloom filters", new CreateBloomFilterFn(this.vectorSize, this.nbHash, pTable.getKeyType()), getBloomFilterType(pTable.getTypeFamily())).asReadable(true);
        FilterKeysWithBloomFilterFn filterKeysWithBloomFilterFn = new FilterKeysWithBloomFilterFn(asReadable, this.vectorSize, this.nbHash, pTable.getKeyType());
        ParallelDoOptions.Builder builder = ParallelDoOptions.builder();
        builder.sourceTargets(asReadable.getSourceTargets());
        return this.delegateJoinStrategy.join(pTable, pTable2.parallelDo("Filter right side with BloomFilters", (DoFn) filterKeysWithBloomFilterFn, (PTableType) pTable2.getPTableType(), builder.build()), joinType);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <K> MapFn<K, byte[]> getKeyToBytesMapFn(PType<K> pType, Configuration configuration) {
        if (pType instanceof AvroType) {
            return new AvroToBytesFn((AvroType) pType, configuration);
        }
        if (pType instanceof WritableType) {
            return new WritableToBytesFn((WritableType) pType, configuration);
        }
        throw new IllegalStateException("Unrecognized PType: " + pType);
    }

    private static PType<BloomFilter> getBloomFilterType(PTypeFamily pTypeFamily) {
        if (pTypeFamily.equals(AvroTypeFamily.getInstance())) {
            return Avros.writables(BloomFilter.class);
        }
        if (pTypeFamily.equals(WritableTypeFamily.getInstance())) {
            return Writables.writables(BloomFilter.class);
        }
        throw new IllegalStateException("Unrecognized PTypeFamily: " + pTypeFamily);
    }
}
