PgNotificationPoller

来自 PostgreSQL 维基
跳转到导航跳转到搜索

参见 PgNotificationHelper 和下面类的 JavaDoc,以更详细地解释此 Java/JDBC 代码的功能以及您可能想要使用它的原因。

下面的代码可供任何人在其自己的应用程序中使用。建议,但不要求保留 @author 注释。请注意,它没有像 PgNotificationHelper 那样经过充分测试,并且还没有完整的测试套件,因此建议谨慎使用。

PgNotificationPoller.java

import java.awt.EventQueue;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collection;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import org.postgresql.PGNotification;

/**
 * PgNotificationPoller uses PgNotificationHelper to monitor the database
 * for events of interest.
 *
 * PgNotificationPoller requires a dedicated JDBC connection, which it
 * creates using a properties map and manages its self. It will attempt to
 * reconnect if this connection is lost, and will do its best to handle
 * problems appropriately. Any exceptions from the database will logged
 * via java.util.logging and will be acted on if possible, but not re-thrown.
 *
 * If you want to use your own JDBC connection, use PgNotificationHelper
 * directly and be prepared to deal with SQLExceptions if and when they arise.
 *
 * Note that this class will only function correctly with a post-8.1.x JDBC
 * driver because of changes to notification polling.
 *
 * @author Craig Ringer <[email protected]>
 */
//
// TODO / ideas list:
//
// - Support changing of connection props without notification loss, by maintaining
//   old conn until new one successfully established (?).
//
// - Support advisory lock management too
//
// - Let someone grab the connection and do some work on it, holding it exclusively
//   until they hand it back to us. 
//
public class PgNotificationPoller {

    public static final String PG_DRIVER_CLASS = "org.postgresql.Driver";
    public static final String P_STATUS = "status";

    /**
     * Status describes what the poller is currently doing.
     *
     * The stopped state indicates that it's not currently listening for notifications,
     * and the listening state indicates that is <i>is</i> listening for notifications.
     */
    public enum Status {
        STOPPED,
        LISTENING;
    }

    /**
     * If the PostgreSQL JDBC driver could not be loaded, this exception is thrown.
     */
    public static class DriverNotFoundException extends RuntimeException {
        public DriverNotFoundException(String message, Throwable e) {
            super(message, e);
        }
    }

    /**
     * Listener interface to be implemented by classes that are interested
     * in hearing about PostgreSQL `NOTIFY' events, and in the status of
     * the polling task.
     *
     * Exceptions thrown by listeners will be caught, logged, and ignored.
     * It is still highly recommended that you avoid letting exceptions propagate
     * out of your listeners in the first place.
     */
    public static interface PgNotificationListener {
        
        /**
         * Reports that one or more NOTIFY events have been received by the poller.
         *
         * Self-notifications will be delivered, so if you wish to exclude them
         * or identify them, test `receivingBackendPid' against
         * `PGNotification.getPid()' for each notification to see if the sending
         * and receiving backend pid are the same.
         * 
         * @param poller Poller that received the NOTIFY
         * @param receivingBackendPid pid of backend that recieved the notification. Test against notices.getPid() to eliminate self notifications.
         * @param notices Read-only list of notification event(s)
         */
        void notified( PgNotificationPoller poller, long receivingBackendPid, List<PGNotification> notices );

        /**
         * Reports a change in poller status, such as a disconnect or reconnect.
         * You may need to query the database to catch up on changes here,
         * as a disconnect or reconnect implies that notifications may have
         * been lost.
         *
         * @param poller Poller that disconnected/reconnected
         * @param oldStatus Status transitioned from
         * @param newStatus Status transitioned to
         * @param targetStatus Status seeking
         */
        void pollerStatusChanged( PgNotificationPoller poller, Status oldStatus, Status newStatus, Status targetStatus );

        /**
         * A LISTEN has been executed on the database, and NOTIFY events for that
         * name will now be received.
         *
         * @param poller Poller that added the listen
         * @param names Read-only collection of name(s) newly LISTENed
         */
        void listenersAdded( PgNotificationPoller poller, Collection<String> names );

        /**
         * An UNLISTEN has been executed on the database, and NOTIFY events for that
         * name will no longer be received.
         *
         * '*' will never be passed as a name. Instead, individual
         * listenRemoved(...) calls will be made.
         * 
         * @param poller Poller that added the listen
         * @param names Read-only collection of name(s) newly UNLISTENed
         */
        void listenersRemoved( PgNotificationPoller poller, Collection<String> names );
    }

    /**
     * Adapter for PgNotificationListener in case you're only interested in a subset
     * of the events. Other events are ignored.
     *
     * {@inheritDoc}
     */
    public abstract static class PgNotificationAdapter implements PgNotificationListener {

        /** {@inheritDoc} */
        @Override
        public void notified( PgNotificationPoller poller, long receivingBackendPid, List<PGNotification> notices ) {
        }
        
        /** {@inheritDoc} */
        @Override
        public void pollerStatusChanged(PgNotificationPoller poller, Status oldStatus, Status newStatus, Status targetStatus) {
        }

        /** {@inheritDoc} */
        @Override
        public void listenersAdded(PgNotificationPoller poller, Collection<String> names) {
        }

