Chapter 7. Task executors

Table of Contents

Closures
Dependencies for tasks with subtasks
Task executors and future entities

Modern computers with multiple processor cores, hyperthreading and whatnot are designed for running several threads in parallel. It would be a shame to let all that processing power be wasted by running just single threaded builds, wouldn't it? Using several build threads may even give a speed boost on a uniprocessor system since different tasks have different performance profiles – some tasks are CPU bound, others are I/O bound.

Schmant comes with the TaskExecutor for distributing task execution over a collection of threads. Tasks are added to the task executor and it executes them in parallel using a fixed number of threads from a thread pool.

This performance boost does come with added complexity, though. When parallelizing tasks, dependencies between different tasks have to be dealt with; Task B may need the results from Task A to run. In Schmant, that task B is dependent on task A is expressed by adding an object implementing the TaskDependency interface together with B to the task executor. Task, but not TaskFactory, does that, so A can be used to express the dependency when adding B.

A TaskExecutor is used by

  1. Creating it.

  2. Adding tasks to it.

  3. Calling waitFor() on it to wait for all tasks to be run.

  4. Calling shutdown() to release all of its resources.

A script can also call waitFor(TaskDependency) to wait for a specific dependency to be met.

Example 7.1. Compile Java files and build a Jar file, using a task executor

Groovy

import java.io.File import java.text.SimpleDateFormat import java.util.Date import org.entityfs.util.* import org.entityfs.util.filter.entity.* import org.schmant.run.TaskExecutor import org.schmant.support.entityfs.* import org.schmant.support.io.* import org.schmant.task.io.TreeCopyTF import org.schmant.task.jdk.jar.JarTF import org.schmant.task.jdk.javac.jdk6.Jdk6JavacTF // A temporary directory for the compiled classes. This, along with all of its // contents, will be automatically deleted when Schmant exits (unless the -k // flag is used). def ctarget = TempFileUtil.createTempDir() // The lib directory that contains Jar files that the code to compile uses. def libDir = SchmantFileSystems.getEntityForDirectory( new File("/home/me/myproject/lib"), true) // Get all jar files from the lib directory. The jar files are returned as a // Set of EntityFS EFile:s def depjars = Directories.getAllFilesMatching(libDir, "*.jar") // Add a dependency from the optlib directory. This is a Java File depjars.add(new File("/home/me/myproject/optlib/opt.jar")) // Create a read only, locking Directory object for the source directory. // A locking file system eliminates the risk of accidental concurrent // modification of files by parallel build threads. def src = SchmantFileSystems.getEntityForDirectory( new File("/home/me/myproject/src"), true) // Create a task executor that, by default, will use two build threads, unless // the noOfBuildThreads property is set to another value. def te = new TaskExecutor(). setNumberOfThreads(props.getIntValue("noOfBuildThreads", 2)). start() try { // A task for compiling all source files // TaskExecutor.add returns a TaskDependency object for the added task. def javacDep = te.add( new Jdk6JavacTF(). addSource(src). addClasspathEntries(depjars). setTarget(ctarget)) // A task for copying all non-Java files from the source hierarchy to the // target hierarchy. def copyDep = te.add( new TreeCopyTF(). // Create a view of the source hierarchy that hides all .java files. setSource( src.newView( // Groovy lets us use the ~ operator to negate the filter. ~ new EFileNameExtensionFilter("java"))). setTarget(ctarget)) // A timestamp for the built archive def timestamp = new SimpleDateFormat("yyyyMMddHHmm"). format(new Date()) // Build the Jar file. Put it in /home/me/myproject // The Jar build task depends on both the compiling and the copying tasks. te.add( new JarTF(). addSource(ctarget). setTarget(new File("/home/me/myproject/myproject" + timestamp + ".jar")), // Use an array for the two dependencies. [javacDep, copyDep]) // Wait for all tasks to complete te.waitFor() } finally { te.shutdown() }

JavaScript

// A temporary directory for the compiled classes. This, along with all of its // contents, will be automatically deleted when Schmant exits (unless the -k // flag is used). ctarget = TempFileUtil.createTempDir(); // The lib directory that contains Jar files that the code to compile uses. libDir = SchmantFileSystems.getEntityForDirectory( new File("/home/me/myproject/lib"), true); // Get all jar files from the lib directory. The jar files are returned as a // Set of EntityFS EFile:s depjars = Directories.getAllFilesMatching(libDir, "*.jar"); // Add a dependency from the optlib directory. This is a Java File depjars.add(new File("/home/me/myproject/optlib/opt.jar")); // Create a read only, locking Directory object for the source directory. // A locking file system eliminates the risk of accidental concurrent // modification of files by parallel build threads. src = SchmantFileSystems.getEntityForDirectory( new File("/home/me/myproject/src"), true); // Create a task executor that, by default, will use two build threads, unless // the noOfBuildThreads property is set to another value. te = new TaskExecutor(). setNumberOfThreads(props.getIntValue("noOfBuildThreads", 2)). start(); try { // A task for compiling all source files // TaskExecutor.add returns a TaskDependency object for the added task. javacDep = te.add( new Jdk6JavacTF(). addSource(src). addClasspathEntries(depjars). setTarget(ctarget)); // A task for copying all non-Java files from the source hierarchy to the // target hierarchy. copyDep = te.add( new TreeCopyTF(). // Create a view of the source hierarchy that hides all .java files. setSource( src.newView( new EFileNameExtensionFilter("java").not())). setTarget(ctarget)); // A timestamp for the built archive timestamp = new Packages.java.text.SimpleDateFormat("yyyyMMddHHmm"). format(new Date()); // Build the Jar file. Put it in /home/me/myproject // The Jar build task depends on both the compiling and the copying tasks. te.add( new JarTF(). addSource(ctarget). setTarget(new File("/home/me/myproject/myproject" + timestamp + ".jar")), // Use an array for the two dependencies [javacDep, copyDep]); // Wait for all tasks to complete te.waitFor(); } finally { te.shutdown(); }

