package io.choerodon.event.consumer;

import io.choerodon.core.ChoerodonCoreAutoConfiguration;
import io.choerodon.core.convertor.ApplicationContextHelper;
import io.choerodon.core.oauth.CustomUserDetails;
import io.choerodon.event.consumer.annotation.EventListener;
import io.choerodon.event.consumer.domain.EventConsumer;
import io.choerodon.event.consumer.exception.CannotFindTypeReferenceException;
import io.choerodon.event.consumer.exception.RepeatBusinessTypeException;
import io.choerodon.event.consumer.factory.KafkaMessageConsumerFactory;
import io.choerodon.event.consumer.factory.MessageConsumerFactory;
import io.choerodon.event.consumer.factory.RabbitMessageConsumerFactory;
import io.choerodon.event.consumer.factory.RedisMessageConsumerFactory;
import io.choerodon.event.consumer.factory.RocketMessageConsumerFactory;
import io.choerodon.event.consumer.handler.DefaultMsgHandlerImpl;
import io.choerodon.event.consumer.handler.MsgHandler;
import io.choerodon.event.consumer.mapper.EventConsumerRecordMapper;
import io.choerodon.event.consumer.retry.RetryFactory;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.codehaus.jackson.map.ObjectMapper;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.impl.StdSchedulerFactory;
import org.reflections.Reflections;
import org.reflections.scanners.MethodAnnotationsScanner;
import org.reflections.scanners.Scanner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.http.HttpRequest;
import org.springframework.http.client.ClientHttpRequestExecution;
import org.springframework.http.client.ClientHttpRequestInterceptor;
import org.springframework.http.client.ClientHttpResponse;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.security.jwt.JwtHelper;
import org.springframework.security.jwt.crypto.sign.MacSigner;
import org.springframework.security.jwt.crypto.sign.Signer;
import org.springframework.util.StopWatch;
import org.springframework.web.client.RestTemplate;

