root/trunk/components/tools/OmeroPy/test/integration/ping.py
| Revision 3014, 3.7 kB (checked in by jmoore, 2 months ago) |
|---|
| Line | |
|---|---|
| 1 | #!/usr/bin/env python |
| 2 | |
| 3 | """ |
| 4 | Integration test testing distributed processing via |
| 5 | ServiceFactoryI.acquireProcessor(). |
| 6 | |
| 7 | Copyright 2008 Glencoe Software, Inc. All rights reserved. |
| 8 | Use is subject to license terms supplied in LICENSE.txt |
| 9 | |
| 10 | """ |
| 11 | |
| 12 | import test.integration.library as lib |
| 13 | import omero, tempfile, unittest, os, sys |
| 14 | import omero_api_IScript_ice |
| 15 | from omero.rtypes import * |
| 16 | |
| 17 | PINGFILE = """ |
| 18 | #!/usr/bin/env python |
| 19 | |
| 20 | import os, uuid |
| 21 | from omero_ext import pysys |
| 22 | import omero, omero.scripts as s |
| 23 | from omero.rtypes import * |
| 24 | |
| 25 | # |
| 26 | # Unique name so that IScript does not reject us |
| 27 | # based on duplicate file names. |
| 28 | # |
| 29 | uuid = str(uuid.uuid4()) |
| 30 | print "I am the script named %s" % uuid |
| 31 | |
| 32 | # |
| 33 | # Creation |
| 34 | # |
| 35 | client = s.client(uuid, "simple ping script", s.Long("a").inout(), s.String("b").inout()) |
| 36 | client.createSession() |
| 37 | print "Session", client.getSession() |
| 38 | |
| 39 | # |
| 40 | # Echo'ing input to output |
| 41 | # |
| 42 | keys = client.getInputKeys() |
| 43 | print "Keys found:" |
| 44 | print keys |
| 45 | for key in keys: |
| 46 | client.setOutput(key, client.getInput(key)) |
| 47 | |
| 48 | # |
| 49 | # Env |
| 50 | # |
| 51 | print "This was my environment:" |
| 52 | for k,v in os.environ.items(): |
| 53 | print "%s => %s" %(k,v) |
| 54 | |
| 55 | # |
| 56 | # Must use pysys because of a naming clash with |
| 57 | # with the omero.sys package. |
| 58 | # |
| 59 | pysys.stderr.write("Oh, and this is stderr."); |
| 60 | |
| 61 | |
| 62 | |
| 63 | |
| 64 | """ |
| 65 | |
| 66 | class TestPing(lib.ITest): |
| 67 | |
| 68 | def testUploadAndPing(self): |
| 69 | pingfile = tempfile.NamedTemporaryFile(mode='w+t') |
| 70 | try: |
| 71 | pingfile.write(PINGFILE) |
| 72 | pingfile.flush() |
| 73 | file = self.root.upload(pingfile.name, type="text/x-python") |
| 74 | j = omero.model.ScriptJobI() |
| 75 | j.linkOriginalFile(file) |
| 76 | |
| 77 | p = self.client.sf.acquireProcessor(j, 100) |
| 78 | jp = p.params() |
| 79 | self.assert_(jp, "Non-zero params") |
| 80 | |
| 81 | input = rmap({}) |
| 82 | input.val["a"] = rint(1) |
| 83 | input.val["b"] = rstring("c") |
| 84 | process = p.execute(input) |
| 85 | rc = process.wait() |
| 86 | if rc: |
| 87 | self.assert_(rc == 0, "Non-zero return code") |
| 88 | output = p.getResults(process) |
| 89 | self.assert_( 1 == output.val["a"].val ) |
| 90 | finally: |
| 91 | pingfile.close() |
| 92 | |
| 93 | def _getProcessor(self): |
| 94 | scripts = self.root.getSession().getScriptService() |
| 95 | id = scripts.uploadScript(PINGFILE) |
| 96 | j = omero.model.ScriptJobI() |
| 97 | j.linkOriginalFile(omero.model.OriginalFileI(rlong(id),False)) |
| 98 | p = self.client.sf.acquireProcessor(j, 100) |
| 99 | return p |
| 100 | |
| 101 | def testPingViaISCript(self): |
| 102 | p = self._getProcessor() |
| 103 | input = rmap({}) |
| 104 | input.val["a"] = rint(2) |
| 105 | input.val["b"] = rstring("d") |
| 106 | process = p.execute(input) |
| 107 | process.wait() |
| 108 | output = p.getResults(process) |
| 109 | self.assert_( 2 == output.val["a"].val ) |
| 110 | |
| 111 | def testPingParametersViaISCript(self): |
| 112 | p = self._getProcessor() |
| 113 | params = p.params() |
| 114 | self.assert_( params ) |
| 115 | self.assert_( params.inputs["a"] ) |
| 116 | self.assert_( params.inputs["b"] ) |
| 117 | self.assert_( params.outputs["a"] ) |
| 118 | self.assert_( params.outputs["b"] ) |
| 119 | |
| 120 | def _checkstd(self, output, which): |
| 121 | rfile = output.val[which] |
| 122 | ofile = rfile.val |
| 123 | self.assert_( ofile ) |
| 124 | |
| 125 | tmpfile = tempfile.NamedTemporaryFile(mode='w+t') |
| 126 | try: |
| 127 | self.client.download(ofile, tmpfile.name) |
| 128 | self.assert_( os.path.getsize(tmpfile.name) ) |
| 129 | finally: |
| 130 | tmpfile.close() |
| 131 | |
| 132 | def testPingStdout(self): |
| 133 | p = self._getProcessor() |
| 134 | params = p.params() |
| 135 | self.assert_( params.stdoutFormat ) |
| 136 | |
| 137 | process = p.execute(rmap({})) |
| 138 | process.wait() |
| 139 | output = p.getResults(process) |
| 140 | |
| 141 | self._checkstd(output, "stdout") |
| 142 | self._checkstd(output, "stderr") |
| 143 | |
| 144 | if __name__ == '__main__': |
| 145 | unittest.main() |
Note: See TracBrowser
for help on using the browser.