JRuby

# A temporary directory for the compiled classes. This, along with all of its # contents, will be automatically deleted when Schmant exits (unless the -k # flag is used). ctarget = Schmant::TempFileUtil.createTempDir # The lib directory that contains Jar files that the code to compile uses. libDir = Schmant::SchmantFileSystems.getEntityForDirectory( Java::JavaIo::File.new("/home/me/myproject/lib"), true) # Get all jar files from the lib directory. The jar files are returned as a # Set of EntityFS EFile:s depjars = Schmant::Directories.getAllFilesMatching(libDir, "*.jar") # Add a dependency from the optlib directory. This is a Java File depjars.add Java::JavaIo::File.new("/home/me/myproject/optlib/opt.jar") # Create a read only, locking Directory object for the source directory. # A locking file system eliminates the risk of accidental concurrent # modification of files by parallel build threads. src = Schmant::SchmantFileSystems.getEntityForDirectory( Java::JavaIo::File.new("/home/me/myproject/src"), true) # Create a task executor that, by default, will use two build threads, unless # the noOfBuildThreads property is set to another value. te = Schmant::TaskExecutor.new. setNumberOfThreads($props.getIntValue("noOfBuildThreads", 2)). start begin # A task for compiling all source files # TaskExecutor.add returns a TaskDependency object for the added task. javacDep = te.add( Schmant::Jdk6JavacTF.new. addSource(src). addClasspathEntries(depjars). setTarget(ctarget)) # A task for copying all non-Java files from the source hierarchy to the # target hierarchy. copyDep = te.add( Schmant::TreeCopyTF.new. # Create a view of the source hierarchy that hides all .java files. setSource( src.newView( Schmant::EFileNameExtensionFilter.new("java").not)). setTarget(ctarget)) # A timestamp for the built archive # SimpleDateFormat and Date are not included in the Schmant module, so we have # to access them through the Java module instead. ts = Java::JavaText::SimpleDateFormat.new("yyyyMMddHHmm"). format(Java::JavaUtil::Date.new) # Build the Jar file. Put it in /home/me/myproject # The Jar build task depends on both the compiling and the copying tasks. te.add( Schmant::JarTF.new. addSource(ctarget). setTarget( Java::JavaIo::File.new("/home/me/myproject/myproject" + ts + ".jar")), # Use an array for the two dependencies [javacDep, copyDep]) # Wait for all tasks to complete te.waitFor ensure te.shutdown end

Jython

# A temporary directory for the compiled classes. This, along with all of its # contents, will be automatically deleted when Schmant exits (unless the -k # flag is used). ctarget = TempFileUtil.createTempDir() # The lib directory that contains Jar files that the code to compile uses. libDir = SchmantFileSystems.getEntityForDirectory( File("/home/me/myproject/lib"), True) # Get all jar files from the lib directory. The jar files are returned as a # Set of EntityFS EFile:s depjars = Directories.getAllFilesMatching(libDir, "*.jar") # Add a dependency from the optlib directory. This is a Java File depjars.add(File("/home/me/myproject/optlib/opt.jar")) # Create a read only, locking Directory object for the source directory. # A locking file system eliminates the risk of accidental concurrent # modification of files by parallel build threads. src = SchmantFileSystems.getEntityForDirectory( File("/home/me/myproject/src"), \ True) # Create a task executor that, by default, will use two build threads, unless # the noOfBuildThreads property is set to another value. te = TaskExecutor(). \ setNumberOfThreads(props.getIntValue("noOfBuildThreads", 2)). \ start() try: # A task for compiling all source files # TaskExecutor.add returns a TaskDependency object for the added task. javacDep = te.add( Jdk6JavacTF(). \ addSource(src). \ addClasspathEntries(depjars). \ setTarget(ctarget)) # A task for copying all non-Java files from the source hierarchy to the # target hierarchy. # # Create a view of the source hierarchy that hides all .java files. copyDep = te.add( TreeCopyTF(). \ setSource( src.newView( EFileNameExtensionFilter("java").not())). \ setTarget(ctarget)) # A timestamp for the built archive # # SimpleDateFormat is not automatically imported by the preparation # script. from java.text import SimpleDateFormat timestamp = SimpleDateFormat("yyyyMMddHHmm"). \ format(Date()) # Build the Jar file. Put it in /home/me/myproject # The Jar build task depends on both the compiling and the copying # tasks. # # Use a list for the two dependencies te.add( JarTF(). \ addSource(ctarget). \ setTarget( File("/home/me/myproject/myproject" + timestamp + ".jar")), \ [javacDep, copyDep]) # Wait for all tasks to complete te.waitFor() finally: te.shutdown()

When scheduling a task that depends on the results from several other tasks, its dependencies can be given in any of the following ways:

The task executor will behave in the same way regardless of the way that the dependencies are given. Just use the way that is most convenient for the task being scheduled.

Task factories may be reused to create several tasks:

Example 7.2. Reusing a task factory

Groovy

