Skip to content

Commit

Permalink
Fixed spill memory stream bugs. Added tests to verify spilling.
Browse files Browse the repository at this point in the history
  • Loading branch information
Laszlo Dobos committed Oct 31, 2014
1 parent fd002be commit bb1ba17
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 66 deletions.
209 changes: 143 additions & 66 deletions src/Jhu.SharpFitsIO/SpillMemoryStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,51 +10,70 @@ namespace Jhu.SharpFitsIO
/// Implements a write-only memory stream that spills onto the
/// disk when a given size is reached.
/// </summary>
internal class SpillMemoryStream : Stream
/// <remarks>
/// This class only implements the write functions of a standard
/// stream. When all data is written to the stream, it can be
/// written to another using the WriteTo function.
/// </remarks>
internal class SpillMemoryStream : Stream, IDisposable
{
#region Private member varibles

private long position;
private long spillLimit;
private string spillPath;
private MemoryStream memory;
private FileStream spill;

#endregion
/// <summary>
/// Internal memory buffer to store data temporarily until the
/// spill limit is reached.
/// </summary>
private MemoryStream memoryBuffer;

#region Constructors and initializers
/// <summary>
/// External file buffer to store data temporarily after the
/// spill limit is reached.
/// </summary>
private FileStream spillBuffer;

public SpillMemoryStream()
{
InitializeMembers();
}
#endregion
#region Properties

public SpillMemoryStream(long spillLimit)
/// <summary>
/// Gets the current position of the stream.
/// </summary>
public override long Position
{
InitializeMembers();

this.spillLimit = spillLimit;
get { return position; }
set { throw new InvalidOperationException(); }
}

public SpillMemoryStream(long spillLimit, string spillPath)
/// <summary>
/// Gets or sets the size limit at which data is spilled to the disk.
/// </summary>
public long SpillLimit
{
InitializeMembers();

this.spillLimit = spillLimit;
this.spillPath = spillPath;
get { return spillLimit; }
set
{
EnsureNotOpen();
spillLimit = value;
}
}

private void InitializeMembers()
/// <summary>
/// Gets or sets the path of temporary file used when data is spilled
/// to the disk.
/// </summary>
public string SpillPath
{
this.position = 0;
this.spillLimit = long.MaxValue; // 1MB
this.spillPath = null;
this.memory = new MemoryStream();
this.spill = null;
get { return spillPath; }
set
{
EnsureNotOpen();
spillPath = value;
}
}

#endregion

public override bool CanRead
{
get { return false; }
Expand All @@ -80,51 +99,85 @@ public override long Length
get { return position; }
}

public override long Position
#endregion
#region Constructors and initializers

public SpillMemoryStream()
{
get { return position; }
set { throw new InvalidOperationException(); }
InitializeMembers();
}

public override void Close()
public SpillMemoryStream(long spillLimit)
{
if (spill != null)
{
spill.Close();
}
InitializeMembers();

if (memory != null)
{
memory.Close();
}
this.spillLimit = spillLimit;
}

public SpillMemoryStream(long spillLimit, string spillPath)
{
InitializeMembers();

this.spillLimit = spillLimit;
this.spillPath = spillPath;
}

private void InitializeMembers()
{
this.position = 0;
this.spillLimit = 0x100000; // 1MB
this.spillPath = null;
this.memoryBuffer = null;
this.spillBuffer = null;
}

protected override void Dispose(bool disposing)
{
if (disposing)
{
if (spill != null)
if (spillBuffer != null)
{
spillBuffer.Dispose();
}

if (memoryBuffer != null)
{
spill.Dispose();
memoryBuffer.Dispose();
}

if (memory != null)
if (spillPath != null && File.Exists(spillPath))
{
memory.Dispose();
File.Delete(spillPath);
}
}
}

if (spillPath != null && File.Exists(spillPath))
public void Dispose()
{
Dispose(true);
}

#endregion
#region Stream implementation

public override void Close()
{
if (spillBuffer != null)
{
spillBuffer.Close();
}

if (memoryBuffer != null)
{
File.Delete(spillPath);
memoryBuffer.Close();
}
}

public override void Flush()
{
if (spill != null)
if (spillBuffer != null)
{
spill.Flush();
spillBuffer.Flush();
}
}

Expand All @@ -150,77 +203,101 @@ public override void SetLength(long value)

public override void Write(byte[] buffer, int offset, int count)
{
if (spill == null && position + count < spillLimit)
if (position + count < spillLimit)
{
memory.Write(buffer, offset, count);
OpenMemoryBuffer();
memoryBuffer.Write(buffer, offset, count);
}
else
{
OpenSpillFile();

spill.Write(buffer, offset, count);
OpenSpillBuffer();
spillBuffer.Write(buffer, offset, count);
}

position += count;
}

public override void WriteByte(byte value)
{
if (spill == null && position + 1 < spillLimit)
if (position + 1 < spillLimit)
{
memory.WriteByte(value);
OpenMemoryBuffer();
memoryBuffer.WriteByte(value);
}
else
{
OpenSpillFile();

spill.WriteByte(value);
OpenSpillBuffer();
spillBuffer.WriteByte(value);
}

position++;
}

#endregion

/// <summary>
/// Writes the content of both buffers to an output stream.
/// </summary>
/// <param name="stream"></param>
public void WriteTo(Stream stream)
{
// TODO: this function could use async copy but that just
// overcomplicates things

// Flush memory to stream
if (memory != null)
if (memoryBuffer != null)
{
memory.WriteTo(stream);
memoryBuffer.WriteTo(stream);
}

if (spill != null)
if (spillBuffer != null)
{
// Rewind file but remember position
var pos = spill.Position;
spill.Seek(0, SeekOrigin.Begin);
var pos = spillBuffer.Position;
spillBuffer.Seek(0, SeekOrigin.Begin);

// Copy file to output stream
var i = 0;
var buffer = new byte[0x10000]; // 64k
while (i < pos)
{
var count = spill.Read(buffer, 0, buffer.Length);
var count = spillBuffer.Read(buffer, 0, buffer.Length);
stream.Write(buffer, 0, count);

i += count;
}

spill.Seek(pos, SeekOrigin.Begin);
spillBuffer.Seek(pos, SeekOrigin.Begin);
}
}

private void OpenSpillFile()
private void OpenMemoryBuffer()
{
if (spill != null)
if (memoryBuffer == null && spillLimit > 0)
{
memoryBuffer = new MemoryStream();
}
}

private void OpenSpillBuffer()
{
if (spillBuffer == null)
{
// If path is not set use temp
if (spillPath == null)
{
spillPath = Path.GetTempFileName();
}

spill = new FileStream(spillPath, FileMode.CreateNew, FileAccess.ReadWrite, FileShare.None);
spillBuffer = new FileStream(spillPath, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.None);
}
}

private void EnsureNotOpen()
{
if (memoryBuffer != null || spillBuffer != null)
{
throw new InvalidOperationException("Stream is already open."); // TODO ***
}
}
}
Expand Down
1 change: 1 addition & 0 deletions test/Jhu.SharpFitsIO.Test/Jhu.SharpFitsIO.Test.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="StraightBitConverterTest.cs" />
<Compile Include="SwapBitConverterTest.cs" />
<Compile Include="SpillMemoryStreamTest.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\src\Jhu.SharpFitsIO\Jhu.SharpFitsIO.csproj">
Expand Down
46 changes: 46 additions & 0 deletions test/Jhu.SharpFitsIO.Test/SpillMemoryStreamTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
using System;
using System.IO;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace Jhu.SharpFitsIO
{
[TestClass]
public class SpillMemoryStreamTest
{
[TestMethod]
public void MemoryOnlyTest()
{
var buffer = new byte[0x10000]; // 64k

using (var sms = new SpillMemoryStream())
{
sms.Write(buffer, 0, buffer.Length);

var ms = new MemoryStream();
sms.WriteTo(ms);

Assert.AreEqual(0x10000, ms.Position);
}
}

[TestMethod]
public void SpillToTempTest()
{
var buffer = new byte[0x10000]; // 64k

using (var sms = new SpillMemoryStream())
{
// Write 2M
for (int i = 0; i < 32; i++)
{
sms.Write(buffer, 0, buffer.Length);
}

var ms = new MemoryStream();
sms.WriteTo(ms);

Assert.AreEqual(0x200000, ms.Position);
}
}
}
}

0 comments on commit bb1ba17

Please sign in to comment.