        /** {@inheritDoc} */
        @Override
        public void listenersRemoved(PgNotificationPoller poller, Collection<String> names) {
        }

    }

    /**
     * A single object may listen for exceptions thrown by the
     * JDBC connction used by the poller's worker thread. It must implement
     * this interface.
     */
    public interface PgNotificationExceptionListener {

        /**
         * If any exceptions are thrown while working with the JDBC connection,
         * this callback will report them. This may be useful for handling bad
         * usernames/passwords, consistently unreachable servers, etc.
         *
         * It's mainly here so that you can log issues.
         *
         * The poller / worker will act on the exception as best as it can, generally
         * by re-trying its work or if the connection looks dead trying to create a
         * new one. You cannot directly control that, but may supply new credentials
         * or stop() / start() the poller service if needed. It is safe to do so from
         * this callback.
         *
         * The JDBC connection on which the exception was thrown is provided on
         * the off chance it may be required. It will not be used by the poller
         * before this method returns, as this method's execution blocks the
         * poller worker thread, so it's technically safe to execute work on it.
         * Nonetheless, you should really avoid doing anything with it if at
         * all possible.
         *
         * @warning Do not permit exceptions to be thrown by your implementation of this method.
         *
         * @warning This method is called synchronously on the poller's worker thread.
         *          It should schedule any time consuming work to be done later and return
         *          promptly. <b>Delayed work <i>must</i> not use the passed connection</b>.
         *
         * @param poller poller whose worker caught the exception
         * @param conn Connection in use when the exception was thrown
         * @param message Message from poller worker indicating what it was doing at the time
         * @param exc SQLException thrown
         */
        void sqlExceptionEncountered( PgNotificationPoller poller, Connection conn, String message, SQLException exc );

        /**
         * A listener let an exception propagate up to the worker thread's event
         * dispatch methods. The exception will be ignored, but this callback is
         * invoked with it first to make it possible to track down such issues.
         *
         * @param poller Poller whose worker had a listener throw the exception
         * @param message Message from the poller about the problem
         * @param listener The guilty party - the object that threw
         * @param exc Exception thrown by the listener
         */
        void listenerExceptionEncountered( PgNotificationPoller poller, String message, PgNotificationListener listener, Throwable exc );
        
    }


    private final ConnectionPoller poller = new ConnectionPoller();
    private int pollDelayMs = 1000;
    // TODO backoff
    private int reconnectDelayMs = 30000;
    private int validityTestDelayMs = 10000;
    private int validityTestTimeoutSeconds = 30;
    

    /**
     * Create a new poller without any connection parameters.
     *
     * The poller will begin in the `STOPPED' state and will not attempt to
     * start polling until connnection details are passed via
     * setConnectionProperties.
     *
     * @throws DriverNotFoundException if jdbc driver not loadable
     */
    public PgNotificationPoller() {
        testDriverPresent();
        poller.start();
    }

    /**
     * Create a new poller with the supplied JDBC URL and connecton properties map.
     *
     * The poller will begin in the STOPPED state, but will immediately begin
     * trying to transition to LISTENING by bringing up a JDBC connection. If
     * connection fails, it will be retried periodically.
     *
     * @param url JDBC URL to connect with
     * @param map JDBC properties map to use for credentials and conn params
     * @throws DriverNotFoundException if jdbc driver not loadable
     */
    public PgNotificationPoller(String url, Properties map) {
        this();
        setConnectionProperties(url, map);
    }

    /**
     * Create a new poller with the supplied JDBC URL and connecton properties map.
     *
     * The poller will begin in the STOPPED state, but will immediately begin
     * trying to transition to LISTENING by bringing up a JDBC connection. If
     * connection fails, it will be retried periodically.
     *
     * @param url JDBC URL to connect with
     * @param username PostgreSQL username
     * @param password PostgreSQL password
     * @throws DriverNotFoundException if jdbc driver not loadable
     */
    public PgNotificationPoller(String url, String username, String password) {
        this();
        setConnectionProperties(url, username, password);
    }

    /**
     * Request a change of connection details, using the new parameters passed.
     * 
     * If the new parameters differ from those the current connection (if any)
     * is using, the current connection (if any) will be dropped and the
     * poller will begin trying to establish a new connection if the target
     * status is LISTENING.
     *
     * This call does not change the target status, so the poller will remain
     * stopped if stop() has been called or it has never been started.
     *
     * @warning A call to this method may cause notifications to be missed during the connection change-over.
     *
     * @param url JDBC URL to connect with
     * @param map JDBC properties map to use for credentials and conn params
     */
    public void setConnectionProperties(String url, Properties map) {
        if (map == null)
            throw new NullPointerException("Connection properties map may not be null");
        if (url == null) {
            throw new NullPointerException("URL may not be null");
        }
        poller.setNewConnectionProperties(url, map);
        start();
    }

    /**
     * As setConnectionProperties(String url, Properties map) but with username
     * and password supplied directly instead of via the properties map.
     *
     * @warning A call to this method may cause notifications to be missed during the connection change-over.
     * 
     * @param url JDBC URL to connect with
     * @param username PostgreSQL username
     * @param password PostgreSQL password
     */
    public void setConnectionProperties(String url, String username, String password) {
        Properties props = new Properties();
        if (username != null)
            props.setProperty("user", username);
        if (password != null)
            props.setProperty("password", password);
        setConnectionProperties(url, props);
    }