import org.schmant.run.TaskExecutor import org.schmant.task.jdk.java.JavaTF import org.schmant.task.jdk.javac.jdk6.Jdk6JavacTF // Create a task executor with two parallel build threads def te = new TaskExecutor(). setNumberOfThreads(2). start() try { // Compile the source files in the src directory (hierarchy) and put the // classes in bin javacTask = new Jdk6JavacTF(). setSource(src). setTarget(bin).create() te.add(javacTask) // Create a task factory for running a Java program tf = new JavaTF(). setClassToRun("Test1"). // This is not configurable. Use the global variable bin. addClasspathEntry(bin) // Configure the task factory for the first program run, create a task and add // it to the task executor te.add( tf.addJvmOption("-Dexample.config=first"), javacTask) // Configure the task factory for the second program run, create a task and // add it to the task executor te.add( tf.addJvmOption("-Dexample.config=second"), javacTask) // Wait for all tasks to finish te.waitFor() } finally { te.shutdown() }

JavaScript

// Create a task executor with two parallel build threads te = new TaskExecutor(). setNumberOfThreads(2). start(); try { // Compile the source files in the src directory (hierarchy) and put the // classes in bin javacTask = new Jdk6JavacTF(). setSource(src). setTarget(bin).create(); te.add(javacTask); // Create a task factory for running a Java program tf = new JavaTF(). setClassToRun("Test1"). // This is not configurable. Use the global variable bin. addClasspathEntry(bin); // Configure the task factory for the first program run, create a task and add // it to the task executor te.add( tf.addJvmOption("-Dexample.config=first"), javacTask); // Configure the task factory for the second program run, create a task and // add it to the task executor te.add( tf.addJvmOption("-Dexample.config=second"), javacTask); // Wait for all tasks to finish te.waitFor(); } finally { te.shutdown(); }

JRuby

# Create a task executor with two parallel build threads te = Schmant::TaskExecutor.new. setNumberOfThreads(2). start begin # Compile the source files in the src directory (hierarchy) and put the # classes in bin javacTask = Schmant::Jdk6JavacTF.new. setSource($src). setTarget($bin).create te.add(javacTask) # Create a task factory for running a Java program tf = Schmant::JavaTF.new. setClassToRun("Test1"). # This is not configurable. Use the global variable bin. addClasspathEntry($bin) # Configure the task factory for the first program run, create a task and add # it to the task executor te.add( tf.addJvmOption("-Dexample.config=first"), javacTask) # Configure the task factory for the second program run, create a task and # add it to the task executor te.add( tf.addJvmOption("-Dexample.config=second"), javacTask) # Wait for all tasks to finish te.waitFor ensure te.shutdown end

Jython

# Create a task executor with two parallel build threads te = TaskExecutor(). \ setNumberOfThreads(2). \ start() try: # Compile the source files in the src directory (hierarchy) and put the # classes in bin javacTask = Jdk6JavacTF(). \ setSource(src). \ setTarget(bin).create() te.add(javacTask) # Create a task factory for running a Java program # # Use the classpath entries from the global variable "bin" tf = JavaTF(). \ setClassToRun("Test1"). \ addClasspathEntry(bin) # Configure the task factory for the first program run, create a task and add # it to the task executor te.add( \ tf.addJvmOption("-Dexample.config=first"), \ javacTask) # Configure the task factory for the second program run, create a task and # add it to the task executor te.add( \ tf.addJvmOption("-Dexample.config=second"), \ javacTask) # Wait for all tasks to finish te.waitFor() finally: te.shutdown()

Instead of registering a task with a TaskExecutor, a script may register a closure instead. A closure is a block of code that has access to the variables of the surrounding scope, but that may be executed at a later time. See Wikipedia article on closure.

Example 7.3. Running a closure with a task executor

Groovy

// enableTaskPackage org.at4j import org.entityfs.util.* import org.schmant.run.TaskExecutor import org.schmant.support.FutureFile import org.schmant.task.at4j.tar.TarTF // After tar:ing the directory d to a file in the directory targetD, create an // info file containing the size of the tar file. def te = new TaskExecutor(). start() try { def targetFile = new FutureFile(targetD, "d.tar") def tarTask = new TarTF(). setSource(d). setTarget(targetFile).create() te.add(tarTask) // Add a closure for creating the information file. This closure does not have // any arguments, but it has access to the variables in the scope where it is // defined. // Note the curly braces. te.add( { def sz = Files.getSize(targetFile.file) def infoFile = Directories.newFile(targetD, "d.txt") Files.writeText(infoFile, "" + sz) }, // The tar task must be run before the closure. tarTask) te.waitFor() } finally { te.shutdown() }

JavaScript

// After tar:ing the directory d to a file in the directory targetD, create an // info file containing the size of the tar file. enableTaskPackage("org.at4j"); te = new TaskExecutor(). start(); try { targetFile = new FutureFile(targetD, "d.tar"); tarTask = new TarTF(). setSource(d). setTarget(targetFile).create(); te.add(tarTask); // Add a closure for creating the information file. This closure does not have // any arguments, but it has access to the variables in the scope where it is // defined. te.add(function() { var sz = Files.getSize(targetFile.getFile()); var infoFile = Directories.newFile(targetD, "d.txt"); Files.writeText(infoFile, "" + sz); }, // The tar task must be run before the closure. tarTask); te.waitFor(); } finally { te.shutdown(); }

JRuby

