Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Given what we have in DAX4 (middle column below), we could rewrite it to be something like what we have in the right most column below. Functionality it is the same, just written a a little differently. At least in this example, it looks a little less cluttered and simpler to read than the middle column while saving a few lines in the process...



Code Block
languagepy
themeRDark
titleDAX3 Black Diamond
linenumberstrue
#!/usr/bin/env python

from Pegasus.DAX3 import *

# Create a DAX
diamond = ADAG("diamond")

# Add some metadata
diamond.metadata("name", "diamond")
diamond.metadata("createdby", "Gideon Juve")

# Add input file to the DAX-level replica catalog
a = File("f.a")
a.addPFN(PFN("gsiftp://site.com/inputs/f.a","site"))
a.metadata("size", "1024")
diamond.addFile(a)

# Add executables to the DAX-level replica catalog
e_preprocess = Executable(namespace="diamond", name="preprocess", version="4.0", os="linux", arch="x86_64")
e_preprocess.metadata("size", "2048")
e_preprocess.addPFN(PFN("gsiftp://site.com/bin/preprocess","site"))
diamond.addExecutable(e_preprocess)

e_findrange = Executable(namespace="diamond", name="findrange", version="4.0", os="linux", arch="x86_64")
e_findrange.addPFN(PFN("gsiftp://site.com/bin/findrange","site"))
diamond.addExecutable(e_findrange)

e_analyze = Executable(namespace="diamond", name="analyze", version="4.0", os="linux", arch="x86_64")
e_analyze.addPFN(PFN("gsiftp://site.com/bin/analyze","site"))
diamond.addExecutable(e_analyze)

# Add a preprocess job
preprocess = Job(e_preprocess)
preprocess.addProfile(Profile(Namespace.DAGMAN, "RETRY", 0))
preprocess.metadata("time", "60")
b1 = File("f.b1")
b2 = File("f.b2")
preprocess.addArguments("-a preprocess","-T60","-i",a,"-o",b1,b2)
preprocess.uses(a, link=Link.INPUT)
preprocess.uses(b1, link=Link.OUTPUT, transfer=True)
preprocess.uses(b2, link=Link.OUTPUT, transfer=True)
diamond.addJob(preprocess)

# Add left Findrange job
frl = Job(e_findrange)
frl.metadata("time", "60")
c1 = File("f.c1")
frl.addArguments("-a findrange","-T60","-i",b1,"-o",c1)
frl.uses(b1, link=Link.INPUT)
frl.uses(c1, link=Link.OUTPUT, transfer=True)
diamond.addJob(frl)

# Add right Findrange job
frr = Job(e_findrange)
frr.metadata("time", "60")
c2 = File("f.c2")
frr.addArguments("-a findrange","-T60","-i",b2,"-o",c2)
frr.uses(b2, link=Link.INPUT)
frr.uses(c2, link=Link.OUTPUT, transfer=True)
diamond.addJob(frr)

# Add Analyze job
analyze = Job(e_analyze)
analyze.metadata("time", "60")
d = File("f.d")
analyze.addArguments("-a analyze","-T60","-i",c1,c2,"-o",d)
analyze.uses(c1, link=Link.INPUT)
analyze.uses(c2, link=Link.INPUT)
analyze.uses(d, link=Link.OUTPUT, transfer=True, register=True)
diamond.addJob(analyze)

# Add dependencies
diamond.depends(parent=preprocess, child=frl)
diamond.depends(parent=preprocess, child=frr)
diamond.depends(parent=frl, child=analyze)
diamond.depends(parent=frr, child=analyze)

# Write the DAX to stdout
import sys
diamond.writeXML(sys.stdout)

# Write the DAX to a file
f = open("diamond.dax","w")
diamond.writeXML(f)
f.close()



Code Block
languagepy
themeRDark
titleDAX4
(low level of abstraction)
Black Diamond
linenumberstrue
#!/usr/bin/env python

from Pegasus.DAX4 import *

# Create a DAX
diamond = 
ADAG
Workflow("diamond")

# Add some metadata
diamond.add_metadata("name", "diamond")
diamond.add_metadata("createdby", "Gideon Juve")

