The lifecycle of a job begins when the application contained in the job’s package is executed with the run command and a config. After the application starts, a service container is configured and the job listed in the config is instantiated. The stage is now set for the job to begin running.

Execution

When building a job, it is useful to envision the execution implemented as a for-each loop with a block of code before and after the loop. The following code shows the order in which the methods on a job are called.

var job = new MyJob();

await job.InitializeAsync();

await foreach (var item in job.GetItemsAsync())
	await job.ProcessAsync(item);

await job.FinalizeAsync();

In every job there are two methods that must be implemented: GetItemsAsync and ProcessAsync. These two methods make up the core of the job’s execution. All of the other methods in a job have a default implementation that can be overridden when needed.

GetItemsAsync

GetItemsAsync returns a collection of items to pass to ProcessAsync, as an IAsyncEnumerable<TItem>. Usually this method will contain a database query, an API call, open a file and return records, or read from a queue. Wherever the data comes from, there are some important aspects of how the data is used that can impact how you implement this method.

Serializability

The items returned from GetItemsAsync should be JSON serializable. This makes it possible to provide detailed reporting and advanced processing scenarios such as distributing items to be processed on other nodes.

Yielding Items

A feature that IAsyncEnumerable<T> enables is the ability to yield items from an async method. This makes it very easy to write code that allows the job to begin processing items before the entire collection of items has been read, avoiding the need to keep a large number of items in memory.

public override async IAsyncEnumerable<Environment> GetItemsAsync()
{
	foreach (var orgId in await api.GetOrgs())
		yield return await api.GetOrgDetails(orgId);
}

Using IEnumerable<T>

Although no cast from IEnumerable<T> to IAsyncEnumerable<T> exists, Runly provides an extension method with two overloads to wrap IEnumerable<T> and Task<IEnumerable<T>> in an IAsyncEnumerable<T>.

Use ToAsyncEnumerable() to convert an IEnumerable<T>, such as an array, List<T>, or other in-memory enumerable collection:

public override IAsyncEnumerable<string> GetItemsAsync() =>
	new string[] { "Item1", "Item2" }.ToAsyncEnumerable();

ToAsyncEnumerable() also converts async methods that return Task<IEnumerable<T>>, such as database queries:

public override IAsyncEnumerable<Environment> GetItemsAsync() => 
	db.GetUsersInRole("Owner").ToAsyncEnumerable();

When used in conjunction with a Task<IEnumerable<T>>, the Task is awaited when the first item in the collection is accessed by the enumerator.

Reading Files

When possible, avoid using methods that will read a large amount of data into memory before it is returned from GetItemsAsync. The following code reads a file into memory, parsing rows into FileRecords and adding them to a list before returning the list.

// Don't use this approach, there's a better way!
public override IAsyncEnumerable<FileRecord> GetItemsAsync() 
{
	using var reader = new StreamReader(File.OpenRead("a-big-file.txt"));

	var records = new List<FileRecord>();
	string line;

	while ((line = reader.ReadLine()) != null)
		records.Add(new FileRecord(line));

	return records.ToAsyncEnumerable();
}

The previous example fails to take advantage of the IAsyncEnumerator and reads the entire file into memory before returning it. Instead, a better solution is to read the data asynchronously using MoveNextAsync on the StreamReader and yield a FileRecord for each line read. This allows the Job to start processing items as the file is being read. Eliminating the use of a list and yielding items makes the code simpler and able to scale with large files:

public override async IAsyncEnumerable<FileRecord> GetItemsAsync() 
{
	using var reader = new StreamReader(File.OpenRead("a-big-file.txt"));
	string line;

	while ((line = await reader.ReadLineAsync()) != null)
		yield return new FileRecord(line);
}

Total Count

IAsyncEnumerable is inherently different than IEnumerable. An IAsyncEnumerable is a stream of data where the operation to get the next item in the stream is awaited. In general, streams should only be enumerated once. Some can only be enumerated once, like messages from a queue. Because of this, IAsyncEnumerable doesn’t lend itself to certain operations like Count. For this reason, Runly doesn’t attempt to count the items in an IAsyncEnumerable to determine the percentage of items that are processed in a job.

The exception to this is when ToAsyncEnumerable is used to wrap an IEnumerable in an AsyncEnumerableWrapper. When this is the case, Runly performs a Count on the underlying collection to get a total item count. This behavior can be turned off by passing false into ToAsyncEnumerable(bool canBeCounted). This should be done when the IEnumerable is reading from an underlying stream that can’t be enumerated more than once.

