Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streaming support and doc update #4

Open
chris-twiner opened this issue Jun 5, 2023 · 13 comments
Open

streaming support and doc update #4

chris-twiner opened this issue Jun 5, 2023 · 13 comments
Milestone

Comments

@chris-twiner
Copy link

The FileSystem implementation doesn't support streaming (must be an AbstractFileSystem). Simply wrapping DelegateToFileSystem seems to be enough.

chris-twiner added a commit to chris-twiner/hadoop-bare-naked-local-fs that referenced this issue Jun 5, 2023
@chris-twiner
Copy link
Author

pr - [#5 ]

@chris-twiner
Copy link
Author

@garretwilson - hihi, pinging in case you haven't seen this

@garretwilson
Copy link
Member

garretwilson commented Jul 5, 2023

I had seen it, but lots of things got in front of it on the priority. Thanks for the ping. The more people want things, the higher the things get on my priority list. 😆

I'll try to address this before July 15, 2023, how's that? If I miss that feel free to ping me again.

@garretwilson garretwilson added this to the v0.2.0 milestone Jul 13, 2023
@garretwilson
Copy link
Member

OK, @chris-twiner . I set aside some time for this. Let's dig into this a little further. When you say "it doesn't support streaming", what are you wanting to do exactly? I assume you're referring to Spark Streaming. Bear with me a second—I haven't touched Hadoop and Spark in a year, and I wasn't working with streaming when I was working with Spark. So what exactly about Spark (or whatever you're working with) requires an AbstractFileSystem?

If Spark is using some sort of dependency on AbstractFileSystem to get something done, then that doesn't sound like a good thing to begin with. A better approach would be to fix the FileSystem APIs to handle whatever Spark needs. (Then again, if Apache were committed to keeping the FileSystem APIs up-to-date, then we wouldn't need this project to begin with, huh? 😅) Anyway, please just give me a bit more context so that I can decide the best path to take to address your needs while not creating a monstrous hack.

Secondly, if I understand the proposal correctly, I'm against creating a new class named BareStreamingLocalFileSystem that simply wraps BareLocalFileSystem. In fact this new class BareStreamingLocalFileSystem would have nothing at all related to streaming. Its only reason for existence would be to get AbstractFileSystem into the hierarchy. So a better name would be BareLocalFileSystemDecoratorSoSparkSeesAnAbstractFileSystemSystem. That reflects its purpose. (It also illustrates the hackiness.)

So explain a little more about exactly what other component is needing AbstractFileSystem, and what this other component is attempting to do. Thanks.

@chris-twiner
Copy link
Author

chris-twiner commented Jul 13, 2023

Hi, correct it's for spark structured streaming which uses AbstractFileSystem under the hood not FileSystem and as you note Hadoop/Spark aren't likely to change org.apache.hadoop.fs.FileContext.

The use case is to be able to stream from and to files on windows using spark streaming, in particular it's the checkpointing code which requires it, This is the kind of stack you get, there may be others but if you use checkpointing it's definitely hit:

at org.apache.hadoop.fs.AbstractFileSystem.create(AbstractFileSystem.java:584)
at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:686)
at org.apache.hadoop.fs.FileContext$3.next(FileContext.java:682)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext.create(FileContext.java:688)
at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createTempFile(CheckpointFileManager.scala:311)
at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:133)
at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.<init>(CheckpointFileManager.scala:136)
at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.createAtomic(CheckpointFileManager.scala:318)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatchToFile(HDFSMetadataLog.scala:123)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply$mcZ$sp(HDFSMetadataLog.scala:112)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$$anonfun$add$1.apply(HDFSMetadataLog.scala:110)

This was discovered in migrating frameless to use bare-naked. The code added was needed by these tests which use checkpoints.

@garretwilson
Copy link
Member

garretwilson commented Jul 13, 2023

Huh, it's just creating a temporary file. I wonder why it needs an AbstractFileSystem. (The documentation of the FileContext doesn't seem to provide any answers.)

Before just slapping on kludges, let's try to get to the heart of what's going on. @chris-twiner , where does the actual error show up for you? Do you tell it to use the BareLocalFileSystem as the fs.file.impl, and then it throws a ClassCastException or something? Please let me know exactly what happens, and where it's happening, and in response to what. A stack trace of the actual error would be helpful.

The other question is: what file system is it using to begin with, which works with Winutils? It can't be the LocalFileSystem, because that's not an AbstractFileSystem. What file system implementation does it use if Winutils is installed? (This last question is important.)

@garretwilson
Copy link
Member

garretwilson commented Jul 13, 2023

I'm guessing (just from browsing through the code) that this method in AbstractFileSystem may be related:

  /**
   * Create a file system instance for the specified uri using the conf. The
   * conf is used to find the class name that implements the file system. The
   * conf is also passed to the file system for its configuration.
   *
   * @param uri URI of the file system
   * @param conf Configuration for the file system
   * 
   * @return Returns the file system for the given URI
   *
   * @throws UnsupportedFileSystemException file system for <code>uri</code> is
   *           not found
   */
  public static AbstractFileSystem createFileSystem(URI uri, Configuration conf)
      throws UnsupportedFileSystemException {
    final String fsImplConf = String.format("fs.AbstractFileSystem.%s.impl",
        uri.getScheme());

    Class<?> clazz = conf.getClass(fsImplConf, null);
    if (clazz == null) {
      throw new UnsupportedFileSystemException(String.format(
          "%s=null: %s: %s",
          fsImplConf, NO_ABSTRACT_FS_ERROR, uri.getScheme()));
    }
    return (AbstractFileSystem) newInstance(clazz, uri, conf);
  }