    /**
     * PgNotificationPoller must periodically check the JDBC connection's receive
     * buffer to see if it has received any notifications. Report how frequently
     * the poller is currently checking for notifications.
     *
     * @see #setPollDelayMs
     *
     * @return delay between polls for notifications, in milliseconds
     */
    public int getPollDelayMs() {
        return pollDelayMs;
    }

    /**
     * PgNotificationPoller must periodically check the JDBC connection's receive
     * buffer to see if it has received any notifications. While this operation
     * doesn't create any client/server communication it's still not entirely free,
     * so the frequency with which notifications are polled for should be adjusted
     * to the app's needs and the rate at which notifications are expected to arrive.
     *
     * @see #getPollDelayMs
     * 
     * @param pollDelayMs delay between polls for notifications, in milliseconds
     */
    public void setPollDelayMs(int pollDelayMs) {
        if (pollDelayMs <= 0) {
            throw new IllegalArgumentException("Poll delay must be > 0");
        }
        this.pollDelayMs = pollDelayMs;
    }

    /**
     * How long the poller will wait between reconnect attempts if conn lost.
     *
     * @see #setReconnectDelayMs
     *
     * @return Delay in ms between connect attempts
     */
    public int getReconnectDelayMs() {
        return reconnectDelayMs;
    }

    /**
     * If the poller's connection drops, it will delay a configurable number
     * of milliseconds before each try to re-establish it after the first
     * attempt.
     *
     * @see #getReconnectDelayMs
     *
     * @param delayMs Delay in ms between connect attempts
     */
    public void setReconnectDelayMs(int delayMs) {
        if (delayMs <= 0) {
            throw new IllegalArgumentException("Reconnect delay must be > 0");
        }
        this.reconnectDelayMs = delayMs;
    }

    /**
     * The poller perioidcally checks whether the connection is alive. Report
     * how long it waits between checks.
     *
     * @see #setValidityTestDelayMs
     *
     * @return Delay in ms between connection validity checks
     */
    public int getValidityTestDelayMs() {
        return validityTestDelayMs;
    }

    /**
     * Merely polling for notifications is not sufficient to detect if the
     * connection has broken; an explicit check that produces client/server
     * chat is required.
     *
     * If set to zero, no validity checking is done.
     *
     * The default is 10 seconds.
     *
     * @see #getValidityTestDelayMs
     * 
     * @param freqMs Delay in ms between connection validity checks
     */
    public void setValidityTestDelayMs(int delayMs) {
        if (delayMs < 0) {
            throw new IllegalArgumentException("Validity test delay must be >= 0");
        }
        this.validityTestDelayMs = delayMs;
    }

    /**
     * How long to wait before giving up on a validity test.
     *
     * @see #setValidityTestTimeoutSeconds
     *
     * @return how long to wait for connection validity test
     */
    public int getValidityTestTimeoutSeconds() {
        return validityTestTimeoutSeconds;
    }

    /**
     * When testing for connection validity a timeout helps avoid the poller
     * blocking forever on dead tcp/ip connections. This reports how long it
     * waits before giving up on the connection, dropping it, and starting
     * to establish a new one.
     * 
     * @see #getValidityTestTimeoutSeconds
     *
     * @param timeout
     */
    public void setValidityTestTimeoutSeconds(int timeout) {
        if (timeout < 0) {
            throw new IllegalArgumentException("Timeout must be >= 0 seconds");
        }
        this.validityTestTimeoutSeconds = timeout;
    }

    /**
     * Ask the poller to poll for notifications as soon as it is next able to.
     * If it's idle and waiting, it will poll immediately, though this call will
     * not block until the poll is completed.
     *
     * Calling pollNow() on a poller in the STOPPED state (whether due to a
     * stop() call, never having been started, or due to a pending reconnect
     * after a connection break) has no effect, and false is returned.
     * 
     * @return true if the pollNow request was acted on, false if it could not be
     */
    public boolean pollNow() {
        return poller.pollNow();
    }

    /**
     * Report the current state of the poller.
     *
     * The poller may be trying to (re)connect, in which case its state will
     * be STOPPED even though it's trying to transition to the target state
     * LISTENING. Similarly, if it's just been asked to stop, it might be
     * LISTENING even though it's in the process of switching to STOPPED.
     *
     * getStatus() tells you whether, at this <i>particular instant</i>, the
     * poller is receiving notifications.
     *
     * @return poller status
     */
    public Status getStatus() {
        return poller.status;
    }

    /**
     * Report the status the poller is trying to be in, which may or may not
     * be the same as its currents status.
     *
     * If getTargetStatus() consistently reports listening, but getStatus()
     * consistently reports stopped, it's likely that there's an issue establishing
     * a connection. Check getLastException() for hints on what might be wrong.
     *
     * See getStatus() for details.
     *
     * @see #stop
     * @see #start
     * @see #getStatus
     * 
     * @return target status of poller
     */
    public Status getTargetStatus(){
        return poller.pub_targetStatus;
    }

