package io.debezium.relational.history;

import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.admin.AdminClient;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.admin.Config;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.admin.DescribeTopicsResult;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.admin.NewTopic;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.admin.TopicDescription;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.consumer.ConsumerConfig;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.consumer.KafkaConsumer;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.consumer.OffsetResetStrategy;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.producer.KafkaProducer;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.producer.ProducerConfig;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.producer.ProducerRecord;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.clients.producer.RecordMetadata;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.Node;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.TopicPartition;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.config.ConfigDef;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.config.ConfigResource;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.errors.UnsupportedVersionException;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.serialization.StringDeserializer;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.common.serialization.StringSerializer;
import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException;
import io.debezium.DebeziumException;
import io.debezium.annotation.NotThreadSafe;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.document.DocumentReader;
import io.debezium.relational.HistorizedRelationalDatabaseConnectorConfig;
import io.debezium.util.Collect;
import io.debezium.util.Threads;
import java.io.IOException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:io/debezium/relational/history/KafkaDatabaseHistory.class */
public class KafkaDatabaseHistory extends AbstractDatabaseHistory {
    private static final String CLEANUP_POLICY_NAME = "cleanup.policy";
    private static final String CLEANUP_POLICY_VALUE = "delete";
    private static final String RETENTION_MS_NAME = "retention.ms";
    private static final long RETENTION_MS_MAX = Long.MAX_VALUE;
    private static final String RETENTION_BYTES_NAME = "retention.bytes";
    private static final int UNLIMITED_VALUE = -1;
    private static final short PARTITION_COUNT = 1;
    private static final String DEFAULT_TOPIC_REPLICATION_FACTOR_PROP_NAME = "default.replication.factor";
    private static final short DEFAULT_TOPIC_REPLICATION_FACTOR = 1;
    private static final String CONSUMER_PREFIX = "database.history.consumer.";
    private static final String PRODUCER_PREFIX = "database.history.producer.";
    private final DocumentReader reader = DocumentReader.defaultReader();
    private String topicName;
    private Configuration consumerConfig;
    private Configuration producerConfig;
    private volatile KafkaProducer<String, String> producer;
    private int maxRecoveryAttempts;
    private Duration pollInterval;
    private ExecutorService checkTopicSettingsExecutor;
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaDatabaseHistory.class);
    private static final long RETENTION_MS_MIN = Duration.of(1825, ChronoUnit.DAYS).toMillis();
    public static final Field TOPIC = Field.create("database.history.kafka.topic").withDisplayName("Database history topic name").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("The name of the topic for the database schema history").withValidation(forKafka(Field::isRequired));
    public static final Field BOOTSTRAP_SERVERS = Field.create("database.history.kafka.bootstrap.servers").withDisplayName("Kafka broker addresses").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("A list of host/port pairs that the connector will use for establishing the initial connection to the Kafka cluster for retrieving database schema history previously stored by the connector. This should point to the same Kafka cluster used by the Kafka Connect process.").withValidation(forKafka(Field::isRequired));
    public static final Field RECOVERY_POLL_INTERVAL_MS = Field.create("database.history.kafka.recovery.poll.interval.ms").withDisplayName("Poll interval during database history recovery (ms)").withType(ConfigDef.Type.INT).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.LOW).withDescription("The number of milliseconds to wait while polling for persisted data during recovery.").withDefault(100).withValidation(Field::isNonNegativeInteger);
    public static final Field RECOVERY_POLL_ATTEMPTS = Field.create("database.history.kafka.recovery.attempts").withDisplayName("Max attempts to recovery database history").withType(ConfigDef.Type.INT).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.LOW).withDescription("The number of attempts in a row that no data are returned from Kafka before recover completes. The maximum amount of time to wait after receiving no data is (recovery.attempts) x (recovery.poll.interval.ms).").withDefault(100).withValidation(Field::isInteger);
    public static final Field INTERNAL_CONNECTOR_CLASS = Field.create("database.history.connector.class").withDisplayName("Debezium connector class").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.LONG).withImportance(ConfigDef.Importance.HIGH).withDescription("The class of the Debezium database connector").withNoValidation();
    public static final Field INTERNAL_CONNECTOR_ID = Field.create("database.history.connector.id").withDisplayName("Debezium connector identifier").withType(ConfigDef.Type.STRING).withWidth(ConfigDef.Width.SHORT).withImportance(ConfigDef.Importance.HIGH).withDescription("The unique identifier of the Debezium connector").withNoValidation();
    public static Field.Set ALL_FIELDS = Field.setOf(TOPIC, BOOTSTRAP_SERVERS, DatabaseHistory.NAME, RECOVERY_POLL_INTERVAL_MS, RECOVERY_POLL_ATTEMPTS, INTERNAL_CONNECTOR_CLASS, INTERNAL_CONNECTOR_ID);
    private static final Duration KAFKA_QUERY_TIMEOUT = Duration.ofSeconds(3);
    private static final Integer PARTITION = 0;

    @Override // io.debezium.relational.history.AbstractDatabaseHistory, io.debezium.relational.history.DatabaseHistory
    public void configure(Configuration configuration, HistoryRecordComparator historyRecordComparator, DatabaseHistoryListener databaseHistoryListener, boolean z) {
        super.configure(configuration, historyRecordComparator, databaseHistoryListener, z);
        Field.Set set = ALL_FIELDS;
        Logger logger = LOGGER;
        Objects.requireNonNull(logger);
        if (!configuration.validateAndRecord(set, logger::error)) {
            throw new ConnectException("Error configuring an instance of " + getClass().getSimpleName() + "; check the logs for details");
        }
        this.topicName = configuration.getString(TOPIC);
        this.pollInterval = Duration.ofMillis(configuration.getInteger(RECOVERY_POLL_INTERVAL_MS));
        this.maxRecoveryAttempts = configuration.getInteger(RECOVERY_POLL_ATTEMPTS);
        String string = configuration.getString(BOOTSTRAP_SERVERS);
        String string2 = configuration.getString(DatabaseHistory.NAME, UUID.randomUUID().toString());
        this.consumerConfig = configuration.subset(CONSUMER_PREFIX, true).edit().withDefault("bootstrap.servers", string).withDefault("client.id", string2).withDefault("group.id", string2).withDefault(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1).withDefault(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false).withDefault("session.timeout.ms", 10000).withDefault(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, OffsetResetStrategy.EARLIEST.toString().toLowerCase()).withDefault(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class).withDefault(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class).build();
        this.producerConfig = configuration.subset(PRODUCER_PREFIX, true).edit().withDefault("bootstrap.servers", string).withDefault("client.id", string2).withDefault(ProducerConfig.ACKS_CONFIG, 1).withDefault("retries", 1).withDefault("batch.size", 32768).withDefault(ProducerConfig.LINGER_MS_CONFIG, 0).withDefault(ProducerConfig.BUFFER_MEMORY_CONFIG, 1048576).withDefault(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class).withDefault(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class).withDefault(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000).build();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("KafkaDatabaseHistory Consumer config: {}", this.consumerConfig.withMaskedPasswords());
            LOGGER.info("KafkaDatabaseHistory Producer config: {}", this.producerConfig.withMaskedPasswords());
        }
        try {
            String string3 = configuration.getString(INTERNAL_CONNECTOR_CLASS);
            if (string3 != null) {
                this.checkTopicSettingsExecutor = Threads.newSingleThreadExecutor(Class.forName(string3), configuration.getString(INTERNAL_CONNECTOR_ID), "db-history-config-check", true);
            }
        } catch (ClassNotFoundException e) {
            throw new DebeziumException(e);
        }
    }

    @Override // io.debezium.relational.history.AbstractDatabaseHistory, io.debezium.relational.history.DatabaseHistory
    public synchronized void start() {
        super.start();
        if (this.producer == null) {
            this.producer = new KafkaProducer<>(this.producerConfig.asProperties());
        }
    }

    @Override // io.debezium.relational.history.AbstractDatabaseHistory
    protected void storeRecord(HistoryRecord historyRecord) throws DatabaseHistoryException {
        if (this.producer == null) {
            throw new IllegalStateException("No producer is available. Ensure that 'start()' is called before storing database history records.");
        }
        LOGGER.trace("Storing record into database history: {}", historyRecord);
        try {
            Future<RecordMetadata> send = this.producer.send(new ProducerRecord<>(this.topicName, PARTITION, null, historyRecord.toString()));
            this.producer.flush();
            RecordMetadata recordMetadata = send.get();
            if (recordMetadata != null) {
                LOGGER.debug("Stored record in topic '{}' partition {} at offset {} ", new Object[]{recordMetadata.topic(), Integer.valueOf(recordMetadata.partition()), Long.valueOf(recordMetadata.offset())});
            }
        } catch (InterruptedException e) {
            LOGGER.trace("Interrupted before record was written into database history: {}", historyRecord);
            Thread.currentThread().interrupt();
            throw new DatabaseHistoryException(e);
        } catch (ExecutionException e2) {
            throw new DatabaseHistoryException(e2);
        }
    }

    @Override // io.debezium.relational.history.AbstractDatabaseHistory
    protected void recoverRecords(Consumer<HistoryRecord> consumer) {
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(this.consumerConfig.asProperties());
        try {
            LOGGER.debug("Subscribing to database history topic '{}'", this.topicName);
            kafkaConsumer.subscribe(Collect.arrayListOf(this.topicName, new String[0]));
            long j = -1;
            Long l = null;
            int i = 0;
            while (i <= this.maxRecoveryAttempts) {
                l = getEndOffsetOfDbHistoryTopic(l, kafkaConsumer);
                LOGGER.debug("End offset of database history topic is {}", l);
                int i2 = 0;
                Iterator<ConsumerRecord<String, String>> it = kafkaConsumer.poll(this.pollInterval.toMillis()).iterator();
                while (it.hasNext()) {
                    ConsumerRecord<String, String> next = it.next();
                    try {
                        if (j < next.offset()) {
                            if (next.value() == null) {
                                LOGGER.warn("Skipping null database history record. This is often not an issue, but if it happens repeatedly please check the '{}' topic.", this.topicName);
                            } else {
                                HistoryRecord historyRecord = new HistoryRecord(this.reader.read(next.value()));
                                LOGGER.trace("Recovering database history: {}", historyRecord);
                                if (historyRecord == null || !historyRecord.isValid()) {
                                    LOGGER.warn("Skipping invalid database history record '{}'. This is often not an issue, but if it happens repeatedly please check the '{}' topic.", historyRecord, this.topicName);
                                } else {
                                    consumer.accept(historyRecord);
                                    LOGGER.trace("Recovered database history: {}", historyRecord);
                                }
                            }
                            j = next.offset();
                            i2++;
                        }
                    } catch (IOException e) {
                        LOGGER.error("Error while deserializing history record '{}'", next, e);
                    } catch (Exception e2) {
                        LOGGER.error("Unexpected exception while processing record '{}'", next, e2);
                        throw e2;
                    }
                }
                if (i2 == 0) {
                    LOGGER.debug("No new records found in the database history; will retry");
                    i++;
                } else {
                    LOGGER.debug("Processed {} records from database history", Integer.valueOf(i2));
                }
                if (j >= l.longValue() - 1) {
                    kafkaConsumer.close();
                    return;
                }
            }
            throw new IllegalStateException("The database history couldn't be recovered. Consider to increase the value for " + RECOVERY_POLL_INTERVAL_MS.name());
        } catch (Throwable th) {
            try {
                kafkaConsumer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private Long getEndOffsetOfDbHistoryTopic(Long l, KafkaConsumer<String, String> kafkaConsumer) {
        Long value = kafkaConsumer.endOffsets(Collections.singleton(new TopicPartition(this.topicName, PARTITION.intValue()))).entrySet().iterator().next().getValue();
        if (l == null || l.equals(value)) {
            return value;
        }
        throw new IllegalStateException("Detected changed end offset of database history topic (previous: " + l + ", current: " + value + "). Make sure that the same history topic isn't shared by multiple connector instances.");
    }

    @Override // io.debezium.relational.history.DatabaseHistory
    public boolean storageExists() {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(this.consumerConfig.asProperties());
        try {
            boolean containsKey = kafkaConsumer.listTopics().containsKey(this.topicName);
            kafkaConsumer.close();
            return containsKey;
        } catch (Throwable th) {
            try {
                kafkaConsumer.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Override // io.debezium.relational.history.DatabaseHistory
    public boolean exists() {
        boolean z = false;
        if (storageExists()) {
            KafkaConsumer kafkaConsumer = new KafkaConsumer(this.consumerConfig.asProperties());
            try {
                checkTopicSettings(this.topicName);
                Set singleton = Collections.singleton(new TopicPartition(this.topicName, PARTITION.intValue()));
                z = kafkaConsumer.endOffsets(singleton).entrySet().iterator().next().getValue().longValue() > kafkaConsumer.beginningOffsets(singleton).entrySet().iterator().next().getValue().longValue();
                kafkaConsumer.close();
            } catch (Throwable th) {
                try {
                    kafkaConsumer.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
                throw th;
            }
        }
        return z;
    }

    private void checkTopicSettings(String str) {
        if (this.checkTopicSettingsExecutor == null || this.checkTopicSettingsExecutor.isShutdown()) {
            return;
        }
        this.checkTopicSettingsExecutor.execute(() -> {
            AdminClient create;
            Map<ConfigResource, Config> map;
            String str2 = this.producerConfig.getString("client.id") + "-topic-check";
            Properties asProperties = this.producerConfig.asProperties();
            asProperties.put("client.id", str2);
            try {
                create = AdminClient.create(asProperties);
                try {
                    map = create.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, str))).all().get(KAFKA_QUERY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
                } finally {
                }
            } catch (Throwable th) {
                LOGGER.info("Attempted to validate database history topic but failed", th);
            }
            if (map.size() != 1) {
                LOGGER.info("Expected one topic '{}' to match the query but got {}", str, Integer.valueOf(map.values().size()));
                if (create != null) {
                    create.close();
                    return;
                }
                return;
            }
            Config next = map.values().iterator().next();
            if (next == null) {
                LOGGER.info("Could not get config for topic '{}'", next);
                if (create != null) {
                    create.close();
                    return;
                }
                return;
            }
            String value = next.get("cleanup.policy").value();
            if (!"delete".equals(value)) {
                LOGGER.warn("Database history topic '{}' option '{}' should be '{}' but is '{}'", new Object[]{str, "cleanup.policy", "delete", value});
                if (create != null) {
                    create.close();
                    return;
                }
                return;
            }
            String value2 = next.get("retention.bytes").value();
            if (value2 != null && Long.parseLong(value2) != -1) {
                LOGGER.warn("Database history topic '{}' option '{}' should be '{}' but is '{}'", new Object[]{str, "retention.bytes", -1, value2});
                if (create != null) {
                    create.close();
                    return;
                }
                return;
            }
            String value3 = next.get("retention.ms").value();
            if (value3 != null && Long.parseLong(value3) != -1 && Long.parseLong(value3) < RETENTION_MS_MIN) {
                LOGGER.warn("Database history topic '{}' option '{}' should be '{}' or greater than '{}' (5 years) but is '{}'", new Object[]{str, "retention.ms", -1, Long.valueOf(RETENTION_MS_MIN), value3});
                if (create != null) {
                    create.close();
                    return;
                }
                return;
            }
            DescribeTopicsResult describeTopics = create.describeTopics(Collections.singleton(str));
            if (describeTopics.values().size() != 1) {
                LOGGER.info("Expected one topic '{}' to match the query but got {}", str, Integer.valueOf(describeTopics.values().size()));
                if (create != null) {
                    create.close();
                    return;
                }
                return;
            }
            TopicDescription topicDescription = describeTopics.values().values().iterator().next().get();
            if (topicDescription == null) {
                LOGGER.info("Could not get description for topic '{}'", str);
                if (create != null) {
                    create.close();
                    return;
                }
                return;
            }
            int size = topicDescription.partitions().size();
            if (size != 1) {
                LOGGER.warn("Database history topic '{}' should have one partiton but has '{}'", str, Integer.valueOf(size));
                if (create != null) {
                    create.close();
                    return;
                }
                return;
            }
            LOGGER.info("Database history topic '{}' has correct settings", str);
            if (create != null) {
                create.close();
            }
            stopCheckTopicSettingsExecutor();
        });
    }

    @Override // io.debezium.relational.history.AbstractDatabaseHistory, io.debezium.relational.history.DatabaseHistory
    public synchronized void stop() {
        stopCheckTopicSettingsExecutor();
        try {
            if (this.producer != null) {
                try {
                    this.producer.flush();
                    this.producer.close(Duration.ofSeconds(30L));
                } catch (Throwable th) {
                    this.producer.close(Duration.ofSeconds(30L));
                    throw th;
                }
            }
        } finally {
            this.producer = null;
            super.stop();
        }
    }

    private void stopCheckTopicSettingsExecutor() {
        if (this.checkTopicSettingsExecutor != null) {
            this.checkTopicSettingsExecutor.shutdown();
        }
    }

    public String toString() {
        return this.topicName != null ? "Kafka topic " + this.topicName + ":" + PARTITION + " using brokers at " + this.producerConfig.getString(BOOTSTRAP_SERVERS) : "Kafka topic";
    }

    protected static String consumerConfigPropertyName(String str) {
        return CONSUMER_PREFIX + str;
    }

    @Override // io.debezium.relational.history.AbstractDatabaseHistory, io.debezium.relational.history.DatabaseHistory
    public void initializeStorage() {
        super.initializeStorage();
        try {
            AdminClient create = AdminClient.create(this.producerConfig.asProperties());
            try {
                NewTopic newTopic = new NewTopic(this.topicName, 1, getDefaultTopicReplicationFactor(create));
                newTopic.configs(Collect.hashMapOf("cleanup.policy", "delete", "retention.ms", Long.toString(RETENTION_MS_MAX), "retention.bytes", Long.toString(-1L)));
                create.createTopics(Collections.singleton(newTopic));
                LOGGER.info("Database history topic '{}' created", newTopic);
                if (create != null) {
                    create.close();
                }
            } finally {
            }
        } catch (Exception e) {
            throw new ConnectException("Creation of database history topic failed, please create the topic manually", e);
        }
    }

    private short getDefaultTopicReplicationFactor(AdminClient adminClient) throws Exception {
        try {
            String value = getKafkaBrokerConfig(adminClient).get(DEFAULT_TOPIC_REPLICATION_FACTOR_PROP_NAME).value();
            if (value != null) {
                return Short.parseShort(value);
            }
        } catch (ExecutionException e) {
            if (!(e.getCause() instanceof UnsupportedVersionException)) {
                throw e;
            }
        }
        LOGGER.warn("Unable to obtain the default replication factor from the brokers at {}. Setting value to {} instead.", this.producerConfig.getString(BOOTSTRAP_SERVERS), (short) 1);
        return (short) 1;
    }

    private Config getKafkaBrokerConfig(AdminClient adminClient) throws Exception {
        Collection<Node> collection = adminClient.describeCluster().nodes().get(KAFKA_QUERY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
        if (collection.isEmpty()) {
            throw new ConnectException("No brokers available to obtain default settings");
        }
        Map<ConfigResource, Config> map = adminClient.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.BROKER, collection.iterator().next().idString()))).all().get(KAFKA_QUERY_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
        if (map.isEmpty()) {
            throw new ConnectException("No configs have been received");
        }
        return map.values().iterator().next();
    }

    private static Field.Validator forKafka(Field.Validator validator) {
        return (configuration, field, validationOutput) -> {
            if (KafkaDatabaseHistory.class.getName().equals(configuration.getString(HistorizedRelationalDatabaseConnectorConfig.DATABASE_HISTORY))) {
                return validator.validate(configuration, field, validationOutput);
            }
            return 0;
        };
    }
}