# Add input file to the DAX-level replica catalog
a = File("f.a")
a.add_PFN("gsiftp://site.com/inputs/f.a","site")
a.add_metadata("size", "1024")
diamond.add_file(a)

# Add executables to the DAX-level replica catalog
e_preprocess = 
Executable
Transformation(namespace="diamond", name="preprocess", version="4.0", os="linux", arch="x86_64")
e_preprocess.add_metadata("size", "2048")
e_preprocess.add_PFN("gsiftp://site.com/bin/preprocess","site")
diamond.add_
executable
transformation(e_preprocess)

e_findrange = 
Executable
Transformation(namespace="diamond", name="findrange", version="4.0", os="linux", arch="x86_64")
e_findrange.add_PFN("gsiftp://site.com/bin/findrange","site")
diamond.add_
executable
transformation(e_findrange)

e_analyze = 
Executable
Transformation(namespace="diamond", name="analyze", version="4.0", os="linux", arch="x86_64")
e_analyze.add_PFN("gsiftp://site.com/bin/analyze","site")
diamond.add_
executable
transformation(e_analyze)

# Add a preprocess job
preprocess = Job(e_preprocess)
preprocess.add_profile(Namespace.DAGMAN, "RETRY", 0)
preprocess.add_metadata("time", "60")
b1 = File("f.b1")
b2 = File("f.b2")
preprocess.add_arguments("-a preprocess","-T60","-i",a,"-o",b1,b2)
preprocess.add_inputs(a)
preprocess.add_outputs(b1, b2, stage_out=True, register_replica=False)
diamond.add_job(preprocess)

# Add left Findrange job
frl = Job(e_findrange)
frl.add_metadata("time", "60")
c1 = File("f.c1")
frl.add_arguments("-a findrange","-T60","-i",b1,"-o",c1)
frl.add_inputs(b1)
frl.add_outputs(c1, stage_out=True)
diamond.add_job(frl)

# Add right Findrange job
frr = Job(e_findrange)
frr.add_metadata("time", "60")
c2 = File("f.c2")
frr.add_arguments("-a findrange","-T60","-i",b2,"-o",c2)
frr.add_inputs(b2)
frr.add_outputs(c3, stage_out=True)
diamond.add_job(frr)

# Add Analyze job
analyze = Job(e_analyze)
analyze.add_metadata("time", "60")
d = File("f.d")
analyze.add_arguments("-a analyze","-T60","-i",c1,c2,"-o",d)
analyze.add_inputs(c1, c2)
analyze.add_outputs(d, stage_out=True, register_replica=True)
diamond.add_job(analyze)

# Add dependencies
diamond.add_dependency(
parent=
preprocess, 
child=
frl
) diamond
, frr) # Workflow.add_dependency(parent
=preprocess
, 
child=frr
*children)
diamond.add_dependency(
parent=
frl, 
child=
analyze)
diamond
 # Workflow.add_dependency(parent
=frr
, 
child=analyze) #
*children)
diamond.add_dependency(frr, analyze) # Workflow.add_dependency(parent, *children)

# Write the DAX to stdout
import sys
diamond.write_yaml(sys.stdout)

# Write the DAX to a file
f = open("diamond.yml","w")
diamond.write_yaml(f)
f.close()



