/*
 * Decompiled with CFR 0.152.
 */
package org.apache.sling.distribution.queue.impl.jobhandling;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Dictionary;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Set;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.queue.DistributionQueueType;
import org.apache.sling.distribution.queue.impl.CachingDistributionQueue;
import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
import org.apache.sling.distribution.queue.impl.jobhandling.DistributionAgentJobConsumer;
import org.apache.sling.distribution.queue.impl.jobhandling.JobHandlingDistributionQueue;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.QueueConfiguration;
import org.apache.sling.event.jobs.consumer.JobConsumer;
import org.jetbrains.annotations.NotNull;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.cm.Configuration;
import org.osgi.service.cm.ConfigurationAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobHandlingDistributionQueueProvider
implements DistributionQueueProvider {
    public static final String TYPE = "jobs";
    private final Logger log = LoggerFactory.getLogger(this.getClass());
    private final String prefix;
    private final JobManager jobManager;
    private ServiceRegistration<JobConsumer> jobConsumer = null;
    private BundleContext context;
    private Set<String> processingQueueNames = null;
    private final ConfigurationAdmin configAdmin;

    public JobHandlingDistributionQueueProvider(String prefix, JobManager jobManager, BundleContext context) {
        this(prefix, jobManager, context, null);
    }

    public JobHandlingDistributionQueueProvider(String prefix, JobManager jobManager, BundleContext context, ConfigurationAdmin configAdmin) {
        this.configAdmin = configAdmin;
        if (prefix == null || jobManager == null || context == null) {
            throw new IllegalArgumentException("all arguments are required");
        }
        this.prefix = prefix;
        this.jobManager = jobManager;
        this.context = context;
    }

    @Override
    @NotNull
    public DistributionQueue getQueue(@NotNull String queueName) {
        String topic = "org/apache/sling/distribution/queue/" + this.prefix + "/" + queueName;
        boolean isActive = this.jobConsumer != null && (this.processingQueueNames == null || this.processingQueueNames.contains(queueName));
        DistributionQueue queue = new JobHandlingDistributionQueue(queueName, topic, this.jobManager, isActive, DistributionQueueType.ORDERED);
        queue = new CachingDistributionQueue(topic, queue);
        return queue;
    }

    @Override
    public DistributionQueue getQueue(@NotNull String queueName, @NotNull DistributionQueueType type) {
        String topic = "org/apache/sling/distribution/queue/" + type.name().toLowerCase() + '/' + this.prefix + "/" + queueName;
        boolean isActive = this.jobConsumer != null && (this.processingQueueNames == null || this.processingQueueNames.contains(queueName));
        try {
            if (this.configAdmin != null && this.jobManager.getQueue(queueName) == null && this.configAdmin.getConfiguration(queueName) == null) {
                Configuration config = this.configAdmin.createFactoryConfiguration(QueueConfiguration.class.getName(), null);
                Hashtable<String, Object> props = new Hashtable<String, Object>();
                ((Dictionary)props).put("queue.name", queueName);
                ((Dictionary)props).put("queue.type", DistributionQueueType.PARALLEL.equals((Object)type) ? QueueConfiguration.Type.UNORDERED.name() : QueueConfiguration.Type.ORDERED.name());
                ((Dictionary)props).put("queue.topics", new String[]{topic});
                ((Dictionary)props).put("queue.retries", -1);
                ((Dictionary)props).put("queue.retrydelay", 2000L);
                ((Dictionary)props).put("queue.keepJobs", true);
                ((Dictionary)props).put("queue.priority", "MAX");
                ((Dictionary)props).put("queue.maxparallel", 15);
                config.update(props);
            }
        }
        catch (IOException e) {
            throw new RuntimeException("could not create config for queue " + queueName, e);
        }
        DistributionQueue queue = new JobHandlingDistributionQueue(queueName, topic, this.jobManager, isActive, type);
        queue = new CachingDistributionQueue(topic, queue);
        return queue;
    }

    @Override
    public void enableQueueProcessing(@NotNull DistributionQueueProcessor queueProcessor, String ... queueNames) throws DistributionException {
        if (this.jobConsumer != null) {
            throw new DistributionException("job already registered");
        }
        Hashtable<String, String[]> jobProps = new Hashtable<String, String[]>();
        String mainTopic = "org/apache/sling/distribution/queue/" + this.prefix;
        ArrayList<String> topicList = new ArrayList<String>();
        if (queueNames == null) {
            topicList.add(mainTopic + "/*");
            this.processingQueueNames = null;
        } else {
            for (String queueName : queueNames) {
                topicList.add(mainTopic + '/' + queueName);
            }
            this.processingQueueNames = new HashSet<String>(Arrays.asList(queueNames));
        }
        ((Dictionary)jobProps).put("job.topics", topicList.toArray(new String[topicList.size()]));
        this.log.debug("registering job consumer for prefix {}", (Object)this.prefix);
        this.log.info("qp: {}, jp: {}", (Object)queueProcessor, jobProps);
        this.jobConsumer = this.context.registerService(JobConsumer.class, (Object)new DistributionAgentJobConsumer(queueProcessor), jobProps);
        this.log.debug("job consumer for prefix {} registered", (Object)this.prefix);
    }

    @Override
    public void disableQueueProcessing() {
        if (this.jobConsumer != null) {
            this.jobConsumer.unregister();
            this.log.info("job consumer for agent {} unregistered", (Object)this.prefix);
            this.jobConsumer = null;
        }
        this.processingQueueNames = null;
        this.log.info("unregistering job consumer for agent {}", (Object)this.prefix);
    }
}

