Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore fsync behavior in FSDirectory via P/Invoke, #933 #938

Merged
merged 17 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions src/Lucene.Net.Tests/Support/TestConcurrentHashSet.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using Lucene.Net.Attributes;
using Lucene.Net.Support;
using NUnit.Framework;
using System.Linq;
using System.Threading.Tasks;

namespace Lucene.Net
{
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

public class TestConcurrentHashSet
NightOwl888 marked this conversation as resolved.
Show resolved Hide resolved
{
[Test, LuceneNetSpecific]
public void TestExceptWith()
{
// Numbers 0-8, 10-80, 99
var initialSet = Enumerable.Range(1, 8)
.Concat(Enumerable.Range(1, 8).Select(i => i * 10))
.Append(99)
.Append(0);

var hashSet = new ConcurrentHashSet<int>(initialSet);

Parallel.ForEach(Enumerable.Range(1, 8), i =>
{
// Remove i and i * 10, i.e. 1 and 10, 2 and 20, etc.
var except = new[] { i, i * 10 };
hashSet.ExceptWith(except);
});

Assert.AreEqual(2, hashSet.Count);
Assert.IsTrue(hashSet.Contains(0));
Assert.IsTrue(hashSet.Contains(99));
}
}
}
66 changes: 27 additions & 39 deletions src/Lucene.Net/Store/FSDirectory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ namespace Lucene.Net.Store
/// the best <see cref="FSDirectory"/> implementation given your
/// environment, and the known limitations of each
/// implementation. For users who have no reason to prefer a
/// specific implementation, it's best to simply use
/// specific implementation, it's best to simply use
/// <see cref="Open(string)"/> (or one of its overloads). For all others, you should instantiate the
/// desired implementation directly.
///
/// <para/>The locking implementation is by default
/// <para/>The locking implementation is by default
/// <see cref="NativeFSLockFactory"/>, but can be changed by
/// passing in a custom <see cref="LockFactory"/> instance.
///
Expand All @@ -94,10 +94,8 @@ public abstract class FSDirectory : BaseDirectory
public const int DEFAULT_READ_CHUNK_SIZE = 8192;

protected readonly DirectoryInfo m_directory; // The underlying filesystem directory
protected readonly ISet<string> m_staleFiles = new ConcurrentHashSet<string>(); // Files written, but not yet sync'ed

// LUCENENET specific: No such thing as "stale files" in .NET, since Flush(true) writes everything to disk before
// our FileStream is disposed.
//protected readonly ISet<string> m_staleFiles = new ConcurrentHashSet<string>(); // Files written, but not yet sync'ed
#pragma warning disable 612, 618
private int chunkSize = DEFAULT_READ_CHUNK_SIZE;
#pragma warning restore 612, 618
Expand Down Expand Up @@ -321,9 +319,8 @@ public override void DeleteFile(string name)
{
throw new IOException("Cannot delete " + file, e);
}
// LUCENENET specific: No such thing as "stale files" in .NET, since Flush(true) writes everything to disk before
// our FileStream is disposed.
//m_staleFiles.Remove(name);

m_staleFiles.Remove(name);
}

/// <summary>
Expand Down Expand Up @@ -366,35 +363,29 @@ protected virtual void EnsureCanWrite(string name)

protected virtual void OnIndexOutputClosed(FSIndexOutput io)
{
// LUCENENET specific: No such thing as "stale files" in .NET, since Flush(true) writes everything to disk before
// our FileStream is disposed.
//m_staleFiles.Add(io.name);
m_staleFiles.Add(io.name);
}

public override void Sync(ICollection<string> names)
{
EnsureOpen();

// LUCENENET specific: No such thing as "stale files" in .NET, since Flush(true) writes everything to disk before
// our FileStream is disposed. Therefore, there is nothing else to do in this method.
//ISet<string> toSync = new HashSet<string>(names);
//toSync.IntersectWith(m_staleFiles);

//// LUCENENET specific: Fsync breaks concurrency here.
//// Part of a solution suggested by Vincent Van Den Berghe: http://apache.markmail.org/message/hafnuhq2ydhfjmi2
////foreach (var name in toSync)
////{
//// Fsync(name);
////}

//// fsync the directory itsself, but only if there was any file fsynced before
//// (otherwise it can happen that the directory does not yet exist)!
//if (toSync.Count > 0)
//{
// IOUtils.Fsync(m_directory.FullName, true);
//}
ISet<string> toSync = new HashSet<string>(names);
toSync.IntersectWith(m_staleFiles);

foreach (var name in toSync)
{
Fsync(name);
}

//m_staleFiles.ExceptWith(toSync);
// fsync the directory itself, but only if there was any file fsynced before
// (otherwise it can happen that the directory does not yet exist)!
if (toSync.Count > 0)
{
IOUtils.Fsync(m_directory, true);
}

m_staleFiles.ExceptWith(toSync);
}

public override string GetLockID()
Expand Down Expand Up @@ -546,7 +537,7 @@ protected override void Dispose(bool disposing)
Exception priorE = null; // LUCENENET: No need to cast to IOExcpetion
try
{
file.Flush(flushToDisk: true);
file.Flush(flushToDisk: false);
}
catch (Exception ioe) when (ioe.IsIOException())
{
Expand Down Expand Up @@ -586,12 +577,9 @@ public override void Seek(long pos)
public override long Position => file.Position; // LUCENENET specific - need to override, since we are buffering locally, renamed from getFilePointer() to match FileStream
}