# After tar:ing the directory d to a file in the directory targetD, create an # info file containing the size of the tar file. enableTaskPackage "org.at4j" te = Schmant::TaskExecutor.new. start(); begin targetFile = Schmant::FutureFile.new($targetD, "d.tar") tarTask = Schmant::TarTF.new. setSource($d). setTarget(targetFile).create te.add tarTask # Bind the global variable to a variable defined in this scope. The closure # does not have access to the global variable. targetDir = $targetD # Add a closure for creating the information file. This closure does not have # any arguments, but it has access to the variables from the scope where it is # defined. te.add( Proc.new { sz = Schmant::Files.getSize targetFile.file infoFile = Schmant::Directories.newFile(targetDir, "d.txt") Schmant::Files.writeText(infoFile, sz.to_s) }, # The tar task must be run before the closure. tarTask) te.waitFor ensure te.shutdown end

Jython

# After tar:ing the directory d to a file in the directory targetD, create an # info file containing the size of the tar file. enableTaskPackage("org.at4j") te = TaskExecutor().start() try: targetFile = FutureFile(targetD, "d.tar") tarTask = TarTF(). \ setSource(d). \ setTarget(targetFile).create() te.add(tarTask) # Jython does not support closures. Function variables and lambda functions # may be used instead # # The function that will create the info file. It uses global variables and # the targetFile variable that is defined in this scope. def createInfoFile(): sz = Files.getSize(targetFile.getFile()) infoFile = Directories.newFile(targetD, "d.txt") Files.writeText(infoFile, str(sz)) # Add the function that creates the info file to the task executor. It must be # run after the tar task. # # The function cannot take any arguments. te.add(createInfoFile, tarTask) te.waitFor() finally: te.shutdown()

Closures may be used in several places in Schmant, for instance:

The next example shows how a closure may be used instead of a task factory when running a RecursiveActionTF.

Example 7.4. Using a closure instead of a task factory

Groovy

// enableTaskPackage org.at4j import org.at4j.comp.CompressionLevel import org.entityfs.util.* import org.entityfs.util.filter.entity.EFileNameExtensionFilter import org.schmant.arg.DirectoryAndFilter import org.schmant.run.TaskExecutor import org.schmant.support.FutureFile import org.schmant.task.at4j.bzip2.BZip2TF import org.schmant.task.meta.RecursiveActionTF // For each text file in the directory hierarchy under the directory d, bzip2 it // with medium compression if it is less than 4096 bytes, and with high // compression if it is larger // Use this task executor to run the tasks. def te = new TaskExecutor(). setNumberOfThreads(2). start() try { // This constant will be used in the closure, which demonstrates that the // closure has access to the surrounding scope def MIN_SIZE_FOR_MAX_COMPRESSION = 4096 // bytes new RecursiveActionTF(). // Use a filter that will only return text files setSource(new DirectoryAndFilter( d, new EFileNameExtensionFilter(".txt"))). // Instead of running the entire RecursiveActionTF in the task executor, use // it to run each task that the RecursiveActionTF creates instead. setTaskExecutor(te). // Use a closure instead of a task. Since this is run in a // RecursiveActionTF, the closure will be called with a ClosureParameters // object containing a source entity. If the task would have been run in a // RecursiveProcessTF instead, the ClosureParameters object would // have contained both a source and a target entity. // // Note the curly braces setTaskFactory{ cParams -> def textFile = cParams.source def targetName = Entities.getName(textFile) + ".bz2" def targetDir = Entities.getParent(textFile) // Medium or high compression? def compressionLevel if (Files.getSize(textFile) < MIN_SIZE_FOR_MAX_COMPRESSION) { compressionLevel = CompressionLevel.DEFAULT } else { compressionLevel = CompressionLevel.BEST } new BZip2TF(). setSource(textFile). setTarget(new FutureFile(targetDir, targetName)). setCompressionLevel(compressionLevel).run() }.run() te.waitFor() } finally { te.shutdown() }

JavaScript

// For each text file in the directory hierarchy under the directory d, bzip2 it // with medium compression if it is less than 4096 bytes, and with high // compression if it is larger enableTaskPackage("org.at4j"); // Use this task executor to run the tasks. te = new TaskExecutor(). setNumberOfThreads(2). start(); try { // This constant will be used in the closure, which demonstrates that the // closure has access to the surrounding scope var MIN_SIZE_FOR_MAX_COMPRESSION = 4096; // bytes new RecursiveActionTF(). // Use a filter that will only return text files setSource(new DirectoryAndFilter( d, new EFileNameExtensionFilter(".txt"))). // Instead of running the entire RecursiveActionTF in the task executor, use // it to run each task that the RecursiveActionTF creates instead. setTaskExecutor(te). // Use a closure instead of a task. Since this is run in a // RecursiveActionTF, the closure will be called with a ClosureParameters // object containing a source entity. If the task would have been run in a // RecursiveProcessTF instead, the ClosureParameters object would // have contained both a source and a target entity. setTaskFactory(function(cParams) { var textFile = cParams.source; var targetName = Entities.getName(textFile) + ".bz2"; var targetDir = Entities.getParent(textFile); // Medium or high compression? if (Files.getSize(textFile) < MIN_SIZE_FOR_MAX_COMPRESSION) { var compressionLevel = CompressionLevel.DEFAULT; } else { var compressionLevel = CompressionLevel.BEST; } new BZip2TF(). setSource(textFile). setTarget(new FutureFile(targetDir, targetName)). setCompressionLevel(compressionLevel).run(); }).run(); te.waitFor(); } finally { te.shutdown(); }

JRuby