@EnableScheduling
@EnableConfigurationProperties({EventConsumerProperties.class})
@Configuration
@EnableAsync
@ConditionalOnProperty(prefix = "choerodon.event.consumer", name = {"enabled"}, matchIfMissing = true)
@Import({ChoerodonCoreAutoConfiguration.class})
/* loaded from: input_file:io/choerodon/event/consumer/EventConsumerAutoConfiguration.class */
public class EventConsumerAutoConfiguration {
    public static final String EVENT_BEAN_PREFIX = "event_consumer_helper_";
    private static final String DEFAULT_KAFKA_CONSUME_GROUP = "default-group";
    private static final Logger LOGGER = LoggerFactory.getLogger(EventConsumerAutoConfiguration.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final Signer SIGNER = new MacSigner("hand");
    private static final CustomUserDetails DEFAULT_USER = new CustomUserDetails("default", "unknown", Collections.emptyList());

    @ConditionalOnClass({KafkaConsumer.class})
    @ConditionalOnProperty(prefix = "choerodon.event.consumer", name = {"queue-type"}, havingValue = "kafka", matchIfMissing = true)
    /* loaded from: input_file:io/choerodon/event/consumer/EventConsumerAutoConfiguration$Kafka.class */
    static class Kafka {

        @Value("${spring.application.name:default}")
        private String applicationName;

        Kafka() {
        }

        @Autowired
        @Bean({"kafkaPropertiesMap"})
        public Properties kafkaPropertiesMap(EventConsumerProperties eventConsumerProperties) {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", eventConsumerProperties.getKafka().getBootstrapServers());
            properties.put("session.timeout.ms", Integer.valueOf(eventConsumerProperties.getKafka().getSessionTimeoutMs()));
            properties.put("enable.auto.commit", false);
            properties.put("auto.offset.reset", eventConsumerProperties.getKafka().getAutoOffsetReset());
            properties.put("max.poll.records", Integer.valueOf(eventConsumerProperties.getKafka().getMaxPollRecords()));
            properties.put("max.poll.interval.ms", Integer.valueOf(eventConsumerProperties.getKafka().getMaxPollIntervalMs()));
            properties.put("fetch.max.bytes", Integer.valueOf(eventConsumerProperties.getKafka().getFetchMaxBytes()));
            properties.put("fetch.max.wait.ms", Integer.valueOf(eventConsumerProperties.getKafka().getFetchMaxWaitMs()));
            properties.put("heartbeat.interval.ms", Integer.valueOf(eventConsumerProperties.getKafka().getHeartbeatIntervalMs()));
            properties.put("fetch.min.bytes", Integer.valueOf(eventConsumerProperties.getKafka().getFetchMaxBytes()));
            properties.put("key.deserializer", StringDeserializer.class);
            properties.put("value.deserializer", StringDeserializer.class);
            properties.put("partition.assignment.strategy", eventConsumerProperties.getKafka().getPartitionAssignmentStrategy());
            properties.put("send.buffer.bytes", Integer.valueOf(eventConsumerProperties.getKafka().getSendBufferBytes()));
            properties.put("receive.buffer.bytes", Integer.valueOf(eventConsumerProperties.getKafka().getReceiveBufferBytes()));
            properties.put("client.id", eventConsumerProperties.getKafka().getClientId());
            properties.put("reconnect.backoff.ms", Long.valueOf(eventConsumerProperties.getKafka().getReconnectBackoffMs()));
            properties.put("reconnect.backoff.max.ms", Long.valueOf(eventConsumerProperties.getKafka().getReconnectBackoffMaxMs()));
            properties.put("retry.backoff.ms", Long.valueOf(eventConsumerProperties.getKafka().getRetryBackoffMs()));
            properties.put("metrics.sample.window.ms", Long.valueOf(eventConsumerProperties.getKafka().getMetricsSampleWindowMs()));
            properties.put("metrics.num.samples", Integer.valueOf(eventConsumerProperties.getKafka().getMetricsNumSample()));
            properties.put("metrics.recording.level", eventConsumerProperties.getKafka().getMetricsRecordingLevel());
            properties.put("metric.reporters", eventConsumerProperties.getKafka().getMetricReporters());
            properties.put("security.protocol", eventConsumerProperties.getKafka().getSecurityProtocol());
            properties.put("connections.max.idle.ms", Long.valueOf(eventConsumerProperties.getKafka().getConnectionsMaxIdleMs()));
            properties.put("request.timeout.ms", Integer.valueOf(eventConsumerProperties.getKafka().getRequestTimeoutMs()));
            properties.put("check.crcs", Boolean.valueOf(eventConsumerProperties.getKafka().isCheckCrcs()));
            properties.put("interceptor.classes", eventConsumerProperties.getKafka().getInterceptorClasses());
            properties.put("exclude.internal.topics", Boolean.valueOf(eventConsumerProperties.getKafka().isExcludeInternalTopics()));
            properties.put("isolation.level", eventConsumerProperties.getKafka().getIsolationLevel());
            properties.put("group.id", this.applicationName);
            if (EventConsumerAutoConfiguration.DEFAULT_KAFKA_CONSUME_GROUP.equals(this.applicationName)) {
                EventConsumerAutoConfiguration.LOGGER.warn("Please set spring.application.name，otherwise 'default' is used as the default group");
            } else {
                EventConsumerAutoConfiguration.LOGGER.info("kafka consumer group name is {}", this.applicationName);
            }
            return properties;
        }

        @Bean
        public ExecutorService queueReceiveExecutor() {
            return Executors.newCachedThreadPool();
        }

        @Bean
        public MessageConsumerFactory kafkaMessageConsumeFactory(MsgHandler msgHandler, @Qualifier("kafkaPropertiesMap") Properties properties, EventConsumerProperties eventConsumerProperties) {
            return new KafkaMessageConsumerFactory(properties, msgHandler, eventConsumerProperties, queueReceiveExecutor());
        }
    }

    /* loaded from: input_file:io/choerodon/event/consumer/EventConsumerAutoConfiguration$OAuthAuthorizationInterceptor.class */
    class OAuthAuthorizationInterceptor implements ClientHttpRequestInterceptor {
        OAuthAuthorizationInterceptor() {
        }

        public ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bArr, ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
            try {
                String str = "Bearer " + JwtHelper.encode(EventConsumerAutoConfiguration.OBJECT_MAPPER.writeValueAsString(EventConsumerAutoConfiguration.DEFAULT_USER), EventConsumerAutoConfiguration.SIGNER).getEncoded();
                EventConsumerAutoConfiguration.LOGGER.info("token {}", str);
                httpRequest.getHeaders().add("Authorization", str);
            } catch (IOException e) {
                EventConsumerAutoConfiguration.LOGGER.warn("IOException happen when add RestTemplate JWT token, {}", e.getCause());
            }
            return clientHttpRequestExecution.execute(httpRequest, bArr);
        }
    }

    @ConditionalOnClass({ConnectionFactory.class})
    @ConditionalOnProperty(prefix = "choerodon.event.consumer", name = {"queue-type"}, havingValue = "rabbitmq")
    /* loaded from: input_file:io/choerodon/event/consumer/EventConsumerAutoConfiguration$Rabbitmq.class */
    static class Rabbitmq {
        Rabbitmq() {
        }

        @Bean
        public MessageConsumerFactory rabbitMessageConsumeFactory(ConnectionFactory connectionFactory, DataSourceTransactionManager dataSourceTransactionManager, DuplicateRemoveListener duplicateRemoveListener, Optional<RetryFactory> optional) {
            return new RabbitMessageConsumerFactory(dataSourceTransactionManager, connectionFactory, duplicateRemoveListener, optional);
        }
    }

    @ConditionalOnClass({RedisConnectionFactory.class})
    @ConditionalOnProperty(prefix = "choerodon.event.consumer", name = {"queue-type"}, havingValue = "redis")
    /* loaded from: input_file:io/choerodon/event/consumer/EventConsumerAutoConfiguration$Redis.class */
    static class Redis {
        Redis() {
        }