Code Block
languagepy
themeRDark
titleDAX4
, another way to write
Black Diamond, catalogs included
linenumberstrue
#!/usr/bin/env python from Pegasus.DAX4 import * # Create a DAX diamond = ADAG("diamond") # Add some metadata diamond.add_metadata("name", "diamond")\ .add_metadata("createdby", "Gideon Juve") # Add input file to the DAX-level replica catalog a = diamond.add_file("f.a")\ .add_PFN("gsiftp://site.com/inputs/f.a","site")\ .add_metadata("size", "1024") # Add executables to the DAX-level replica catalog e_preprocess = diamond.add_executable(namespace="diamond", name="preprocess", version="4.0", os="linux", arch="x86_64")\ .add_metadata("size", "2048")\ .add_PFN("gsiftp://site.com/bin/preprocess","site") e_findrange = diamond.add_executable(namespace="diamond", name="findrange", version="4.0", os="linux", arch="x86_64")\ .add_PFN("gsiftp://site.com/bin/findrange","site") e_analyze = diamond.add_executable(namespace="diamond", name="analyze", version="4.0", os="linux", arch="x86_64")\ .add_PFN("gsiftp://site.com/bin/analyze","site") # Add a preprocess job b1 = File("f.b1") b2 = File("f.b2") preprocess = diamond.add_job(e_preprocess, "-a preprocess", "-T60", "-i", a, "-o", b1, b2)\ # ADAG.add_job(exec, *args)
'''
------- Catalogs -------
Pegasus.DAX4.SiteCatalog
- add_site(name)
- get_site(name)
- has_site(name)
- write_catalog(path)
...

Pegasus.DAX4.ReplicaCatalog
- add_replica(lfn, iri, site, regex=false)
- get_replica(lfn)
- has_replica(lfn)
- remove_replica(lfn)
- write_catalog(path)
...

Pegasus.DAX4.TransformationCatalog
- add_transformation(namespace, name, version, os, arch, ...)
- get_transformation(namespace, name, version, os, arch, ...)
- has_transformation(namespace, name, version, os, arch, ...)
- remove_transformation(namespace, name, version, os, arch, ...)
- write_catalog(path)
...

------- Workflow -------
Pegasus.DAX4.Workflow <------ requires only 'pegasus' (version), 'name', and 'jobs' (jobs and parent-child dependencies)
- add_job(transformation, *args)
- add_dependency(parent, *children)
- add_site_catalog(sc)            # add inline OR iri if string is given
- add_replica_catalog(rc)         # add inline
- add_transformation_catalog(tc)  # add inline
...

'''


#!/usr/bin/env python

from Pegasus.DAX4 import *

# Create a DAX
diamond = Workflow("diamond")

# Add some metadata
diamond.add_metadata("name", "diamond")\
    .add_metadata("createdby", "Gideon Juve")

# Add input file to the replica_catalog
rc = ReplicaCatalog()
a = rc.add_file("f.a", "gsiftp://site.com/inputs/f.a", "site", regex=False)\
        
.add_profile(Namespace.DAGMAN
.add_metadata("size", "
RETRY
1024"
, 0
)
\


# Add executables to the transformation catalog
tc = TransformationCatalog()
e_preprocess 
= tc.add_
metadata
transformation(namespace="
time
diamond", name="
60
preprocess"
)\
, version="4.0", os="linux", arch="x86_64")\
                .add_
inputs(a
metadata("size", "2048")\
                .add_
outputs(b1, b2, stage_out=True, register_replica=False) # job.add_outputs(*lfns, stage_out=True, register_replica=False) # Add left Findrange job c1 = File("f.c1") frl = diamond.add_job(e_findrange, "-a findrange", "-T60", "-i", b1, "-o", c1)\ # ADAG.add_job(exec, *args) .add_metadata("time", "60
pfn("gsiftp://site.com/bin/preprocess", "site", type="installed")

e_findrange = tc.add_transformation(namespace="diamond", name="findrange", version="4.0", os="linux", arch="x86_64")\
                .add_pfn("gsiftp://site.com/bin/findrange","site", type="installed")

e_analyze = tc.add_transformation(namespace="diamond", name="analyze", version="4.0", os="linux", arch="x86_64")\
        
.add_inputs(b1)\
        .add_
outputs(c1, stage_out=True
pfn("gsiftp://site.com/bin/analyze","site", type="installed")

# Add 
right
a 
Findrange
preprocess job
c2
b1 = File("f.b1")
b2 = File("f.
c2
b2")
frr
preprocess = diamond.add_job(e_
findrange
preprocess, "-a 
findrange
preprocess", "-T60", "-i", 
b2
a, "-o", 
c2
b1, b2)\ # ADAG.add_job(exec, *args)
                .add_
metadata
profile(Namespace.DAGMAN, "
time
RETRY", 
"60"
0)\
          
.add_inputs(b2
      .add_metadata("time", "60")\
                .add_
outputs(c2, stage_out=True
inputs(a)\
# Add Analyze job d = File("f.d") analyze = diamond.add_job(e_analyze, *["-a analyze", "-T60", "-i", c1, c2, "-o", d])\ # ADAG.add_job(exec, *args) .add_metadata("time", "60")\ .add_inputs(c1, c2)\ .add_outputs(d, stage_out=True, register_replica=True) # Add dependencies diamond.add_dependency(parent=preprocess, child=frl)\ .add_dependency(parent=preprocess, child=frr)\ .add_dependency(parent=frl, child=analyze)\ .add_dependency(parent=frr, child=analyze) # Write the DAX to stdout import sys diamond.write_yaml(sys.stdout) # Write the DAX to a file f = open("diamond.yml","w") diamond.write_yaml(f) f.close()
                .add_outputs(b1, b2, stage_out=True, register_replica=False) # job.add_outputs(*lfns, stage_out=True, register_replica=False)

# Add left Findrange job
c1 = File("f.c1")
frl = diamond.add_job(e_findrange, "-a findrange", "-T60", "-i", b1, "-o", c1)\ # ADAG.add_job(exec, *args)
        .add_metadata("time", "60")\
        .add_inputs(b1)\
        .add_outputs(c1, stage_out=True)

# Add right Findrange job
c2 = File("f.c2")
frr = diamond.add_job(e_findrange, "-a findrange", "-T60", "-i", b2, "-o", c2)\ # ADAG.add_job(exec, *args)
        .add_metadata("time", "60")\
        .add_inputs(b2)\
        .add_outputs(c2, stage_out=True)\

# Add Analyze job
d = File("f.d")
analyze = diamond.add_job(e_analyze, *["-a analyze", "-T60", "-i", c1, c2, "-o", d])\ # ADAG.add_job(exec, *args)
            .add_metadata("time", "60")\
            .add_inputs(c1, c2)\
            .add_outputs(d, stage_out=True, register_replica=True)

# Add dependencies
diamond.add_dependency(preprocess, frl, frr)\
    .add_dependency(frl, analyze)\
    .add_dependency(frr, analyze)


If you wanted the catalogs inline, then:
diamond.add_replica_catalog(rc)
diamond.add_transformation_catalog(tc)
diamond.add_site_catalog(sc) # not defined above, but if it was


# Write the DAX to a file
f = open("diamond.yml","w")
diamond.write_yaml(f)
f.close()

'''
# Additionally, we could use contextlib and rewrite like the following:
with ReplicaCatalog("rc.yml") as rc, \
     SiteCatalog("sc.yml") as sc, \
     TransformationCatalog("tc.yml") as tc, \
     Workflow("diamond") as wf:

     # do stuff

# with block ends and all the catalogs + workflow are written to separate files
'''




Code Block
languagepy
themeRDark
titleDAX3 Invokes
linenumberstrue
from Pegasus.DAX3 import *

# Create a DAX
adag = ADAG("workflow")

# Add invoke
adag.addInvoke(Invoke("start", "/usr/bin/echo hello"))
# adag.invoke("start", "/usr/bin/echo hello")



Code Block
languagepy
themeRDark
titleDAX4 Hooks
linenumberstrue
from Pegasus.DAX4 import *

# Create a DAX
adag = ADAGWorkflow("workflow")

# Add shell hook
adag.add_shell_hook("start", "/usr/bin/echo hello")

# Add web hook hook
adag.add_webhook_hook("start", ...)


...


Code Block
languagepy
themeRDark
titleDAX3 Transformations
linenumberstrue
from Pegasus.DAX3 import *

# Create a DAX
adag = ADAG("workflow")

# Create some executables
e1 = Executable("executable_1")
e2 = Executable("executable_2")
e3 = Executable("executable_3")

# Create a transformation
t = Transformation(e3)
t.uses(e1)
t.uses(e2)
t.uses(e3)



Code Block
languagepy
themeRDark
titleDAX4 Transformations
linenumberstrue
from Pegasus.DAX4 import *

# Create a DAX
adag = ADAGWorkflow("workflow")

# Create some executablestransformations
e1 = ExecutableTransformation("executable_1")
e2 = ExecutableTransformation("executable_2")
e3 = Executable("executable_3")

# Create a transformation that requires other transformations
t = Transformation("executable_3")
t.usesrequires(e1, e2, e3)

'''
same as
t.uses(e1, e2)
t.uses(e3)
'''