// LUCENENET specific: Fsync is pointless in .NET, since we are
// calling FileStream.Flush(true) before the stream is disposed
// which means we never need it at the point in Java where it is called.
//protected virtual void Fsync(string name)
//{
// IOUtils.Fsync(Path.Combine(m_directory.FullName, name), false);
//}
protected virtual void Fsync(string name)
{
IOUtils.Fsync(new FileInfo(Path.Combine(m_directory.FullName, name)), false);
}
}
}
}
25 changes: 17 additions & 8 deletions src/Lucene.Net/Support/ConcurrentHashSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,16 +204,16 @@ public ConcurrentHashSet(IEnumerable<T> collection, IEqualityComparer<T> compare


/// <summary>
/// Initializes a new instance of the <see cref="ConcurrentHashSet{T}"/>
/// class that contains elements copied from the specified <see cref="T:System.Collections.IEnumerable"/>,
/// has the specified concurrency level, has the specified initial capacity, and uses the specified
/// Initializes a new instance of the <see cref="ConcurrentHashSet{T}"/>
/// class that contains elements copied from the specified <see cref="T:System.Collections.IEnumerable"/>,
/// has the specified concurrency level, has the specified initial capacity, and uses the specified
/// <see cref="T:System.Collections.Generic.IEqualityComparer{T}"/>.
/// </summary>
/// <param name="concurrencyLevel">The estimated number of threads that will update the
/// <param name="concurrencyLevel">The estimated number of threads that will update the
/// <see cref="ConcurrentHashSet{T}"/> concurrently.</param>
/// <param name="collection">The <see cref="T:System.Collections.IEnumerable{T}"/> whose elements are copied to the new
/// <param name="collection">The <see cref="T:System.Collections.IEnumerable{T}"/> whose elements are copied to the new
/// <see cref="ConcurrentHashSet{T}"/>.</param>
/// <param name="comparer">The <see cref="T:System.Collections.Generic.IEqualityComparer{T}"/> implementation to use
/// <param name="comparer">The <see cref="T:System.Collections.Generic.IEqualityComparer{T}"/> implementation to use
/// when comparing items.</param>
/// <exception cref="ArgumentNullException">
/// <paramref name="collection"/> is a null reference.
Expand Down Expand Up @@ -638,7 +638,7 @@ private void GrowTable(Tables tables)
// We want to make sure that GrowTable will not be called again, since table is at the maximum size.
// To achieve that, we set the budget to int.MaxValue.
//
// (There is one special case that would allow GrowTable() to be called in the future:
// (There is one special case that would allow GrowTable() to be called in the future:
// calling Clear() on the ConcurrentHashSet will shrink the table and lower the budget.)
_budget = int.MaxValue;
}
Expand Down Expand Up @@ -753,7 +753,16 @@ private void CopyToItems(T[] array, int index)

public void ExceptWith(IEnumerable<T> other)
{
throw new NotImplementedException();
if (ReferenceEquals(this, other))
paulirwin marked this conversation as resolved.
Show resolved Hide resolved
{
Clear();
return;
}

foreach (var item in other)
{
TryRemove(item);
paulirwin marked this conversation as resolved.
Show resolved Hide resolved
}
}

public void IntersectWith(IEnumerable<T> other)
Expand Down
95 changes: 95 additions & 0 deletions src/Lucene.Net/Support/IO/PosixFsyncSupport.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
using Lucene.Net.Util;
using System.IO;
using System.Runtime.InteropServices;

namespace Lucene.Net.Support.IO
{
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

internal static class PosixFsyncSupport
{
// https://pubs.opengroup.org/onlinepubs/009695399/functions/fsync.html
[DllImport("libc", SetLastError = true)]
private static extern int fsync(int fd);

// https://pubs.opengroup.org/onlinepubs/007904875/functions/open.html
[DllImport("libc", SetLastError = true)]
private static extern int open([MarshalAs(UnmanagedType.LPStr)] string pathname, int flags);

// https://pubs.opengroup.org/onlinepubs/009604499/functions/close.html
[DllImport("libc", SetLastError = true)]
private static extern int close(int fd);

// https://pubs.opengroup.org/onlinepubs/007904975/functions/fcntl.html
// and https://developer.apple.com/library/archive/documentation/System/Conceptual/ManPages_iPhoneOS/man2/fcntl.2.html
[DllImport("libc", SetLastError = true)]
private static extern int fcntl(int fd, int cmd, int arg);

private const int O_RDONLY = 0;
private const int O_WRONLY = 1;

// https://opensource.apple.com/source/xnu/xnu-6153.81.5/bsd/sys/fcntl.h.auto.html
private const int F_FULLFSYNC = 51;

public static void Fsync(string path, bool isDir)
{
using DescriptorWrapper handle = new DescriptorWrapper(path, isDir);
handle.Flush();
}

private readonly ref struct DescriptorWrapper
{
private readonly int fd;

public DescriptorWrapper(string path, bool isDir)
{
fd = open(path, isDir ? O_RDONLY : O_WRONLY);

if (fd == -1)
{
int error = Marshal.GetLastWin32Error();
throw new IOException($"Unable to open path, error: 0x{error:x8}", error);
}
}

public void Flush()
{
// if macOS, use F_FULLFSYNC
if (Constants.MAC_OS_X)
{
if (fcntl(fd, F_FULLFSYNC, 0) == -1)
{
throw new IOException("fcntl failed", Marshal.GetLastWin32Error());
}
}
else if (fsync(fd) == -1)
{
throw new IOException("fsync failed", Marshal.GetLastWin32Error());
}
}

public void Dispose()
{
if (close(fd) == -1)
{
throw new IOException("close failed", Marshal.GetLastWin32Error());
}
}
}
}
}
Loading
Loading