        @Bean
        public MessageConsumerFactory redisMessageConsumeFactory(DataSourceTransactionManager dataSourceTransactionManager, RedisConnectionFactory redisConnectionFactory, DuplicateRemoveListener duplicateRemoveListener, Optional<RetryFactory> optional) {
            return new RedisMessageConsumerFactory(dataSourceTransactionManager, redisConnectionFactory, duplicateRemoveListener, optional);
        }
    }

    @ConditionalOnProperty(prefix = "choerodon.event.consumer", name = {"retry.enabled"}, havingValue = "true")
    /* loaded from: input_file:io/choerodon/event/consumer/EventConsumerAutoConfiguration$Retry.class */
    static class Retry {
        Retry() {
        }

        @Bean
        public Scheduler retryScheduler() {
            Scheduler scheduler = null;
            try {
                scheduler = new StdSchedulerFactory().getScheduler();
                scheduler.start();
            } catch (SchedulerException e) {
                EventConsumerAutoConfiguration.LOGGER.warn("error happen when start scheduler, {}", e.getCause());
            }
            return scheduler;
        }

        @Bean
        public RetryFactory retryFactory() {
            return new RetryFactory(retryScheduler());
        }
    }

    @ConditionalOnClass({DefaultMQPushConsumer.class})
    @ConditionalOnProperty(prefix = "choerodon.event.consumer", name = {"queue-type"}, havingValue = "rocketmq")
    /* loaded from: input_file:io/choerodon/event/consumer/EventConsumerAutoConfiguration$Rocketmq.class */
    static class Rocketmq {
        Rocketmq() {
        }

        @Bean
        public MessageConsumerFactory rocketMessageConsumeFactory(DataSourceTransactionManager dataSourceTransactionManager, EventConsumerProperties eventConsumerProperties, DuplicateRemoveListener duplicateRemoveListener, Optional<RetryFactory> optional) {
            return new RocketMessageConsumerFactory(dataSourceTransactionManager, eventConsumerProperties, duplicateRemoveListener, optional);
        }
    }

    @ConditionalOnMissingBean
    @Bean
    @LoadBalanced
    public RestTemplate restTemplate() {
        RestTemplate restTemplate = new RestTemplate();
        restTemplate.setInterceptors(Collections.singletonList(new OAuthAuthorizationInterceptor()));
        return restTemplate;
    }

    @Autowired
    @Bean
    public DuplicateRemoveListener duplicateRemoveListener(EventConsumerRecordMapper eventConsumerRecordMapper) {
        return new DefaultDuplicateRemoveListener(eventConsumerRecordMapper);
    }

    @Bean
    public MsgHandler msgHandler(DataSourceTransactionManager dataSourceTransactionManager, DuplicateRemoveListener duplicateRemoveListener, Optional<RetryFactory> optional, RestTemplate restTemplate) {
        return new DefaultMsgHandlerImpl(dataSourceTransactionManager, duplicateRemoveListener, optional, restTemplate);
    }

    @Autowired
    @Bean(name = {"runAndGetMethods"})
    public Boolean runAndGetTopics(MessageConsumerFactory messageConsumerFactory) throws RepeatBusinessTypeException, CannotFindTypeReferenceException {
        HashSet hashSet = new HashSet();
        StopWatch stopWatch = new StopWatch();
        stopWatch.start();
        Set<Method> methodsAnnotatedWith = new Reflections("", new Scanner[]{new MethodAnnotationsScanner()}).getMethodsAnnotatedWith(EventListener.class);
        ArrayList arrayList = new ArrayList(methodsAnnotatedWith.size());
        if (methodsAnnotatedWith.isEmpty()) {
            stopWatch.stop();
            return true;
        }
        for (Method method : methodsAnnotatedWith) {
            EventListener eventListener = (EventListener) AnnotationUtils.findAnnotation(method, EventListener.class);
            for (String str : eventListener.businessType()) {
                String str2 = eventListener.topic() + str;
                if (hashSet.contains(str2)) {
                    throw new RepeatBusinessTypeException(eventListener.topic(), str);
                }
                hashSet.add(str2);
            }
            arrayList.add(new EventConsumer(method, ApplicationContextHelper.getSpringFactory().getBean(method.getDeclaringClass()), eventListener, CommonUtils.getTypeReference(method)));
        }
        try {
            messageConsumerFactory.createConsumers(arrayList);
        } catch (Exception e) {
            LOGGER.warn("error happen when create consumer, {}", e.getCause());
        }
        stopWatch.stop();
        return true;
    }

    static {
        DEFAULT_USER.setLanguage("zh_CN");
        DEFAULT_USER.setTimeZone("CCT");
        DEFAULT_USER.setUserId(0L);
        DEFAULT_USER.setOrganizationId(1L);
    }
}