It looks like there's a separate configuration for setting the fs.AbstractFileSystem.file.impl or something. What a mess.

What I'm getting at, though, is that we may provide the other class as just an implementation of AbstractFileSystem (not mentioning anything about streaming) and set this configuration parameter instead, as a less hacky way of doing things.

This is just after a few minutes of glancing through the code. I look forward to getting the other info.

@chris-twiner
Copy link
Author

correct, all paths lead there for streaming, even mkdir:

Caused by: java.io.IOException: Cannot run program "C:\Dev\bin\hadoop-3.2.0\bin\winutils.exe": CreateProcess error=193, %1 is not a valid Win32 application
	at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:934)
	at org.apache.hadoop.util.Shell.run(Shell.java:900)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1212)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1306)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1288)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:660)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:700)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:672)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:699)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:677)
	at org.apache.hadoop.fs.FileSystem.primitiveMkdir(FileSystem.java:1356)
	at org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:185)
	at org.apache.hadoop.fs.FilterFs.mkdir(FilterFs.java:219)
	at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:809)
	at org.apache.hadoop.fs.FileContext$4.next(FileContext.java:805)
	at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
	at org.apache.hadoop.fs.FileContext.mkdir(FileContext.java:812)
	at org.apache.spark.sql.execution.streaming.AbstractFileContextBasedCheckpointFileManager.mkdirs(CheckpointFileManager.scala:324)
	at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.<init>(HDFSMetadataLog.scala:67)
	at org.apache.spark.sql.execution.streaming.CompactibleFileStreamLog.<init>(CompactibleFileStreamLog.scala:48)
	at org.apache.spark.sql.execution.streaming.FileStreamSinkLog.<init>(FileStreamSinkLog.scala:91)
	at org.apache.spark.sql.execution.streaming.FileStreamSink.<init>(FileStreamSink.scala:139)
	at org.apache.spark.sql.execution.datasources.DataSource.createSink(DataSource.scala:322)
	at org.apache.spark.sql.streaming.DataStreamWriter.createV1Sink(DataStreamWriter.scala:439)
	at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:404)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:231)

hence me adding that documentation change to the PR.

@garretwilson
Copy link
Member

But @chris-twiner one of the important questions I'm asking is whether this works with Winutils installed. I think the stack trace you just showed me last is a different error, saying that Winutils isn't present. Does this actually work if Winutils is installed?

My concern is wondering why we would have to do this special with my library but not with the Winutils approach, because they both extend the same classes.

Other thing I'm seeing in your latest stack trace is this:

	at org.apache.hadoop.fs.DelegateToFileSystem.mkdir(DelegateToFileSystem.java:185)

How did this DelegateToFileSystem get into the mix? It wasn't in the stack trace you gave me earlier. Any idea where it's coming from? Did you perhaps add a workaround for Winutils like you are suggesting for my library?

I'm beginning to suspect this ticket is really something like this: "Your library works great, but we notice that neither your library nor Winutils works with Spark Streaming. To make it work with Winutils we added a DelegateToFileSystem wrapper. If we do the same with your library it will make your library even better by working with Spark Streaming." Is that what happened?

Help me understand if this is something broken with my library so that it can be brought up to the same functionality as Winutils, or if you're proposing an improvement for a problem with Winutils as well. See what I'm asking?

@chris-twiner
Copy link
Author

Yes it works with winutils, it doesn't work without the code and configuration in the PR due to the createFileSystem code you note above delegating to an AbstractFileSystem.

Your library, as it currently stands, as noted in the first comment, does not support spark streaming. This PR adds that support.

@garretwilson
Copy link
Member

Yes it works with winutils

Then maybe Spark Streaming is adding something into the mix that my library is overriding. I need to find out how the DelegateToFileSystem is getting into the stack trace when you are using Spark Streaming with Winutils.

Please send me some simple instructions for what you are doing with Spark Streaming to get that stack trace. Do you just install Spark Streaming and then issue some command? Help me out a bit to know how to reproduce this, and I'll investigate further.

@chris-twiner
Copy link
Author

Spark's AbstractFileContextBasedCheckpointFileManager creates the FileContext by:

  1. calling org.apache.hadoop.fs.FileContext.getFileContext which in turn calls
  2. getAbstractFileSystem with the "file:///" type. which calls 'get', calling
  3. createFileSystem (your above snippet)
  4. getFileContext (1.) then calls getFileContext with the AbstractFileSystem creating a new FileContext

The returned FS is org.apache.hadoop.fs.local.LocalFs which extends ChecksumFs, which extends FilterFS which has the underlying "myFS", which is a RawLocalFS instance.

RawLocalFS extends DelegateToFileSystem which provides the mkdir function. That's why that's there, it's default hadoop for file:/// from FileContext.getFileContext.

I've attached a simple test project. One test runWithoutStreamingFS will fail if winutils is not present. The other will pass regardless of winutils.

StreamingExample.zip

@garretwilson
Copy link
Member

Thanks, @chris-twiner . This is exactly the sort of information I was needing to dig into it deeper.

One intriguing thing is that the two file systems are LocalFs and RawLocalFs, the latter of which extends DelegateToFileSystem — that's where it was coming from, as you mention. But the odd thing is that my library extends RawLocalFileSystem and RawLocalFileSystem instead. I don't recall the exact XXXFs vs XXXFileSystem distinction, but I seem to remember researching it at the time. Isn't the XXXFileSystem form the newer one? But it looks like they have different hierarchies, which explains what Spark Streaming doesn't like here.

I assume that I used the default classes at the time. Or maybe I used the most recent hierarchy, not knowing that anything needed AbstractFileSystem in the hierarchy. I don't remember. But now that we have these subtleties in hand, I'll look into it and see what I can dig up.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants