Skip to content

Data Transformation

Robin Rodricks edited this page Sep 12, 2023 · 11 revisions

Architecture

Getting Started

Transform sinks is another awesome feature of FluentStorage that works across all the storage providers. Transform sinks allow you to transform data stream for both upload and download to somehow transform the underlying stream of data. Examples of transform sinks would be gzipping data transparently, encrypting it, and so on.

Let's say you would like to gzip all of the files that you upload/download to a storage. You can do that in the following way:

IBlobStorage myGzippedStorage = StorageFactory.Blobs
   .AzureBlobStorageWithSharedKey("name", "key")
   .WithGzipCompression();

Then use the storage as you would before - all the data is compressed as you write it (with any WriteXXX method) and decompressed as you read it (with any ReadXXX method).

Implementation

Due to the nature of the transforms, they can change both the underlying data, and stream size, therefore there is an issue with storage providers, as they need to know beforehand the size of the blob you are uploading. The matter becomes more complicated when some implementations need to calculate other statistics of the data before uploading i.e. hash, CRC and so on. Therefore the only reliable way to stream transformed data is to actually perform all of the transofrms, and then upload it. In this implementation, FluentStorage uses in-memory transforms to achieve this, however does it extremely efficiently by using Microsoft.IO.RecyclableMemoryStream package that performs memory pooling and reclaiming for you so that you don't need to worry about software slowdows. You can read more about this technique here.

This also means that today a transform sink can upload a stream only as large as the amount of RAM available on your machine. I am, however, thinking of ways to go further than that, and there are some beta implementations available that might see the light soon.

Available Sinks

Build Your Own Sink

Implementing your own transformation sink is a matter of implementing a new class derived from ITransformSink interface, which only has two methods:

   public interface ITransformSink
   {
      Stream OpenReadStream(string fullPath, Stream parentStream);

      Stream OpenWriteStream(string fullPath, Stream parentStream);
   }

The first one is called when FluentStorage opens a blob for reading, so that you can replace original stream passed in parentStream with your own. The second one does the reverse. For instance, have a look at the implementation of Gzip sink, as it's the easiest one:

public class GZipSink : ITransformSink
{
   private readonly CompressionLevel _compressionLevel;

   public GZipSink(CompressionLevel compressionLevel = CompressionLevel.Optimal)
   {
      _compressionLevel = compressionLevel;
   }

   public Stream OpenReadStream(string fullPath, Stream parentStream)
   {
      if(parentStream == null)
         return null;

      return new GZipStream(parentStream, CompressionMode.Decompress, false);
   }

   public Stream OpenWriteStream(string fullPath, Stream parentStream)
   {
      return new GZipStream(parentStream, _compressionLevel, false);
   }
}

This sink simply takes incoming stream and wraps it around in the standard built-in GZipStream from System.IO.Compression namespace.

Passing Your Sink To Storage

In order to use the sink, you can simply call .WithSinks extension method and pass the sink you want to use. For instance, to enable GZipSink do the following:

IBlobStorage storage = StorageFactory.Blobs
   .XXX()
   .WithSinks(new GZipSink());

You can also create an extension method if you use this often:

public static IBlobStorage WithGzipCompression(
   this IBlobStorage blobStorage, CompressionLevel compressionLevel = CompressionLevel.Optimal)
{
   return blobStorage.WithSinks(new GZipSink(compressionLevel));
}

Chaining Sinks

.WithSinks extension method in fact accept an array of sinks, which means that sinks can be chained together. This is useful when you need to do multiple transformations at the same time. For instance, if I would like to both compress, and encrypt data in the target storage, I could initialise my storage in the following way:

IBlobStorage encryptedAndCompressed =          
   StorageFactory.Blobs
      .InMemory()
      .WithSinks(
         new GZipSink(),
         new SymmetricEncryptionSink("To6X5XVaNNMKFfxssJS6biREGpOVZjEIC6T7cc1rJF0=")))

Note that declaration order matters here - when writing, the data is compressed first, and encrypted second.