# For each text file in the directory hierarchy under the directory d, bzip2 it # with medium compression if it is less than 4096 bytes, and with high # compression if it is larger enableTaskPackage "org.at4j" # Use this task executor to run the tasks. te = Schmant::TaskExecutor.new. setNumberOfThreads(2). start begin # This constant will be used in the closure, which demonstrates that the # closure has access to the surrounding scope MIN_SIZE_FOR_MAX_COMPRESSION = 4096; # bytes Schmant::RecursiveActionTF.new. # Use a filter that will only return text files setSource(Schmant::DirectoryAndFilter.new( $d, Schmant::EFileNameExtensionFilter.new(".txt"))). # Instead of running the entire RecursiveActionTF in the task executor, use # it to run each task that the RecursiveActionTF creates instead. setTaskExecutor(te). # Use a closure instead of a task. Since this is run in a # RecursiveActionTF, the closure will be called with a ClosureParameters # object containing a source entity. If the task would have been run in a # RecursiveProcessTF instead, the ClosureParameters object would # have contained both a source and a target entity. setTaskFactory( Proc.new { |cParams| textFile = cParams.source targetName = Schmant::Entities.getName(textFile) + ".bz2" targetDir = Schmant::Entities.getParent(textFile) # Medium or high compression? if Schmant::Files.getSize(textFile) < MIN_SIZE_FOR_MAX_COMPRESSION compressionLevel = Schmant::CompressionLevel::DEFAULT else compressionLevel = Schmant::CompressionLevel::BEST end Schmant::BZip2TF.new. setSource(textFile). setTarget(Schmant::FutureFile.new(targetDir, targetName)). setCompressionLevel(compressionLevel).run }).run te.waitFor ensure te.shutdown end

Jython

# For each text file in the directory hierarchy under the directory d, bzip2 it # with medium compression if it is less than 4096 bytes, and with high # compression if it is larger # Jython does not support closures. The closest that we can get is using lambda # expressions or function variables # # Define the function that will be called by the task def compressFile(cParams): textFile = cParams.getSource() targetName = Entities.getName(textFile) + ".bz2" targetDir = Entities.getParent(textFile) # Medium or high compression? if Files.getSize(textFile) < MIN_SIZE_FOR_MAX_COMPRESSION: compressionLevel = CompressionLevel.DEFAULT else: compressionLevel = CompressionLevel.BEST BZip2TF(). \ setSource(textFile). \ setTarget(FutureFile(targetDir, targetName)). \ setCompressionLevel(compressionLevel).run() enableTaskPackage("org.at4j") # Use this task executor to run the tasks. te = TaskExecutor(). \ setNumberOfThreads(2). \ start() try: # This constant will be used in the closure, which demonstrates that the # closure has access to the surrounding scope MIN_SIZE_FOR_MAX_COMPRESSION = 4096; # bytes # Create the recursive action task # * Use a filter that will only return text files # * Instead of running the entire RecursiveActionTF in the task executor, use # it to run each task that the RecursiveActionTF creates instead. # * Use a function variable instead of a task. Since this is run in a # RecursiveActionTF, the function will be called with a ClosureParameters # object containing a source entity. If the task would have been run in a # RecursiveProcessTF instead, the ClosureParameters object would # have contained both a source and a target entity. RecursiveActionTF(). \ setSource(DirectoryAndFilter( \ d, \ EFileNameExtensionFilter(".txt"))). \ setTaskExecutor(te). \ setTaskFactory(compressFile).run() te.waitFor() finally: te.shutdown()

Some tasks such as the RecursiveActionTF or the JavaWorkspaceBuilderTF create and schedule their own tasks when they are run. When using such a task as a task dependency, the subtasks are not included in that dependency, and that is probably not what the programmer intended. Use the dependency from the task's getDependencyForTasksScheduledByThisTask method instead. That dependency will not be satisfied until all scheduled tasks have been run.

Example 7.5. Dependencies for task with subtasks

Groovy

// enableTaskPackage org.at4j import org.entityfs.util.Entities import org.schmant.run.TaskExecutor import org.schmant.support.FutureFile import org.schmant.task.at4j.tar.TarTF import org.schmant.task.io.gzip.GZipTF import org.schmant.task.meta.RecursiveActionTF // Gzip all files in a directory hierarchy under d, and then build a tar archive // containing all files. The tar archive is put in the directory targetD. // A task executor def te = new TaskExecutor().start() try { gzipTask = new RecursiveActionTF(). setSource(d). // Let the recursive action task schedule all tasks that it creates instead // of running them right away. setTaskExecutor(te). // Use a closure that compresses a file and then deletes the source file. // Note the curly braces. setTaskFactory{ cParams -> def targetName = Entities.getName(cParams.source) + ".gz" def targetDir = Entities.getParent(cParams.source) new GZipTF(). setSource(cParams.source). setTarget(new FutureFile(targetDir, targetName)). run() // Delete the original file. Entities.delete(cParams.source) // Run the recursive action task now. This will make it schedule all of its // tasks in the task executor. }.run() tarTask = new TarTF(). setSource(d). setTarget(new FutureFile(targetD, "d.tar")). create() // The tar task depends on all tasks scheduled by the recursive action task // above te.add(tarTask, gzipTask.dependencyForTasksScheduledByThisTask) te.waitFor() } finally { te.shutdown() }

JavaScript

