package com.marklogic.ps.xqsync;

import com.marklogic.ps.FileFinder;
import com.marklogic.ps.Session;
import com.marklogic.ps.SimpleLogger;
import com.marklogic.ps.timing.TimedEvent;
import com.marklogic.xcc.AdhocQuery;
import com.marklogic.xcc.ContentbaseMetaData;
import com.marklogic.xcc.Request;
import com.marklogic.xcc.RequestOptions;
import com.marklogic.xcc.ResultSequence;
import com.marklogic.xcc.exceptions.StreamingResultException;
import com.marklogic.xcc.exceptions.XQueryException;
import com.marklogic.xcc.exceptions.XccException;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/* loaded from: input_file:com/marklogic/ps/xqsync/XQSyncManager.class */
public class XQSyncManager {
    protected static SimpleLogger logger;
    private static final String ERROR_CODE_MISSING_URI_LEXICON = "XDMP-URILXCNNOTFOUND";
    public static final String NAME = XQSyncManager.class.getName();
    private static final String START_VARIABLE_NAME = "start";
    private static final String START_POSITION_PREDICATE = "[position() ge $start]\n";
    private static final String START_POSITION_DEFINE_VARIABLE = "declare variable $start as xs:integer external;\n";
    private Session inputSession;
    private Configuration configuration;
    private long itemsQueued;
    private UriQueue uriQueue;
    private UriQueue lastUriQueue;
    private Monitor monitor;

    /* loaded from: input_file:com/marklogic/ps/xqsync/XQSyncManager$CallerBlocksPolicy.class */
    public class CallerBlocksPolicy implements RejectedExecutionHandler {
        private BlockingQueue<Runnable> queue;
        private boolean warning = false;

        public CallerBlocksPolicy() {
        }

        @Override // java.util.concurrent.RejectedExecutionHandler
        public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
            if (null == this.queue) {
                this.queue = threadPoolExecutor.getQueue();
            }
            try {
                if (!this.warning) {
                    XQSyncManager.logger.fine("queue is full: size = " + this.queue.size() + " (will only appear once!)");
                    this.warning = true;
                }
                this.queue.put(runnable);
            } catch (InterruptedException e) {
                Thread.interrupted();
                throw new RejectedExecutionException(e);
            }
        }
    }

    public XQSyncManager(Configuration configuration) {
        this.configuration = configuration;
        logger = this.configuration.getLogger();
    }

    public void run() {
        TaskFactory taskFactory = null;
        try {
            try {
                int threadCount = this.configuration.getThreadCount();
                logger.finer("threads = " + threadCount);
                this.inputSession = this.configuration.newInputSession();
                int queueSize = this.configuration.getQueueSize();
                logger.info("starting pool of " + threadCount + " threads, queue size = " + queueSize);
                ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(threadCount, threadCount, 16L, TimeUnit.SECONDS, new ArrayBlockingQueue(queueSize), new CallerBlocksPolicy());
                ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(threadPoolExecutor);
                this.monitor = new Monitor(this.configuration, threadPoolExecutor, executorCompletionService, this.configuration.isFatalErrors());
                this.monitor.setPriority(6);
                this.monitor.start();
                TaskFactory taskFactory2 = new TaskFactory(this.configuration, this.monitor);
                newUriQueue(executorCompletionService, threadPoolExecutor, taskFactory2, this.monitor);
                Session newOutputSession = this.configuration.newOutputSession();
                if (null != newOutputSession) {
                    ContentbaseMetaData contentbaseMetaData = newOutputSession.getContentbaseMetaData();
                    logger.info("output version info: client " + contentbaseMetaData.getDriverVersionString() + ", server " + contentbaseMetaData.getServerVersionString());
                }
                if (null != this.inputSession) {
                    ContentbaseMetaData contentbaseMetaData2 = this.inputSession.getContentbaseMetaData();
                    logger.info("input version info: client " + contentbaseMetaData2.getDriverVersionString() + ", server " + contentbaseMetaData2.getServerVersionString());
                    this.itemsQueued = queueFromInputConnection();
                } else if (null != this.configuration.getInputPackagePath()) {
                    this.itemsQueued = queueFromInputPackage(this.configuration.getInputPackagePath());
                } else {
                    this.itemsQueued = queueFromInputPath(this.configuration.getInputPath());
                }
                while (this.monitor.getTaskCount() != this.itemsQueued) {
                    Thread.sleep(125L);
                    Thread.yield();
                    if (this.monitor.getTaskCount() > this.itemsQueued) {
                        throw new FatalException("task count mismatch: " + this.itemsQueued + " < " + this.monitor.getTaskCount());
                    }
                }
                this.monitor.setFinalTaskCount(this.itemsQueued);
                logger.info("final queue count " + this.itemsQueued);
                this.uriQueue.shutdown();
                logger.fine("queue is shutdown with queue size " + this.uriQueue.getQueueSize());
                do {
                    Thread.sleep(125L);
                    Thread.yield();
                } while (this.uriQueue.getQueueSize() > 0);
                logger.info("pool ready to shutdown, queue size " + this.uriQueue.getQueueSize());
                threadPoolExecutor.shutdown();
                logger.info("waiting for monitor to exit");
                do {
                    logger.finest("waiting for monitor " + this.monitor + " " + (null != this.monitor) + " " + this.monitor.isAlive());
                    try {
                        Thread.yield();
                        this.monitor.join();
                    } catch (InterruptedException e) {
                        Thread.interrupted();
                        logger.logException("interrupted", e);
                    }
                    logger.finest("waiting for monitor " + this.monitor + " " + (null != this.monitor) + " " + this.monitor.isAlive());
                    if (null == this.monitor) {
                        break;
                    }
                } while (this.monitor.isAlive());
                if (null != taskFactory2) {
                    logger.info("closing factory");
                    taskFactory2.close();
                }
                if (null != this.configuration) {
                    this.configuration.close();
                }
            } catch (Throwable th) {
                logger.logException("fatal error", th);
                if (null != this.uriQueue) {
                    this.uriQueue.halt();
                }
                if (null != this.monitor) {
                    logger.info("halting monitor");
                    this.monitor.halt(th);
                }
                if (0 != 0) {
                    logger.info("closing factory");
                    taskFactory.close();
                }
                if (null != this.configuration) {
                    this.configuration.close();
                }
            }
            logger.fine("exiting");
        } catch (Throwable th2) {
            if (0 != 0) {
                logger.info("closing factory");
                taskFactory.close();
            }
            if (null != this.configuration) {
                this.configuration.close();
            }
            throw th2;
        }
    }

    private void newUriQueue(CompletionService<TimedEvent[]> completionService, ThreadPoolExecutor threadPoolExecutor, TaskFactory taskFactory, Monitor monitor) {
        this.uriQueue = new UriQueue(this.configuration, completionService, threadPoolExecutor, taskFactory, monitor, new LinkedBlockingQueue());
        this.uriQueue.start();
        while (!this.uriQueue.isActive()) {
            Thread.yield();
        }
    }

    private long queueFromInputPackage(String str) throws IOException, SyncException {
        logger.fine(str);
        File file = new File(str);
        if (!file.exists()) {
            throw new IOException("missing expected input package path: " + str);
        }
        if (!file.canRead()) {
            throw new IOException("cannot read from input package path: " + str);
        }
        if (file.isFile()) {
            return queueFromInputPackageFile(file);
        }
        if (!file.isDirectory()) {
            throw new IOException("unexpected file type: " + file.getCanonicalPath());
        }
        long j = 0;
        final String packageFileExtension = Configuration.getPackageFileExtension();
        File[] listFiles = file.listFiles(new FileFilter() { // from class: com.marklogic.ps.xqsync.XQSyncManager.1
            @Override // java.io.FileFilter
            public boolean accept(File file2) {
                return file2.isDirectory() || (file2.isFile() && file2.getName().endsWith(packageFileExtension));
            }
        });
        Arrays.sort(listFiles);
        for (File file2 : listFiles) {
            j += queueFromInputPackage(file2.getCanonicalPath());
        }
        return j;
    }

    private long queueFromInputPackageFile(File file) throws IOException, SyncException {
        logger.fine("listing package " + file);
        while (null != this.lastUriQueue && 0 != this.lastUriQueue.getQueueSize()) {
            try {
                Thread.sleep(125L);
            } catch (InterruptedException e) {
                Thread.interrupted();
                logger.warning("interrupted, will continue");
            }
        }
        InputPackage inputPackage = new InputPackage(file.getCanonicalPath(), this.configuration);
        inputPackage.addReference();
        logger.fine("listing package " + file + " (" + inputPackage.size() + ")");
        if (null != this.uriQueue) {
            this.uriQueue.shutdown();
        }
        this.lastUriQueue = this.uriQueue;
        newUriQueue(this.uriQueue, new PackageTaskFactory(this.configuration, this.monitor, inputPackage));
        logger.fine("uriQueue = " + this.uriQueue + ", last = " + this.lastUriQueue);
        Iterator<String> it = inputPackage.list().iterator();
        long j = 0;
        while (true) {
            long j2 = j;
            if (!it.hasNext()) {
                this.uriQueue.shutdown();
                inputPackage.closeReference();
                logger.info("queued " + j2 + " from " + file);
                return j2;
            }
            String next = it.next();
            logger.finest("queuing " + j2 + ": " + next);
            inputPackage.addReference();
            this.uriQueue.add(next);
            j = j2 + 1;
        }
    }

    private void newUriQueue(UriQueue uriQueue, TaskFactory taskFactory) {
        newUriQueue(uriQueue.getCompletionService(), uriQueue.getPool(), taskFactory, uriQueue.getMonitor());
    }

    private long queueFromInputConnection() throws XccException, SyncException {
        try {
            return queueFromInputConnection(true);
        } catch (XQueryException e) {
            if (ERROR_CODE_MISSING_URI_LEXICON.equals(e.getCode())) {
                logger.warning("Enable the document uri lexicon on " + this.inputSession.getContentBaseName() + " to speed up synchronization.");
                return queueFromInputConnection(false);
            }
            logger.logException("error queuing from input connection", e);
            throw e;
        }
    }

    private long queueFromInputConnection(boolean z) throws XccException, SyncException {
        String[] inputCollectionUris = this.configuration.getInputCollectionUris();
        String[] inputDirectoryUris = this.configuration.getInputDirectoryUris();
        String[] inputDocumentUris = this.configuration.getInputDocumentUris();
        String[] inputQuery = this.configuration.getInputQuery();
        if (null != inputDocumentUris) {
            if (null != inputCollectionUris || null != inputDirectoryUris || null != inputQuery) {
                logger.warning("conflicting properties: only using INPUT_DOCUMENT_URIS");
            }
        } else if (null != inputCollectionUris) {
            if (null != inputDirectoryUris || null != inputQuery) {
                logger.warning("conflicting properties: only using INPUT_COLLECTION_URI");
            }
        } else if (null != inputDirectoryUris && null != inputQuery) {
            logger.warning("conflicting properties: only using INPUT_DIRECTORY_URI");
        }
        Long startPosition = this.configuration.getStartPosition();
        if (null != startPosition) {
            logger.info("using INPUT_START_POSITION=" + startPosition.longValue());
        }
        long j = 0;
        if (null != inputDocumentUris) {
            for (int i = 0; i < inputDocumentUris.length; i++) {
                if (null == startPosition || i >= startPosition.longValue()) {
                    this.uriQueue.add(inputDocumentUris[i]);
                    j++;
                }
            }
            return j;
        }
        RequestOptions defaultRequestOptions = this.inputSession.getDefaultRequestOptions();
        logger.fine("buffer size = " + defaultRequestOptions.getResultBufferSize() + ", caching = " + defaultRequestOptions.getCacheResult());
        defaultRequestOptions.setCacheResult(this.configuration.isInputQueryCachable());
        defaultRequestOptions.setResultBufferSize(this.configuration.inputQueryBufferSize());
        logger.info("buffer size = " + defaultRequestOptions.getResultBufferSize() + ", caching = " + defaultRequestOptions.getCacheResult());
        int i2 = 1;
        if (null != inputCollectionUris && inputCollectionUris.length > 1) {
            i2 = inputCollectionUris.length;
        } else if (null != inputDirectoryUris && inputDirectoryUris.length > 1) {
            i2 = inputDirectoryUris.length;
        } else if (null != inputQuery && inputQuery.length > 1) {
            i2 = inputQuery.length;
        }
        for (int i3 = 0; i3 < i2; i3++) {
            Request request = getRequest(null == inputCollectionUris ? null : inputCollectionUris[i3], null == inputDirectoryUris ? null : inputDirectoryUris[i3], null == inputQuery ? null : inputQuery[i3], startPosition, z);
            request.setOptions(defaultRequestOptions);
            ResultSequence submitRequest = this.inputSession.submitRequest(request);
            while (submitRequest.hasNext()) {
                try {
                    String asString = submitRequest.next().asString();
                    if (0 == j) {
                        logger.info("queuing first task: " + asString);
                    }
                    logger.finest("queuing " + j + ": " + asString);
                    this.uriQueue.add(asString);
                    j++;
                } catch (StreamingResultException e) {
                    logger.info("count = " + j);
                    logger.warning("Listing input URIs probably timed out: try setting INPUT_QUERY_CACHABLE or INPUT_QUERY_BUFFER_BYTES");
                    throw e;
                }
            }
            submitRequest.close();
        }
        return j;
    }

    private Request getRequest(String str, String str2, String str3, Long l, boolean z) throws XccException {
        Request urisRequest;
        Session newOutputSession;
        boolean z2 = l != null && l.longValue() > 1;
        if (str != null) {
            urisRequest = getCollectionRequest(str, z2, z);
            if (this.configuration.isDeleteOutputCollection() && (newOutputSession = this.configuration.newOutputSession()) != null) {
                logger.info("deleting collection " + str + " on output connection");
                newOutputSession.deleteCollection(str);
                newOutputSession.close();
            }
        } else if (str2 != null) {
            urisRequest = getDirectoryRequest(str2, z2, z);
        } else if (str3 != null) {
            logger.info("listing query: " + str3);
            if (z2) {
                logger.warning("ignoring start value in user-supplied query");
                z2 = false;
            }
            urisRequest = this.inputSession.newAdhocQuery(str3);
        } else {
            urisRequest = getUrisRequest(z2, z);
        }
        if (z2) {
            urisRequest.setNewIntegerVariable(START_VARIABLE_NAME, l.longValue());
        }
        return urisRequest;
    }

    private Request getUrisRequest(boolean z, boolean z2) {
        String str;
        String str2 = Session.XQUERY_VERSION_1_0_ML + (z ? START_POSITION_DEFINE_VARIABLE : "");
        if (z2) {
            logger.info("listing all documents (with uri lexicon)");
            str = str2 + "cts:uris('', 'document')" + (z ? START_POSITION_PREDICATE : "");
        } else {
            logger.info("listing all documents (no uri lexicon)");
            str = str2 + "for $i in doc()" + (z ? START_POSITION_PREDICATE : "") + " return string(xdmp:node-uri($i))";
        }
        logger.fine(str);
        return this.inputSession.newAdhocQuery(str);
    }

    private Request getCollectionRequest(String str, boolean z, boolean z2) {
        String str2;
        logger.info("listing collection " + str);
        String str3 = "xquery version \"1.0-ml\";\ndeclare variable $uri as xs:string external;\n" + (z ? START_POSITION_DEFINE_VARIABLE : "");
        if (z2) {
            str2 = str3 + "cts:uris('', 'document', cts:collection-query($uri))\n" + (z ? START_POSITION_PREDICATE : "");
        } else {
            str2 = str3 + "for $i in collection($uri)\n" + (z ? START_POSITION_PREDICATE : "") + "return string(xdmp:node-uri($i))\n";
        }
        AdhocQuery newAdhocQuery = this.inputSession.newAdhocQuery(str2);
        newAdhocQuery.setNewStringVariable("uri", str);
        return newAdhocQuery;
    }

    private Request getDirectoryRequest(String str, boolean z, boolean z2) {
        String str2;
        logger.info("listing directory " + str);
        String str3 = "xquery version \"1.0-ml\";\ndeclare variable $uri as xs:string external;\n" + (z ? START_POSITION_DEFINE_VARIABLE : "");
        if (z2) {
            str2 = str3 + "cts:uris('', 'document', cts:directory-query($uri, 'infinity'))\n" + (z ? START_POSITION_PREDICATE : "");
        } else {
            str2 = str3 + "for $i in xdmp:directory($uri, 'infinity')\n" + (z ? START_POSITION_PREDICATE : "") + "return string(xdmp:node-uri($i))\n";
        }
        logger.fine(str2);
        AdhocQuery newAdhocQuery = this.inputSession.newAdhocQuery(str2);
        String str4 = str;
        if (!str4.endsWith("/")) {
            str4 = str4 + "/";
        }
        newAdhocQuery.setNewStringVariable("uri", str4);
        return newAdhocQuery;
    }

    private long queueFromInputPath(String str) throws SyncException, IOException {
        logger.info("listing from " + str + ", excluding " + XQSyncDocument.METADATA_REGEX);
        FileFinder fileFinder = new FileFinder(str, (String) null, XQSyncDocument.METADATA_REGEX);
        fileFinder.find();
        Iterator<File> it = fileFinder.list().iterator();
        long j = 0;
        while (it.hasNext()) {
            j++;
            String canonicalPath = it.next().getCanonicalPath();
            logger.finer("queuing " + j + ": " + canonicalPath);
            this.uriQueue.add(canonicalPath);
        }
        return j;
    }

    public long getItemsQueued() {
        return this.itemsQueued;
    }
}