    /**
     * Register an object's interest in hearing about notifications received,
     * poller state changes, and names added to or removed from the list being
     * monitored.
     *
     * @see #removeNotificationListener
     *
     * @param listener New object to receive notification events
     */
    public void addNotificationListener(PgNotificationListener listener) {
        // poller.listeners is thread-safe
        poller.listeners.add(listener);
    }

    /**
     * Remove an object from the list of objects interested in hearing about
     * notification-related events.
     *
     * If the passed object is not in the current listener list, no action
     * is taken.
     *
     * @see #addNotificationListener
     * 
     * @param listener Object to stop notifying.
     */
    public void removeNotificationListener(PgNotificationListener listener) {
        // poller.listeners is thread-safe
        poller.listeners.remove(listener);
    }

    /**
     * Clear the list of object to be informed when notifications arrive, state
     * changes, etc.
     *
     * @see #removeNotificationListener
     * @see #addNotificationListener
     */
    public void clearNotificationListeners() {
        // poller.listeners is thread-safe
        poller.listeners.clear();
    }

    /**
     * Add the passed name, which must be a legal PostgreSQL identifier, to the
     * set of notifications that should be LISTENed to.
     *
     * This call won't block waiting for the addition to take effect. Objects
     * listening for notification events will receive a listenersAdded call when
     * the change has been applied by the worker thread. Until then, notifications
     * on this name may not be received.
     *
     * If the target status is STOPPED, this call will not transition it to
     * LISTENING, so it will have no effect until start() is called.
     *
     * @see #unlisten
     * @see #unlistenAll
     * @param name PostgreSQL identifier to LISTEN for notifications on
     */
    public void listen(String name) {
        poller.addListenedName(name);
    }

    /**
     * Remove the passed name from the list of notifications that should be
     * LISTENed to.
     *
     * This call won't block waiting for the removal to take effect. Objects
     * listening for notification events will receive a listenersRemoved call when
     * the change has been applied by the worker thread. It is possible that
     * notifications for this name may be received in the mean time.
     *
     * If the target status is STOPPED, this call will not transition it to
     * LISTENING, so it will have no effect until start() is called.
     *
     * If the name is not currently on the list of names to be listened to,
     * this call has no effect.
     *
     * @see #listen
     * @see #listenAll
     * @param name PostgreSQL identifier to stop listening for notifications on
     */
    public void unlisten(String name) {
        poller.removeListenedName(name);
    }

    /**
     * Remove all names name from the list of notifications that should be
     * LISTENed to, leaving the poller listening for no notifications at all.
     *
     * This call won't block waiting for the removal to take effect. Objects
     * listening for notification events will receive a listenersRemoved call when
     * the change has been applied by the worker thread. It is possible that
     * notifications may be received in the mean time.
     *
     * @see #listen
     * @see #unlisten
     */
    public void unlistenAll() {
        poller.clearListenedNames();
    }

    /**
     * Stop listening for notifications and close down the JDBC connection
     * used for listening.
     *
     * If the target status is already STOPPED, this call has no effect.
     *
     * This call won't block waiting for the change to take effect. Objects
     * listening for notification events will receive a listenersRemoved call
     * and a statusChanged call once the shutdown takes effect. It is possible that
     * notifications may be received in the mean time.
     *
     * @see #getTargetStatus
     */
    public void stop() {
        poller.setTargetStatus(Status.STOPPED);
    }

    /**
     * Begin listening for notification events if not already doing so. If
     * necessary, a new JDBC connection will be created to listen on, using
     * credentials supplied in the ctor or in setCredentials.
     *
     * If the target status is already LISTENING, this call has no effect.
     *
     * @see #getTargetStatus
     */
    public void start() {
        poller.setTargetStatus(Status.LISTENING);
    }

    /**
     * @see PgNotificationExceptionListener
     * @see #setPgNotificationExceptionListener
     * @return current exception listener
     */
    public PgNotificationExceptionListener getPgNotificationExceptionListener() {
        return poller.exceptionListener;
    }

    /**
     * Register an exception listener which will be informed of all
     * SQLExceptions encountered during the operation of the worker
     * thread.
     *
     * It's recommended that you set the listener once during
     * poller setup and leave it.
     *
     * @see PgNotificationExceptionListener
     *
     * @param newExceptionListener
     */
    public void setPgNotificationExceptionListener(PgNotificationExceptionListener newExceptionListener) {
        poller.exceptionListener = newExceptionListener;
    }

    /**
     * Test to see if the JDBC driver is present and loadable, and thow
     * DriverNotFoundException if not.
     *
     * @throws DriverNotFoundException if jdbc driver not loadable
     */
    private void testDriverPresent() {
        try {
            Class.forName(PG_DRIVER_CLASS);
        } catch (ClassNotFoundException e) {
            throw new DriverNotFoundException("Couldn't load JDBC driver " + PG_DRIVER_CLASS, e);
        }
    }

    private class ConnectionPoller extends Thread implements PgNotificationHelper.PgNotificationListener {

        // Simple threading rules:
        // If a var is prefixed by `pub' it must only ever be accessed while the ConnectionPoller
        // monitor is held. No database operations may be performed while the ConnectionPoller monitor is held.
        // Variables not prefixed by `pub' must never be updated by direct external triggers such as
        // outside method calls.

        // `status' must only ever be changed by method private and internal to ConnectionPoller and running
        // on the ConnectionPoller thread, or things will break horribly.
        private Status status = Status.STOPPED;
        // targetStatus must only ever be set by outside callers via setTargetStatus(...).
        private Status pub_targetStatus = Status.STOPPED;

        private final Set<PgNotificationListener> listeners = Collections.synchronizedSet( new HashSet<PgNotificationListener>() );
        private final PgNotificationHelper helper = new PgNotificationHelper();
        private JDBCStatementTimeout connValidTimer;
        private Connection conn;
        private Properties connProperties, pub_newConnProperties;
        private String url, pub_newUrl;
        // Timestamps recording last time of poll and last time of reconnect
        // so that we can honour our delay parameters. Note that these get
        // updated even if the attempted operation fails.
        private long lastPollTimeMs, lastReconnectTimeMs, lastValidityTestMs;

        // Object to be told about any SQLExceptions we hit.
        private PgNotificationExceptionListener exceptionListener;

        private final Set<String> pub_namesToListenTo = new HashSet<String>();
        // *ONLY* test or set this while holding the monitor for namesToListenTo
        private boolean pub_namesToListenToChanged = false;

        public ConnectionPoller() {
            super("PgNotificationPoller$ConnectionPoller");
            setDaemon(true);
            helper.addNotificationListener(this);
            lastPollTimeMs = 0;
            lastReconnectTimeMs = 0;
            JDBCStatementTimeoutThread timeoutThread = new JDBCStatementTimeoutThread();
            connValidTimer = timeoutThread;
            timeoutThread.start();
        }
    
        @Override
        public void run() {
            // Vars to hold copies of state
            boolean needsReconnect, runStart_namesToListenToChanged;
            final Set<String> runStart_namesToListenTo = new HashSet<String>();
            String runStart_url;
            Properties runStart_properties;
            Status runStart_targetStatus;
            // Lifetime loop
            while (true) {
                checkConnWaitTimerStatus();
                // Lock and examine externally-writable state to decide what to do.
                // Some state may need to be copied to permit updates to it
                // to run concurrently with the database work done in this thread.
                synchronized(this) {
                    if (pub_newUrl == null || pub_newConnProperties == null) {
                        // Should never be the case if we're actually run()
                        throw new IllegalStateException("newUrl and/or newConnProperties must never be null if run() called");
                    }
                    // See if there is any work to do, and if there is not, sleep
                    // until there is.
                    needsReconnect = false;
                    runStart_namesToListenToChanged = pub_namesToListenToChanged;
                    // WARNING: runStart_namesToListenTo may not be copied, unless pub_namesToListenToChanged .
                    // otherwise it will be empty and should not be relied on.
                    runStart_namesToListenTo.clear();
                    runStart_url = pub_newUrl;
                    runStart_properties = pub_newConnProperties;
                    runStart_targetStatus = pub_targetStatus;
                    try {
                        if (status == Status.STOPPED && pub_targetStatus == Status.STOPPED) {
                            // We're stopped. Sleep until woken by state change.
                            this.wait();
                            // Re-check to see what to do now
                            continue;
                        } else if (status == Status.STOPPED && pub_targetStatus == Status.LISTENING) {
                            // Needs reconnect, but we're rate limited. Make sure we
                            // sleep to honour the rate limit if needed.
                            needsReconnect = true;
                        } else if (status == Status.LISTENING && pub_targetStatus == Status.STOPPED) {
                            // We've been asked to drop the connection. No sleep needed, continue
                            // execution immediately.
                        } else if (status == Status.LISTENING && pub_targetStatus == Status.LISTENING) {
                            if (  !pub_newUrl.equals(url) || !pub_newConnProperties.equals(connProperties)) {
                                // Have connection properties changed? If so, we'll need to reconnect.
                                needsReconnect = true;
                            } else if (pub_namesToListenToChanged) {
                                // Names to listen to have changed. We'll apply the changes
                                // immediately without waiting the poll delay.
                                //
                                // First, copy names to listen to to reflect current state
                                runStart_namesToListenTo.addAll(pub_namesToListenTo);
                                // Now that we've recorded our intentions, we can mark that
                                // we've seen the changes so far. If something goes
                                // wrong acting on them, we'll relisten() or reconnect,
                                // so the helper will get updated one way or the other.
                                pub_namesToListenToChanged = false;
                            } else {
                                // We're just polling. Delay if necessary before executing poll.
                                long delayMs = (lastPollTimeMs + pollDelayMs) - System.currentTimeMillis();
                                if (delayMs > 0) {
                                    this.wait(delayMs);
                                    // Make sure nothing has changed while we waited,
                                    // such as a pollNow() call or the like.
                                    continue;
                                }
                            }
                        } else {
                            // Unreachable without coding error
                            throw new IllegalStateException("Unreachable code reached, programmer error - new state(s) added?");
                        }
                        if (needsReconnect) {
                            long delayMs = (lastReconnectTimeMs + reconnectDelayMs) - System.currentTimeMillis();
                            if (delayMs > 0) {
                                // Make sure nothing has changed while we waited
                                this.wait(delayMs);
                                continue;
                            }
                        }
                    } catch (InterruptedException e) {
                        // We were woken by something. Re-check state.
                        continue;
                    }

                } // end of synchronized(this) block

                // (From here on, we no longer hold our monitor and may *NOT*
                // safely access the pub_ variables).
                // We're now acting on our earlier decisions. If the public
                // state changes we won't know, and it's too late to do anything
                // about it. It'll get caught next go-around.

                if (needsReconnect) {
                    // By clearing the connection we'll ensure that
                    // status is Status.STOPPED if it wasn't already.
                    clearConnection();
                }
                // Now see if we need to drop the connection or to connect
                // (in the latter case, possibly due to just having dropped
                // the connection above).
                if (status != runStart_targetStatus) {
                    switch (runStart_targetStatus) {
                        case LISTENING:
                            if (status == Status.STOPPED) {
                                connect(runStart_url, runStart_properties);
                            }
                            break;
                        case STOPPED:
                            if (status == Status.LISTENING) {
                                clearConnection();
                            }
                            break;
                    }
                }
                // Now, if we're in the target status (possibly due to work
                // just done above), perform the real work.
                if (status == runStart_targetStatus) {
                    switch (status) {
                        case LISTENING:
                            if ( validityTestDelayMs > 0 && (System.currentTimeMillis() - lastValidityTestMs > validityTestDelayMs) ) {
                                // Time to make sure the connection is still here
                                if (!testConnection()) {
                                    // Connection is broken and has been cleared, return to go and do not collect $200
                                    continue;
                                }
                            }
                            if (runStart_namesToListenToChanged) {
                                updateListeners(runStart_namesToListenTo);
                            }
                            // Now, if we didn't break something by updating listeners, poll()
                            if (status == Status.LISTENING)
                                poll();
                            break;
                        case STOPPED:
                            /* do nothing */
                            break;
                    }
                }
            }
        }

