• Views
  • Iteration Report
  • My Iteration Report
  •  
OMERO.server
  • Login
  • Help/Guide
  • About Trac
  • Preferences
  • Wiki
  • Timeline
  • Roadmap
  • Browse Source
  • View Tickets
  • Search

Context Navigation

  • ← Previous Changeset
  • Next Changeset →

Changeset 3266

Show
Ignore:
Timestamp:
01/12/09 08:49:32 (6 months ago)
Author:
jmoore
Message:

OmeroClustering : Moving away from jgroups to straight multicast/jdbc approach

The ReplicatedHashMap implementation used from the org.jgroups.blocks
package had issues with our usage of the map. Since we are already
guaranteed a share storage (the db) for all blitz instances within
a single cluster, we are now using Ice's multicast support (from 3.3)
as well as simpleJdbcTemplate to manage the same information that was
previously in jgroups.
(cherry-pick from commit e50eed4204c2a1d360c2b261b5e6a7618573ba65)

Location:
trunk
Files:
3 added
9 modified

  • components/blitz/resources/ome/services/blitz-config.xml (modified) (1 diff)
  • components/blitz/resources/omero/Internal.ice (added)
  • components/blitz/src/ome/services/blitz/fire/Discovery.java (added)
  • components/blitz/src/ome/services/blitz/fire/Ring.java (modified) (15 diffs)
  • components/blitz/src/ome/services/blitz/util/BlitzConfiguration.java (modified) (4 diffs)
  • components/blitz/test/ome/services/blitz/test/ClusteredRingTest.java (added)
  • components/blitz/test/ome/services/blitz/test/mock/MockFixture.java (modified) (6 diffs)
  • components/blitz/test/ome/services/blitz/test/utests/RingTest.java (modified) (3 diffs)
  • components/server/resources/ome/services/datalayer.xml (modified) (1 diff)
  • components/server/resources/ome/services/hibernate.xml (modified) (3 diffs)
  • components/server/src/ome/tools/spring/JotmFactoryBean.java (modified) (2 diffs)
  • etc/internal.cfg (modified) (1 diff)

Legend:

Unmodified
Added
Removed
  • trunk/components/blitz/resources/ome/services/blitz-config.xml

    r3239 r3266  
    3030    <constructor-arg ref="securitySystem"/> 
    3131    <constructor-arg ref="executor"/> 
     32    <constructor-arg ref="ring"/> 
    3233  </bean> 
    3334 
    34   <bean id="ring" factory-bean="configuration" factory-method="getRing"/> 
     35  <bean id="ring" class="ome.services.blitz.fire.Ring"> 
     36      <constructor-arg ref="simpleJdbcTemplate"/> 
     37  </bean> 
     38 
    3539  <bean id="Ice.Communicator" factory-bean="configuration" factory-method="getCommunicator"/> 
    3640  <bean id="BlitzAdapter" factory-bean="configuration" factory-method="getBlitzAdapter"/> 
  • trunk/components/blitz/src/ome/services/blitz/fire/Ring.java

    r3264 r3266  
    88package ome.services.blitz.fire; 
    99 
     10import java.sql.ResultSet; 
     11import java.sql.SQLException; 
    1012import java.util.ArrayList; 
    11 import java.util.Arrays; 
    1213import java.util.HashSet; 
    1314import java.util.List; 
    14 import java.util.Map; 
    1515import java.util.Set; 
    1616import java.util.UUID; 
    17 import java.util.Vector; 
    1817 
    1918import ome.services.blitz.util.BlitzConfiguration; 
    2019import ome.services.messages.CreateSessionMessage; 
    2120import ome.services.messages.DestroySessionMessage; 
     21import ome.services.sessions.SessionManager; 
     22import omero.internal.ClusterPrx; 
     23import omero.internal.ClusterPrxHelper; 
     24import omero.internal.DiscoverCallbackPrx; 
     25import omero.internal._ClusterDisp; 
    2226 
    2327import org.apache.commons.logging.Log; 
    2428import org.apache.commons.logging.LogFactory; 
    25 import org.jgroups.Address; 
    26 import org.jgroups.ChannelFactory; 
    27 import org.jgroups.JChannelFactory; 
    28 import org.jgroups.View; 
    2929import org.jgroups.blocks.ReplicatedHashMap; 
    3030import org.springframework.context.ApplicationEvent; 
     31import org.springframework.context.ApplicationEventPublisher; 
     32import org.springframework.context.ApplicationEventPublisherAware; 
    3133import org.springframework.context.ApplicationListener; 
    3234import org.springframework.context.event.ContextClosedEvent; 
     35import org.springframework.dao.DataIntegrityViolationException; 
     36import org.springframework.jdbc.BadSqlGrammarException; 
     37import org.springframework.jdbc.core.simple.ParameterizedRowMapper; 
     38import org.springframework.jdbc.core.simple.SimpleJdbcTemplate; 
    3339 
    3440import Glacier2.CannotCreateSessionException; 
    … …  
    3642import Glacier2.SessionManagerPrxHelper; 
    3743import Glacier2.SessionPrx; 
     44import Ice.Current; 
    3845 
    3946/** 
    4047 * Distributed ring of {@link BlitzConfiguration} objects which manages lookups 
    4148 * of sessions and other resources from all the blitzes which take part in the 
    42  * ring. Membership in the {@link Ring} is based on a single token -- 
     49 * cluster. Membership in the {@link Ring} is based on a single token -- 
    4350 * "omero.instance" -- retrieved from the current context, or if missing, a 
    4451 * calculated value which will prevent this instance from taking part in 
    … …  
    4956 *@since Beta4 
    5057 */ 
    51 public class Ring implements ReplicatedHashMap.Notification<String, String>, 
    52         ApplicationListener { 
     58public class Ring extends _ClusterDisp implements ApplicationListener, ApplicationEventPublisherAware { 
    5359 
    5460    private final static Log log = LogFactory.getLog(Ring.class); 
    … …  
    6066    private final static String SESSIONS = "session-"; 
    6167 
    62     private final String uuid = UUID.randomUUID().toString(); 
    63  
     68    /** 
     69     * UUID for this cluster node. Used to uniquely identify the session manager 
     70     * in this blitz instance. 
     71     */ 
     72    public final String uuid = UUID.randomUUID().toString(); 
     73 
     74    private final SimpleJdbcTemplate jdbc; 
     75 
     76    private/*final */ApplicationEventPublisher publisher; 
     77     
    6478    private/* final */Ice.Communicator communicator; 
    6579 
     80    /** 
     81     * Multicast-based adapter solely for cluster communication. 
     82     */ 
     83    private/* final */Ice.ObjectAdapter clusterAdapter; 
     84 
     85    /** 
     86     * Standard blitz adapter which is used for the callback. 
     87     */ 
     88    private/* final */Ice.ObjectAdapter adapter; 
     89 
     90    /** 
     91     * {@link Ice.ObjectPrx} for this {@link Ring} instance as added to the 
     92     * {@link #adapter} in the {@link #init(Ice.ObjectAdapter, String)} method. 
     93     */ 
     94    private/* final */Ice.ObjectPrx ownProxy; 
     95 
     96    /** 
     97     * Multicast (datagram) proxy to all other cluster nodes. 
     98     */ 
     99    private/* final */ClusterPrx cluster; 
     100     
     101    /** 
     102     * Direct proxy value to the {@link SessionManager} in this blitz instance. 
     103     */ 
    66104    private/* final */String directProxy; 
    67105 
    68     private final ReplicatedHashMap<String, String> map; 
    69  
    70     private final String groupname; 
    71  
    72     public static String determineGroupName() { 
    73         String defaultValue = "Runtime" + Runtime.getRuntime().hashCode(); 
    74         String environValue = System.getenv("OMERO_INSTANCE"); 
    75         if (environValue == null || environValue.length() == 0) { 
    76             environValue = defaultValue; 
    77         } 
    78         return environValue; 
    79     } 
    80  
    81     public Ring(String props) { 
    82         this(determineGroupName(), props); 
    83     } 
    84  
    85     public Ring(String groupname, String props) { 
    86         this.groupname = groupname; 
     106    public Ring(SimpleJdbcTemplate jdbc) { 
     107        this.jdbc = jdbc; 
    87108        try { 
    88             ChannelFactory factory = new JChannelFactory(); 
    89  
    90             map = new ReplicatedHashMap<String, String>(groupname, factory, 
    91                     props, 10000); 
    92             map.setBlockingUpdates(true); 
    93             map.addNotifier(this); 
    94             map.start(1000); 
     109            jdbc.update("create table session_ring " 
     110                    + "(key varchar unique, value varchar)"); 
     111            log.info("Created OMERO.cluster table"); 
     112        } catch (BadSqlGrammarException bsge) { 
     113            // Here we assume that this means that the table 
     114            // already exists. 
     115            log.info("session_ring already exists"); 
     116        } 
     117    } 
     118 
     119    /** 
     120     * Passed to the {@link Discovery} instance for signalling completion from 
     121     * its background {@link Thread}  
     122     */ 
     123    public void setApplicationEventPublisher(ApplicationEventPublisher arg0) { 
     124        this.publisher = arg0; 
     125    } 
     126     
     127    // Configuration and cluster usage 
     128    // ========================================================================= 
     129 
     130    /** 
     131     * Typically called from within {@link BlitzConfiguration} after the 
     132     * communicator and adapter have been properly setup. 
     133     */ 
     134    public void init(Ice.ObjectAdapter adapter, String directProxy) { 
     135        this.adapter = adapter; 
     136        this.communicator = adapter.getCommunicator(); 
     137        this.directProxy = directProxy; 
     138 
     139        // The cluster we belong to. 
     140        Ice.ObjectPrx prx = this.communicator.propertyToProxy("ClusterProxy"); 
     141        prx = prx.ice_datagram(); 
     142        cluster = ClusterPrxHelper.uncheckedCast(prx); 
     143         
     144        // Before we add our self we check the validity of the cluster. 
     145        checkClusterAndAddSelf(); 
     146        put(MANAGERS + uuid, directProxy); 
     147        putIfAbsent(CONFIG + "redirect", uuid); 
     148        log.info("Current redirect: " + getRedirect()); 
     149    } 
     150 
     151    /** 
     152     * Method called during initialization to get all the active uuids within 
     153     * the cluster, and remove any dead nodes. After this, we add this instance 
     154     * to the cluster. 
     155     */ 
     156    protected void checkClusterAndAddSelf() { 
     157 
     158        // Now create the call back and start it 
     159        // The instance registers and removes itself. 
     160        Discovery discovery = new Discovery(adapter, this, this.publisher); 
     161        try { 
     162            cluster.discover(discovery.getProxy()); 
     163            new Thread(discovery).start(); 
     164        } catch (Ice.NoEndpointException nee) { 
     165            log.warn("No cluster endpoints found", nee); 
     166        } 
     167 
     168        // Now our checking is done, add ourselves. 
     169        this.clusterAdapter = this.communicator.createObjectAdapter("Cluster"); 
     170        ownProxy = this.clusterAdapter.add(this, this.communicator 
     171                .stringToIdentity("Cluster")); 
     172        this.clusterAdapter.activate(); 
     173    } 
     174 
     175    /** 
     176     * Returns the uuid for this manager instance. Other instances will use this 
     177     * value to keep the session_ring table in sync. 
     178     */ 
     179    public void discover(DiscoverCallbackPrx cb, Current __current) { 
     180        try { 
     181            cb.clusterNodeUuid(uuid); 
     182            log.info("Sent cluster node uuid: " + uuid); 
    95183        } catch (Exception e) { 
    96             throw new RuntimeException("Could not start ring: ", e); 
    97         } 
    98         log.info("Initialized ring in group " + groupname); 
    99  
    100     } 
    101  
    102     public Ring() { 
    103         this("session_ring.xml"); 
    104     } 
    105  
    106     /** 
    107      *  
    108      * @param bc 
    109      */ 
    110     public void init(Ice.Communicator communicator, String directProxy) { 
    111         this.communicator = communicator; 
    112         this.directProxy = directProxy; 
    113         map.put(MANAGERS + uuid, directProxy); 
    114         map.putIfAbsent(CONFIG + "redirect", directProxy); 
    115         log.info("Current redirect: " + getRedirect()); 
    116     } 
    117  
     184            log.warn("Exception while sending cluster node uuid: "+uuid, e); 
     185        } 
     186    } 
     187 
     188    /** 
     189     * Called when any node goes down. First we try to remove any redirect for that 
     190     * instance. Then we try to install ourselves. 
     191     */ 
     192    public void down(String downUuid, Current __current) { 
     193        removeIfEquals(CONFIG+"redirect", downUuid); 
     194        if (putIfAbsent(CONFIG+"redirect", this.uuid)) { 
     195            log.info("Installed self as new redirect: "+uuid); 
     196        } 
     197    } 
     198     
    118199    public void destroy() { 
    119200        try { 
    120             log.info("Shutting down ring in group " + groupname); 
    121             map.remove(MANAGERS + uuid); 
    122             map.stop(); 
     201            this.clusterAdapter.deactivate(); 
     202            cluster.down(this.uuid); 
     203            remove(MANAGERS + uuid); 
     204            removeIfEquals(CONFIG+"redirect", uuid); 
     205            log.info("Disconnected from OMERO.cluster"); 
    123206        } catch (Exception e) { 
    124207            log.error("Error stopping ring " + this, e); 
    … …  
    126209    } 
    127210 
    128     // Our usage 
     211    // Local usage 
    129212    // ========================================================================= 
    130213 
    131214    public boolean checkPassword(String userId) { 
    132         return map.get(SESSIONS + userId) != null; 
     215        boolean rv = get(SESSIONS + userId) != null; 
     216        log.info(String.format("Checking password: %s [%s]", userId, rv)); 
     217        return rv; 
    133218    } 
    134219 
    … …  
    140225     */ 
    141226    public String getRedirect() { 
    142         String redirect = map.get(CONFIG + "redirect"); 
     227        String redirect = get(CONFIG + "redirect"); 
    143228        return redirect; 
    144229    } 
    … …  
    150235     * value is set. In either case, the previous value is returned. 
    151236     */ 
    152     public String putRedirect(String uuidOrProxy) { 
    153         String oldValue; 
     237    public void putRedirect(String uuidOrProxy) { 
    154238        if (uuidOrProxy == null || uuidOrProxy.length() == 0) { 
    155             oldValue = map.remove(CONFIG + "redirect"); 
     239            remove(CONFIG + "redirect"); 
    156240        } else { 
    157             oldValue = map.put(CONFIG + "redirect", uuidOrProxy); 
    158         } 
    159         return oldValue; 
     241            put(CONFIG + "redirect", uuidOrProxy); 
     242        } 
    160243    } 
    161244 
    … …  
    177260                proxyString = null; 
    178261            } else { 
    179                 if (!map.containsKey(MANAGERS + redirect)) { 
     262                if (!containsKey(MANAGERS + redirect)) { 
    180263                    log.warn("No proxy found for manager: " + redirect); 
    181264                } else { 
    182                     proxyString = map.get(MANAGERS + redirect); 
     265                    proxyString = get(MANAGERS + redirect); 
    183266                    log.info("Resolved redirect to: " + proxyString); 
    184267                } 
    … …  
    190273 
    191274            // Check if the session is in ring 
    192             proxyString = map.get(SESSIONS + userId); 
     275            proxyString = get(SESSIONS + userId); 
    193276            if (proxyString != null && !proxyString.equals(directProxy)) { 
    194277                log.info(String.format("Returning remote session on %s", 
    … …  
    198281            // or needs to be load balanced 
    199282            else { 
    200                 double IMPOSSIBLE = 309340.0; 
     283                double IMPOSSIBLE = 314159.0; 
    201284                if (Math.random() > IMPOSSIBLE) { 
    202285                    Set<String> values = filter(MANAGERS); 
    … …  
    209292                            List v = new ArrayList(values); 
    210293                            proxyString = (String) v.get(idx); 
    211                             proxyString = map.get(MANAGERS + proxyString); 
     294                            proxyString = get(MANAGERS + proxyString); 
    212295                            log.info(String.format("Load balancing to %s", 
    213296                                    proxyString)); 
    … …  
    238321    } 
    239322 
     323    public Set<String> knownManagers() { 
     324        Set<String> managers = filter(MANAGERS); 
     325        Set<String> rv = new HashSet<String>(); 
     326        for (String manager : managers) { 
     327            manager = manager.replaceFirst(MANAGERS, ""); 
     328            rv.add(manager); 
     329        } 
     330        return rv; 
     331    } 
     332     
     333    public void assertNodes(Set<String> nodeUuids) { 
     334        Set<String> managers = knownManagers(); 
     335        for (String manager : managers) { 
     336            System.out.println(manager+"?="+nodeUuids); 
     337            if (!nodeUuids.contains(manager)) { 
     338                purgeNode(manager); 
     339            } 
     340        } 
     341    } 
     342 
     343    protected void purgeNode(String manager) { 
     344        log.info("Purging node: " + manager); 
     345        int count = jdbc.update("delete from session_ring where key like '" 
     346                + SESSIONS + "' and value = ?", manager); 
     347        log.info("Removed " + count + " sessions with value " + manager); 
     348        count = jdbc.update("delete from session_ring where key = ?", MANAGERS 
     349                + manager); 
     350        log.info("Removed " + MANAGERS + manager); 
     351        count = jdbc.update( 
     352                "delete from session_ring where key = ? and value = ?", CONFIG 
     353                        + "redirect", manager); 
     354        if (count != 0) { 
     355            log.info("Removed redirect to " + manager); 
     356        } 
     357        putRedirect(uuid); 
     358    } 
     359 
    240360    // Events 
    241361    // ========================================================================= 
    … …  
    243363    public void onApplicationEvent(ApplicationEvent arg0) { 
    244364        if (arg0 instanceof CreateSessionMessage) { 
    245             String uuid = ((CreateSessionMessage) arg0).getSessionId(); 
    246             map.put(SESSIONS + uuid, directProxy); 
     365            String session = ((CreateSessionMessage) arg0).getSessionId(); 
     366            put(SESSIONS + session, uuid); // Use our uuid rather than proxy 
    247367        } else if (arg0 instanceof DestroySessionMessage) { 
    248             String uuid = ((DestroySessionMessage) arg0).getSessionId(); 
    249             map.remove(SESSIONS + uuid); 
     368            String session = ((DestroySessionMessage) arg0).getSessionId(); 
     369            remove(SESSIONS + session); 
    250370        } else if (arg0 instanceof ContextClosedEvent) { 
    251371            // This happens 3 times for each nested context. Perhaps we 
    … …  
    254374    } 
    255375 
    256     // Notification interface 
     376    // Map interface 
    257377    // ========================================================================= 
    258  
    259     public void contentsCleared() { 
    260         log.info("cleared"); 
    261     } 
    262  
    263     public void contentsSet(Map<String, String> arg0) { 
    264         log.info("set contents:" + arg0); 
    265     } 
    266  
    267     public void entryRemoved(String arg0) { 
    268         log.info("remove:" + arg0); 
    269     } 
    270  
    271     public void entrySet(String arg0, String arg1) { 
    272         log.info("set:" + arg0 + "=" + arg1); 
    273     } 
    274  
    275     public void viewChange(View arg0, Vector<Address> arg1, Vector<Address> arg2) { 
    276         log.info("view change:" + arg0); 
    277     } 
    278  
    279     // Main 
    280     // ========================================================================= 
    281  
    282     public static void main(String[] args) throws Exception { 
    283  
    284         Ring ring = new Ring(); 
     378    // If the implementation of this class needs to be changed, these should be 
     379    // the only methods which need to be replaced. i.e. subclass implementors 
     380    // may want to start here. 
     381 
     382    public boolean containsKey(String key) { 
     383        int count = jdbc.queryForInt("select count(key) " 
     384                + "from session_ring " + "where key = ?", key); 
     385        return count > 0; 
     386    } 
     387 
     388    public String get(String key) { 
     389        String value = (String) jdbc.queryForObject("select value " 
     390                + "from session_ring " + "where key = ?", String.class, key); 
     391        return value; 
     392    } 
     393 
     394    public void put(String key, String value) { 
     395        boolean wasPut = putIfAbsent(key, value); 
     396        if (!wasPut) { 
     397            int count = jdbc.update( 
     398                    "update session_ring set value = ? where key = ?", value, 
     399                    key); 
     400            if (count == 0) { 
     401                log.info("Key not found for update: " + key); 
     402            } else { 
     403                log.info(String.format("Updated key %s with value %s", key, 
     404                        value)); 
     405            } 
     406        } 
     407    } 
     408 
     409    public boolean putIfAbsent(String key, String value) { 
    285410        try { 
    286  
    287             if (args == null || args.length == 0) { 
    288                 StringBuilder sb = new StringBuilder(); 
    289                 sb.append("java Ring [sessions | config | redirect [value]"); 
    290                 sb.append("\n"); 
    291                 sb.append("  print fqn        -    print all values at fqn\n"); 
    292                 sb.append("  redirect         -    print current redirect\n"); 
    293                 sb 
    294                         .append("  redirect [value] -    set current redirect. Empty removes\n"); 
    295                 sb.append(" \n"); 
    296                 System.out.println(sb.toString()); 
    297             } 
    298  
    299             if (args[0].equals("print")) { 
    300                 if (args.length > 1) { 
    301                     for (int i = 1; i < args.length; i++) { 
    302                         ring.printTree(args[i]); 
     411            int count = jdbc.update( 
     412                    "insert into session_ring (key, value) values (?, ?)", key, 
     413                    value); 
     414            if (count > 0) { 
     415                log.info(String.format("Put new key: %s with value: %s ", key, 
     416                        value)); 
     417                return true; 
     418            } 
     419        } catch (DataIntegrityViolationException dive) { 
     420            // The key already exists in the table. This is therefore 
     421            // a no-op 
     422        } 
     423        return false; 
     424    } 
     425 
     426    /** 
     427     * Removes a key from the session_ring table if present. 
     428     *  
     429     * @param key 
     430     */ 
     431    public boolean remove(String key) { 
     432        int count = jdbc.update("delete from session_ring where key = ?", key); 
     433        if (count == 0) { 
     434            log.info("Key not found to remove: " + key); 
     435            return false; 
     436        } else { 
     437            log.info("Removed key: " + key); 
     438            return true; 
     439        } 
     440    } 
     441 
     442    /** 
     443     * Used to remove a key/value pair iff the value equals the value give here. 
     444     */ 
     445    public boolean removeIfEquals(String key, String value) { 
     446        int count = jdbc 
     447                .update("delete from session_ring where key = ? and value = ?", key, value); 
     448        if (count == 0) { 
     449            log.info("Key and value do not match: " + key + "=" + value); 
     450            return false; 
     451        } else { 
     452            log.info("Removed key: " + key + " since value matched: " + value); 
     453            return true; 
     454        } 
     455    } 
     456 
     457    public List<String> keySet() { 
     458        return jdbc.query("select key from session_ring", 
     459                new ParameterizedRowMapper<String>() { 
     460                    public String mapRow(ResultSet arg0, int arg1) 
     461                            throws SQLException { 
     462                        return arg0.getString(1); 
    303463                    } 
    304                 } else { 
    305                     ring.printAll(); 
    306                 } 
    307             } else if (args[0].equals("redirect")) { 
    308                 if (args.length > 1) { 
    309                     String value = args[1]; 
    310                     ring.putRedirect(value); 
    311                 } else { 
    312                     ring.printRedirect(); 
    313                 } 
    314             } else if (args[0].equals("raw")) { 
    315                 System.out.println(ring.map); 
    316             } 
    317  
    318         } finally { 
    319             ring.destroy(); 
    320         } 
    321     } 
    322  
    323     public void printAll() { 
    324         for (String fqn : Arrays.asList(SESSIONS, MANAGERS, CONFIG)) { 
    325             printTree(fqn); 
    326         } 
    327     } 
    328  
    329     public void printTree(String prefix) { 
    330         System.out.println("===== " + prefix + " ====="); 
    331         Set<String> keys = filter(prefix); 
    332         if (keys != null && keys.size() > 0) { 
    333             for (String key : keys) { 
    334                 System.out.println(String.format("%s\t%s", key, map.get(key))); 
    335             } 
    336         } else { 
    337             System.out.println("(empty)"); 
    338         } 
    339     } 
    340  
    341     public void printRedirect() { 
    342         Object uuidOrProxy = map.get(CONFIG + "redirect"); 
    343         if (uuidOrProxy != null && uuidOrProxy.toString().length() > 0) { 
    344             System.out.println(uuidOrProxy.toString()); 
    345         } 
     464                }); 
    346465    } 
    347466 
    … …  
    350469 
    351470    private Set<String> filter(String prefix) { 
    352         Set<String> values = new HashSet<String>(map.keySet()); 
     471        Set<String> values = new HashSet<String>(keySet()); 
    353472        Set<String> remove = new HashSet<String>(); 
    354473        for (String value : values) { 
  • trunk/components/blitz/src/ome/services/blitz/util/BlitzConfiguration.java

    r3264 r3266  
    7070     * should be careful to check for nulls. 
    7171     */ 
    72     public BlitzConfiguration( 
     72    public BlitzConfiguration(Ring ring, 
    7373            ome.services.sessions.SessionManager sessionManager, 
    7474            SecuritySystem securitySystem, Executor executor) 
    7575            throws RuntimeException { 
    76         this(createId(), sessionManager, securitySystem, executor); 
     76        this(createId(), ring, sessionManager, securitySystem, executor); 
    7777    } 
    7878 
    … …  
    8989     * @throws RuntimeException 
    9090     */ 
    91     public BlitzConfiguration(Ice.InitializationData id, 
     91    public BlitzConfiguration(Ice.InitializationData id, Ring ring, 
    9292            ome.services.sessions.SessionManager sessionManager, 
    9393            SecuritySystem securitySystem, Executor executor) 
    … …  
    9797 
    9898        this.id = id; 
    99         this.blitzRing = new Ring(); 
     99        this.blitzRing = ring; 
    100100        this.communicator = createCommunicator(); 
    101101 
    … …  
    112112            managerDirectProxy = blitzAdapter.createDirectProxy(managerId()); 
    113113 
    114             // Must inject configuration before starting the adapter 
    115             blitzRing.init(communicator, communicator 
     114            blitzAdapter.activate(); 
     115 
     116            // When using adapter methods from within the ring, it is necessary 
     117            // to start the adapter first. 
     118            blitzRing.init(blitzAdapter, communicator 
    116119                    .proxyToString(getDirectProxy())); 
    117             blitzAdapter.activate(); 
     120 
    118121        } catch (RuntimeException e) { 
    119122            destroy(); 
  • trunk/components/blitz/test/ome/services/blitz/test/mock/MockFixture.java

    r3239 r3266  
    2727import omero.model.PermissionsI; 
    2828import omero.util.ObjectFactoryRegistrar; 
    29  
    3029import org.jmock.Mock; 
    3130import org.jmock.MockObjectTestCase; 
    3231import org.springframework.aop.target.HotSwappableTargetSource; 
     32import org.springframework.jdbc.core.simple.SimpleJdbcTemplate; 
    3333 
    3434import Ice.InitializationData; 
    … …  
    4444    public final SessionManager mgr; 
    4545    public final SecuritySystem ss; 
     46    public final SimpleJdbcTemplate jdbc; 
    4647 
    4748    public static OmeroContext basicContext() { 
    … …  
    4950                "classpath:ome/services/blitz-servantDefinitions.xml", 
    5051                "classpath:ome/services/throttling/throttling.xml", 
    51                 "classpath:ome/services/messaging.xml" }); 
     52                "classpath:ome/services/messaging.xml", 
     53                "classpath:ome/services/datalayer.xml", 
     54                "classpath:ome/config.xml"}); 
    5255    } 
    5356 
    … …  
    7174        this.ss = (SecuritySystem) ctx.getBean("securitySystem"); 
    7275        this.mgr = (SessionManager) ctx.getBean("sessionManager"); 
    73  
     76        this.jdbc = (SimpleJdbcTemplate) ctx.getBean("simpleJdbcTemplate"); 
     77         
    7478        // -------------------------------------------- 
    7579 
    … …  
    9397        // Basic configuration 
    9498        id.properties.setProperty("BlitzAdapter.Endpoints","default -h 127.0.0.1"); 
    95  
    96         blitz = new BlitzConfiguration(id, mgr, ss, ex); 
     99        // Cluster configuration from etc/internal.cfg 
     100        id.properties.setProperty("Cluster.Endpoints","udp -h 224.0.0.5 -p 10000"); 
     101        id.properties.setProperty("ClusterProxy","Cluster:udp -h 224.0.0.5 -p 10000"); 
     102         
     103         
     104        Ring ring = new Ring(jdbc); 
     105        ring.setApplicationEventPublisher(ctx); 
     106         
     107        blitz = new BlitzConfiguration(id, ring, mgr, ss, ex); 
    97108        this.sm = (SessionManagerI) blitz.getBlitzManager(); 
    98109        // The following is a bit of spring magic so that we can configure 
    … …  
    105116 
    106117    public void tearDown() { 
    107         this.blitz.getCommunicator().destroy(); 
    108         this.ctx.closeAll(); 
     118        this.blitz.destroy(); 
     119        // this.ctx.closeAll(); 
    109120    } 
    110121 
  • trunk/components/blitz/test/ome/services/blitz/test/utests/RingTest.java

    r3264 r3266  
    77package ome.services.blitz.test.utests; 
    88 
     9import javax.sql.DataSource; 
     10 
    911import ome.services.blitz.fire.Ring; 
     12import ome.system.OmeroContext; 
    1013 
    11 import org.jgroups.blocks.ReplicatedTree; 
    1214import org.jmock.Mock; 
    1315import org.jmock.MockObjectTestCase; 
    14 import org.testng.annotations.AfterClass; 
    15 import org.testng.annotations.BeforeClass; 
     16import org.springframework.jdbc.core.simple.SimpleJdbcTemplate; 
    1617import org.testng.annotations.BeforeTest; 
    1718import org.testng.annotations.Test; 
    … …  
    2324public class RingTest extends MockObjectTestCase { 
    2425 
    25     ReplicatedTree tree1, tree2; 
     26    OmeroContext ctx; 
     27    SimpleJdbcTemplate jdbc; 
     28    Ice.ObjectAdapter oa; 
    2629    Ice.Communicator ic; 
    27     Mock mockIc;  
    28      
    29     @BeforeClass 
    30     public void setup() throws Exception { 
    31         tree1 = new ReplicatedTree("test", "session_ring.xml", 1000); 
    32         tree2= new ReplicatedTree("test", "session_ring.xml", 1000); 
    33         tree1.start(); 
    34         tree2.start(); 
    35     } 
    36      
     30    Mock mockIc, mockOa; 
     31 
    3732    @BeforeTest 
    3833    public void setupMethod() throws Exception { 
    3934        mockIc = mock(Ice.Communicator.class); 
    4035        ic = (Ice.Communicator) mockIc.proxy(); 
     36        mockOa = mock(Ice.ObjectAdapter.class); 
     37        oa = (Ice.ObjectAdapter) mockOa.proxy(); 
     38        ctx = new OmeroContext(new String[] { 
     39                "classpath:ome/config.xml", 
     40                "classpath:ome/services/datalayer.xml" }); 
     41        DataSource dataSource = (DataSource) ctx.getBean("dataSource"); 
     42        jdbc = new SimpleJdbcTemplate(dataSource); 
     43 
    4144    } 
    42      
    43     @AfterClass 
    44     public void tearDown() { 
    45         tree1.stop(); 
    46         tree2.stop(); 
    47     } 
    48      
    49     //@Test 
    50     public void testMain() throws Exception { 
    51         Ring.main(new String[]{}); 
    52     } 
    53      
     45 
    5446    @Test 
    5547    public void testFirstTakesOver() throws Exception { 
    56         Ring one = new Ring("takeover", "session_ring.xml"); 
    57         one.init(ic, "one"); 
     48        Ring one = new Ring(jdbc); 
     49        one.setApplicationEventPublisher(ctx); 
     50        one.init(oa, "one"); 
    5851        assertEquals("one", one.getRedirect()); 
    59         Ring two = new Ring("takeover", "session_ring.xml"); 
    60         two.init(ic, "two"); 
     52        Ring two = new Ring(jdbc); 
     53        two.setApplicationEventPublisher(ctx); 
     54        two.init(oa, "two"); 
    6155        assertEquals("one", two.getRedirect()); 
    6256    } 
    … …  
    6660        fail(); 
    6761    } 
    68      
     62 
    6963    @Test 
    7064    public void testHandlesMissingServers() throws Exception { 
    7165        fail(); 
    7266    } 
    73      
     67 
    7468    @Test 
    7569    public void testRemovesUnreachable() throws Exception { 
    7670        fail(); 
    7771    } 
    78      
     72 
    7973    @Test 
    8074    public void testReaddsSelfIfTemporarilyUnreachable() throws Exception { 
    8175        fail(); 
    8276    } 
    83      
     77 
    8478    @Test 
    85     public void testLotsOfCalls() throws Exception { 
    86         int i = 0; 
    87         int j = 0; 
    88         long start = System.currentTimeMillis(); 
    89         for (int k = 0; k < 100; k++) { 
    90             i++; j++; 
    91             tree1.put("/1", i+"", i+""); 
    92             tree2.put("/2", j+"", j+""); 
    93         } 
    94  
    95  
    96         assertEquals(100, tree2.getKeys("/1").size()); 
    97         assertEquals(100, tree1.getKeys("/2").size()); 
    98      
    99     } 
    100      
    101     @Test 
    102     public void testPrintSessions() throws Exception { 
    103         Ring ring = new Ring("test", "session_ring.xml"); 
    104         ring.printTree("/SESSIONS"); 
    105         ring.destroy(); 
     79    public void testAllSessionRemovedIfDiscoveryFails() throws Exception { 
     80        fail(); 
    10681    } 
    10782 
     83    @Test 
     84    public void testAllSessionsReassertedIfSessionComesBackOnline() 
     85            throws Exception { 
     86        fail(); 
     87    } 
    10888} 
  • trunk/components/server/resources/ome/services/datalayer.xml

    r3265 r3266  
    6969    </property> 
    7070  </bean> 
    71  
     71   
     72  <bean id="transactionTemplate" class="org.springframework.transaction.support.TransactionTemplate"> 
     73    <constructor-arg ref="transactionManager"/> 
     74  </bean> 
     75   
     76  <bean id="simpleJdbcTemplate" class="org.springframework.jdbc.core.simple.SimpleJdbcTemplate"> 
     77    <description> 
     78    Scope: private 
     79     
     80    This template uses session.connection() for data access. See javadocs for 
     81    more info. 
     82    </description> 
     83    <constructor-arg ref="dataSource"/> 
     84  </bean> 
     85   
    7286  <bean id="unsafeJdbcTemplate" class="org.springframework.jdbc.core.simple.SimpleJdbcTemplate"> 
    7387    <description> 
  • trunk/components/server/resources/ome/services/hibernate.xml

    r3261 r3266  
    7575  </bean> 
    7676   
    77   <bean id="transactionTemplate" class="org.springframework.transaction.support.TransactionTemplate"> 
    78     <constructor-arg ref="transactionManager"/> 
    79   </bean> 
    80    
    8177  <bean id="proxyHandler" class="ome.tools.hibernate.ProxyCleanupFilter$Interceptor"> 
    8278    <description> 
    … …  
    111107    <property name="allowCreate" value="false"/> 
    112108  </bean> 
    113  
    114   <bean id="simpleJdbcTemplate" class="org.springframework.jdbc.core.simple.SimpleJdbcTemplate"> 
    115     <description> 
    116     Scope: private 
    117      
    118     This template uses session.connection() for data access. See javadocs for 
    119     more info. 
    120     </description> 
    121     <constructor-arg ref="dataSource"/> 
    122   </bean> 
    123109   
    124110  <bean id="sessionFactory" class="org.springframework.orm.hibernate3.annotation.AnnotationSessionFactoryBean"> 
    … …  
    141127        <bean id="transactionManager.transactionManager" class="org.springframework.beans.factory.config.PropertyPathFactoryBean"/> 
    142128    </property> 
     129    <!-- http://coffeedrivenjava.blogspot.com/2006/08/distributed-xa-transactions-w-hibernate.html --> 
     130    <property name="useTransactionAwareDataSource" value="true"/> 
    143131  </bean> 
    144132 
  • trunk/components/server/src/ome/tools/spring/JotmFactoryBean.java

    r3254 r3266  
    1010import javax.naming.Context; 
    1111import javax.naming.InitialContext; 
     12import javax.naming.NameAlreadyBoundException; 
    1213import javax.naming.NamingException; 
     14 
     15import org.apache.commons.logging.Log; 
     16import org.apache.commons.logging.LogFactory; 
    1317 
    1418/** 
    … …  
    2024        org.springframework.transaction.jta.JotmFactoryBean { 
    2125 
     26    private final static Log log = LogFactory.getLog(JotmFactoryBean.class); 
     27     
    2228    public JotmFactoryBean() throws NamingException { 
    2329        super(); 
    24         Context context = new InitialContext(); 
    25         context.bind("java:comp/UserTransaction", getObject()); 
     30        try { 
     31            Context context = new InitialContext(); 
     32            context.bind("java:comp/UserTransaction", getObject()); 
     33            log.info("Bound UserTransaction in JNDI"); 
     34        } catch (NameAlreadyBoundException nabe) { 
     35            log.info("UserTransaction already bound in JNDI"); 
     36        } 
    2637    } 
    2738 
  • trunk/etc/internal.cfg

    r2377 r3266  
    66IceGridAdmin.Username=root 
    77IceGridAdmin.Password=ome 
     8 
     9# 
     10# Used to communicate between instances 
     11# See: omero/Internal.ice 
     12# 
     13Cluster.Endpoints=udp -h 239.0.0.5 -p 22222 
     14ClusterProxy=Cluster:udp -h 239.0.0.5 -p 22222 

Download in other formats:

  • Unified Diff
  • Zip Archive

Trac Powered

Powered by Trac 0.11
By Edgewall Software.

Visit the Trac open source project at
http://trac.edgewall.org/