Changeset 3268
- Timestamp:
- 01/12/09 08:49:33 (6 months ago)
- Location:
- trunk
- Files:
-
- 4 modified
-
components/blitz/src/ome/services/blitz/fire/Ring.java (modified) (11 diffs)
-
components/blitz/test/ome/services/blitz/test/ClusteredRingTest.java (modified) (1 diff)
-
etc/grid/templates.xml (modified) (1 diff)
-
etc/internal.cfg (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
trunk/components/blitz/src/ome/services/blitz/fire/Ring.java
r3266 r3268 34 34 import org.springframework.context.event.ContextClosedEvent; 35 35 import org.springframework.dao.DataIntegrityViolationException; 36 import org.springframework.dao.EmptyResultDataAccessException; 36 37 import org.springframework.jdbc.BadSqlGrammarException; 37 38 import org.springframework.jdbc.core.simple.ParameterizedRowMapper; … … 56 57 *@since Beta4 57 58 */ 58 public class Ring extends _ClusterDisp implements ApplicationListener, ApplicationEventPublisherAware { 59 public class Ring extends _ClusterDisp implements ApplicationListener, 60 ApplicationEventPublisherAware { 59 61 60 62 private final static Log log = LogFactory.getLog(Ring.class); … … 74 76 private final SimpleJdbcTemplate jdbc; 75 77 76 private/* final */ApplicationEventPublisher publisher;77 78 private/* final */ApplicationEventPublisher publisher; 79 78 80 private/* final */Ice.Communicator communicator; 79 81 … … 98 100 */ 99 101 private/* final */ClusterPrx cluster; 100 102 101 103 /** 102 104 * Direct proxy value to the {@link SessionManager} in this blitz instance. … … 119 121 /** 120 122 * Passed to the {@link Discovery} instance for signalling completion from 121 * its background {@link Thread} 123 * its background {@link Thread} 122 124 */ 123 125 public void setApplicationEventPublisher(ApplicationEventPublisher arg0) { 124 126 this.publisher = arg0; 125 127 } 126 128 127 129 // Configuration and cluster usage 128 130 // ========================================================================= … … 139 141 // The cluster we belong to. 140 142 Ice.ObjectPrx prx = this.communicator.propertyToProxy("ClusterProxy"); 143 if (prx == null) { 144 throw new RuntimeException("Could not obtain ClusterProxy. " 145 + "Is multicast property properly set?"); 146 } 141 147 prx = prx.ice_datagram(); 148 if (prx == null) { 149 throw new RuntimeException("Could no get datagram proxy. " 150 + "Please check your multicast configuration."); 151 } 142 152 cluster = ClusterPrxHelper.uncheckedCast(prx); 143 153 144 154 // Before we add our self we check the validity of the cluster. 145 155 checkClusterAndAddSelf(); … … 182 192 log.info("Sent cluster node uuid: " + uuid); 183 193 } catch (Exception e) { 184 log.warn("Exception while sending cluster node uuid: " +uuid, e);185 } 186 } 187 188 /** 189 * Called when any node goes down. First we try to remove any redirect for that190 * instance. Then we try to install ourselves.194 log.warn("Exception while sending cluster node uuid: " + uuid, e); 195 } 196 } 197 198 /** 199 * Called when any node goes down. First we try to remove any redirect for 200 * that instance. Then we try to install ourselves. 191 201 */ 192 202 public void down(String downUuid, Current __current) { 193 removeIfEquals(CONFIG +"redirect", downUuid);194 if (putIfAbsent(CONFIG +"redirect", this.uuid)) {195 log.info("Installed self as new redirect: " +uuid);196 } 197 } 198 203 removeIfEquals(CONFIG + "redirect", downUuid); 204 if (putIfAbsent(CONFIG + "redirect", this.uuid)) { 205 log.info("Installed self as new redirect: " + uuid); 206 } 207 } 208 199 209 public void destroy() { 200 210 try { 201 211 this.clusterAdapter.deactivate(); 202 cluster.down(this.uuid);203 212 remove(MANAGERS + uuid); 204 removeIfEquals(CONFIG+"redirect", uuid); 213 int count = jdbc.update("delete from session_ring where value = ?", uuid); 214 log.info("Removed "+count+" entries for "+uuid); 205 215 log.info("Disconnected from OMERO.cluster"); 206 216 } catch (Exception e) { 207 217 log.error("Error stopping ring " + this, e); 218 } finally { 219 cluster.down(this.uuid); 208 220 } 209 221 } … … 330 342 return rv; 331 343 } 332 344 333 345 public void assertNodes(Set<String> nodeUuids) { 334 346 Set<String> managers = knownManagers(); 335 347 for (String manager : managers) { 336 System.out.println(manager+"?="+nodeUuids);337 348 if (!nodeUuids.contains(manager)) { 338 purgeNode(manager); 349 // Also verify this is not ourself, since 350 // possibly we haven't finished registration 351 // yet 352 if (!uuid.equals(manager)) { 353 purgeNode(manager); 354 } 339 355 } 340 356 } … … 387 403 388 404 public String get(String key) { 389 String value = (String) jdbc.queryForObject("select value " 390 + "from session_ring " + "where key = ?", String.class, key); 391 return value; 405 try { 406 String value = (String) jdbc 407 .queryForObject("select value " + "from session_ring " 408 + "where key = ?", String.class, key); 409 return value; 410 } catch (EmptyResultDataAccessException erdae) { 411 return null; 412 } 392 413 } 393 414 … … 401 422 log.info("Key not found for update: " + key); 402 423 } else { 403 log.info(String.format("Updated key %s with value %s", key, 404 value)); 424 log.info(String.format("Updated key %s to %s", key, value)); 405 425 } 406 426 } … … 444 464 */ 445 465 public boolean removeIfEquals(String key, String value) { 446 int count = jdbc 447 .update("delete from session_ring where key = ? and value = ?", key, value); 466 int count = jdbc.update( 467 "delete from session_ring where key = ? and value = ?", key, 468 value); 448 469 if (count == 0) { 449 470 log.info("Key and value do not match: " + key + "=" + value); -
trunk/components/blitz/test/ome/services/blitz/test/ClusteredRingTest.java
r3266 r3268 147 147 fail(); 148 148 } 149 150 @Test 151 public void testIfRedirectIsDeletedAnotherHostTakesOver() throws Exception { 152 fails(); 153 } 149 154 } -
trunk/etc/grid/templates.xml
r3239 r3268 26 26 <property name="Ice.MessageSizeMax" value="65536"/> <!-- 64 MB --> 27 27 <property name="Ice.Default.CollocationOptimized" value="0"/> 28 <!-- used to communicate between instances. See omero/Internal.ice --> 29 <property name="Cluster.Endpoints" value="udp -h 239.0.0.5 -p 22222"/> 30 <property name="ClusterProxy" value="Cluster:udp -h 239.0.0.5 -p 22222"/> 28 31 </properties> 29 32 </server> -
trunk/etc/internal.cfg
r3266 r3268 6 6 IceGridAdmin.Username=root 7 7 IceGridAdmin.Password=ome 8 9 #10 # Used to communicate between instances11 # See: omero/Internal.ice12 #13 Cluster.Endpoints=udp -h 239.0.0.5 -p 2222214 ClusterProxy=Cluster:udp -h 239.0.0.5 -p 22222