        //------------------ THE FOLLOWING MAY *ONLY* BE CALLED FROM THE ConnectionPoller thread ------------------//

        private void reportSQLException(String msg, SQLException e) {
            final PgNotificationExceptionListener el = exceptionListener;
            if (el != null) {
                el.sqlExceptionEncountered(PgNotificationPoller.this, conn, msg, e);
            }
        }

        private void reportListenerException(String msg, PgNotificationListener l, Throwable e) {
            final PgNotificationExceptionListener el = exceptionListener;
            if (el != null) {
                el.listenerExceptionEncountered(PgNotificationPoller.this, msg, l, e);
            }
        }

        // Returns true if ok, false if problem
        // not synchronized, must not access pub members
        private boolean poll() {
            assert(status == Status.LISTENING);
            assert(Thread.currentThread().equals(this));
            try {
                lastPollTimeMs = System.currentTimeMillis();
                helper.poll();
                return true;
            } catch (SQLException e) {
                reportSQLException("Polling for notifications failed, will try to reconnect", e);
                return false;
            }
        }

        // Returns true if ok, false if problem
        // not synchronized, must not access pub members
        private boolean testConnection() {
            assert(status == Status.LISTENING);
            assert(Thread.currentThread().equals(this));
            lastValidityTestMs = System.currentTimeMillis();
            if (isConnValid(validityTestTimeoutSeconds)) {
                return true;
            }
            clearConnection();
            return false;
        }

        private void checkConnWaitTimerStatus() {
            assert(Thread.currentThread().equals(this));
            // Ensure that no statement is being waited on. If there is one,
            // something has gone horribly wrong, since this should only be
            // called from run() when we're clean and about to go around again.
            assert( ! connValidTimer.isWaiting() );
        }


        /**
         * Test if the connection is valid and return true if so.
         *
         * Should use Connection.isValid() ... if Pg actually implemented it.
         *
         * @param timeoutSeconds delay before giving up on connection
         * @return true if test statement succeeded, false if failed or timeout
         */
        private boolean isConnValid(int timeoutSeconds) {
            assert(Thread.currentThread().equals(this));
            if (conn == null)
                return false;
            try {
                final Statement stmt = conn.createStatement();
                connValidTimer.statementStarting(stmt, timeoutSeconds);
                try {
                    stmt.execute("SELECT 1;");
                } finally {
                    SQLException exc = connValidTimer.statementDone(stmt);
                    stmt.close();
                    if (exc != null) {
                        reportSQLException("Error cancelling timed-out connection validity test statement", exc);
                    }
                }
                return true;
            } catch (SQLException e) {
                reportSQLException("Connection validity test statement failed", e);
                return false;
            }
        }

        // always succeeds in clearing connection
        // not synchronized, must not access pub members
        private void clearConnection() {
            assert(Thread.currentThread().equals(this));
            try {
                helper.setConnection(null);
            } catch (SQLException e) {
                reportSQLException("Problem while unlistening from probably broken connection", e);
            }
            try {
                if (conn != null)
                    conn.close();
            } catch (SQLException e) {
                reportSQLException("Problem while closing probably broken connection", e);
            }
            conn = null;
            setStatus(Status.STOPPED);
        }

