Performance Booster with System.IO.Pipelines in C#

As our industry has embraced the new strategies for handling the production workloads which include containers (Read as K8s) or Serverless (Read as Functions As A Service), the developers don’t have the luxury of unlimited computing resources on the production environments. Those days are gone where it was easy to acquire a large virtual machine with many cores and high memory for application deployment needs. As a .net developer though you are working with managed code and you rely on GC (Garbage collector) to do the job, the onus is now on you to write highly performant code which can run anywhere right from docker containers to IoT devices. With advent of C#8 and .NET core , Microsoft .NET team has been very cognizant of memory allocations. Every new version comes with modern APIs that can increase the performance of the application to X times more compared with older version of traditional APIs. In this blog post I will be showcasing the file I/O operations with 3 different techniques and will benchmark each technique. From the benchmarks results it will be pretty evident that System.IO.Pipelines wins by a considerable margin both in the time of execution and memory allocation.

The Challenge

We will be experimenting with reading a large csv file (100,000 records with 5 fields) of employee data. I’m sure you must encounter this challenge many times in your career where you have to parse a large csv file. This challenge creates a enough pressure on GC. The considerations around garbage collection are particularly important when thinking about performance. This is because garbage collection takes up CPU time, reducing the time spent on the actual data processing. Not only this, but each time a garbage collection is triggered, much of the work is suspended so that the remaining references can be evaluated. This can drastically effect the amount of time taken to process the data. I’ve chosen three techniques for this challenge.

  1. Using CSVHelper : This is a popular library for parsing csv files in .NET ecosystem.
  2. Using IAsyncEnumerable : This API was introduced from C#8 where a data stream (chunks of data) can be processed instead of whole file.
  3. Using System.IO.Pipelines : This API was shipped with .NET core 2.1 and is internally used by Kestrel , web server for AspNET core for high performance to process many requests per second received from the socket. It is available as Nuget package download.David Fowler who has architected these APIs as an excellent post of its introduction.

The source code of this post is available on my Github repo. The repo also consists of tests which has benchmarks results.

So let’s dive straight into the code

#nullable enable
using System;
using System.Buffers;
using System.IO;
using System.IO.Pipelines;
using System.Text;
using System.Threading.Tasks;

namespace FileIO
{
    public class WithPipeLines
    {
        /// <summary>
        /// Process the file using System.IO.Pipelines
        /// </summary>
        /// <param name="filePath">The file path </param>
        /// <param name="employeeRecords">The Employee Array in which file data will be processed.</param>
        /// <returns>PipeReader Sequence Position</returns>
        public async Task<int> ProcessFileAsync(string filePath, Employee[] employeeRecords)
        {
            var position = 0;
            if (!File.Exists(filePath)) return position;
            await using var fileStream = File.OpenRead(filePath);
            var pipeReader = PipeReader.Create(fileStream);
            while (true)
            {
                var fileData = await pipeReader.ReadAsync();

                // convert to Buffer
                var fileDataBuffer = fileData.Buffer;
                
               
                var sequencePosition = ParseLines(employeeRecords, ref fileDataBuffer, ref position);

                pipeReader.AdvanceTo(sequencePosition, fileDataBuffer.End);

                if (fileData.IsCompleted)
                {
                    break;
                }
            }

            await pipeReader.CompleteAsync(); // marking pipereader as Completed
            return position;
        }
        
         private static SequencePosition ParseLines(Employee[] employeeRecords, ref ReadOnlySequence<byte> buffer, ref int position)
        {
            var newLine = Encoding.UTF8.GetBytes(Environment.NewLine).AsSpan();

            var reader = new SequenceReader<byte>(buffer);

            while (!reader.End)
            {
                // Read the whole line till the new line is found
                if (!reader.TryReadToAny(out ReadOnlySpan<byte> line, newLine, true))
                {
                    break; // we don't have a delimiter (newline) in the current data
                }

                var parsedLine = LineParser.ParseLine(line); // we have a line to parse

                if (parsedLine is { }) // if the returned value is valid Employee object
                    employeeRecords[position++] = (Employee) parsedLine;
            }

            return reader.Position; // returning the Last position of the reader
        }

