Changeset 3268
- Timestamp:
- 01/12/09 08:49:33 (20 months ago)
- Author:
- jmoore
- Message:
-
OmeroClustering : Various fixes for the jdbc version
- Properties must be passed via the descriptor not internal.cfg
- Needed to catch various exceptions and better handle cleanup
(cherry picked from commit 0df94cf5fd89fbc63f011fb3a25bf3f34df1b9be)
- Location:
- trunk
- Files:
-
Legend:
- Unmodified
- Added
- Removed
-
|
r3266
|
r3268
|
|
| 34 | 34 | import org.springframework.context.event.ContextClosedEvent; |
| 35 | 35 | import org.springframework.dao.DataIntegrityViolationException; |
| | 36 | import org.springframework.dao.EmptyResultDataAccessException; |
| 36 | 37 | import org.springframework.jdbc.BadSqlGrammarException; |
| 37 | 38 | import org.springframework.jdbc.core.simple.ParameterizedRowMapper; |
| … |
… |
|
| 56 | 57 | *@since Beta4 |
| 57 | 58 | */ |
| 58 | | public class Ring extends _ClusterDisp implements ApplicationListener, ApplicationEventPublisherAware { |
| | 59 | public class Ring extends _ClusterDisp implements ApplicationListener, |
| | 60 | ApplicationEventPublisherAware { |
| 59 | 61 | |
| 60 | 62 | private final static Log log = LogFactory.getLog(Ring.class); |
| … |
… |
|
| 74 | 76 | private final SimpleJdbcTemplate jdbc; |
| 75 | 77 | |
| 76 | | private/*final */ApplicationEventPublisher publisher; |
| 77 | | |
| | 78 | private/* final */ApplicationEventPublisher publisher; |
| | 79 | |
| 78 | 80 | private/* final */Ice.Communicator communicator; |
| 79 | 81 | |
| … |
… |
|
| 98 | 100 | */ |
| 99 | 101 | private/* final */ClusterPrx cluster; |
| 100 | | |
| | 102 | |
| 101 | 103 | /** |
| 102 | 104 | * Direct proxy value to the {@link SessionManager} in this blitz instance. |
| … |
… |
|
| 119 | 121 | /** |
| 120 | 122 | * Passed to the {@link Discovery} instance for signalling completion from |
| 121 | | * its background {@link Thread} |
| | 123 | * its background {@link Thread} |
| 122 | 124 | */ |
| 123 | 125 | public void setApplicationEventPublisher(ApplicationEventPublisher arg0) { |
| 124 | 126 | this.publisher = arg0; |
| 125 | 127 | } |
| 126 | | |
| | 128 | |
| 127 | 129 | // Configuration and cluster usage |
| 128 | 130 | // ========================================================================= |
| … |
… |
|
| 139 | 141 | // The cluster we belong to. |
| 140 | 142 | Ice.ObjectPrx prx = this.communicator.propertyToProxy("ClusterProxy"); |
| | 143 | if (prx == null) { |
| | 144 | throw new RuntimeException("Could not obtain ClusterProxy. " |
| | 145 | + "Is multicast property properly set?"); |
| | 146 | } |
| 141 | 147 | prx = prx.ice_datagram(); |
| | 148 | if (prx == null) { |
| | 149 | throw new RuntimeException("Could no get datagram proxy. " |
| | 150 | + "Please check your multicast configuration."); |
| | 151 | } |
| 142 | 152 | cluster = ClusterPrxHelper.uncheckedCast(prx); |
| 143 | | |
| | 153 | |
| 144 | 154 | // Before we add our self we check the validity of the cluster. |
| 145 | 155 | checkClusterAndAddSelf(); |
| … |
… |
|
| 182 | 192 | log.info("Sent cluster node uuid: " + uuid); |
| 183 | 193 | } catch (Exception e) { |
| 184 | | log.warn("Exception while sending cluster node uuid: "+uuid, e); |
| 185 | | } |
| 186 | | } |
| 187 | | |
| 188 | | /** |
| 189 | | * Called when any node goes down. First we try to remove any redirect for that |
| 190 | | * instance. Then we try to install ourselves. |
| | 194 | log.warn("Exception while sending cluster node uuid: " + uuid, e); |
| | 195 | } |
| | 196 | } |
| | 197 | |
| | 198 | /** |
| | 199 | * Called when any node goes down. First we try to remove any redirect for |
| | 200 | * that instance. Then we try to install ourselves. |
| 191 | 201 | */ |
| 192 | 202 | public void down(String downUuid, Current __current) { |
| 193 | | removeIfEquals(CONFIG+"redirect", downUuid); |
| 194 | | if (putIfAbsent(CONFIG+"redirect", this.uuid)) { |
| 195 | | log.info("Installed self as new redirect: "+uuid); |
| 196 | | } |
| 197 | | } |
| 198 | | |
| | 203 | removeIfEquals(CONFIG + "redirect", downUuid); |
| | 204 | if (putIfAbsent(CONFIG + "redirect", this.uuid)) { |
| | 205 | log.info("Installed self as new redirect: " + uuid); |
| | 206 | } |
| | 207 | } |
| | 208 | |
| 199 | 209 | public void destroy() { |
| 200 | 210 | try { |
| 201 | 211 | this.clusterAdapter.deactivate(); |
| 202 | | cluster.down(this.uuid); |
| 203 | 212 | remove(MANAGERS + uuid); |
| 204 | | removeIfEquals(CONFIG+"redirect", uuid); |
| | 213 | int count = jdbc.update("delete from session_ring where value = ?", uuid); |
| | 214 | log.info("Removed "+count+" entries for "+uuid); |
| 205 | 215 | log.info("Disconnected from OMERO.cluster"); |
| 206 | 216 | } catch (Exception e) { |
| 207 | 217 | log.error("Error stopping ring " + this, e); |
| | 218 | } finally { |
| | 219 | cluster.down(this.uuid); |
| 208 | 220 | } |
| 209 | 221 | } |
| … |
… |
|
| 330 | 342 | return rv; |
| 331 | 343 | } |
| 332 | | |
| | 344 | |
| 333 | 345 | public void assertNodes(Set<String> nodeUuids) { |
| 334 | 346 | Set<String> managers = knownManagers(); |
| 335 | 347 | for (String manager : managers) { |
| 336 | | System.out.println(manager+"?="+nodeUuids); |
| 337 | 348 | if (!nodeUuids.contains(manager)) { |
| 338 | | purgeNode(manager); |
| | 349 | // Also verify this is not ourself, since |
| | 350 | // possibly we haven't finished registration |
| | 351 | // yet |
| | 352 | if (!uuid.equals(manager)) { |
| | 353 | purgeNode(manager); |
| | 354 | } |
| 339 | 355 | } |
| 340 | 356 | } |
| … |
… |
|
| 387 | 403 | |
| 388 | 404 | public String get(String key) { |
| 389 | | String value = (String) jdbc.queryForObject("select value " |
| 390 | | + "from session_ring " + "where key = ?", String.class, key); |
| 391 | | return value; |
| | 405 | try { |
| | 406 | String value = (String) jdbc |
| | 407 | .queryForObject("select value " + "from session_ring " |
| | 408 | + "where key = ?", String.class, key); |
| | 409 | return value; |
| | 410 | } catch (EmptyResultDataAccessException erdae) { |
| | 411 | return null; |
| | 412 | } |
| 392 | 413 | } |
| 393 | 414 | |
| … |
… |
|
| 401 | 422 | log.info("Key not found for update: " + key); |
| 402 | 423 | } else { |
| 403 | | log.info(String.format("Updated key %s with value %s", key, |
| 404 | | value)); |
| | 424 | log.info(String.format("Updated key %s to %s", key, value)); |
| 405 | 425 | } |
| 406 | 426 | } |
| … |
… |
|
| 444 | 464 | */ |
| 445 | 465 | public boolean removeIfEquals(String key, String value) { |
| 446 | | int count = jdbc |
| 447 | | .update("delete from session_ring where key = ? and value = ?", key, value); |
| | 466 | int count = jdbc.update( |
| | 467 | "delete from session_ring where key = ? and value = ?", key, |
| | 468 | value); |
| 448 | 469 | if (count == 0) { |
| 449 | 470 | log.info("Key and value do not match: " + key + "=" + value); |
-
|
r3266
|
r3268
|
|
| 147 | 147 | fail(); |
| 148 | 148 | } |
| | 149 | |
| | 150 | @Test |
| | 151 | public void testIfRedirectIsDeletedAnotherHostTakesOver() throws Exception { |
| | 152 | fails(); |
| | 153 | } |
| 149 | 154 | } |
-
|
r3239
|
r3268
|
|
| 26 | 26 | <property name="Ice.MessageSizeMax" value="65536"/> <!-- 64 MB --> |
| 27 | 27 | <property name="Ice.Default.CollocationOptimized" value="0"/> |
| | 28 | <!-- used to communicate between instances. See omero/Internal.ice --> |
| | 29 | <property name="Cluster.Endpoints" value="udp -h 239.0.0.5 -p 22222"/> |
| | 30 | <property name="ClusterProxy" value="Cluster:udp -h 239.0.0.5 -p 22222"/> |
| 28 | 31 | </properties> |
| 29 | 32 | </server> |
-
|
r3266
|
r3268
|
|
| 6 | 6 | IceGridAdmin.Username=root |
| 7 | 7 | IceGridAdmin.Password=ome |
| 8 | | |
| 9 | | # |
| 10 | | # Used to communicate between instances |
| 11 | | # See: omero/Internal.ice |
| 12 | | # |
| 13 | | Cluster.Endpoints=udp -h 239.0.0.5 -p 22222 |
| 14 | | ClusterProxy=Cluster:udp -h 239.0.0.5 -p 22222 |
Download in other formats:
1.2.1-PRO © 2008-2009
agile42 all
rights reserved
(this page was served in: 0.110187 sec.)