        // not synchronized, must not access pub members
        private void connect(String newUrl, Properties newProperties) {
            assert(Thread.currentThread().equals(this));
            if (conn != null)
                clearConnection();
            try {
                assert(status == Status.STOPPED);
                lastReconnectTimeMs = System.currentTimeMillis();
                conn = DriverManager.getConnection(newUrl, newProperties);
                conn.setAutoCommit(true);
                conn.setReadOnly(true);
                conn.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
                // Will relisten as part of connect
                helper.setConnection(conn);
                url = newUrl;
                connProperties = newProperties;
                setStatus(Status.LISTENING);
            } catch (SQLException e) {
                reportSQLException("Could not connect to listening channel", e);
                // Do nothing. Our status hasn't changed, and we'll try
                // to reconnect again in a bit.
            }
        }

        // not synchronized, must not access pub members
        private void setStatus(final Status newStatus) {
            assert(Thread.currentThread().equals(this));
            final Status oldStatus = status;
            this.status = newStatus;
            notifyStatusListeners(oldStatus, newStatus);
        }

        private void notifyStatusListeners(final Status oldStatus, final Status newStatus) {
            assert(Thread.currentThread().equals(this));
            final Status targetStatus = pub_targetStatus;
            if (oldStatus != newStatus) {
                EventQueue.invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        for (PgNotificationListener l : listeners) {
                            try {
                                l.pollerStatusChanged(PgNotificationPoller.this, oldStatus, newStatus, targetStatus);
                            } catch (Throwable t) {
                                reportListenerException("PgNotificationListener.pollerStatusChanged(...)", l, t);
                            }
                        }
                    }
                });
            }
        }



        @Override
        public void listenersAdded(PgNotificationHelper helper, final Collection<String> names) {
            assert(Thread.currentThread().equals(this));
            EventQueue.invokeLater(new Runnable() {
                @Override
                public void run() {
                    for (PgNotificationListener l : listeners) {
                        try {
                            l.listenersAdded(PgNotificationPoller.this, names);
                        } catch (Throwable t) {
                            reportListenerException("PgNotificationListener.listenersAdded(...)", l, t);
                        }
                    }
                }
            });
        }

        @Override
        public void listenersRemoved(PgNotificationHelper helper, final Collection<String> names) {
            assert(Thread.currentThread().equals(this));
            EventQueue.invokeLater(new Runnable() {
                @Override
                public void run() {
                    for (PgNotificationListener l : listeners) {
                        try {
                            l.listenersRemoved(PgNotificationPoller.this, names);
                        } catch (Throwable t) {
                            reportListenerException("PgNotificationListener.listenersRemoved(...)", l, t);
                        }
                    }
                }
            });
        }

        // returns true if all ok, false if conn problem
        // not synchronized, must not access pub members
        private boolean updateListeners(Set<String> namesToListenTo) {
            assert(status == Status.LISTENING);
            assert(Thread.currentThread().equals(this));
            // Copy and clear records of what to do next
            Set<String> toAdd = null, toRemove = null;
            boolean clearAll = false;
            if (namesToListenTo.isEmpty()) {
                clearAll = true;
            } else {
                toAdd = new HashSet<String>(namesToListenTo);
                toAdd.removeAll(helper.getListenedNames());
                toRemove = new HashSet<String>(helper.getListenedNames());
                toRemove.removeAll(namesToListenTo);
            }
            // then update the helper, now that we've released the namesToListenTo lock
            // and can thus perform database work without blocking anybody trying
            // to change the list of names to listen to.
            try {
                if (clearAll) {
                    helper.unlistenAll();
                } else {
                    for (String l : toRemove) {
                        helper.unlisten(l);
                    }
                    for (String l : toAdd) {
                        helper.listen(l);
                    }
                }
            } catch (SQLException e) {
                // Something went wrong while doing our update work.
                reportSQLException("Failed to update listens on helper, trying to relisten", e);
                // Reset the helper to a clean state, supplying the list of names
                // we expect it to have since we don't know what state it landed up in.
                try {
                    // TODO: Be smarter with notifications here
                    // TODO: Notify people here
                    helper.relisten(namesToListenTo);
                } catch (SQLException e2) {
                    reportSQLException("Failed to relisten(), re-setting connection. Notify events may be lost.", e2);
                    clearConnection();
                    return false;
                }
            }
            return true;
        }



        //------------------ THE FOLLOWING MAY *NOT* BE CALLED FROM THE ConnectionPoller thread ------------------//
        
        /**
         * Tell the poller that its desired status has changed.
         * @param newStatus New status to seek
         */
        public synchronized void setTargetStatus(Status newStatus) {
            assert(!Thread.currentThread().equals(this));
            this.pub_targetStatus = newStatus;
            notifyAll();
        }

        /**
         * Inform the poller that the desired connection properties have
         * changed. If the new and old properties actually differ, this will
         * trigger a reconnect.
         *
         * @param map New properties map, which is copied for storage
         */
        public synchronized void setNewConnectionProperties(String url, Properties map) {
            assert(!Thread.currentThread().equals(this));
            this.pub_newUrl = url;
            this.pub_newConnProperties = copyPropertiesIncludingDefaults(map);
        }

        public synchronized void addListenedName(String name) {
            assert(!Thread.currentThread().equals(this));
            pub_namesToListenTo.add(name);
            pub_namesToListenToChanged = true;
        }

        public synchronized void removeListenedName(String name) {
            assert(!Thread.currentThread().equals(this));
            pub_namesToListenTo.remove(name);
            pub_namesToListenToChanged = true;
        }

        public synchronized void clearListenedNames() {
            assert(!Thread.currentThread().equals(this));
            pub_namesToListenTo.clear();
            pub_namesToListenToChanged = true;
        }

        public synchronized boolean pollNow() {
            lastPollTimeMs = 0;
            notifyAll();
            return status == Status.LISTENING;
        }

        //------------------ The following may be called from any thread ------------------//
        // They must not modify any locals at all and must not even access them unless
        // they are guaranteed thread-safe, like `listeners'.

        @Override
        public void notified(PgNotificationHelper helper, final long receivingBackendPid, final List<PGNotification> notices) {
            EventQueue.invokeLater(new Runnable() {
                @Override
                public void run() {
                    for (PgNotificationListener l : listeners) {
                        try {
                            l.notified(PgNotificationPoller.this, receivingBackendPid, notices);
                        } catch (Throwable t) {
                            reportListenerException("PgNotificationListener.notified(...)", l, t);
                        }
                    }
                }
            });
        }


    }

    
    /**
     * Copy the properties map `input' into a new properties map.
     * Any default properties maps are merged into the new map first, so that
     * the returned map is a single properties map containing all the properties
     * of the whole defaults inheritance tree.
     *
     * The result from calling getProperty(...) on the new map will be the same
     * as on the old map.
     *
     * This operation is really, really not fast.
     *
     * @param input
     * @return
     */
    private static Properties copyPropertiesIncludingDefaults(Properties input) {
        Properties newProps = new Properties();
        for (Enumeration e = input.propertyNames(); e.hasMoreElements(); ) {
           final String key = (String) e.nextElement();
           newProps.setProperty(key, input.getProperty(key));
        }
        return newProps;
    }

}