// Gzip all files in a directory hierarchy under d, and then build a tar archive // containing all files. The tar archive is put in the directory targetD. enableTaskPackage("org.at4j"); // A task executor te = new TaskExecutor().start() try { gzipTask = new RecursiveActionTF(). setSource(d). // Let the recursive action task schedule all tasks that it creates instead // of running them right away. setTaskExecutor(te). // Use a closure that compresses a file and then deletes the source file. setTaskFactory( function(cParams) { var targetName = Entities.getName(cParams.source) + ".gz"; var targetDir = Entities.getParent(cParams.source); new GZipTF(). setSource(cParams.source). setTarget(new FutureFile(targetDir, targetName)). run(); // Delete the original file. // "delete" is a reserved word in JavaScript. Use the deleteEntity // method instead. Entities.deleteEntity(cParams.source); } // Run the recursive action task now. This will make it schedule all of its // tasks in the task executor. ).run(); tarTask = new TarTF(). setSource(d). setTarget(new FutureFile(targetD, "d.tar")). create(); // The tar task depends on all tasks scheduled by the recursive action task // above te.add(tarTask, gzipTask.getDependencyForTasksScheduledByThisTask()); te.waitFor(); } finally { te.shutdown(); }

JRuby

# Gzip all files in a directory hierarchy under d, and then build a tar archive # containing all files. The tar archive is put in the directory targetD. enableTaskPackage "org.at4j" # A task executor te = Schmant::TaskExecutor.new.start begin gzipTask = Schmant::RecursiveActionTF.new. setSource($d). # Let the recursive action task schedule all tasks that it creates instead # of running them right away. setTaskExecutor(te). # Use a closure that compresses a file and then deletes the source file. setTaskFactory( Proc.new { |cParams| targetName = Schmant::Entities.getName(cParams.source) + ".gz" targetDir = Schmant::Entities.getParent(cParams.source) Schmant::GZipTF.new. setSource(cParams.source). setTarget(Schmant::FutureFile.new(targetDir, targetName)). run # Delete the original file. Schmant::Entities.delete cParams.source } # Run the recursive action task now. This will make it schedule all of its # tasks in the task executor. ).run tarTask = Schmant::TarTF.new. setSource($d). setTarget(Schmant::FutureFile.new($targetD, "d.tar")). create # The tar task depends on all tasks scheduled by the recursive action task # above te.add(tarTask, gzipTask.dependencyForTasksScheduledByThisTask) te.waitFor ensure te.shutdown end

Jython

# Gzip all files in a directory hierarchy under d, and then build a tar archive # containing all files. The tar archive is put in the directory targetD. enableTaskPackage("org.at4j") # A task executor te = TaskExecutor().start() try: # The function that will be used for compressing the file def gzipFile(cParams): targetName = Entities.getName(cParams.source) + ".gz" targetDir = Entities.getParent(cParams.source) GZipTF(). \ setSource(cParams.source). \ setTarget(FutureFile(targetDir, targetName)). \ run() # Delete the original file. Entities.delete(cParams.source) # Create the recursive gzip task # * Let the recursive action task schedule all tasks that it creates instead # of running them right away. # * Use a closure that compresses a file and then deletes the source file. # * Run the recursive action task now. This will make it schedule all of its # tasks in the task executor. gzipTask = RecursiveActionTF(). \ setSource(d). \ setTaskExecutor(te). \ setTaskFactory(gzipFile).run() tarTask = TarTF(). \ setSource(d). \ setTarget(FutureFile(targetD, "d.tar")). \ create() # The tar task depends on all tasks scheduled by the recursive action task # above te.add(tarTask, gzipTask.getDependencyForTasksScheduledByThisTask()) te.waitFor() finally: te.shutdown()

A FutureEntity such as a FutureFile is used for representing an entity that does not yet exist when scheduling a task. See below for a simple example that uses a FutureFile to represent a file created by one task and then used by another.

Example 7.6. Using future files to represent files that do not yet exist

Groovy

import org.entityfs.util.Directories import org.schmant.support.FutureFile import org.schmant.task.io.gzip.GZipTF import org.schmant.task.text.TextReplaceTF // te is a task executor. // d is a Directory where the file f.txt to be processed is. // Note: in this example, we could have fed the result from TextReplaceTF right // into GZipTF's source property instead since TextReplaceTF is a Producer. // A FutureFile representing the result of the text processing task // created below. def fp = new FutureFile(d, "fp.txt") // Create a task for processing the template file f.txt def pt = new TextReplaceTF(). setSource(Directories.getFile(d, "f.txt")). setTarget(fp). addReplace("!!!VERSION!!!", "1.0").create() te.add(pt) // Create a task for GZip'ping the processed file te.add( new GZipTF(). // Reuse the future file, this time as the source parameter. Since this task // depends on the task creating the file, the future file will exist when // this task is run. setSource(fp). setTarget(new FutureFile(d, "fp.txt.gz")), // This task must depend on the processing task pt)

JavaScript

// te is a task executor. // d is a Directory where the file f.txt to be processed is. // Note: in this example, we could have fed the result from TextReplaceTF right // into GZipTF's source property instead since TextReplaceTF is a Producer. // A FutureFile representing the result of the text processing task // created below. fp = new FutureFile(d, "fp.txt"); // Create a task for processing the template file f.txt pt = new TextReplaceTF(). setSource(Directories.getFile(d, "f.txt")). setTarget(fp). addReplace("!!!VERSION!!!", "1.0").create(); te.add(pt); // Create a task for GZip'ping the processed file te.add( new GZipTF(). // Reuse the future file, this time as the source parameter. Since this task // depends on the task creating the file, the file will exist when this task // is run. setSource(fp). setTarget(new FutureFile(d, "fp.txt.gz")), // This task must depend on the processing task pt);

JRuby

