package io.choerodon.event.consumer.factory;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.choerodon.core.event.EventPayload;
import io.choerodon.event.consumer.CommonUtils;
import io.choerodon.event.consumer.EventConsumerProperties;
import io.choerodon.event.consumer.annotation.EventListener;
import io.choerodon.event.consumer.domain.EventConsumer;
import io.choerodon.event.consumer.domain.MsgExecuteBean;
import io.choerodon.event.consumer.handler.MsgHandler;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StringUtils;

/* loaded from: input_file:io/choerodon/event/consumer/factory/KafkaMessageConsumerFactory.class */
public class KafkaMessageConsumerFactory implements MessageConsumerFactory {
    private static final Logger LOGGER = LoggerFactory.getLogger(MessageConsumerFactory.class);
    private ObjectMapper mapper = new ObjectMapper();
    private Properties kafkaProperties;
    private ExecutorService executorService;
    private MsgHandler msgHandler;
    private EventConsumerProperties consumerProperties;

    public KafkaMessageConsumerFactory(Properties properties, MsgHandler msgHandler, EventConsumerProperties eventConsumerProperties, ExecutorService executorService) {
        this.kafkaProperties = properties;
        this.msgHandler = msgHandler;
        this.consumerProperties = eventConsumerProperties;
        this.executorService = executorService;
    }

    @Override // io.choerodon.event.consumer.factory.MessageConsumerFactory
    public void createConsumer(Method method, Object obj, EventListener eventListener, TypeReference typeReference) {
    }

    @Override // io.choerodon.event.consumer.factory.MessageConsumerFactory
    public void createConsumers(List<EventConsumer> list) {
        ArrayList arrayList = new ArrayList();
        Iterator<EventConsumer> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().eventListener.topic());
        }
        this.executorService.execute(() -> {
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(this.kafkaProperties);
            Throwable th = null;
            try {
                kafkaConsumer.subscribe(arrayList);
                while (true) {
                    ConsumerRecords poll = kafkaConsumer.poll(100L);
                    for (TopicPartition topicPartition : poll.partitions()) {
                        receiveMsg(poll.records(topicPartition), kafkaConsumer, list, topicPartition);
                    }
                }
            } catch (Throwable th2) {
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                throw th2;
            }
        });
    }

    private void receiveMsg(List<ConsumerRecord<String, String>> list, KafkaConsumer<String, String> kafkaConsumer, List<EventConsumer> list2, TopicPartition topicPartition) {
        try {
            list.parallelStream().forEach(consumerRecord -> {
                String businessType = CommonUtils.getBusinessType((String) consumerRecord.value());
                if (StringUtils.isEmpty(businessType)) {
                    LOGGER.warn("businessType is null, skip this payload {}", consumerRecord.value());
                }
                EventConsumer eventConsumer = getEventConsumer(list2, consumerRecord.topic(), businessType);
                if (eventConsumer != null) {
                    MsgExecuteBean msgExecuteBean = new MsgExecuteBean(eventConsumer.eventListener, eventConsumer.method, eventConsumer.object, this.consumerProperties);
                    try {
                        msgExecuteBean.setPayloadJson((String) consumerRecord.value());
                        msgExecuteBean.setPayload((EventPayload) this.mapper.readValue((String) consumerRecord.value(), eventConsumer.payLoadType));
                        msgExecuteBean.setMessageTimestamp(Long.valueOf(consumerRecord.timestamp()));
                        msgExecuteBean.setKafkaPartition(Integer.valueOf(consumerRecord.partition()));
                        msgExecuteBean.setKafkaOffset(Long.valueOf(consumerRecord.offset()));
                        this.msgHandler.execute(msgExecuteBean);
                    } catch (IOException e) {
                        LOGGER.warn("message deserialize error, {}", e.toString());
                    }
                }
            });
            kafkaConsumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(list.get(list.size() - 1).offset() + 1)));
        } catch (Exception e) {
            long offset = list.get(0).offset();
            LOGGER.warn("consume message failed, seek partition {} offset to {} error {}", new Object[]{topicPartition, Long.valueOf(offset), e.toString()});
            kafkaConsumer.seek(topicPartition, offset);
        }
    }

    private EventConsumer getEventConsumer(List<EventConsumer> list, String str, String str2) {
        for (EventConsumer eventConsumer : list) {
            List asList = Arrays.asList(eventConsumer.eventListener.businessType());
            if (eventConsumer.eventListener.topic().equals(str) && asList.contains(str2)) {
                return eventConsumer;
            }
        }
        return null;
    }
}
