package cz.o2.proxima.beam.io;

import cz.o2.proxima.core.functional.Consumer;
import cz.o2.proxima.core.repository.RepositoryFactory;
import cz.o2.proxima.core.storage.StreamElement;
import cz.o2.proxima.direct.core.DirectDataOperator;
import cz.o2.proxima.direct.core.OnlineAttributeWriter;
import cz.o2.proxima.internal.com.google.common.annotations.VisibleForTesting;
import lombok.Generated;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/o2/proxima/beam/io/ProximaIO.class */
public class ProximaIO {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(ProximaIO.class);

    /* loaded from: input_file:cz/o2/proxima/beam/io/ProximaIO$Write.class */
    public static class Write extends PTransform<PCollection<StreamElement>, PDone> {
        private final RepositoryFactory repositoryFactory;

        private Write(RepositoryFactory repositoryFactory) {
            this.repositoryFactory = repositoryFactory;
        }

        public PDone expand(PCollection<StreamElement> pCollection) {
            pCollection.apply("Write", ParDo.of(new WriteFn(this.repositoryFactory)));
            return PDone.in(pCollection.getPipeline());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cz/o2/proxima/beam/io/ProximaIO$WriteFn.class */
    public static class WriteFn extends DoFn<StreamElement, Void> {
        private final RepositoryFactory repositoryFactory;
        private transient DirectDataOperator direct;

        WriteFn(RepositoryFactory repositoryFactory) {
            this.repositoryFactory = repositoryFactory;
        }

        @VisibleForTesting
        DirectDataOperator getDirect() {
            return this.direct;
        }

        @DoFn.Setup
        public void setUp() {
            this.direct = this.repositoryFactory.apply().getOrCreateOperator(DirectDataOperator.class, new Consumer[0]);
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element StreamElement streamElement) {
            ((OnlineAttributeWriter) this.direct.getWriter(streamElement.getAttributeDescriptor()).orElseThrow(() -> {
                return new IllegalArgumentException(String.format("Missing writer for [%s].", streamElement.getAttributeDescriptor()));
            })).write(streamElement, (z, th) -> {
                if (th != null) {
                    ProximaIO.log.error(String.format("Unable to write element [%s].", streamElement), th);
                }
            });
        }

        @DoFn.Teardown
        public void tearDown() {
            if (this.direct != null) {
                this.direct.close();
            }
        }
    }

    private ProximaIO() {
    }

    public static Write write(RepositoryFactory repositoryFactory) {
        return new Write(repositoryFactory);
    }
}