JDBCStatementTimeout.java

import java.sql.SQLException;
import java.sql.Statement;

/**
 * A service capable of registering a statement for cancellation after
 * a timeout and of later cancelling that timer if informed that the
 * statement has finished.
 *
 * @author Craig Ringer <[email protected]>
 */
public interface JDBCStatementTimeout {

    /**
     * Start waiting on the statement `stmt'.
     * 
     * @param stmt Wait for this statement to finish
     * @param timeoutSeconds for this long
     */
    void statementStarting(final Statement stmt, final int timeoutSeconds );

    /**
     * Report that the statement `stmt' has finished or failed. It may have
     * failed due to our own cancellation request. 
     *
     * @param stmt Statement to remove from waiting list
     * @return null unless task was cancelled and cancellation threw, in which case that exception
     */
    SQLException statementDone(Statement stmt);

    /**
     * @return true if and only if waiting for one or more statements
     */
    boolean isWaiting();

    
}

JDBCStatementTimeoutThread.java

import java.sql.SQLException;
import java.sql.Statement;

/**
 * A fairly stupid statement timeout class, capable of tracking
 * only one statement at a time.
 *
 * @author Craig Ringer <[email protected]>
 */
public class JDBCStatementTimeoutThread extends Thread implements JDBCStatementTimeout {
    
    private Statement waitingFor;
    private SQLException lastException;
    long timeRemainingMs;
    long endMillis;

    public JDBCStatementTimeoutThread() {
        setName(JDBCStatementTimeoutThread.class.getName());
        setDaemon(true);
    }

    @Override
    public synchronized void run() {
        while (true) {
            try {
                wait();
            } catch (InterruptedException e) {
                // re-check if there's something to do and reloop if not
            }
            while (waitingFor != null && timeRemainingMs >= 0) {
                timeRemainingMs = endMillis - System.currentTimeMillis();
                try {
                    wait(timeRemainingMs);
                } catch (InterruptedException e) {
                    // recheck and continue
                }
            }
            lastException = null;
            // done waiting or interrupted. Did we time out, or did the task finish?
            if (waitingFor != null) {
                // whoops, timeout!
                try {
                    try {
                        waitingFor.cancel();
                    } finally {
                        waitingFor = null;
                    }
                } catch (SQLException e) {
                    // Not running on the main worker thread so we can't
                    // safely call reportSQLException(...). Note it down
                    // so the main worker thread can check and report
                    // it later.
                    lastException = e;
                }
            }
        }
    }

    @Override
    public synchronized void statementStarting(final Statement stmt, final int timeoutSeconds ) {
        if (waitingFor != null)
            throw new IllegalStateException("Already waiting for completion of statement " + waitingFor);
        waitingFor = stmt;
        timeRemainingMs = timeoutSeconds * 1000;
        endMillis = System.currentTimeMillis() + timeRemainingMs;
        notifyAll();
    }

    @Override
    public synchronized SQLException statementDone(Statement stmt) {
        if (waitingFor != null && waitingFor != stmt) {
            throw new IllegalStateException("Statement reported as done, " + stmt + ", was not statement waited for " + waitingFor);
        }
        SQLException lastExc = lastException;
        waitingFor = null;
        lastException = null;
        notifyAll();
        return lastExc;
    }

    @Override
    public boolean isWaiting() {
        return waitingFor != null;
    }

}

版权

此代码最初由 Craig Ringer <[email protected]> 编写,所有大量的错误投诉和诅咒都应该指向他。