package xin.manong.stream.boost.receiver.ots;

import com.alicloud.openservices.tablestore.model.StreamRecord;
import com.alicloud.openservices.tablestore.tunnel.worker.IChannelProcessor;
import com.alicloud.openservices.tablestore.tunnel.worker.ProcessRecordsInput;
import java.util.Iterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import xin.manong.stream.sdk.receiver.ReceiveProcessor;

/* loaded from: input_file:xin/manong/stream/boost/receiver/ots/OTSChannelProcessor.class */
public class OTSChannelProcessor implements IChannelProcessor {
    private static final Logger logger = LoggerFactory.getLogger(OTSChannelProcessor.class);
    private ReceiveProcessor receiveProcessor;

    public OTSChannelProcessor(ReceiveProcessor receiveProcessor) {
        this.receiveProcessor = receiveProcessor;
    }

    public void process(ProcessRecordsInput processRecordsInput) {
        Iterator it = processRecordsInput.getRecords().iterator();
        while (it.hasNext()) {
            try {
                this.receiveProcessor.process((StreamRecord) it.next());
            } catch (Throwable th) {
                logger.error("process stream record failed for trace[{}] and token[{}]", processRecordsInput.getTraceId(), processRecordsInput.getNextToken());
                logger.error(th.getMessage(), th);
                throw new RuntimeException(th);
            }
        }
    }

    public void shutdown() {
        logger.info("channel processor has been shutdown");
    }
}
