/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools;

import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import net.sourceforge.argparse4j.ArgumentParsers;
import net.sourceforge.argparse4j.impl.Arguments;
import net.sourceforge.argparse4j.inf.ArgumentAction;
import net.sourceforge.argparse4j.inf.ArgumentGroup;
import net.sourceforge.argparse4j.inf.ArgumentParser;
import net.sourceforge.argparse4j.inf.ArgumentParserException;
import net.sourceforge.argparse4j.inf.Namespace;
import net.sourceforge.argparse4j.inf.Subparser;
import net.sourceforge.argparse4j.inf.Subparsers;
import org.apache.kafka.clients.admin.AbortTransactionSpec;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DescribeProducersOptions;
import org.apache.kafka.clients.admin.DescribeProducersResult;
import org.apache.kafka.clients.admin.DescribeTransactionsResult;
import org.apache.kafka.clients.admin.ListTopicsOptions;
import org.apache.kafka.clients.admin.ListTransactionsOptions;
import org.apache.kafka.clients.admin.ProducerState;
import org.apache.kafka.clients.admin.TransactionDescription;
import org.apache.kafka.clients.admin.TransactionListing;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.TransactionalIdNotFoundException;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.tools.PrintVersionAndExitAction;
import org.apache.kafka.tools.ToolsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class TransactionsCommand {
    private static final Logger log = LoggerFactory.getLogger(TransactionsCommand.class);
    protected final Time time;

    protected TransactionsCommand(Time time) {
        this.time = time;
    }

    abstract String name();

    abstract void addSubparser(Subparsers var1);

    abstract void execute(Admin var1, Namespace var2, PrintStream var3) throws Exception;

    private static void printErrorAndExit(String message, Throwable t) {
        log.debug(message, t);
        String exitMessage = message + ": " + t.getMessage() + ". Enable debug logging for additional detail.";
        TransactionsCommand.printErrorAndExit(exitMessage);
    }

    private static void printErrorAndExit(String message) {
        System.err.println(message);
        Exit.exit((int)1, (String)message);
    }

    private static Admin buildAdminClient(Namespace ns) {
        Properties properties;
        String configFile = ns.getString("command_config");
        if (configFile == null) {
            properties = new Properties();
        } else {
            try {
                properties = Utils.loadProps((String)configFile);
            }
            catch (IOException e) {
                TransactionsCommand.printErrorAndExit("Failed to load admin client properties", e);
                return null;
            }
        }
        String bootstrapServers = ns.getString("bootstrap_server");
        properties.put("bootstrap.servers", bootstrapServers);
        return Admin.create((Properties)properties);
    }

    static ArgumentParser buildBaseParser() {
        ArgumentParser parser = ArgumentParsers.newArgumentParser((String)"kafka-transactions.sh");
        parser.description("This tool is used to analyze the transactional state of producers in the cluster. It can be used to detect and recover from hanging transactions.");
        parser.addArgument(new String[]{"-v", "--version"}).action((ArgumentAction)new PrintVersionAndExitAction()).help("show the version of this Kafka distribution and exit");
        parser.addArgument(new String[]{"--command-config"}).help("property file containing configs to be passed to admin client").action((ArgumentAction)Arguments.store()).type(String.class).metavar(new String[]{"FILE"}).required(false);
        parser.addArgument(new String[]{"--bootstrap-server"}).help("hostname and port for the broker to connect to, in the form `host:port`  (multiple comma-separated entries can be given)").action((ArgumentAction)Arguments.store()).type(String.class).metavar(new String[]{"host:port"}).required(true);
        return parser;
    }

    static void execute(String[] args, Function<Namespace, Admin> adminSupplier, PrintStream out, Time time) throws Exception {
        Namespace ns;
        List<ForceTerminateTransactionsCommand> commands = List.of(new ListTransactionsCommand(time), new DescribeTransactionsCommand(time), new DescribeProducersCommand(time), new AbortTransactionCommand(time), new FindHangingTransactionsCommand(time), new ForceTerminateTransactionsCommand(time));
        ArgumentParser parser = TransactionsCommand.buildBaseParser();
        Subparsers subparsers = parser.addSubparsers().dest("command").title("commands").metavar("COMMAND");
        commands.forEach(command -> command.addSubparser(subparsers));
        try {
            ns = parser.parseArgs(args);
        }
        catch (ArgumentParserException e) {
            parser.handleError(e);
            Exit.exit((int)1);
            return;
        }
        Admin admin = adminSupplier.apply(ns);
        String commandName = ns.getString("command");
        Optional<TransactionsCommand> commandOpt = commands.stream().filter(cmd -> cmd.name().equals(commandName)).findFirst();
        if (commandOpt.isEmpty()) {
            TransactionsCommand.printErrorAndExit("Unexpected command " + commandName);
        }
        TransactionsCommand command2 = commandOpt.get();
        command2.execute(admin, ns, out);
        Exit.exit((int)0);
    }

    public static void main(String[] args) throws Exception {
        TransactionsCommand.execute(args, TransactionsCommand::buildAdminClient, System.out, Time.SYSTEM);
    }

    static class ListTransactionsCommand
    extends TransactionsCommand {
        static final List<String> HEADERS = List.of("TransactionalId", "Coordinator", "ProducerId", "TransactionState");

        ListTransactionsCommand(Time time) {
            super(time);
        }

        @Override
        public String name() {
            return "list";
        }

        @Override
        public void addSubparser(Subparsers subparsers) {
            Subparser subparser = subparsers.addParser(this.name()).help("list transactions");
            subparser.addArgument(new String[]{"--duration-filter"}).help("Duration (in millis) to filter by: if < 0, all transactions will be returned; otherwise, only transactions running longer than this duration will be returned").action((ArgumentAction)Arguments.store()).type(Long.class).required(false);
            subparser.addArgument(new String[]{"--transactional-id-pattern"}).help("Transactional id regular expression pattern to filter by").action((ArgumentAction)Arguments.store()).type(String.class).required(false);
        }

        @Override
        public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
            Map result;
            ListTransactionsOptions options = new ListTransactionsOptions();
            Optional.ofNullable(ns.getLong("duration_filter")).ifPresent(arg_0 -> ((ListTransactionsOptions)options).filterOnDuration(arg_0));
            Optional.ofNullable(ns.getString("transactional_id_pattern")).ifPresent(arg_0 -> ((ListTransactionsOptions)options).filterOnTransactionalIdPattern(arg_0));
            try {
                result = (Map)admin.listTransactions(options).allByBrokerId().get();
            }
            catch (ExecutionException e) {
                TransactionsCommand.printErrorAndExit("Failed to list transactions", e.getCause());
                return;
            }
            ArrayList<List<String>> rows = new ArrayList<List<String>>();
            for (Map.Entry brokerListingsEntry : result.entrySet()) {
                String coordinatorIdString = ((Integer)brokerListingsEntry.getKey()).toString();
                Collection listings = (Collection)brokerListingsEntry.getValue();
                for (TransactionListing listing : listings) {
                    rows.add(List.of(listing.transactionalId(), coordinatorIdString, String.valueOf(listing.producerId()), listing.state().toString()));
                }
            }
            ToolsUtils.prettyPrintTable(HEADERS, rows, out);
        }
    }

    static class DescribeTransactionsCommand
    extends TransactionsCommand {
        static final List<String> HEADERS = List.of("CoordinatorId", "TransactionalId", "ProducerId", "ProducerEpoch", "TransactionState", "TransactionTimeoutMs", "CurrentTransactionStartTimeMs", "TransactionDurationMs", "TopicPartitions");

        DescribeTransactionsCommand(Time time) {
            super(time);
        }

        @Override
        public String name() {
            return "describe";
        }

        @Override
        public void addSubparser(Subparsers subparsers) {
            Subparser subparser = subparsers.addParser(this.name()).description("Describe the state of an active transactional-id.").help("describe the state of an active transactional-id");
            subparser.addArgument(new String[]{"--transactional-id"}).help("transactional id").action((ArgumentAction)Arguments.store()).type(String.class).required(true);
        }

        @Override
        public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
            String transactionDurationMsColumnValue;
            String transactionStartTimeMsColumnValue;
            TransactionDescription result;
            String transactionalId = ns.getString("transactional_id");
            try {
                result = (TransactionDescription)admin.describeTransactions(Set.of(transactionalId)).description(transactionalId).get();
            }
            catch (ExecutionException e) {
                TransactionsCommand.printErrorAndExit("Failed to describe transaction state of transactional-id `" + transactionalId + "`", e.getCause());
                return;
            }
            if (result.transactionStartTimeMs().isPresent()) {
                long transactionStartTimeMs = result.transactionStartTimeMs().getAsLong();
                transactionStartTimeMsColumnValue = String.valueOf(transactionStartTimeMs);
                transactionDurationMsColumnValue = String.valueOf(this.time.milliseconds() - transactionStartTimeMs);
            } else {
                transactionStartTimeMsColumnValue = "None";
                transactionDurationMsColumnValue = "None";
            }
            List<String> row = List.of(String.valueOf(result.coordinatorId()), transactionalId, String.valueOf(result.producerId()), String.valueOf(result.producerEpoch()), result.state().toString(), String.valueOf(result.transactionTimeoutMs()), transactionStartTimeMsColumnValue, transactionDurationMsColumnValue, result.topicPartitions().stream().map(TopicPartition::toString).collect(Collectors.joining(",")));
            ToolsUtils.prettyPrintTable(HEADERS, List.of(row), out);
        }
    }

    static class DescribeProducersCommand
    extends TransactionsCommand {
        static final List<String> HEADERS = List.of("ProducerId", "ProducerEpoch", "LatestCoordinatorEpoch", "LastSequence", "LastTimestamp", "CurrentTransactionStartOffset");

        DescribeProducersCommand(Time time) {
            super(time);
        }

        @Override
        public String name() {
            return "describe-producers";
        }

        @Override
        public void addSubparser(Subparsers subparsers) {
            Subparser subparser = subparsers.addParser(this.name()).help("describe the states of active producers for a topic partition");
            subparser.addArgument(new String[]{"--broker-id"}).help("optional broker id to describe the producer state on a specific replica").action((ArgumentAction)Arguments.store()).type(Integer.class).required(false);
            subparser.addArgument(new String[]{"--topic"}).help("topic name").action((ArgumentAction)Arguments.store()).type(String.class).required(true);
            subparser.addArgument(new String[]{"--partition"}).help("partition number").action((ArgumentAction)Arguments.store()).type(Integer.class).required(true);
        }

        @Override
        public void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
            DescribeProducersResult.PartitionProducerState result;
            DescribeProducersOptions options = new DescribeProducersOptions();
            Optional.ofNullable(ns.getInt("broker_id")).ifPresent(arg_0 -> ((DescribeProducersOptions)options).brokerId(arg_0));
            String topicName = ns.getString("topic");
            Integer partitionId = ns.getInt("partition");
            TopicPartition topicPartition = new TopicPartition(topicName, partitionId.intValue());
            try {
                result = (DescribeProducersResult.PartitionProducerState)admin.describeProducers(Set.of(topicPartition), options).partitionResult(topicPartition).get();
            }
            catch (ExecutionException e) {
                String brokerClause = options.brokerId().isPresent() ? "broker " + options.brokerId().getAsInt() : "leader";
                TransactionsCommand.printErrorAndExit("Failed to describe producers for partition " + String.valueOf(topicPartition) + " on " + brokerClause, e.getCause());
                return;
            }
            List<List<String>> rows = result.activeProducers().stream().map(producerState -> {
                String currentTransactionStartOffsetColumnValue = producerState.currentTransactionStartOffset().isPresent() ? String.valueOf(producerState.currentTransactionStartOffset().getAsLong()) : "None";
                return List.of(String.valueOf(producerState.producerId()), String.valueOf(producerState.producerEpoch()), String.valueOf(producerState.coordinatorEpoch().orElse(-1)), String.valueOf(producerState.lastSequence()), String.valueOf(producerState.lastTimestamp()), currentTransactionStartOffsetColumnValue);
            }).collect(Collectors.toList());
            ToolsUtils.prettyPrintTable(HEADERS, rows, out);
        }
    }

    static class AbortTransactionCommand
    extends TransactionsCommand {
        AbortTransactionCommand(Time time) {
            super(time);
        }

        @Override
        String name() {
            return "abort";
        }

        @Override
        void addSubparser(Subparsers subparsers) {
            Subparser subparser = subparsers.addParser(this.name()).help("abort a hanging transaction (requires administrative privileges)");
            subparser.addArgument(new String[]{"--topic"}).help("topic name").action((ArgumentAction)Arguments.store()).type(String.class).required(true);
            subparser.addArgument(new String[]{"--partition"}).help("partition number").action((ArgumentAction)Arguments.store()).type(Integer.class).required(true);
            ArgumentGroup newBrokerArgumentGroup = subparser.addArgumentGroup("Brokers on versions 3.0 and above").description("For newer brokers, only the start offset of the transaction to be aborted is required");
            newBrokerArgumentGroup.addArgument(new String[]{"--start-offset"}).help("start offset of the transaction to abort").action((ArgumentAction)Arguments.store()).type(Long.class);
            ArgumentGroup olderBrokerArgumentGroup = subparser.addArgumentGroup("Brokers on versions older than 3.0").description("For older brokers, you must provide all of these arguments");
            olderBrokerArgumentGroup.addArgument(new String[]{"--producer-id"}).help("producer id").action((ArgumentAction)Arguments.store()).type(Long.class);
            olderBrokerArgumentGroup.addArgument(new String[]{"--producer-epoch"}).help("producer epoch").action((ArgumentAction)Arguments.store()).type(Short.class);
            olderBrokerArgumentGroup.addArgument(new String[]{"--coordinator-epoch"}).help("coordinator epoch").action((ArgumentAction)Arguments.store()).type(Integer.class);
        }

        private AbortTransactionSpec buildAbortSpec(Admin admin, TopicPartition topicPartition, long startOffset) throws Exception {
            DescribeProducersResult.PartitionProducerState result;
            try {
                result = (DescribeProducersResult.PartitionProducerState)admin.describeProducers(Set.of(topicPartition)).partitionResult(topicPartition).get();
            }
            catch (ExecutionException e) {
                TransactionsCommand.printErrorAndExit("Failed to validate producer state for partition " + String.valueOf(topicPartition), e.getCause());
                return null;
            }
            Optional<ProducerState> foundProducerState = result.activeProducers().stream().filter(producerState -> {
                OptionalLong txnStartOffsetOpt = producerState.currentTransactionStartOffset();
                return txnStartOffsetOpt.isPresent() && txnStartOffsetOpt.getAsLong() == startOffset;
            }).findFirst();
            if (foundProducerState.isEmpty()) {
                TransactionsCommand.printErrorAndExit("Could not find any open transactions starting at offset " + startOffset + " on partition " + String.valueOf(topicPartition));
                return null;
            }
            ProducerState producerState2 = foundProducerState.get();
            return new AbortTransactionSpec(topicPartition, producerState2.producerId(), (short)producerState2.producerEpoch(), producerState2.coordinatorEpoch().orElse(0));
        }

        private void abortTransaction(Admin admin, AbortTransactionSpec abortSpec) throws Exception {
            try {
                admin.abortTransaction(abortSpec).all().get();
            }
            catch (ExecutionException e) {
                TransactionsCommand.printErrorAndExit("Failed to abort transaction " + String.valueOf(abortSpec), e.getCause());
            }
        }

        @Override
        void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
            AbortTransactionSpec abortSpec;
            String topicName = ns.getString("topic");
            Integer partitionId = ns.getInt("partition");
            TopicPartition topicPartition = new TopicPartition(topicName, partitionId.intValue());
            Long startOffset = ns.getLong("start_offset");
            Long producerId = ns.getLong("producer_id");
            if (startOffset == null && producerId == null) {
                TransactionsCommand.printErrorAndExit("The transaction to abort must be identified either with --start-offset (for brokers on 3.0 or above) or with --producer-id, --producer-epoch, and --coordinator-epoch (for older brokers)");
                return;
            }
            if (startOffset == null) {
                Short producerEpoch = ns.getShort("producer_epoch");
                if (producerEpoch == null) {
                    TransactionsCommand.printErrorAndExit("Missing required argument --producer-epoch");
                    return;
                }
                Integer coordinatorEpoch = ns.getInt("coordinator_epoch");
                if (coordinatorEpoch == null) {
                    TransactionsCommand.printErrorAndExit("Missing required argument --coordinator-epoch");
                    return;
                }
                if (coordinatorEpoch < 0) {
                    coordinatorEpoch = 0;
                }
                abortSpec = new AbortTransactionSpec(topicPartition, producerId.longValue(), producerEpoch.shortValue(), coordinatorEpoch.intValue());
            } else {
                abortSpec = this.buildAbortSpec(admin, topicPartition, startOffset);
            }
            this.abortTransaction(admin, abortSpec);
        }
    }

    static class FindHangingTransactionsCommand
    extends TransactionsCommand {
        private static final int MAX_BATCH_SIZE = 500;
        static final List<String> HEADERS = List.of("Topic", "Partition", "ProducerId", "ProducerEpoch", "CoordinatorEpoch", "StartOffset", "LastTimestamp", "Duration(min)");

        FindHangingTransactionsCommand(Time time) {
            super(time);
        }

        @Override
        String name() {
            return "find-hanging";
        }

        @Override
        void addSubparser(Subparsers subparsers) {
            Subparser subparser = subparsers.addParser(this.name()).help("find hanging transactions");
            subparser.addArgument(new String[]{"--broker-id"}).help("broker id to search for hanging transactions").action((ArgumentAction)Arguments.store()).type(Integer.class).required(false);
            subparser.addArgument(new String[]{"--max-transaction-timeout"}).help("maximum transaction timeout in minutes to limit the scope of the search (15 minutes by default)").action((ArgumentAction)Arguments.store()).type(Integer.class).setDefault((Object)15).required(false);
            subparser.addArgument(new String[]{"--topic"}).help("topic name to limit search to (required if --partition is specified)").action((ArgumentAction)Arguments.store()).type(String.class).required(false);
            subparser.addArgument(new String[]{"--partition"}).help("partition number").action((ArgumentAction)Arguments.store()).type(Integer.class).required(false);
        }

        @Override
        void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
            List<TopicPartition> topicPartitions;
            Optional<Integer> brokerId = Optional.ofNullable(ns.getInt("broker_id"));
            Optional<String> topic = Optional.ofNullable(ns.getString("topic"));
            if (topic.isEmpty() && brokerId.isEmpty()) {
                TransactionsCommand.printErrorAndExit("The `find-hanging` command requires either --topic or --broker-id to limit the scope of the search");
                return;
            }
            Optional<Integer> partition = Optional.ofNullable(ns.getInt("partition"));
            if (partition.isPresent() && topic.isEmpty()) {
                TransactionsCommand.printErrorAndExit("The --partition argument requires --topic to be provided");
                return;
            }
            long maxTransactionTimeoutMs = TimeUnit.MINUTES.toMillis(ns.getInt("max_transaction_timeout").intValue());
            List<OpenTransaction> candidates = this.collectCandidateOpenTransactions(admin, brokerId, maxTransactionTimeoutMs, topicPartitions = this.collectTopicPartitionsToSearch(admin, topic, partition, brokerId));
            if (candidates.isEmpty()) {
                this.printHangingTransactions(List.of(), out);
            } else {
                Map<Long, List<OpenTransaction>> openTransactionsByProducerId = this.groupByProducerId(candidates);
                Map<Long, String> transactionalIds = this.lookupTransactionalIds(admin, openTransactionsByProducerId.keySet());
                Map<String, TransactionDescription> descriptions = this.describeTransactions(admin, transactionalIds.values());
                List<OpenTransaction> hangingTransactions = this.filterHangingTransactions(openTransactionsByProducerId, transactionalIds, descriptions);
                this.printHangingTransactions(hangingTransactions, out);
            }
        }

        private List<TopicPartition> collectTopicPartitionsToSearch(Admin admin, Optional<String> topic, Optional<Integer> partition, Optional<Integer> brokerId) throws Exception {
            List<String> topics;
            if (topic.isPresent()) {
                if (partition.isPresent()) {
                    return List.of(new TopicPartition(topic.get(), partition.get().intValue()));
                }
                topics = List.of(topic.get());
            } else {
                topics = this.listTopics(admin);
            }
            return this.findTopicPartitions(admin, brokerId, topics);
        }

        private List<OpenTransaction> filterHangingTransactions(Map<Long, List<OpenTransaction>> openTransactionsByProducerId, Map<Long, String> transactionalIds, Map<String, TransactionDescription> descriptions) {
            ArrayList<OpenTransaction> hangingTransactions = new ArrayList<OpenTransaction>();
            openTransactionsByProducerId.forEach((producerId, openTransactions) -> {
                String transactionalId = (String)transactionalIds.get(producerId);
                if (transactionalId == null) {
                    hangingTransactions.addAll((Collection<OpenTransaction>)openTransactions);
                } else {
                    TransactionDescription description = (TransactionDescription)descriptions.get(transactionalId);
                    if (description == null) {
                        hangingTransactions.addAll((Collection<OpenTransaction>)openTransactions);
                    } else {
                        for (OpenTransaction openTransaction : openTransactions) {
                            if (description.topicPartitions().contains(openTransaction.topicPartition)) continue;
                            hangingTransactions.add(openTransaction);
                        }
                    }
                }
            });
            return hangingTransactions;
        }

        private void printHangingTransactions(List<OpenTransaction> hangingTransactions, PrintStream out) {
            long currentTimeMs = this.time.milliseconds();
            ArrayList<List<String>> rows = new ArrayList<List<String>>(hangingTransactions.size());
            for (OpenTransaction transaction : hangingTransactions) {
                long transactionDurationMinutes = TimeUnit.MILLISECONDS.toMinutes(currentTimeMs - transaction.producerState.lastTimestamp());
                rows.add(List.of(transaction.topicPartition.topic(), String.valueOf(transaction.topicPartition.partition()), String.valueOf(transaction.producerState.producerId()), String.valueOf(transaction.producerState.producerEpoch()), String.valueOf(transaction.producerState.coordinatorEpoch().orElse(-1)), String.valueOf(transaction.producerState.currentTransactionStartOffset().orElse(-1L)), String.valueOf(transaction.producerState.lastTimestamp()), String.valueOf(transactionDurationMinutes)));
            }
            ToolsUtils.prettyPrintTable(HEADERS, rows, out);
        }

        private Map<String, TransactionDescription> describeTransactions(Admin admin, Collection<String> transactionalIds) throws Exception {
            try {
                DescribeTransactionsResult result = admin.describeTransactions(new HashSet<String>(transactionalIds));
                HashMap<String, TransactionDescription> descriptions = new HashMap<String, TransactionDescription>();
                for (String transactionalId : transactionalIds) {
                    try {
                        TransactionDescription description = (TransactionDescription)result.description(transactionalId).get();
                        descriptions.put(transactionalId, description);
                    }
                    catch (ExecutionException e) {
                        if (e.getCause() instanceof TransactionalIdNotFoundException) {
                            descriptions.put(transactionalId, null);
                            continue;
                        }
                        throw e;
                    }
                }
                return descriptions;
            }
            catch (ExecutionException e) {
                TransactionsCommand.printErrorAndExit("Failed to describe " + transactionalIds.size() + " transactions", e.getCause());
                return Map.of();
            }
        }

        private Map<Long, List<OpenTransaction>> groupByProducerId(List<OpenTransaction> openTransactions) {
            HashMap<Long, List<OpenTransaction>> res = new HashMap<Long, List<OpenTransaction>>();
            for (OpenTransaction transaction : openTransactions) {
                List states = res.computeIfAbsent(transaction.producerState.producerId(), __ -> new ArrayList());
                states.add(transaction);
            }
            return res;
        }

        private List<String> listTopics(Admin admin) throws Exception {
            try {
                ListTopicsOptions listOptions = new ListTopicsOptions().listInternal(true);
                return new ArrayList<String>((Collection)admin.listTopics(listOptions).names().get());
            }
            catch (ExecutionException e) {
                TransactionsCommand.printErrorAndExit("Failed to list topics", e.getCause());
                return List.of();
            }
        }

        private List<TopicPartition> findTopicPartitions(Admin admin, Optional<Integer> brokerId, List<String> topics) throws Exception {
            ArrayList<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
            this.consumeInBatches(topics, 500, batch -> this.findTopicPartitions(admin, brokerId, (List<String>)batch, (List<TopicPartition>)topicPartitions));
            return topicPartitions;
        }

        private void findTopicPartitions(Admin admin, Optional<Integer> brokerId, List<String> topics, List<TopicPartition> topicPartitions) throws Exception {
            try {
                Map topicDescriptions = (Map)admin.describeTopics(topics).allTopicNames().get();
                topicDescriptions.forEach((topic, description) -> description.partitions().forEach(partitionInfo -> {
                    if (brokerId.isEmpty() || this.hasReplica((Integer)brokerId.get(), (TopicPartitionInfo)partitionInfo)) {
                        topicPartitions.add(new TopicPartition(topic, partitionInfo.partition()));
                    }
                }));
            }
            catch (ExecutionException e) {
                TransactionsCommand.printErrorAndExit("Failed to describe " + topics.size() + " topics", e.getCause());
            }
        }

        private boolean hasReplica(int brokerId, TopicPartitionInfo partitionInfo) {
            return partitionInfo.replicas().stream().anyMatch(node -> node.id() == brokerId);
        }

        private List<OpenTransaction> collectCandidateOpenTransactions(Admin admin, Optional<Integer> brokerId, long maxTransactionTimeoutMs, List<TopicPartition> topicPartitions) throws Exception {
            ArrayList<OpenTransaction> candidateTransactions = new ArrayList<OpenTransaction>();
            this.consumeInBatches(topicPartitions, 500, batch -> this.collectCandidateOpenTransactions(admin, brokerId, maxTransactionTimeoutMs, (List<TopicPartition>)batch, (List<OpenTransaction>)candidateTransactions));
            return candidateTransactions;
        }

        private void collectCandidateOpenTransactions(Admin admin, Optional<Integer> brokerId, long maxTransactionTimeoutMs, List<TopicPartition> topicPartitions, List<OpenTransaction> candidateTransactions) throws Exception {
            try {
                DescribeProducersOptions describeOptions = new DescribeProducersOptions();
                brokerId.ifPresent(arg_0 -> ((DescribeProducersOptions)describeOptions).brokerId(arg_0));
                Map producersByPartition = (Map)admin.describeProducers(topicPartitions, describeOptions).all().get();
                long currentTimeMs = this.time.milliseconds();
                producersByPartition.forEach((topicPartition, producersStates) -> producersStates.activeProducers().forEach(activeProducer -> {
                    long transactionDurationMs;
                    if (activeProducer.currentTransactionStartOffset().isPresent() && (transactionDurationMs = currentTimeMs - activeProducer.lastTimestamp()) > maxTransactionTimeoutMs) {
                        candidateTransactions.add(new OpenTransaction((TopicPartition)topicPartition, (ProducerState)activeProducer));
                    }
                }));
            }
            catch (ExecutionException e) {
                TransactionsCommand.printErrorAndExit("Failed to describe producers for " + topicPartitions.size() + " partitions on broker " + String.valueOf(brokerId), e.getCause());
            }
        }

        private Map<Long, String> lookupTransactionalIds(Admin admin, Set<Long> producerIds) throws Exception {
            try {
                ListTransactionsOptions listTransactionsOptions = new ListTransactionsOptions().filterProducerIds(producerIds);
                Collection transactionListings = (Collection)admin.listTransactions(listTransactionsOptions).all().get();
                HashMap<Long, String> transactionalIdMap = new HashMap<Long, String>();
                transactionListings.forEach(listing -> {
                    if (!producerIds.contains(listing.producerId())) {
                        log.debug("Received transaction listing {} which has a producerId which was not requested", listing);
                    } else {
                        transactionalIdMap.put(listing.producerId(), listing.transactionalId());
                    }
                });
                return transactionalIdMap;
            }
            catch (ExecutionException e) {
                TransactionsCommand.printErrorAndExit("Failed to list transactions for " + producerIds.size() + " producers", e.getCause());
                return Map.of();
            }
        }

        private <T> void consumeInBatches(List<T> list, int batchSize, ThrowableConsumer<List<T>> consumer) throws Exception {
            int batchStartIndex = 0;
            int limitIndex = list.size();
            while (batchStartIndex < limitIndex) {
                int batchEndIndex = Math.min(limitIndex, batchStartIndex + batchSize);
                consumer.accept(list.subList(batchStartIndex, batchEndIndex));
                batchStartIndex = batchEndIndex;
            }
        }

        private record OpenTransaction(TopicPartition topicPartition, ProducerState producerState) {
        }

        @FunctionalInterface
        private static interface ThrowableConsumer<T> {
            public void accept(T var1) throws Exception;
        }
    }

    static class ForceTerminateTransactionsCommand
    extends TransactionsCommand {
        ForceTerminateTransactionsCommand(Time time) {
            super(time);
        }

        @Override
        String name() {
            return "forceTerminateTransaction";
        }

        @Override
        void addSubparser(Subparsers subparsers) {
            Subparser subparser = subparsers.addParser(this.name()).description("Force abort an ongoing transaction on transactionalId").help("Force abort an ongoing transaction on transactionalId (requires administrative privileges)");
            subparser.addArgument(new String[]{"--transactionalId"}).help("transactional id").action((ArgumentAction)Arguments.store()).type(String.class).required(true);
        }

        @Override
        void execute(Admin admin, Namespace ns, PrintStream out) throws Exception {
            String transactionalId = ns.getString("transactionalId");
            try {
                admin.forceTerminateTransaction(transactionalId).result().get();
            }
            catch (ExecutionException e) {
                TransactionsCommand.printErrorAndExit("Failed to force terminate transactionalId `" + transactionalId + "`", e.getCause());
            }
        }
    }
}

