Child pages
  • Moving From DAX3 to DAX4

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: workflow "adds"

...

Code Block
languagepy
themeRDark
titleJobs
linenumberstrue
'''
Trying to be declarative, but then workflow.add(...) might need to be
workflow.jobs(Job(...)) to be consistent. Don't like how the function names
aren't verbs.
'''
job = workflow.add(Job(transformation, arguments))\
    .profiles(namespace, "key", "value")\
    .metadata({"time": "60"})\
    .metadata({"key": "value"})\
    .hooks(ShellHook(args))\
    .inputs(a)\
    .outputs(b1, b2, stage_out=True)

'''
Verbose, explicitly stating operations on job.
The following two assignments are the same.
'''
job = workflow.add_job(transformation, arguments...)\
    .add_profile(namespace, "key", "value")\
    .add_metadata(**{"time": "60", "key": "value"})\
    .add_shell_hook(args)\
    .add_inputs(a)\
    .add_outputs(b1, b2, stage_out=True)

job = workflow.add_job(Job(transformation, args))\
    .add_profile(Profile(namespace, "key", "value"))\
    .add_metadata(*[Metadata("time", "60"), Metadata("key", "value")])\
    .add_shell_hook(args)\
    .add_inputs(a)\
    .add_outputs(b1, b2, stage_out=True)

'''
List/Set semantics.Can't chain as job.<member>.add(something) won't
return back a ref to the job (unless we make it but that would look awkward)
'''
job = workflow.add(Job(transformation, arguments))
# List semantics for (job.args)
job.arguments.append("A") 
job.arguments.extend( ["A", "B"] )
# Set semantics for (job.profiles)
job.profiles.add(namespace, "key", "value")
# Dict semantics for job.{<namespace>,metadata}.*
job.env.key = "value"
job.env["key"] = "value" 
job.env.update( {"key1": "value1", "key2": "value2"} )
job.metadata.add({"time": "60"})
job.metadata.add({"key": "value"})
job.hooks.add(ShellHook(args))
job.inputs.add(a)
job.outputs.add(b1, b2, stage_out=True)



Code Block
languagepy
themeRDark
titleDAX4 Workflow "adds"
linenumberstrue
#!/usr/bin/env python
from Pegasus.DAX4 import *

# Create a DAX
diamond = Workflow("diamond")

# Add some metadata
diamond.add_metadata("name", "diamond")

# Add input file to the replica_catalog
rc = ReplicaCatalog()
sc = SiteCatalog()
# say that we've added some replicas and sites to the catalogs

tc = TransformationCatalog()
e_preprocess = tc.add_transformation(name="preprocess")\
                .add_site(site="site", uri="gsiftp://site.com/bin/preprocess", type="installed")

e_findrange = tc.add_transformation(name="findrange")\
                .add_site(site="site", uri="gsiftp://site.com/bin/findrange", type="installed")

e_analyze = tc.add_transformation(name="analyze")\
                .add_site(site="site", uri="gsiftp://site.com/bin/analyze", type="installed")
# Add jobs
job1 = diamond.add_job(e_preprocess, args)
job2 = diamond.add_job(e_findrange, args)
job3 = diamond.add_job(e_findrange, args)
job4 = diamond.add_job(e_analyze, args)

# Add dependencies
diamond.add_dependency(job1, job2, job3)\
    .add_dependency(job2, job4)\
    .add_dependency(job3, job4)

diamond.add_replica_catalog(rc)
diamond.add_transformation_catalog(tc)
diamond.add_site_catalog(sc)

# Write the DAX to a file
f = open("diamond.yml","w")
diamond.write_yaml(f)
f.close()



Code Block
languagepy
themeRDark
titleDAX4 Workflow "generic add"
linenumberstrue
#!/usr/bin/env python
from Pegasus.DAX4 import *

# Create a DAX
diamond = Workflow("diamond")

# Add some metadata
diamond.add({"name": "diamond"})

# Add input file to the replica_catalog
rc = ReplicaCatalog()
sc = SiteCatalog()
# say that we've added some replicas and sites to the catalogs

tc = TransformationCatalog()
e_preprocess = tc.add_transformation(name="preprocess")\
                .add_site(site="site", uri="gsiftp://site.com/bin/preprocess", type="installed")

e_findrange = tc.add_transformation(name="findrange")\
                .add_site(site="site", uri="gsiftp://site.com/bin/findrange", type="installed")

e_analyze = tc.add_transformation(name="analyze")\
                .add_site(site="site", uri="gsiftp://site.com/bin/analyze", type="installed")
# Add jobs
job1 = diamond.add(e_preprocess, args)
job2 = diamond.add(e_findrange, args)
job3 = diamond.add(e_findrange, args)
job4 = diamond.add(e_analyze, args)

# Add dependencies
diamond.add(job1, job2, job3)
diamond.add(job2, job4)
diamond.add(job3, job4)

diamond.add(rc)
diamond.add(tc)
diamond.add(sc)

# Write the DAX to a file
f = open("diamond.yml","w")
diamond.write_yaml(f)
f.close()

'''
Using Workflow.add(*args) as supposed to:
- Workflow.add_job(transformation, *args)
- Workflow.add_dependency(parent, *children)
- Workflow.add_metadata(dict or key, value)
- Workflow.add_transformation_catalog(tc)
- Workflow.add_site_catalog(sc)
- Workflow.add_replica_catalog(rc)

Could work as functools.singledispatch makes the dispatch happen
based only on the first argument, and in our function signatures above,
the first arguments are all unique.

If we go with this route, then chaining things together may not be ideal
as it is not obvious what Workflow.add(...) will return. When we say
Workflow.add_job(transformation, *args), it might be more apparent that
a Job would be returned, and therefore you can chain job related functions
together.
'''