Creating a New Adapter From Scratch
Overview
This guide will help you to create a new adapter project and explain the building blocks. Make sure you have the prerequisites configured as explained here,
Adapters can be hosted in various types of applications. Most common ones will be a Console Application or Windows Service.
SDK provides an engine and other building blocks, but you are responsible for how you want to run it.
You can use the sample projects as the base for a new adapter by copying and renaming it. The guide below explains building an adapter from scratch.
Console Application
Console application is the simplest hosting model for adapters.
Start with the following steps to create a console app:
- Open Visual Studio 2022
- Create a new .NET Console application project using .NET 6
- Add references to the latest versions of the following packages:
OpenText.Fusion.AdapterSdk.Api
OpenText.Fusion.AdapterSdk.Engine.Runtime
- Create an implementation of the
IRepositoryAdapter
and optionallyIChangeLogSupport
. - In the
Program.cs
file, add the following lines:
using OpenText.Fusion.AdapterSdk.Engine.Runtime;
using OpenText.Fusion.AdapterSdk.Samples.ConsoleApplicationExample;
// Initialize .NET Core Application
AppInitializer.Initialize();
// Build engine factory
var engineFactory = ProcessingEngineFactoryConfigurator.Current.BuildConfigured<MyAdapter>("MyFs");
// Engine construction with simulated requests
// var engineFactory = ProcessingEngineFactoryConfigurator.Current.BuildConfigured<MyAdapter>("MyFs", TestingSetup.ConfigureServiceCollection());
// Create the processing engine
var processingEngine = engineFactory.CreateEngine();
// Attach to Ctrl+C and cancel processing and shut down the agent when pressed
var cancellationSource = new CancellationTokenSource();
Console.CancelKeyPress += (_, _) => cancellationSource.Cancel();
// StartAsync the engine and wait until it's shut down due to a problem or Ctrl+C pressed
await processingEngine.ExecuteAsync(cancellationSource.Token);
Above code block initializes the processing engine and executes it.
Repository Adapter Implementation
The IRepositoryAdapter
represents an Fusion Source Adapter.
It contains methods to handle work items from the Fusion.
CreateConfiguration method
CreateConfiguration method should return a class with basic information about the Adapter:
- adapter type
- definitions of properties supported by this adapter
- additional capabilities of the adapter
The adapter Type
property should match the Type
specified in Fusion Sources, Custom Adapters:
This method returns IAdapterDescriptor
. In most cases, the default implementation should be used - AdapterDescriptor
.
Example of CreateConfiguration
method implementation:
public IAdapterDescriptor CreateDescriptor()
{
return new AdapterDescriptor("MyCustomAdapter1",
new List<RepositorySettingDefinition>
{
new("Location", TypeCode.String, true, false),
new("UserName", TypeCode.String, true, false),
new("Password", TypeCode.String, true, true)
},
new List<string>
{
"ShuffleDataSupport"
});
}
First parameter in the AdapterDescriptor
constructor is the source type name.
Second parameter lists properties which are supported and required by the adapter.
The RepositorySettingDefinition
constructor expects following values:
public RepositorySettingDefinition(string name, TypeCode typeCode, bool isRequired, bool isEncrypted)
In our example, following properties are expected:
- Location
- required, string type, not encrypted
- UserName
- required, string type, not encrypted
- Password
- required, string type, encrypted
At the moment, the properties are only used for validation purposes and un-encrypting data from Fusion.
Encrypted fields like passwords are automatically decrypted by the SDK. Consumer will have a plain text.
Third parameter lists the additional capabilities of the adapter.
In our example we have declared that the adapter has ShuffleDataSupport
.
The declared capabilities will be interpreted in Fusion.
RetrieveFileListAsync method
Task RetrieveFileListAsync(RetrieveFileListRequest request, IFileListResultsHandler handler, CancellationToken cancellationToken);
This method is responsible for retrieval of a list of files from a source. If a source supports change log, then this method would only return changed files.
In case of sources that doesn't support a change log, this method should retrieve a list of all files in the repository.
Internally the changes will be detected automatically by comparison of metadata like ModifiedTime
or ContentHash
if available.
First parameter, RetrieveFileListRequest request
, provides you with information required to query a repository
Detected files should be queued in the IFileListResultsHandler handler
using QueueFileAsync
method.
It is important that all queued files have the basic metadata populated - for example, the ModifiedTime
is used to detect
changed files.
For example:
handler.QueueFileAsync(new FileMetadata(file.Name, file.FullName)
{
Size = file.Length,
ModifiedTime = file.LastWriteTimeUtc,
AccessedTime = file.LastAccessTimeUtc,
CreatedTime = file.CreationTimeUtc
},
file.DirectoryName ?? string.Empty);
This method has the following signature:
Task QueueFileAsync(IFileMetadata fileMetadata, string groupId);
Group Id
We covered the fileMetadata
, but there's also the groupId
parameter.
It is used by Fusion to distribute file data retrieval tasks across multiple adapters or multiple threads
to improve performance.
Different groups can execute simultaneously.
For example if an adapter scans file system shares, folders can potentially come from different drives and different machines. Specifying a group as a directory name and allowing files from each folder to be retrieved at the same time, can improve performance.
Failures
When implementing an adapter, you should place the code that can fail inside the try/catch
block.
If an exception is caught, you should call one of the handler.RegisterFailure
methods.
Depending on the type and place where a failure occurred, it might be related to a specific repository file or location (like a file system folder). In this case you should call the following overload:
void RegisterFailure(string location, IFailureDetails failureDetails)
If a failure is not specific to any location, or a file, you should use this overload:
void RegisterFailure(IFailureDetails failureDetails)
There is a default implementation of IFailureDetails
interface: FailureDetails
.
Complete Example of RetrieveFileList Method
Here's a complete example of RetrieveFileList
method that:
- scans a directory in file system according to the
Location
property, - queues information about discovered files
- handles different types of failures
public async Task RetrieveFileListAsync(RetrieveFileListRequest request, IFileListResultsHandler handler, CancellationToken cancellationToken)
{
// Get the repository option provided in UI, the location to scan
var location = request.RepositoryProperties.RepositoryOptions["Location"].ToString();
var userName = request.RepositoryProperties.RepositoryOptions["UserName"].ToString();
var password = request.RepositoryProperties.RepositoryOptions["Password"].ToString();
try
{
// Perform some operation, like a login
LogIn(userName, password);
}
catch (Exception ex)
{
handler.RegisterFailure(new FailureDetails($"Failed to log in to the repository", ex));
return;
}
var directoryInfo = new DirectoryInfo(location);
List<DirectoryInfo> directories;
try
{
// Enumerate top level directories, this can fail
directories = directoryInfo.EnumerateDirectories().ToList();
}
catch (UnauthorizedAccessException ex)
{
// Handle location specific failure
handler.RegisterFailure(location, new FailureDetails($"Current user does not have access to {location}", ex));
return;
}
catch (Exception ex)
{
// Handle location specific failure
handler.RegisterFailure(location, new FailureDetails($"Exception caught when enumerating directories for {location}", ex));
return;
}
try
{
foreach (var directory in directories)
{
foreach (var file in directory.EnumerateFiles("*", SearchOption.AllDirectories))
{
try
{
// performing some operation on a file that can throw an exception
file.ResolveLinkTarget(true);
}
catch (Exception ex)
{
// Handle a failure that is specific to a repository file
handler.RegisterFailure(file.FullName, new FailureDetails("Failed to ResolveLinkTarget", ex));
continue;
}
// Queue information about a discovered file, use directory for a group id
await handler.QueueFileAsync(new FileMetadata(file.Name, file.FullName)
{
Size = file.Length,
ModifiedTime = file.LastWriteTimeUtc,
AccessedTime = file.LastAccessTimeUtc,
CreatedTime = file.CreationTimeUtc
},
file.DirectoryName ?? string.Empty);
}
}
}
catch (Exception ex)
{
handler.RegisterFailure(directoryInfo.FullName, new FailureDetails(ex.Message, ex));
}
}
See the the full implementation in the provided sample in OpenText.Fusion.AdapterSdk.Samples.ConsoleApplicationExample/MyAdapter.cs
.
RetrieveChangeLogAsync method
Task RetrieveChangeLogAsync(ChangeLogRequest request, IFileListResultsHandler handler, CancellationToken cancellationToken);
This method is responsible for retrieval of a list of files from a source. If a source supports change log, then this method would only return changed files that have been updated after the LastProcessedDate passed in the request.
In case of sources that do not support a change log, the adapter will fall back to the RetrieveFileListRequest request
.
First parameter, ChangeLogRequest request
, provides you with information required to query a repository.
Detected files should be queued in the IFileListResultsHandler handler
using QueueFileAsync
method as per
the RetrieveFileListAsync
method.
Failures should be handled as per the RetrieveFileListAsync
method.
Complete Example of RetrieveChangeLogAsync Method
Here is a complete example of RetrieveChangeLogAsync
method that:
- scans a directory in file system according to the
Location
property, - filters files updated since the LastProcessedDate
- queues information about discovered files
- handles different types of failures
public async Task RetrieveChangeLogAsync(ChangeLogRequest request, IFileListResultsHandler handler, CancellationToken cancellationToken)
{
_logger.LogInformation("Executing RetrieveChangeLogAsync with the request: {@ChangeLogRequest}", request);
// Get the repository option provided in UI, the location to scan
var location = request.RepositoryProperties.RepositoryOptions.GetOption("Location");
var userName = request.RepositoryProperties.RepositoryOptions.GetOption("UserName");
var password = request.RepositoryProperties.RepositoryOptions.GetOption("Password");
var lastProcessedDate = request.LastProcessedDate;
try
{
// Perform some operation, like a login
LogIn(userName, password);
}
catch (Exception ex)
{
await handler.RegisterFailureAsync(new FailureDetails("Failed to log in to the repository", ex));
return;
}
var directoryInfo = new DirectoryInfo(location);
try
{
foreach (var file in directoryInfo.EnumerateFiles("*", SearchOption.AllDirectories))
{
cancellationToken.ThrowIfCancellationRequested();
if (lastProcessedDate < file.LastWriteTimeUtc)
{
await handler.QueueFileAsync(new FileMetadata(file.Name, file.FullName)
{
ModifiedTime = file.LastWriteTimeUtc,
AccessedTime = file.LastAccessTimeUtc,
CreatedTime = file.CreationTimeUtc
},
file.DirectoryName ?? string.Empty,
cancellationToken);
}
}
}
catch (Exception ex)
{
await handler.RegisterFailureAsync(directoryInfo.FullName, new FailureDetails(ex.Message, ex));
}
}
See the the full implementation in the provided sample in OpenText.Fusion.AdapterSdk.Samples.ConsoleApplicationExample/MyAdapter.cs
.
RetrieveFilesDataAsync method
Task RetrieveFilesDataAsync(RetrieveFilesDataRequest request, IFileDataResultsHandler handler, CancellationToken cancellationToken)
RetrieveFilesDataAsync
method is responsible for the binary contents and expensive metadata retrieval.
During repository update, files that are detected as changed after RetrieveFileListAsync
method completes, are passed
here for further processing:
- binary contents retrieval,
- an expensive to obtain metadata
The first parameter,RetrieveFilesDataRequest request
contains a list of files to retrieve in the Files
property:
public IEnumerable<IRepositoryFile> Files { get; }
Usually you'll have to iterate over each file, retrieve the contents and optionally some additional metadata.
You'll then queue the contents using the IFileDataResultsHandler handler
, QueueFileAsync
method:
ValueTask QueueFileAsync(string fileId, IFileContents fileContents, IFileMetadata metadata);
The IFileContents
interface represents the binary contents of a file.
The default implementation FileContents
class allows creating it using either a stream or using byte array.
When used with a Stream
, this class takes a function with a result of a Stream
. You should create the stream
inside this function, and return it.
The easiest way is to use a lambda
expression:
new FileContents(() =>
{
var file = new FileInfo(repositoryFile.Metadata.FileLocation);
return file.OpenRead();
})
The actual stream is created and disposed by the SDK.
Full example:
foreach (var repositoryFile in request.Files)
{
// FileLocation is a string that can be used to access / retrieve a file in a repository. In our case, it's the full path
var file = new FileInfo(repositoryFile.Metadata.FileLocation);
// Open the file stream
var fileStream = file.OpenRead();
// Optionally add some additional metadata
repositoryFile.Metadata.AdditionalMetadata.Add("custom_property", 123);
// Queue the data - file contents (using file stream) created as a result of the function,
// and repository file metadata extended by additional property.
await handler.QueueFileAsync(repositoryFile.FileId,
new FileContents(() =>
{
var file = new FileInfo(repositoryFile.Metadata.FileLocation);
return file.OpenRead();
}),
repositoryFile.Metadata,
cancellationToken);
}
Handling Failures
Failure are handled in a similar way as in the RetrieveFileListAsync
method. The IFileDataResultsHandler
contains RegisterFailure
method.
Stream fileStream;
try
{
// Both of those operations can fail for various reasons
var file = new FileInfo(repositoryFile.Metadata.FileLocation);
fileStream = file.OpenRead();
}
catch (Exception ex)
{
// Register a failure for the current file. Exceptions message is automatically added to the reported failure.
handler.RegisterFailure(repositoryFile.Metadata.FileLocation,
new FailureDetails($"Failed to obtain file contents for {repositoryFile.Metadata.FileLocation}", ex));
continue;
}
Entire RetrieveFilesDataAsync
method example:
public Task RetrieveFilesDataAsync(RetrieveFilesDataRequest request, IFileDataResultsHandler handler, CancellationToken cancellationToken)
{
foreach (var repositoryFile in request.Files)
{
Stream fileStream;
try
{
// Both of those operations can fail for various reasons
var file = new FileInfo(repositoryFile.Metadata.FileLocation);
fileStream = file.OpenRead();
}
catch (Exception ex)
{
// Register a failure for the current file. Exceptions message is automatically added to the reported failure.
handler.RegisterFailure(repositoryFile.Metadata.FileLocation,
new FailureDetails($"Failed to obtain file contents for {repositoryFile.Metadata.FileLocation}", ex));
continue;
}
repositoryFile.Metadata.AdditionalMetadata.Add("custom_property", 123);
handler.QueueFileAsync(repositoryFile.FileId, new FileContents(fileStream), repositoryFile.Metadata);
}
return Task.CompletedTask;
}
Worker Service
The Worker Service is the .NET Core version of a long running service.
This will be the default choice in many cases.
Creating of a Worker Service is very similar to the console application.
The only difference is the initialization code and an additional class where the engine is instantiated and started.
- Open Visual Studio 2022
- Create a new project using the Worker Service application template, use .NET 6
- Add references to the latest versions of the following packages:
OpenText.Fusion.AdapterSdk.Api
OpenText.Fusion.AdapterSdk.Engine.Runtime
- Create an implementation of the
IRepositoryAdapter
same way as in the console application - Add the constructor arguments in the
Worker.cs
class and store them in local fields:
private readonly IProcessingEngineFactory _engineFactory;
private readonly ILogger<Worker> _logger;
public Worker(ILogger<Worker> logger, IProcessingEngineFactory engineFactory)
{
_logger = logger;
_engineFactory = engineFactory;
}
- Add the engine instantiation and startup in the
Worker.cs
class:
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.Register(() => _logger.LogInformation("Worker - cancellation requested, service will stop"));
_logger.LogInformation("Starting Worker Service...");
try
{
var processingEngine = _engineFactory.CreateEngine();
await processingEngine.ExecuteAsync(stoppingToken).ConfigureAwait(false);
_logger.LogInformation("Stopping worker service");
}
catch (Exception ex)
{
_logger.LogError(ex, "Exception caught during execution");
throw;
}
}
- In the
Program.cs
file, add the following lines:
using OpenText.Fusion.AdapterSdk.Engine.Runtime;
using WorkerServiceExample; // Update this with your `Worker` namespace
AppInitializer.Initialize();
var host = Host.CreateDefaultBuilder(args)
.ConfigureAppConfiguration((context, builder) => builder.SetBasePath(AppDomain.CurrentDomain.BaseDirectory).AddJsonFile("appsettings.json"))
.ConfigureServices((context, services) =>
{
// Configure the SDK Services
services.ConfigureAdapterSdk<MyAdapter>(context.Configuration, "MyAdapter")
// Add Worker as a hosted service
.AddHostedService<Worker>();
})
// At the moment, only Windows platform is supported and the service can be deployed as Windows Service
.UseWindowsService()
.Build();
await host.RunAsync();
Delete files from Workbook
After scanning of files we can delete those files from workspace using Delete activity, also we can debug this functionality using sample project.
Click here for how to debug the code.