# te is a task executor. # d is a Directory where the file f.txt to be processed is. # Note: in this example, we could have fed the result from TextReplaceTF right # into GZipTF's source property instead since TextReplaceTF is a Producer. # A FutureFile representing the result of the text processing task # created below. fp = FutureFile(d, "fp.txt") # Create a task for processing the template file f.txt pt = TextReplaceTF(). \ setSource(Directories.getFile(d, "f.txt")). \ setTarget(fp). \ addReplace("!!!VERSION!!!", "1.0").create() te.add(pt) # Create a task for GZip'ping the processed file. # This task must depend on the processing task. # Reuse the future file, this time as the source parameter. Since this task # depends on the task that creates the file, the file will exist when this task # is run. te.add( \ GZipTF(). \ setSource(fp). \ setTarget(FutureFile(d, "fp.txt.gz")), \ pt)

Jython

# te is a task executor. # d is a Directory where the file f.txt to be processed is. # Note: in this example, we could have fed the result from TextReplaceTF right # into GZipTF's source property instead since TextReplaceTF is a Producer. # A FutureFile representing the result of the text processing task # created below. fp = Schmant::FutureFile.new($d, "fp.txt") # Create a task for processing the template file f.txt pt = Schmant::TextReplaceTF.new. setSource(Schmant::Directories.getFile($d, "f.txt")). setTarget(fp). addReplace("!!!VERSION!!!", "1.0").create() $te.add(pt) # Create a task for GZip'ping the processed file $te.add( Schmant::GZipTF.new. # Reuse the future file, this time as the source parameter. Since this task # depends on the task creating the file, the file will exist when this task # is run. setSource(fp). setTarget(FutureFile.new($d, "fp.txt.gz")), # This task must depend on the processing task pt)

In the example below, an XML file is first preprocessed, then parsed and then "statistics" is created for it. The XML parser uses an XML catalog for resolving external entities. All tasks are run with a TaskExecutor[2].

Example 7.7. Computing statistics for an XML file

Groovy

import java.io.File import org.entityfs.util.* import org.schmant.run.TaskExecutor import org.schmant.support.* import org.schmant.support.entityfs.* import org.schmant.support.xml.* import org.schmant.task.meta.RecursiveActionTF import org.schmant.task.proxy.ReplaceSourceFileTF import org.schmant.task.script.ScriptTF import org.schmant.task.text.TextReplaceTF import org.schmant.task.xml.catalog.AddSystemIdToCatalogTF import org.schmant.task.xml.dom.DomParseXmlTF // Create a task executor with two threads and start it def te = new TaskExecutor(). setNumberOfThreads(2). start() try { // An XML catalog to use for resolving external entities def cr = new XmlCatalogResolver() // Add all DTD files in the directory DTD to the catalog def buildCatalogTask = new RecursiveActionTF(). setSource(dtd). setTaskFactory( new AddSystemIdToCatalogTF(). setXmlCatalog(cr)).create() // Add the task to the executor te.add buildCatalogTask // Create a FutureFile representing the file f. This is // done because the preprocessing task below will invalidate f by deleting it. // This is how EntityFS entities work; even though the preprocess task will // put a new file in the same location as f, that new file is not f. def ff = new FutureFile(f) // The XML document is in the EFile f. Preprocess it. // This makes the f variable invalid since the original file is replaced. def preprocessTask = new ReplaceSourceFileTF(). setSource(f). setTaskFactory( new TextReplaceTF(). addReplace("!!!VERSION!!!", "1.0")).create() // Add the task to the executor. This can be run in parallel with the XML // catalog task defined above. te.add preprocessTask // Define the XML parsing task. def parseTask = new DomParseXmlTF(). setSource(ff). setEntityResolver(cr).create() // Schedule the task and inform the task executor that it depends on the // preprocessing task and the XML catalog task. te.add(parseTask, [preprocessTask, buildCatalogTask]) // Write the XML "statistics" to this file def targetf = new File(props.getStringValue("java.io.tmpdir"), "out.txt") targetf.createNewFile() // Add a closure that calculates statistics from the XML file. Adding the // closure to the task executor will make it execute first when all of its // dependencies have executed. te.add( { // The closure does not have any arguments // // The closure has access to the variables in the surrounding scope. // Convert the target File targetf to an EFile def target = SchmantFileSystems.getEntityForFile(targetf, false) // Write the text to the target file // The parse task is a Producer of a Document object. Files.writeText(target, parseTask.get().toString()) }, // This depends on the parse task parseTask) // Wait for the task executor to finish te.waitFor() } finally { // Make sure that the task executor is stopped. te.shutdown() }

JavaScript

// Create a task executor with two threads and start it te = new TaskExecutor(). setNumberOfThreads(2). start(); try { // An XML catalog to use for resolving external entities cr = new XmlCatalogResolver(); // Add all DTD files in the directory DTD to the catalog buildCatalogTask = new RecursiveActionTF(). setSource(dtd). setTaskFactory( new AddSystemIdToCatalogTF(). setXmlCatalog(cr)).create(); // Add the task to the executor te.add(buildCatalogTask); // Create a FutureFile representing the file f. This is done because // the preprocessing task below will invalidate f by deleting it. This is how // EntityFS entities work; even though the preprocess task will put a new file // in the same location as f, that new file is not f. ff = new FutureFile(f); // The XML document is in the EFile f. Preprocess it. // This makes the f variable invalid since the original file is replaced. preprocessTask = new ReplaceSourceFileTF(). setSource(f). setTaskFactory( new TextReplaceTF(). addReplace("!!!VERSION!!!", "1.0")).create(); // Add the task to the executor. This can be run in parallel with the XML // catalog task defined above. te.add(preprocessTask); // Define the XML parsing task. parseTask = new DomParseXmlTF(). setSource(ff). setEntityResolver(cr).create(); // Schedule the task and inform the task executor that it depends on the // preprocessing task and the XML catalog task. te.add(parseTask, [preprocessTask, buildCatalogTask]); // Write the XML "statistics" to this file targetf = new File(props.getStringValue("java.io.tmpdir"), "out.txt"); targetf.createNewFile(); // Add a closure that calculates statistics from the XML file. Adding the // closure to the task executor will make it execute first when all of its // dependencies have executed. te.add( // The closure does not have any arguments function() { // The closure has access to the variables in the surrounding scope. // Convert the target File targetf to an EFile var target = SchmantFileSystems.getEntityForFile(targetf, false); // Write the text to the target file // The parse task is a Producer of a Document object. Files.writeText(target, parseTask.get().toString()); }, // This depends on the parse task. parseTask); // Wait for the task executor to finish te.waitFor(); } finally { // Make sure that the task executor is stopped. te.shutdown(); }