public override IAsyncEnumerable<FileRecord> GetItemsAsync() 
{
	// Set canBeCounted to false for IEnumerables with underlying streams
	return new MyFileReader("some-file.txt").ToAsyncEnumerable(canBeCounted: false);
}

Identifying Items

Items are identified in the results of a run using a human readable string that’s created by passing the item to GetItemIdAsync. Overriding this method in your job is optional. The default implementation calls the item’s ToString method, so if TItem.ToString returns a string that identifies the item, no override is needed. If TItem has a name, ID, or similar data, you should override GetItemIdAsync to return this data instead.

ProcessAsync

ProcessAsync is the method where the job does its work, being called once for each item returned from GetItemsAsync. When the method is complete, it returns a Result indicating success or failure and optionally includes output.

Parallel Execution

By default, ProcessAsync can be called in parallel using multiple asynchronous Tasks. Access to the collection of items is synchronized, so even if the enumerator is not thread-safe it will be accessed in a thread-safe manner, making it possible to call ProcessAsync concurrently. This behavior can be prevented by setting Options.CanProcessInParallel to false in the constructor.

The number of tasks used is determined by the config parameter Execution.ParallelTaskCount.

Dependency injection

ProcessAsync can accept between zero to sixteen dependencies in addition to TItem, depending on which Process base class is extended. Dependencies that are not thread-safe, such as a database connection, should be taken through the ProcessAsync method and not the constructor. A dependency taken through the constructor would be shared across multiple parallel executions of ProcessAsync, likely leading to hard to diagnose errors.

Read more about dependency injection in jobs.

Transactions

Transactions should begin and end within ProcessAsync. Transactions should never start in InitializeAsync and end in FinalizeAsync or in different calls to ProcessAsync. Doing so can create long duration transactions, unpredictable behavior, and bugs that are hard to diagnose.

Take a scenario in which a job sends an alert as an email to a group of users:

public override IAsyncEnumerable<User> GetItemsAsync() 
{
	return db.GetUsersToAlert();
}

public override async Task<Result> ProcessAsync(User user, IDbConnection db)
{
	using (var tx = db.BeginTransaction())
	{
		await mailApi.SendAlertToUser(user);

		user.MarkUserAsAlerted();
		await db.SaveUser(user);

		tx.Commit();

		return Result.Success();
	}
}

While this code will work, it may break down in some situations.

Using Object-Relational Mappers

When using an object-relational mapper (ORM) within a transaction, it may be necessary to fetch the entity being modified in the same transaction in which it is modified. In this case, that would mean getting the User from the database in ProcessAsync rather than GetItemsAsync.

Changing Conditions

Generally as the number of items returned from GetItemsAsync increases, the length of time that the job runs will increase. The conditions that existed at the beginning of the job may change before a particular user is sent an alert. When ProcessAsync is called for a user that no longer needs an alert, the alert may be sent again without issue, but if db.SaveUser() checks for concurrency an exception would be thrown.

Concurrency in ProcessAsync

A more resilient approach is to keep the items returned from GetItemsAsync lightweight and query the entity in ProcessAsync. Returning user IDs instead of full User objects allows you to keep the transaction isolated to ProcessAsync. By fetching the User and checking the status of the user’s alert in the transaction, any changes to the User in the time between GetItemsAsync and ProcessAsync are accounted for.

public override IAsyncEnumerable<Guid> GetItemsAsync() 
{
	return db.GetUserIdsToAlert();
}

public override async Task<Result> ProcessAsync(Guid userId, IDbConnection db)
{
	using (var tx = db.BeginTransaction())
	{
		var user = await db.GetUserById(userId);

		if (!user.HasBeenAlerted)
		{
			await mailApi.SendAlertToUser(user);

			user.MarkUserAsAlerted();
			await db.SaveUser(user);

			tx.Commit();

			return Result.Success();
		}
		else
		{
			return Result.Success("No alert sent");
		}
	}
}

Cancellation

The Job class contains a CancellationToken that should be used in ProcessAsync. When calling methods that accept a CancellationToken, simply pass the CancellationToken property to the method, like the CopyDirectory job in Examples.GettingStarted does when copying a file:

public override async Task<Result> ProcessAsync(string file)
{
	// ...
	using (var source = File.Open(sourceFile, FileMode.Open))
	{
		var destDir = Path.GetDirectoryName(destFile);

		if (!Directory.Exists(destDir))
			Directory.CreateDirectory(destDir);

		using (var dest = File.Create(destFile))
		{
			await source.CopyToAsync(dest, CancellationToken);
		}
	}
	// ...
}

When a job is cancelled the CancellationToken property’s IsCancellationRequested flag will be set to true, enabling the job to abort work and return a Result indicating the method was cancelled. Once cancellation has been requested, all currently executing ProcessAsync tasks are allowed to complete and no further items will be processed. FinalizeAsync will then be called with a disposition of Disposition.Cancelled.

Result

When ProcessAsync is complete, it must return a Result which indicates success or failure. If the job can produce multiple types of success or failure a category can be provided to group the results. For instance, if an item has been previously processed it could be put in a “Skipped” category so that there is a “Successful” (the default category name for successfully processed items) and a “Skipped” category.

A Result should be created using one the static methods on the class, such as Success or Failure. If ProcessAsync uses the CancellationToken then SuccessOrCancelled can be used to create a successful result unless the cancellation flag has been raised.

public override async Task<Result> ProcessAsync(string file)
{
	var sourceFile = Path.Combine(Config.Source, file);
	var destFile = Path.Combine(Config.Destination, file);

	if (File.Exists(destFile))
		return Result.Success("Already Copied");

	try
	{
		using (var source = File.Open(sourceFile, FileMode.Open))
		{
			var destDir = Path.GetDirectoryName(destFile);

			if (!Directory.Exists(destDir))
				Directory.CreateDirectory(destDir);

			using (var dest = File.Create(destFile))
			{
				await source.CopyToAsync(dest, CancellationToken);
			}
		}
	}
	catch (UnauthorizedAccessException) when (Config.IgnoreUnauthorizedAccessException)
	{
			return Result.Success("Skipped - Unauthorized", "Skipping file copy due to an UnauthorizedAccessException being thrown. Set IgnoreWhenAccessDenied = false to treat as an error.");
	}

	return Result.SuccessOrCancelled(CancellationToken);
}

Exceptions

If an unhandled exception occurs in ProcessAsync the exception is caught and recorded in a failed Result with the exception type name as the category. The full exception is stored with the result so that it can be inspected from the dashboard.

InitializeAsync

As the name suggests, InitializeAsync is called at the beginning of a job’s lifetime, before GetItemsAsync. This method gives an opportunity for the job to do any async initialization that it might need to do. In general, any work that needs to be done before processing starts should be done here instead of in the constructor, including inspection and validation of the config and other constructor parameters. If the job cannot start due to invalid config, an exception should be thrown in this method.

Constructor vs. InitializeAsync

The constructor should only set fields using arguments passed into the constructor. An exception thrown in the constructor should indicate a design-time error, such as an ArgumentNullException being thrown because a dependency was not supplied. Validation and initialization should occur in InitializeAsync. When in doubt, do the work in InitializeAsync.

public class MyJob : Process<MyConfig, string>
{
	readonly IDownloader downloader;

	public MyJob(MyConfig config, IDownloader downloader)
		: base(config)
	{
		// validate dependencies in constructor
		this.downloader = downloader ?? throw new ArgumentNullException(nameof(downloader));
	}

	public override async Task InitializeAsync()
	{
		// validate configuration in InitializeAsync
		if (String.IsNullOrWhiteSpace(Config.DownloadUrl))
			throw new ArgumentException("Missing download URL", nameof(Config.DownloadUrl));

		var data = await downloader.DownloadFile(Config.DownloadUrl);

		// ...
	}

	// ...
}

FinalizeAsync

FinalizeAsync gives jobs an opportunity to do something after all items have been processed, such as cleanup or publishing an event to alert another application that the job’s task is finished. A Disposition parameter indicates whether the job was successful, failed or cancelled by a user. This information can be used to determine how FinalizeAsync behaves and can also determine whether other jobs run in response to the job completing.

public override async Task<object> FinalizeAsync(Disposition disposition)
{
	await db.CleanUpAfterJob();

	if (disposition == Disposition.Successful)
	{
		return new { Message = "Job completed successfully!" };
	}
	else
	{
		return new { Message = "Job failed to complete successfully." };
	}
}