package org.dei.perla.core.channel;

import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;
import org.dei.perla.core.utils.Check;

/* loaded from: input_file:org/dei/perla/core/channel/AbstractChannel.class */
public abstract class AbstractChannel implements Channel {
    private final Logger log;
    private final String id;
    private final BlockingQueue<FutureIOTask> pending = new LinkedBlockingQueue();
    private volatile IOHandler asyncHandler = null;
    private final Thread dispatcher = new Thread(this::dispatch);
    private final AtomicBoolean stopped = new AtomicBoolean(false);

    /* loaded from: input_file:org/dei/perla/core/channel/AbstractChannel$FutureIOTask.class */
    private class FutureIOTask implements IOTask {
        private static final int NEW = 0;
        private static final int SCHEDULED = 1;
        private static final int FINISHED = 2;
        private static final int CANCELLED = 3;
        private final AtomicInteger state = new AtomicInteger(NEW);
        private final IORequest request;
        private final IOHandler handler;

        public FutureIOTask(IORequest iORequest, IOHandler iOHandler) {
            this.request = iORequest;
            this.handler = iOHandler;
        }

        public void run() throws InterruptedException {
            if (this.state.compareAndSet(NEW, SCHEDULED)) {
                try {
                    complete(AbstractChannel.this.handleRequest(this.request));
                } catch (InterruptedException e) {
                    error(new ChannelException("IOTask interrupted while processing I/O request", e));
                    AbstractChannel.this.close();
                } catch (ChannelException e2) {
                    AbstractChannel.this.log.error("An error occurred while processing an I/O Request", e2);
                    error(e2);
                } catch (Exception e3) {
                    AbstractChannel.this.log.error("Unexpected error while processing an I/O Request", e3);
                    error(new ChannelException("Unexpected error while processing an I/O Request", e3));
                }
            }
        }

        private void complete(Payload payload) {
            if (this.state.compareAndSet(SCHEDULED, FINISHED)) {
                this.handler.complete(this.request, Optional.ofNullable(payload));
            }
        }

        private void error(Throwable th) {
            if (this.state.compareAndSet(SCHEDULED, FINISHED)) {
                this.handler.error(this.request, th);
            }
        }

        @Override // org.dei.perla.core.channel.IOTask
        public boolean isDone() {
            return this.state.get() > SCHEDULED;
        }

        @Override // org.dei.perla.core.channel.IOTask
        public void cancel() {
            if (this.state.compareAndSet(NEW, CANCELLED)) {
                this.handler.error(this.request, new IOTaskCancelledException());
            }
        }

        @Override // org.dei.perla.core.channel.IOTask
        public IORequest getRequest() {
            return this.request;
        }

        @Override // org.dei.perla.core.channel.IOTask
        public boolean isCancelled() {
            return this.state.get() == CANCELLED;
        }
    }

    public AbstractChannel(String str) {
        this.id = str;
        this.log = Logger.getLogger(getClass().getCanonicalName() + "_" + str);
        this.dispatcher.start();
    }

    @Override // org.dei.perla.core.channel.Channel
    public String getId() {
        return this.id;
    }

    @Override // org.dei.perla.core.channel.Channel
    public void setAsyncIOHandler(IOHandler iOHandler) throws IllegalStateException {
        if (this.asyncHandler != null) {
            throw new IllegalStateException("An IOHandler has already been set for this Channel");
        }
        this.asyncHandler = (IOHandler) Check.notNull(iOHandler, "handler");
    }

    @Override // org.dei.perla.core.channel.Channel
    public IOTask submit(IORequest iORequest, IOHandler iOHandler) throws ChannelException {
        if (this.stopped.get()) {
            throw new ChannelException("Cannot process IORequest: Channel is not running");
        }
        FutureIOTask futureIOTask = new FutureIOTask(iORequest, iOHandler);
        synchronized (futureIOTask) {
            this.pending.add(futureIOTask);
        }
        return futureIOTask;
    }

    private void dispatch() {
        while (!this.stopped.get() && !Thread.currentThread().isInterrupted()) {
            try {
                FutureIOTask take = this.pending.take();
                synchronized (take) {
                    if (!take.isCancelled()) {
                        take.run();
                    }
                }
            } catch (InterruptedException e) {
            } catch (Exception e2) {
                this.log.error("Unexpected Channel error", e2);
            }
        }
        while (true) {
            FutureIOTask poll = this.pending.poll();
            if (poll == null) {
                return;
            } else {
                poll.cancel();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void notifyAsyncData(Payload payload) {
        if (this.asyncHandler == null || this.stopped.get()) {
            return;
        }
        this.asyncHandler.complete(null, Optional.ofNullable(payload));
    }

    protected void notifyAsyncError(Throwable th) {
        if (this.asyncHandler == null || this.stopped.get()) {
            return;
        }
        this.log.error("Asynchronous reception error", th);
        this.asyncHandler.error(null, th);
    }

    protected int pendingRequestCount() {
        return this.pending.size();
    }

    @Override // org.dei.perla.core.channel.Channel
    public boolean isClosed() {
        return this.stopped.get();
    }

    @Override // org.dei.perla.core.channel.Channel
    public void close() {
        if (this.stopped.compareAndSet(false, true)) {
            this.dispatcher.interrupt();
        }
    }

    public abstract Payload handleRequest(IORequest iORequest) throws ChannelException, InterruptedException;
}