        private static class LineParser
        {
            private const byte Coma = (byte) ',';
            private const string ColumnHeaders = "Name,Email,DateOfJoining,Salary,Age";
            public static Employee? ParseLine(ReadOnlySpan<byte> line)
            {
                if (Encoding.UTF8.GetString(line).Contains(ColumnHeaders)) // Ignore the Header row
                {
                    return null;
                }
                var fieldCount = 1;

                var record = new Employee();

                while (fieldCount <= 5) // we have five fields in csv file
                {
                     var comaAt = line.IndexOf(Coma);
                     if (comaAt < 0)
                     {
                         comaAt = line.Length;
                     }

                     switch (fieldCount)
                     {
                         case 1:
                         {
                             var value = Encoding.UTF8.GetString(line[..comaAt]);
                             record.Name = value;
                             break;
                         }
                         case 2:
                         {
                             var value = Encoding.UTF8.GetString(line[..comaAt]);
                             record.Email = value;
                             break;
                         }
                         case 3:
                         {
                             var value = Encoding.UTF8.GetString(line[..comaAt]);
                             record.DateOfJoining = Convert.ToDateTime(value);
                             break;
                         }
                        
                         case 4:
                         {
                             var value = Encoding.UTF8.GetString(line[..comaAt]);
                             record.Salary = Convert.ToDouble(value);
                             break;
                         }
                        
                         case 5:
                         {
                             var value = Encoding.UTF8.GetString(line[..comaAt]);
                             record.Age = Convert.ToInt16(value);
                             return record;
                         }
                     }

                     line = line[(comaAt + 1)..]; // slice past field

                     fieldCount++;
                }

                return record;
            }
        }
    }
}

The entry point public method is ProcessFileAsync , which creates the instance of PipeReader class , it reads that data and converts into buffer, which is of data type of ReadOnlySequence<byte> . This buffer data is then passed to ParseLines method as a ref along with the position of the PipeReader , which has 0 value as it is in the beginning position. ParseLines method tries to navigate the new line using NewLine as a delimiter. This process continue till the end position of PipeReader is reached. After parsing is finish the PipeRead position is moved till the end of the buffer and it is marked as processed (line number 35 and 43).

The actual data processing takes place in the static class LineParser in the ParseLine method, after omitting the header row of csv file it tries to capture the each field data value by finding the "," position , and then try to extract string value with UTF8 Encoding using index offsets and ranges pattern. Each field is processed one by one and it is tracked by fieldCount variable.

Let’s see how we can consume this ProcessFileAsync method

var pool = ArrayPool<Employee>.Shared;
            var employeeRecords = pool.Rent(100000);
            var pipeLinesTest = new WithPipeLines();

            try
            {
                await pipeLinesTest.ProcessFileAsync(_filePath, employeeRecords);
            }
            finally
            {
                pool.Return(employeeRecords, clearArray: true);
            }

Creating an ArrayPool of Employee type . ArrayPool<T> is a high performance pool of managed arrays. It is a thread safe pool with custom max length.

Next thing is to call the Rent method which requires you to specify minimum length of the buffer. Keep in mind, that what Rent returns might be bigger than what you have asked for.

Once you are done using it, you just Return it to the SAME pool. Return method has an overload, which allows you to cleanup the buffer so subsequent consumer via Rent will not see the previous consumer’s content. By default the contents are left unchanged.

Benchmark Results

I’ve used BenchmarkDotNet library to measure the performance.

table { border-collapse: collapse; display: block; width: 100%; overflow: auto; } td, th { padding: 6px 13px; border: 1px solid #ddd; text-align: right; } tr { background-color: #fff; border-top: 1px solid #ccc; } tr:nth-child(even) { background: #f8f8f8; }

BenchmarkDotNet=v0.13.0, OS=macOS Big Sur 11.4 (20F71) [Darwin 20.5.0]
Intel Core i9-9880H CPU 2.30GHz, 1 CPU, 16 logical and 8 physical cores
.NET SDK=5.0.203
  [Host]     : .NET 5.0.6 (5.0.621.22011), X64 RyuJIT
  DefaultJob : .NET 5.0.6 (5.0.621.22011), X64 RyuJIT
MethodMeanErrorStdDevRankGen 0Gen 1Gen 2Allocated
PipeLines143.1 ms2.81 ms3.24 ms15500.00002000.0000750.000044 MB
AsyncStream223.0 ms4.40 ms8.36 ms28000.00003000.00001000.000064 MB
CsvHelper228.5 ms4.51 ms7.29 ms311000.00005000.00003000.000077 MB

As from the result above PipeLines method is clear winner which just took 143.1 milliseconds to process the data and with just 44 MB of memory allocation

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s