JRuby

# Create a task executor with two threads and start it te = Schmant::TaskExecutor.new. setNumberOfThreads(2). start begin # An XML catalog to use for resolving external entities cr = Schmant::XmlCatalogResolver.new # Add all DTD files in the directory DTD to the catalog buildCatalogTask = Schmant::RecursiveActionTF.new. setSource($dtd). setTaskFactory( Schmant::AddSystemIdToCatalogTF.new. setXmlCatalog(cr)).create # Add the task to the executor te.add buildCatalogTask # Create a FutureFile representing the file f. This is done because # the preprocessing task below will invalidate f by deleting it. This is how # EntityFS entities work; even though the preprocess task will put a new file # in the same location as f, that new file is not f. ff = Schmant::FutureFile.new $f # The XML document is in the EFile f. Preprocess it. # This makes the f variable invalid since the original file is replaced. preprocessTask = Schmant::ReplaceSourceFileTF.new. setSource($f). setTaskFactory( Schmant::TextReplaceTF.new. addReplace("!!!VERSION!!!", "1.0")).create # Add the task to the executor. This can be run in parallel with the XML # catalog task defined above. te.add preprocessTask # Define the XML parsing task. parseTask = Schmant::DomParseXmlTF.new. setSource(ff). setEntityResolver(cr).create # Schedule the task and inform the task executor that it depends on the # preprocessing task and the XML catalog task. te.add(parseTask, [preprocessTask, buildCatalogTask]) # Write the XML "statistics" to this file targetf = Java::JavaIo::File.new( $props.getStringValue("java.io.tmpdir"), "out.txt") targetf.createNewFile # Add a closure that calculates statistics from the XML file. Adding the # closure to the task executor will make it execute first when all of its # dependencies have executed. # # Create the closure using Proc.new. We cannot use a lambda expression here # since our closure spans several statements. te.add( Proc.new { || # The closure has access to the variables in the surrounding scope. # Convert the target File targetf to an EFile target = Schmant::SchmantFileSystems.getEntityForFile(targetf, false) # Write the "statistics" to the target file Schmant::Files.writeText(target, parseTask.get.toString) }, parseTask) # Wait for the task executor to finish te.waitFor ensure # Make sure that the task executor is stopped. te.shutdown end

Jython

# Create a task executor with two threads and start it te = TaskExecutor(). \ setNumberOfThreads(2). \ start() try: # An XML catalog to use for resolving external entities cr = XmlCatalogResolver() # Add all DTD files in the directory DTD to the catalog buildCatalogTask = RecursiveActionTF(). \ setSource(dtd). \ setTaskFactory( AddSystemIdToCatalogTF(). \ setXmlCatalog(cr)).create() # Add the task to the executor te.add(buildCatalogTask) # Create a FutureFile representing the file f. This is done because # the preprocessing task below will invalidate f by deleting it. This is # how EntityFS entities work; even though the preprocess task will put a # new file in the same location as f, that new file is not f. ff = FutureFile(f) # The XML document is in the EFile f. Preprocess it. # This makes the f variable invalid since the original file is replaced. preprocessTask = ReplaceSourceFileTF(). \ setSource(f). \ setTaskFactory( TextReplaceTF(). \ addReplace("!!!VERSION!!!", "1.0")).create() # Add the task to the executor. This can be run in parallel with the XML # catalog task defined above. te.add(preprocessTask) # Define the XML parsing task. parseTask = DomParseXmlTF(). \ setSource(ff). \ setEntityResolver(cr).create() # Schedule the task and inform the task executor that it depends on the # preprocessing task and the XML catalog task. te.add(parseTask, [preprocessTask, buildCatalogTask]) # Write the XML "statistics" to this file targetf = File(props.getStringValue("java.io.tmpdir"), "out.txt") targetf.createNewFile() # This function will be used for creating the XML "statistics". Since it # spans more than one statement, we cannot simply use a lambda for this def calculateXmlStatistics(): # This function has access to the variables in the surrounding scope. # Convert the target File targetf to an EFile. target = SchmantFileSystems.getEntityForFile(targetf, False) # Write the text to the target file # The parse task is a Producer of a Document object. Files.writeText(target, parseTask.get().toString()) # Add the function to the task executor. # It depends on the parse task te.add( calculateXmlStatistics, parseTask) # Wait for the task executor to finish te.waitFor() finally: # Make sure that the task executor is stopped. te.shutdown()

Java's File object works just like a FutureEntity object because, unlike EntityFS entities, it does not require that the file or directory that it references must exist.



[1] Varargs are Java syntactic sugar that are converted to arrays at compile time

[2] In this simple example, using a TaskExecutor does not give much of a performance boost since most tasks depend on each other.