package io.choerodon.event.consumer.handler;

import io.choerodon.event.consumer.CommonUtils;
import io.choerodon.event.consumer.DuplicateRemoveListener;
import io.choerodon.event.consumer.EventConsumerProperties;
import io.choerodon.event.consumer.domain.FailedMsg;
import io.choerodon.event.consumer.domain.MsgExecuteBean;
import io.choerodon.event.consumer.exception.SendEventStoreException;
import io.choerodon.event.consumer.retry.RetryFactory;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.web.client.RestTemplate;

/* loaded from: input_file:io/choerodon/event/consumer/handler/DefaultMsgHandlerImpl.class */
public class DefaultMsgHandlerImpl implements MsgHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(MsgHandler.class);
    private Set<String> beingHandlingUuid = new HashSet();
    private DataSourceTransactionManager transactionManager;
    private DuplicateRemoveListener listener;
    private Optional<RetryFactory> retryFactory;
    private RestTemplate restTemplate;

    @Value("${event.store.service.name:hap-event-store-service}")
    private String eventStoreService;

    public DefaultMsgHandlerImpl(DataSourceTransactionManager dataSourceTransactionManager, DuplicateRemoveListener duplicateRemoveListener, Optional<RetryFactory> optional, RestTemplate restTemplate) {
        this.transactionManager = dataSourceTransactionManager;
        this.listener = duplicateRemoveListener;
        this.retryFactory = optional;
        this.restTemplate = restTemplate;
    }

    @Override // io.choerodon.event.consumer.handler.MsgHandler
    public void executeAsync(MsgExecuteBean msgExecuteBean) {
        execute(msgExecuteBean);
    }

    @Override // io.choerodon.event.consumer.handler.MsgHandler
    public void execute(MsgExecuteBean msgExecuteBean) {
        if (filter(msgExecuteBean)) {
            return;
        }
        DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition();
        defaultTransactionDefinition.setPropagationBehavior(msgExecuteBean.eventListener.transactionDefinition());
        TransactionStatus transaction = this.transactionManager.getTransaction(defaultTransactionDefinition);
        try {
            msgExecuteBean.method.setAccessible(true);
            msgExecuteBean.method.invoke(msgExecuteBean.object, msgExecuteBean.getPayload());
            msgSucceededHandle(msgExecuteBean);
            this.transactionManager.commit(transaction);
        } catch (Exception e) {
            LOGGER.warn("message consume exception, msg : {}, cause {}", msgExecuteBean.getPayload(), e.toString());
            if (LOGGER.isDebugEnabled()) {
                e.printStackTrace();
            }
            this.transactionManager.rollback(transaction);
            msgExecuteBean.setExceptionMessage(CommonUtils.getErrorInfoFromException(e));
            msgFailedHandle(msgExecuteBean);
        }
    }

    private void msgSucceededHandle(MsgExecuteBean msgExecuteBean) {
        if (msgExecuteBean.consumerProperties.isEnableDuplicateRemove()) {
            this.beingHandlingUuid.remove(msgExecuteBean.getPayload().getUuid());
            this.listener.after(msgExecuteBean.getPayload().getUuid());
        }
        msgExecuteBean.setSuccess(true);
    }

    private void msgFailedHandle(MsgExecuteBean msgExecuteBean) {
        if (this.retryFactory.isPresent() && !msgExecuteBean.isRetry() && msgExecuteBean.eventListener.retryTimes() > 0) {
            msgExecuteBean.setRetry(true);
            this.retryFactory.get().addRetry(msgExecuteBean);
        } else if (!msgExecuteBean.isRetry() || msgExecuteBean.getHasRetryTimes().get() >= msgExecuteBean.eventListener.retryTimes()) {
            if (msgExecuteBean.consumerProperties.isEnableDuplicateRemove()) {
                this.beingHandlingUuid.remove(msgExecuteBean.getPayload().getUuid());
            }
            if (EventConsumerProperties.FAILED_STRATEGY_EVENT_STORE.equals(msgExecuteBean.consumerProperties.getFailedStrategy())) {
                msgFailedSendEventStoreCallback(msgExecuteBean);
            }
        }
    }

    private void msgFailedSendEventStoreCallback(MsgExecuteBean msgExecuteBean) {
        ResponseEntity postForEntity = this.restTemplate.postForEntity("http://" + this.eventStoreService + "/v1/messages/failed", new FailedMsg(msgExecuteBean.getPayload().getUuid(), msgExecuteBean.eventListener.topic(), msgExecuteBean.getPayloadJson(), msgExecuteBean.getExceptionMessage(), msgExecuteBean.getKafkaPartition(), msgExecuteBean.getKafkaOffset(), msgExecuteBean.getMessageTimestamp()), Void.class, new Object[0]);
        if (!postForEntity.getStatusCode().is2xxSuccessful()) {
            throw new SendEventStoreException("error has happened when send msg to event store, http status:" + postForEntity.getStatusCodeValue());
        }
    }

    @Override // io.choerodon.event.consumer.handler.MsgHandler
    public boolean filter(MsgExecuteBean msgExecuteBean) {
        if (msgExecuteBean.isRetry()) {
            return false;
        }
        String uuid = msgExecuteBean.getPayload().getUuid();
        synchronized (this) {
            if (!(msgExecuteBean.consumerProperties.isEnableDuplicateRemove() && (this.beingHandlingUuid.contains(uuid) || this.listener.hasBeanConsumed(uuid)))) {
                return false;
            }
            LOGGER.debug("skip message by filter, duplicate uuid found, uuid {}", uuid);
            this.beingHandlingUuid.add(uuid);
            return true;
        }
    }
}
