Child pages
  • python api & client code (to discuss @ 3/9 dev meeting)

Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Simple workflow  (input_file) → [TAR] → (arch.tar.gz)

  • default filenames could be (doesn't matter for the catalogs in the example below because everything is inlined):
    • SiteCatalog.yml
    • TransformationCatalog.yml
    • ReplicaCatalog.yml
    • pegasus.confproperties
    • Workflow.yml
  • default location for submit_dir?<cwd>/<username>/pegasus/<dag name>/runXXXX
  • default location for outputs ?with defaults for the above, we can set defaults for cli args in the client code<cwd>/wf-output
    • when no "local" site given and one is auto generated
    • --output-site=local must be specified in plan


Code Block
languagepy
themeRDark
linenumberstrue
#!/usr/bin/env python3
 
from pathlib import Path
 
from Pegasus.api import *
 
# --- Pegasus Properties -------------------------------------------------------
props = Properties()
props["pegasus.data.configuration"] = "condorio"
props.write()
 
# --- Sites --------------------------------------------------------------------
LOCAL = "local"
CONDORPOOL = "condorpool"
sc = SiteCatalog()\
        .add_sites(
            Site(CONDORPOOL, arch=Arch.X86_64, os_type=OS.LINUX)
                .add_profile_pegasus(style="condor")
                .add_profile_condor(universe="vanilla")
        )
 
# --- Replicas -----------------------------------------------------------------
input_file = File("input")
rc = ReplicaCatalog()\
        .add_replica(LOCAL, input_file, "file://" + str(TOP_DIR / "input"))
 
# --- Transformations ----------------------------------------------------------
tar = Transformation("tar", site=CONDOR_POOL, pfn="/usr/bin/tar", is_stageable=False)
 
tc = TransformationCatalog()\
        .add_transformations(tar)
 
# --- Workflow -----------------------------------------------------------------
output_file = File("output")
 
Workflow("compress", infer_dependencies=True)\
    .add_jobs(
        Job(tar)
            .add_args("-czvf", "arch.tar.gz", input_file)
            .add_inputs(input_file)
            .add_outputs(output_file)
    ).add_site_catalog(sc)\
    .add_replica_catalog(rc)\
    .add_transformation_catalog(tc)\
    .plan(output_site=LOCAL, submit=True)\
	.wait()


'''[########------------------------------------------] 16.7% ..Running (Completed: 1, Queued: 0, Running: 2, Failed: 0)'''

...