'''
------- Catalogs -------
Pegasus.DAX4.SiteCatalog
- add_site(name)
- get_site(name)
- has_site(name)
- write_catalog(path)
...
Pegasus.DAX4.ReplicaCatalog
- add_replica(lfn, iri, site, regex=false)
- get_replica(lfn)
- has_replica(lfn)
- remove_replica(lfn)
- write_catalog(path)
...
Pegasus.DAX4.TransformationCatalog
- add_transformation(namespace, name, version, os, arch, ...)
- get_transformation(namespace, name, version, os, arch, ...)
- has_transformation(namespace, name, version, os, arch, ...)
- remove_transformation(namespace, name, version, os, arch, ...)
- write_catalog(path)
...
------- Workflow -------
Pegasus.DAX4.Workflow <------ requires only 'pegasus' (version), 'name', and 'jobs' (jobs and parent-child dependencies)
- add_job(transformation, *args)
- add_dependency(parent, *children)
- add_site_catalog(sc) # add inline OR iri if string is given
- add_replica_catalog(rc) # add inline
- add_transformation_catalog(tc) # add inline
...
'''
#!/usr/bin/env python
from Pegasus.DAX4 import *
# Create a DAX
diamond = Workflow("diamond")
# Add some metadata
diamond.add_metadata("name", "diamond")\
.add_metadata("createdby", "Gideon Juve")
# Add input file to the replica_catalog
rc = ReplicaCatalog()
a = rc.add_file("f.a", "gsiftp://site.com/inputs/f.a", "site", regex=False)\
.add_metadata("size", "1024")
# Add executables to the transformation catalog
tc = TransformationCatalog()
e_preprocess = tc.add_transformation(namespace="diamond", name="preprocess", version="4.0", os="linux", arch="x86_64")\
.add_metadata("size", "2048")\
.add_pfnsite(site="site", uri="gsiftp://site.com/bin/preprocess", "site", type="installed")
e_findrange = tc.add_transformation(namespace="diamond", name="findrange", version="4.0", os="linux", arch="x86_64")\
.add_pfn(site(site="site", uri="gsiftp://site.com/bin/findrange","site", type="installed")
e_analyze = tc.add_transformation(namespace="diamond", name="analyze", version="4.0", os="linux", arch="x86_64")\
.add_pfnsite(site="site", uri="gsiftp://site.com/bin/analyze","site", type="installed")
# Add a preprocess job
b1 = File("f.b1")
b2 = File("f.b2")
preprocess = diamond.add_job(e_preprocess, "-a preprocess", "-T60", "-i", a, "-o", b1, b2)\ # ADAG.add_job(exec, *args)
.add_profile(Namespace.DAGMAN, "RETRY", 0)\
.add_metadata("time", "60")\
.add_inputs(a)\
.add_outputs(b1, b2, stage_out=True, register_replica=False) # job.add_outputs(*lfns, stage_out=True, register_replica=False)
# Add left Findrange job
c1 = File("f.c1")
frl = diamond.add_job(e_findrange, "-a findrange", "-T60", "-i", b1, "-o", c1)\ # ADAG.add_job(exec, *args)
.add_metadata("time", "60")\
.add_inputs(b1)\
.add_outputs(c1, stage_out=True)
# Add right Findrange job
c2 = File("f.c2")
frr = diamond.add_job(e_findrange, "-a findrange", "-T60", "-i", b2, "-o", c2)\ # ADAG.add_job(exec, *args)
.add_metadata("time", "60")\
.add_inputs(b2)\
.add_outputs(c2, stage_out=True)\
# Add Analyze job
d = File("f.d")
analyze = diamond.add_job(e_analyze, *["-a analyze", "-T60", "-i", c1, c2, "-o", d])\ # ADAG.add_job(exec, *args)
.add_metadata("time", "60")\
.add_inputs(c1, c2)\
.add_outputs(d, stage_out=True, register_replica=True)
# Add dependencies
diamond.add_dependency(preprocess, frl, frr)\
.add_dependency(frl, analyze)\
.add_dependency(frr, analyze)
If you wanted the catalogs inline, then:
diamond.add_replica_catalog(rc)
diamond.add_transformation_catalog(tc)
diamond.add_site_catalog(sc) # not defined above, but if it was
# Write the DAX to a file
f = open("diamond.yml","w")
diamond.write_yaml(f)
f.close()
'''
# Additionally, we could use contextlib and rewrite like the following:
with ReplicaCatalog("rc.yml") as rc, \
SiteCatalog("sc.yml") as sc, \
TransformationCatalog("tc.yml") as tc, \
Workflow("diamond") as wf:
# do stuff
# with block ends and all the catalogs + workflow are written to separate files
'''
|