From e2f7aee1b14b5fb7bf4dd550f7d0e3b36592d234 Mon Sep 17 00:00:00 2001 From: tangyoupeng Date: Sun, 18 Feb 2024 16:22:22 +0800 Subject: [PATCH 01/30] Hadoop: make libjfs singleton to avoid possible jnr weakhashmap infinity loop (#4407) --- .../java/io/juicefs/JuiceFileSystemImpl.java | 111 +++++++++--------- .../java/io/juicefs/JuiceFileSystemTest.java | 23 ++++ 2 files changed, 80 insertions(+), 54 deletions(-) diff --git a/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java b/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java index 719946dbd135..1196126a05c2 100644 --- a/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java +++ b/sdk/java/src/main/java/io/juicefs/JuiceFileSystemImpl.java @@ -57,12 +57,11 @@ import java.net.*; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Paths; import java.nio.file.StandardCopyOption; import java.util.*; -import java.util.concurrent.*; +import java.util.concurrent.TimeUnit; import java.util.jar.JarFile; import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; @@ -84,7 +83,8 @@ public class JuiceFileSystemImpl extends FileSystem { private int minBufferSize; private int cacheReplica; private boolean fileChecksumEnabled; - private Libjfs lib; + private Libjfs lib = loadLibrary(); + private long handle; private UserGroupInformation ugi; private String homeDirPrefix = "/user"; @@ -347,7 +347,6 @@ public void initialize(URI uri, Configuration conf) throws IOException { String supergroup = getConf(conf, "supergroup", conf.get("dfs.permissions.superusergroup", "supergroup")); String mountpoint = getConf(conf, "mountpoint", ""); - lib = loadLibrary(); synchronized (JuiceFileSystemImpl.class) { if (callBack == null) { callBack = new LogCallBackImpl(lib); @@ -568,7 +567,7 @@ private static void initStubLoader() { } } - public static Libjfs loadLibrary() throws IOException { + public static Libjfs loadLibrary() { initStubLoader(); LibraryLoader libjfsLibraryLoader = LibraryLoader.create(Libjfs.class); @@ -607,69 +606,73 @@ public static Libjfs loadLibrary() throws IOException { } URLConnection con; try { - con = location.openConnection(); - } catch (FileNotFoundException e) { - // jar may changed - return loadExistLib(libjfsLibraryLoader, dir, name, libFile); - } - if (location.getProtocol().equals("jar") && (con instanceof JarURLConnection)) { - LOG.debug("juicefs-hadoop.jar is a nested jar"); - JarURLConnection connection = (JarURLConnection) con; - JarFile jfsJar = connection.getJarFile(); - ZipEntry entry = jfsJar.getJarEntry(resource); - soTime = entry.getLastModifiedTime().toMillis(); - ins = jfsJar.getInputStream(entry); - } else { - URI locationUri; try { - locationUri = location.toURI(); - } catch (URISyntaxException e) { + con = location.openConnection(); + } catch (FileNotFoundException e) { + // jar may changed return loadExistLib(libjfsLibraryLoader, dir, name, libFile); } - if (Files.isDirectory(Paths.get(locationUri))) { // for debug: sdk/java/target/classes - soTime = con.getLastModified(); - ins = JuiceFileSystemImpl.class.getClassLoader().getResourceAsStream(resource); + if (location.getProtocol().equals("jar") && (con instanceof JarURLConnection)) { + LOG.debug("juicefs-hadoop.jar is a nested jar"); + JarURLConnection connection = (JarURLConnection) con; + JarFile jfsJar = connection.getJarFile(); + ZipEntry entry = jfsJar.getJarEntry(resource); + soTime = entry.getLastModifiedTime().toMillis(); + ins = jfsJar.getInputStream(entry); } else { - JarFile jfsJar; + URI locationUri; try { - jfsJar = new JarFile(locationUri.getPath()); - } catch (FileNotFoundException fne) { + locationUri = location.toURI(); + } catch (URISyntaxException e) { return loadExistLib(libjfsLibraryLoader, dir, name, libFile); } - ZipEntry entry = jfsJar.getJarEntry(resource); - soTime = entry.getLastModifiedTime().toMillis(); - ins = jfsJar.getInputStream(entry); + if (Files.isDirectory(Paths.get(locationUri))) { // for debug: sdk/java/target/classes + soTime = con.getLastModified(); + ins = JuiceFileSystemImpl.class.getClassLoader().getResourceAsStream(resource); + } else { + JarFile jfsJar; + try { + jfsJar = new JarFile(locationUri.getPath()); + } catch (FileNotFoundException fne) { + return loadExistLib(libjfsLibraryLoader, dir, name, libFile); + } + ZipEntry entry = jfsJar.getJarEntry(resource); + soTime = entry.getLastModifiedTime().toMillis(); + ins = jfsJar.getInputStream(entry); + } } - } - synchronized (JuiceFileSystemImpl.class) { - if (!libFile.exists() || libFile.lastModified() < soTime) { - // try the name for current user - libFile = new File(dir, System.getProperty("user.name") + "-" + name); + synchronized (JuiceFileSystemImpl.class) { if (!libFile.exists() || libFile.lastModified() < soTime) { - InputStream reader = new GZIPInputStream(ins); - File tmp = File.createTempFile(name, null, dir); - FileOutputStream writer = new FileOutputStream(tmp); - byte[] buffer = new byte[128 << 10]; - int bytesRead = 0; - while ((bytesRead = reader.read(buffer)) != -1) { - writer.write(buffer, 0, bytesRead); - } - writer.close(); - reader.close(); - tmp.setLastModified(soTime); - tmp.setReadable(true, false); - try { - File org = new File(dir, name); - Files.move(tmp.toPath(), org.toPath(), StandardCopyOption.ATOMIC_MOVE); - libFile = org; - } catch (Exception ade) { - Files.move(tmp.toPath(), libFile.toPath(), StandardCopyOption.ATOMIC_MOVE); + // try the name for current user + libFile = new File(dir, System.getProperty("user.name") + "-" + name); + if (!libFile.exists() || libFile.lastModified() < soTime) { + InputStream reader = new GZIPInputStream(ins); + File tmp = File.createTempFile(name, null, dir); + FileOutputStream writer = new FileOutputStream(tmp); + byte[] buffer = new byte[128 << 10]; + int bytesRead = 0; + while ((bytesRead = reader.read(buffer)) != -1) { + writer.write(buffer, 0, bytesRead); + } + writer.close(); + reader.close(); + tmp.setLastModified(soTime); + tmp.setReadable(true, false); + try { + File org = new File(dir, name); + Files.move(tmp.toPath(), org.toPath(), StandardCopyOption.ATOMIC_MOVE); + libFile = org; + } catch (Exception ade) { + Files.move(tmp.toPath(), libFile.toPath(), StandardCopyOption.ATOMIC_MOVE); + } } } } + ins.close(); + } catch (Exception e) { + throw new RuntimeException("Init libjfs failed", e); } - ins.close(); return libjfsLibraryLoader.load(libFile.getAbsolutePath()); } diff --git a/sdk/java/src/test/java/io/juicefs/JuiceFileSystemTest.java b/sdk/java/src/test/java/io/juicefs/JuiceFileSystemTest.java index 8a059661695d..102a817805a5 100644 --- a/sdk/java/src/test/java/io/juicefs/JuiceFileSystemTest.java +++ b/sdk/java/src/test/java/io/juicefs/JuiceFileSystemTest.java @@ -35,6 +35,9 @@ import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -921,4 +924,24 @@ public void testUserWithMultiGroups() throws Exception { tomFs.close(); superFs.close(); } + + public void testConcurrentCreate() throws Exception { + int threads = 100; + ExecutorService pool = Executors.newFixedThreadPool(threads); + for (int i = 0; i < threads; i++) { + pool.submit(() -> { + JuiceFileSystem jfs = new JuiceFileSystem(); + try { + jfs.initialize(URI.create("jfs://dev/"), cfg); + jfs.listStatus(new Path("/")); + jfs.close(); + } catch (IOException e) { + fail("concurrent create failed"); + System.exit(1); + } + }); + } + pool.shutdown(); + pool.awaitTermination(1, TimeUnit.MINUTES); + } } From d46cb5b98bb931b8c9afb33b441c0696984a53b2 Mon Sep 17 00:00:00 2001 From: tangyoupeng Date: Wed, 21 Feb 2024 14:52:14 +0800 Subject: [PATCH 02/30] Hadoop: use auto increment fs handle to avoid possible conflict (#4411) --- sdk/java/libjfs/main.go | 77 +++++++++++++++++++++-------------------- 1 file changed, 39 insertions(+), 38 deletions(-) diff --git a/sdk/java/libjfs/main.go b/sdk/java/libjfs/main.go index 24fe5e43cca4..a06a08ff5f32 100644 --- a/sdk/java/libjfs/main.go +++ b/sdk/java/libjfs/main.go @@ -65,14 +65,15 @@ var ( openFiles = make(map[int]*fwrapper) nextHandle = 1 - fslock sync.Mutex - handlers = make(map[uintptr]*wrapper) - activefs = make(map[string][]*wrapper) - logger = utils.GetLogger("juicefs") - bOnce sync.Once - bridges []*Bridge - pOnce sync.Once - pushers []*push.Pusher + fslock sync.Mutex + handlers = make(map[int64]*wrapper) + nextFsHandle int64 = 0 + activefs = make(map[string][]*wrapper) + logger = utils.GetLogger("juicefs") + bOnce sync.Once + bridges []*Bridge + pOnce sync.Once + pushers []*push.Pusher ) const ( @@ -312,7 +313,7 @@ type javaConf struct { PushGraphite string `json:"pushGraphite"` } -func getOrCreate(name, user, group, superuser, supergroup string, f func() *fs.FileSystem) uintptr { +func getOrCreate(name, user, group, superuser, supergroup string, f func() *fs.FileSystem) int64 { fslock.Lock() defer fslock.Unlock() ws := activefs[name] @@ -340,9 +341,9 @@ func getOrCreate(name, user, group, superuser, supergroup string, f func() *fs.F w.ctx = meta.NewContext(uint32(os.Getpid()), w.lookupUid(user), w.lookupGids(group)) } activefs[name] = append(ws, w) - h := uintptr(unsafe.Pointer(w)) & 0x7fffffff // low 32bits - handlers[h] = w - return h + nextFsHandle = nextFsHandle + 1 + handlers[nextFsHandle] = w + return nextFsHandle } func push2Gateway(pushGatewayAddr, pushAuth string, pushInterVal time.Duration, registry *prometheus.Registry, commonLabels map[string]string) { @@ -401,7 +402,7 @@ func push2Graphite(graphite string, pushInterVal time.Duration, registry *promet } //export jfs_init -func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) uintptr { +func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) int64 { name := C.GoString(cname) debug.SetGCPercent(50) object.UserAgent = "JuiceFS-SDK " + version.Version() @@ -576,14 +577,14 @@ func jfs_init(cname, jsonConf, user, group, superuser, supergroup *C.char) uintp }) } -func F(p uintptr) *wrapper { +func F(p int64) *wrapper { fslock.Lock() defer fslock.Unlock() return handlers[p] } //export jfs_update_uid_grouping -func jfs_update_uid_grouping(h uintptr, uidstr *C.char, grouping *C.char) { +func jfs_update_uid_grouping(h int64, uidstr *C.char, grouping *C.char) { w := F(h) if w == nil { return @@ -647,7 +648,7 @@ func jfs_update_uid_grouping(h uintptr, uidstr *C.char, grouping *C.char) { } //export jfs_term -func jfs_term(pid int, h uintptr) int { +func jfs_term(pid int, h int64) int { w := F(h) if w == nil { return 0 @@ -705,7 +706,7 @@ func jfs_term(pid int, h uintptr) int { } //export jfs_open -func jfs_open(pid int, h uintptr, cpath *C.char, lenPtr uintptr, flags int) int { +func jfs_open(pid int, h int64, cpath *C.char, lenPtr uintptr, flags int) int { w := F(h) if w == nil { return EINVAL @@ -728,7 +729,7 @@ func jfs_open(pid int, h uintptr, cpath *C.char, lenPtr uintptr, flags int) int } //export jfs_access -func jfs_access(pid int, h uintptr, cpath *C.char, flags int) int { +func jfs_access(pid int, h int64, cpath *C.char, flags int) int { w := F(h) if w == nil { return EINVAL @@ -737,7 +738,7 @@ func jfs_access(pid int, h uintptr, cpath *C.char, flags int) int { } //export jfs_create -func jfs_create(pid int, h uintptr, cpath *C.char, mode uint16, umask uint16) int { +func jfs_create(pid int, h int64, cpath *C.char, mode uint16, umask uint16) int { w := F(h) if w == nil { return EINVAL @@ -755,7 +756,7 @@ func jfs_create(pid int, h uintptr, cpath *C.char, mode uint16, umask uint16) in } //export jfs_mkdir -func jfs_mkdir(pid int, h uintptr, cpath *C.char, mode uint16, umask uint16) int { +func jfs_mkdir(pid int, h int64, cpath *C.char, mode uint16, umask uint16) int { w := F(h) if w == nil { return EINVAL @@ -769,7 +770,7 @@ func jfs_mkdir(pid int, h uintptr, cpath *C.char, mode uint16, umask uint16) int } //export jfs_delete -func jfs_delete(pid int, h uintptr, cpath *C.char) int { +func jfs_delete(pid int, h int64, cpath *C.char) int { w := F(h) if w == nil { return EINVAL @@ -778,7 +779,7 @@ func jfs_delete(pid int, h uintptr, cpath *C.char) int { } //export jfs_rmr -func jfs_rmr(pid int, h uintptr, cpath *C.char) int { +func jfs_rmr(pid int, h int64, cpath *C.char) int { w := F(h) if w == nil { return EINVAL @@ -787,7 +788,7 @@ func jfs_rmr(pid int, h uintptr, cpath *C.char) int { } //export jfs_rename -func jfs_rename(pid int, h uintptr, oldpath *C.char, newpath *C.char) int { +func jfs_rename(pid int, h int64, oldpath *C.char, newpath *C.char) int { w := F(h) if w == nil { return EINVAL @@ -796,7 +797,7 @@ func jfs_rename(pid int, h uintptr, oldpath *C.char, newpath *C.char) int { } //export jfs_truncate -func jfs_truncate(pid int, h uintptr, path *C.char, length uint64) int { +func jfs_truncate(pid int, h int64, path *C.char, length uint64) int { w := F(h) if w == nil { return EINVAL @@ -805,7 +806,7 @@ func jfs_truncate(pid int, h uintptr, path *C.char, length uint64) int { } //export jfs_setXattr -func jfs_setXattr(pid int, h uintptr, path *C.char, name *C.char, value uintptr, vlen int, mode int) int { +func jfs_setXattr(pid int, h int64, path *C.char, name *C.char, value uintptr, vlen int, mode int) int { w := F(h) if w == nil { return EINVAL @@ -821,7 +822,7 @@ func jfs_setXattr(pid int, h uintptr, path *C.char, name *C.char, value uintptr, } //export jfs_getXattr -func jfs_getXattr(pid int, h uintptr, path *C.char, name *C.char, buf uintptr, bufsize int) int { +func jfs_getXattr(pid int, h int64, path *C.char, name *C.char, buf uintptr, bufsize int) int { w := F(h) if w == nil { return EINVAL @@ -838,7 +839,7 @@ func jfs_getXattr(pid int, h uintptr, path *C.char, name *C.char, buf uintptr, b } //export jfs_listXattr -func jfs_listXattr(pid int, h uintptr, path *C.char, buf uintptr, bufsize int) int { +func jfs_listXattr(pid int, h int64, path *C.char, buf uintptr, bufsize int) int { w := F(h) if w == nil { return EINVAL @@ -855,7 +856,7 @@ func jfs_listXattr(pid int, h uintptr, path *C.char, buf uintptr, bufsize int) i } //export jfs_removeXattr -func jfs_removeXattr(pid int, h uintptr, path *C.char, name *C.char) int { +func jfs_removeXattr(pid int, h int64, path *C.char, name *C.char) int { w := F(h) if w == nil { return EINVAL @@ -864,7 +865,7 @@ func jfs_removeXattr(pid int, h uintptr, path *C.char, name *C.char) int { } //export jfs_readlink -func jfs_readlink(pid int, h uintptr, link *C.char, buf uintptr, bufsize int) int { +func jfs_readlink(pid int, h int64, link *C.char, buf uintptr, bufsize int) int { w := F(h) if w == nil { return EINVAL @@ -898,7 +899,7 @@ func fill_stat(w *wrapper, wb *utils.Buffer, st *fs.FileStat) int { } //export jfs_stat1 -func jfs_stat1(pid int, h uintptr, cpath *C.char, buf uintptr) int { +func jfs_stat1(pid int, h int64, cpath *C.char, buf uintptr) int { w := F(h) if w == nil { return EINVAL @@ -911,7 +912,7 @@ func jfs_stat1(pid int, h uintptr, cpath *C.char, buf uintptr) int { } //export jfs_lstat1 -func jfs_lstat1(pid int, h uintptr, cpath *C.char, buf uintptr) int { +func jfs_lstat1(pid int, h int64, cpath *C.char, buf uintptr) int { w := F(h) if w == nil { return EINVAL @@ -924,7 +925,7 @@ func jfs_lstat1(pid int, h uintptr, cpath *C.char, buf uintptr) int { } //export jfs_summary -func jfs_summary(pid int, h uintptr, cpath *C.char, buf uintptr) int { +func jfs_summary(pid int, h int64, cpath *C.char, buf uintptr) int { w := F(h) if w == nil { return EINVAL @@ -947,7 +948,7 @@ func jfs_summary(pid int, h uintptr, cpath *C.char, buf uintptr) int { } //export jfs_statvfs -func jfs_statvfs(pid int, h uintptr, buf uintptr) int { +func jfs_statvfs(pid int, h int64, buf uintptr) int { w := F(h) if w == nil { return EINVAL @@ -960,7 +961,7 @@ func jfs_statvfs(pid int, h uintptr, buf uintptr) int { } //export jfs_chmod -func jfs_chmod(pid int, h uintptr, cpath *C.char, mode C.mode_t) int { +func jfs_chmod(pid int, h int64, cpath *C.char, mode C.mode_t) int { w := F(h) if w == nil { return EINVAL @@ -974,7 +975,7 @@ func jfs_chmod(pid int, h uintptr, cpath *C.char, mode C.mode_t) int { } //export jfs_utime -func jfs_utime(pid int, h uintptr, cpath *C.char, mtime, atime int64) int { +func jfs_utime(pid int, h int64, cpath *C.char, mtime, atime int64) int { w := F(h) if w == nil { return EINVAL @@ -988,7 +989,7 @@ func jfs_utime(pid int, h uintptr, cpath *C.char, mtime, atime int64) int { } //export jfs_setOwner -func jfs_setOwner(pid int, h uintptr, cpath *C.char, owner *C.char, group *C.char) int { +func jfs_setOwner(pid int, h int64, cpath *C.char, owner *C.char, group *C.char) int { w := F(h) if w == nil { return EINVAL @@ -1015,7 +1016,7 @@ func setOwner(w *wrapper, ctx meta.Context, path string, owner, group string) in } //export jfs_listdir -func jfs_listdir(pid int, h uintptr, cpath *C.char, offset int, buf uintptr, bufsize int) int { +func jfs_listdir(pid int, h int64, cpath *C.char, offset int, buf uintptr, bufsize int) int { var ctx meta.Context var f *fs.File var w *wrapper @@ -1073,7 +1074,7 @@ func toBuf(s uintptr, sz int) []byte { } //export jfs_concat -func jfs_concat(pid int, h uintptr, _dst *C.char, buf uintptr, bufsize int) int { +func jfs_concat(pid int, h int64, _dst *C.char, buf uintptr, bufsize int) int { w := F(h) if w == nil { return EINVAL From b2b13205bad09feccb2d9609e46bb5681f51a583 Mon Sep 17 00:00:00 2001 From: zhijian Date: Fri, 23 Feb 2024 15:39:19 +0800 Subject: [PATCH 03/30] cmd/sync: execute chmod after chown to avoid setuid setgid failure (#4418) --- pkg/sync/sync.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index 6b3c9721a3fa..de83b2458a45 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -214,14 +214,15 @@ func copyPerms(dst object.ObjectStorage, obj object.Object, config *Config) { start := time.Now() key := obj.Key() fi := obj.(object.File) + // chmod needs to be executed after chown, because chown will change setuid setgid to be invalid. + if err := dst.(object.FileSystem).Chown(key, fi.Owner(), fi.Group()); err != nil { + logger.Warnf("Chown %s to (%s,%s): %s", key, fi.Owner(), fi.Group(), err) + } if !fi.IsSymlink() || !config.Links { if err := dst.(object.FileSystem).Chmod(key, fi.Mode()); err != nil { logger.Warnf("Chmod %s to %o: %s", key, fi.Mode(), err) } } - if err := dst.(object.FileSystem).Chown(key, fi.Owner(), fi.Group()); err != nil { - logger.Warnf("Chown %s to (%s,%s): %s", key, fi.Owner(), fi.Group(), err) - } logger.Debugf("Copied permissions (%s:%s:%s) for %s in %s", fi.Owner(), fi.Group(), fi.Mode(), key, time.Since(start)) } From f642c9181efd83350c8a53f7d80dd813f969cf26 Mon Sep 17 00:00:00 2001 From: Sandy Xu Date: Wed, 28 Feb 2024 13:20:56 +0800 Subject: [PATCH 04/30] vfs: fix file length after fallocate & copyfilerange (#4427) Co-authored-by: Davies Liu --- pkg/fs/fs.go | 2 +- pkg/meta/base_test.go | 32 ++++++++++++++++---------------- pkg/meta/interface.go | 4 ++-- pkg/meta/redis.go | 18 ++++++++++++++---- pkg/meta/sql.go | 18 ++++++++++++++---- pkg/meta/tkv.go | 18 ++++++++++++++---- pkg/vfs/vfs.go | 41 ++++++++++++++++++++++++++++++----------- 7 files changed, 91 insertions(+), 42 deletions(-) diff --git a/pkg/fs/fs.go b/pkg/fs/fs.go index 35dbf1895f8a..3dfb1a508bf0 100644 --- a/pkg/fs/fs.go +++ b/pkg/fs/fs.go @@ -575,7 +575,7 @@ func (fs *FileSystem) CopyFileRange(ctx meta.Context, src string, soff uint64, d if err != 0 { return } - err = fs.m.CopyFileRange(ctx, sfi.inode, soff, dfi.inode, doff, size, 0, &written) + err = fs.m.CopyFileRange(ctx, sfi.inode, soff, dfi.inode, doff, size, 0, &written, nil) return } diff --git a/pkg/meta/base_test.go b/pkg/meta/base_test.go index 521985ff0a8a..b6a1ec7338f8 100644 --- a/pkg/meta/base_test.go +++ b/pkg/meta/base_test.go @@ -476,25 +476,25 @@ func testMetaClient(t *testing.T, m Meta) { if len(slices) != 2 || slices[0].Id != 0 || slices[0].Size != 100 || slices[1].Id != sliceId || slices[1].Size != 100 { t.Fatalf("slices: %v", slices) } - if st := m.Fallocate(ctx, inode, fallocPunchHole|fallocKeepSize, 100, 50); st != 0 { + if st := m.Fallocate(ctx, inode, fallocPunchHole|fallocKeepSize, 100, 50, nil); st != 0 { t.Fatalf("fallocate: %s", st) } - if st := m.Fallocate(ctx, inode, fallocPunchHole|fallocCollapesRange, 100, 50); st != syscall.EINVAL { + if st := m.Fallocate(ctx, inode, fallocPunchHole|fallocCollapesRange, 100, 50, nil); st != syscall.EINVAL { t.Fatalf("fallocate: %s", st) } - if st := m.Fallocate(ctx, inode, fallocPunchHole|fallocInsertRange, 100, 50); st != syscall.EINVAL { + if st := m.Fallocate(ctx, inode, fallocPunchHole|fallocInsertRange, 100, 50, nil); st != syscall.EINVAL { t.Fatalf("fallocate: %s", st) } - if st := m.Fallocate(ctx, inode, fallocCollapesRange, 100, 50); st != syscall.ENOTSUP { + if st := m.Fallocate(ctx, inode, fallocCollapesRange, 100, 50, nil); st != syscall.ENOTSUP { t.Fatalf("fallocate: %s", st) } - if st := m.Fallocate(ctx, inode, fallocPunchHole, 100, 50); st != syscall.EINVAL { + if st := m.Fallocate(ctx, inode, fallocPunchHole, 100, 50, nil); st != syscall.EINVAL { t.Fatalf("fallocate: %s", st) } - if st := m.Fallocate(ctx, inode, fallocPunchHole|fallocKeepSize, 0, 0); st != syscall.EINVAL { + if st := m.Fallocate(ctx, inode, fallocPunchHole|fallocKeepSize, 0, 0, nil); st != syscall.EINVAL { t.Fatalf("fallocate: %s", st) } - if st := m.Fallocate(ctx, parent, fallocPunchHole|fallocKeepSize, 100, 50); st != syscall.EPERM { + if st := m.Fallocate(ctx, parent, fallocPunchHole|fallocKeepSize, 100, 50, nil); st != syscall.EPERM { t.Fatalf("fallocate dir: %s", st) } if st := m.Read(ctx, inode, 0, &slices); st != 0 { @@ -1413,7 +1413,7 @@ func testCopyFileRange(t *testing.T, m Meta) { m.Write(ctx, iin, 3, 0, Slice{12, 63 << 20, 10 << 20, 30 << 20}, time.Now()) m.Write(ctx, iout, 2, 10<<20, Slice{13, 50 << 20, 10 << 20, 30 << 20}, time.Now()) var copied uint64 - if st := m.CopyFileRange(ctx, iin, 150, iout, 30<<20, 200<<20, 0, &copied); st != 0 { + if st := m.CopyFileRange(ctx, iin, 150, iout, 30<<20, 200<<20, 0, &copied, nil); st != 0 { t.Fatalf("copy file range: %s", st) } var expected uint64 = 200 << 20 @@ -1964,17 +1964,17 @@ func testAttrFlags(t *testing.T, m Meta) { if st := m.SetAttr(ctx, fallocFile, SetAttrFlag, 0, attr); st != 0 { t.Fatalf("setattr f: %s", st) } - if st := m.Fallocate(ctx, fallocFile, fallocKeepSize, 0, 1024); st != 0 { + if st := m.Fallocate(ctx, fallocFile, fallocKeepSize, 0, 1024, nil); st != 0 { t.Fatalf("fallocate f: %s", st) } - if st := m.Fallocate(ctx, fallocFile, fallocKeepSize|fallocZeroRange, 0, 1024); st != syscall.EPERM { + if st := m.Fallocate(ctx, fallocFile, fallocKeepSize|fallocZeroRange, 0, 1024, nil); st != syscall.EPERM { t.Fatalf("fallocate f: %s", st) } attr.Flags = FlagImmutable if st := m.SetAttr(ctx, fallocFile, SetAttrFlag, 0, attr); st != 0 { t.Fatalf("setattr f: %s", st) } - if st := m.Fallocate(ctx, fallocFile, fallocKeepSize, 0, 1024); st != syscall.EPERM { + if st := m.Fallocate(ctx, fallocFile, fallocKeepSize, 0, 1024, nil); st != syscall.EPERM { t.Fatalf("fallocate f: %s", st) } @@ -1985,21 +1985,21 @@ func testAttrFlags(t *testing.T, m Meta) { if st := m.Create(ctx, 1, "copydstfile", 0644, 022, 0, ©dstFile, nil); st != 0 { t.Fatalf("create f: %s", st) } - if st := m.Fallocate(ctx, copysrcFile, 0, 0, 1024); st != 0 { + if st := m.Fallocate(ctx, copysrcFile, 0, 0, 1024, nil); st != 0 { t.Fatalf("fallocate f: %s", st) } attr.Flags = FlagAppend if st := m.SetAttr(ctx, copydstFile, SetAttrFlag, 0, attr); st != 0 { t.Fatalf("setattr f: %s", st) } - if st := m.CopyFileRange(ctx, copysrcFile, 0, copydstFile, 0, 1024, 0, nil); st != syscall.EPERM { + if st := m.CopyFileRange(ctx, copysrcFile, 0, copydstFile, 0, 1024, 0, nil, nil); st != syscall.EPERM { t.Fatalf("copy_file_range f: %s", st) } attr.Flags = FlagImmutable if st := m.SetAttr(ctx, copydstFile, SetAttrFlag, 0, attr); st != 0 { t.Fatalf("setattr f: %s", st) } - if st := m.CopyFileRange(ctx, copysrcFile, 0, copydstFile, 0, 1024, 0, nil); st != syscall.EPERM { + if st := m.CopyFileRange(ctx, copysrcFile, 0, copydstFile, 0, 1024, 0, nil, nil); st != syscall.EPERM { t.Fatalf("copy_file_range f: %s", st) } } @@ -2182,7 +2182,7 @@ func testDirStat(t *testing.T, m Meta) { checkResult(0, align4K(0), 1) // test dir with file and fallocate - if st := m.Fallocate(Background, fileInode, 0, 0, 4097); st != 0 { + if st := m.Fallocate(Background, fileInode, 0, 0, 4097, nil); st != 0 { t.Fatalf("fallocate: %s", st) } time.Sleep(500 * time.Millisecond) @@ -2313,7 +2313,7 @@ func testClone(t *testing.T, m Meta) { if eno := m.Mknod(Background, dir3, "file3", TypeFile, 0777, 022, 0, "", &file3, nil); eno != 0 { t.Fatalf("mknod: %s", eno) } - if eno := m.Fallocate(Background, file3, 0, 0, 67108864); eno != 0 { + if eno := m.Fallocate(Background, file3, 0, 0, 67108864, nil); eno != 0 { t.Fatalf("fallocate: %s", eno) } diff --git a/pkg/meta/interface.go b/pkg/meta/interface.go index 61dac6fca516..f31b9fb56c29 100644 --- a/pkg/meta/interface.go +++ b/pkg/meta/interface.go @@ -354,7 +354,7 @@ type Meta interface { // Truncate changes the length for given file. Truncate(ctx Context, inode Ino, flags uint8, attrlength uint64, attr *Attr, skipPermCheck bool) syscall.Errno // Fallocate preallocate given space for given file. - Fallocate(ctx Context, inode Ino, mode uint8, off uint64, size uint64) syscall.Errno + Fallocate(ctx Context, inode Ino, mode uint8, off uint64, size uint64, length *uint64) syscall.Errno // ReadLink returns the target of a symlink. ReadLink(ctx Context, inode Ino, path *[]byte) syscall.Errno // Symlink creates a symlink in a directory with given name. @@ -391,7 +391,7 @@ type Meta interface { // InvalidateChunkCache invalidate chunk cache InvalidateChunkCache(ctx Context, inode Ino, indx uint32) syscall.Errno // CopyFileRange copies part of a file to another one. - CopyFileRange(ctx Context, fin Ino, offIn uint64, fout Ino, offOut uint64, size uint64, flags uint32, copied *uint64) syscall.Errno + CopyFileRange(ctx Context, fin Ino, offIn uint64, fout Ino, offOut uint64, size uint64, flags uint32, copied, outLength *uint64) syscall.Errno // GetParents returns a map of node parents (> 1 parents if hardlinked) GetParents(ctx Context, inode Ino) map[Ino]int // GetDirStat returns the space and inodes usage of a directory. diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index 34295c775362..b01ed125ce40 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -1032,7 +1032,7 @@ func (m *redisMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64, return errno(err) } -func (m *redisMeta) Fallocate(ctx Context, inode Ino, mode uint8, off uint64, size uint64) syscall.Errno { +func (m *redisMeta) Fallocate(ctx Context, inode Ino, mode uint8, off uint64, size uint64, flength *uint64) syscall.Errno { if mode&fallocCollapesRange != 0 && mode != fallocCollapesRange { return syscall.EINVAL } @@ -1090,6 +1090,9 @@ func (m *redisMeta) Fallocate(ctx Context, inode Ino, mode uint8, off uint64, si if err := m.checkQuota(ctx, newSpace, 0, m.getParents(ctx, tx, inode, t.Parent)...); err != 0 { return err } + if flength != nil { + *flength = length + } t.Length = length now := time.Now() t.Mtime = now.Unix() @@ -2281,7 +2284,7 @@ func (m *redisMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice return errno(err) } -func (m *redisMeta) CopyFileRange(ctx Context, fin Ino, offIn uint64, fout Ino, offOut uint64, size uint64, flags uint32, copied *uint64) syscall.Errno { +func (m *redisMeta) CopyFileRange(ctx Context, fin Ino, offIn uint64, fout Ino, offOut uint64, size uint64, flags uint32, copied, outLength *uint64) syscall.Errno { defer m.timeit("CopyFileRange", time.Now()) f := m.of.find(fout) if f != nil { @@ -2306,7 +2309,9 @@ func (m *redisMeta) CopyFileRange(ctx Context, fin Ino, offIn uint64, fout Ino, return syscall.EINVAL } if offIn >= sattr.Length { - *copied = 0 + if copied != nil { + *copied = 0 + } return nil } size := size @@ -2336,6 +2341,9 @@ func (m *redisMeta) CopyFileRange(ctx Context, fin Ino, offIn uint64, fout Ino, attr.Mtimensec = uint32(now.Nanosecond()) attr.Ctime = now.Unix() attr.Ctimensec = uint32(now.Nanosecond()) + if outLength != nil { + *outLength = attr.Length + } var vals [][]string for i := offIn / ChunkSize; i <= (offIn+size)/ChunkSize; i++ { @@ -2402,7 +2410,9 @@ func (m *redisMeta) CopyFileRange(ctx Context, fin Ino, offIn uint64, fout Ino, return nil }) if err == nil { - *copied = size + if copied != nil { + *copied = size + } } return err }, m.inodeKey(fout), m.inodeKey(fin)) diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index 98b5e01b1977..5ce756340b96 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -1046,7 +1046,7 @@ func (m *dbMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64, at return errno(err) } -func (m *dbMeta) Fallocate(ctx Context, inode Ino, mode uint8, off uint64, size uint64) syscall.Errno { +func (m *dbMeta) Fallocate(ctx Context, inode Ino, mode uint8, off uint64, size uint64, flength *uint64) syscall.Errno { if mode&fallocCollapesRange != 0 && mode != fallocCollapesRange { return syscall.EINVAL } @@ -1112,6 +1112,9 @@ func (m *dbMeta) Fallocate(ctx Context, inode Ino, mode uint8, off uint64, size if _, err := s.Cols("length", "mtime", "ctime", "mtimensec", "ctimensec").Update(&nodeAttr, &node{Inode: inode}); err != nil { return err } + if flength != nil { + *flength = length + } if mode&(fallocZeroRange|fallocPunchHole) != 0 && off < old { off, size := off, size if off+size > old { @@ -2289,7 +2292,7 @@ func (m *dbMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice Sl return errno(err) } -func (m *dbMeta) CopyFileRange(ctx Context, fin Ino, offIn uint64, fout Ino, offOut uint64, size uint64, flags uint32, copied *uint64) syscall.Errno { +func (m *dbMeta) CopyFileRange(ctx Context, fin Ino, offIn uint64, fout Ino, offOut uint64, size uint64, flags uint32, copied, outLength *uint64) syscall.Errno { defer m.timeit("CopyFileRange", time.Now()) f := m.of.find(fout) if f != nil { @@ -2311,7 +2314,9 @@ func (m *dbMeta) CopyFileRange(ctx Context, fin Ino, offIn uint64, fout Ino, off return syscall.EINVAL } if offIn >= nin.Length { - *copied = 0 + if copied != nil { + *copied = 0 + } return nil } size := size @@ -2339,6 +2344,9 @@ func (m *dbMeta) CopyFileRange(ctx Context, fin Ino, offIn uint64, fout Ino, off nout.Ctime = now / 1e3 nout.Mtimensec = int16(now % 1e3) nout.Ctimensec = int16(now % 1e3) + if outLength != nil { + *outLength = nout.Length + } var cs []chunk err = s.Where("inode = ? AND indx >= ? AND indx <= ?", fin, offIn/ChunkSize, (offIn+size)/ChunkSize).ForUpdate().Find(&cs) @@ -2409,7 +2417,9 @@ func (m *dbMeta) CopyFileRange(ctx Context, fin Ino, offIn uint64, fout Ino, off if _, err := s.Cols("length", "mtime", "ctime", "mtimensec", "ctimensec").Update(&nout, &node{Inode: fout}); err != nil { return err } - *copied = size + if copied != nil { + *copied = size + } return nil }, fout) if err == nil { diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index c3652573f8ea..b09d9f087c83 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -965,7 +965,7 @@ func (m *kvMeta) Truncate(ctx Context, inode Ino, flags uint8, length uint64, at return errno(err) } -func (m *kvMeta) Fallocate(ctx Context, inode Ino, mode uint8, off uint64, size uint64) syscall.Errno { +func (m *kvMeta) Fallocate(ctx Context, inode Ino, mode uint8, off uint64, size uint64, flength *uint64) syscall.Errno { if mode&fallocCollapesRange != 0 && mode != fallocCollapesRange { return syscall.EINVAL } @@ -1020,6 +1020,9 @@ func (m *kvMeta) Fallocate(ctx Context, inode Ino, mode uint8, off uint64, size if err := m.checkQuota(ctx, newSpace, 0, m.getParents(tx, inode, t.Parent)...); err != 0 { return err } + if flength != nil { + *flength = length + } t.Length = length now := time.Now() t.Mtime = now.Unix() @@ -1970,7 +1973,7 @@ func (m *kvMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice Sl return errno(err) } -func (m *kvMeta) CopyFileRange(ctx Context, fin Ino, offIn uint64, fout Ino, offOut uint64, size uint64, flags uint32, copied *uint64) syscall.Errno { +func (m *kvMeta) CopyFileRange(ctx Context, fin Ino, offIn uint64, fout Ino, offOut uint64, size uint64, flags uint32, copied, outLength *uint64) syscall.Errno { defer m.timeit("CopyFileRange", time.Now()) var newLength, newSpace int64 f := m.of.find(fout) @@ -1992,7 +1995,9 @@ func (m *kvMeta) CopyFileRange(ctx Context, fin Ino, offIn uint64, fout Ino, off return syscall.EINVAL } if offIn >= sattr.Length { - *copied = 0 + if copied != nil { + *copied = 0 + } return nil } size := size @@ -2022,6 +2027,9 @@ func (m *kvMeta) CopyFileRange(ctx Context, fin Ino, offIn uint64, fout Ino, off attr.Mtimensec = uint32(now.Nanosecond()) attr.Ctime = now.Unix() attr.Ctimensec = uint32(now.Nanosecond()) + if outLength != nil { + *outLength = attr.Length + } vals := make(map[string][]byte) tx.scan(m.chunkKey(fin, uint32(offIn/ChunkSize)), m.chunkKey(fin, uint32((offIn+size)/ChunkSize)+1), @@ -2084,7 +2092,9 @@ func (m *kvMeta) CopyFileRange(ctx Context, fin Ino, offIn uint64, fout Ino, off } } tx.set(m.inodeKey(fout), m.marshal(&attr)) - *copied = size + if copied != nil { + *copied = size + } return nil }, fout) if err == nil { diff --git a/pkg/vfs/vfs.go b/pkg/vfs/vfs.go index 8f270816b62f..f335e89cb3a5 100644 --- a/pkg/vfs/vfs.go +++ b/pkg/vfs/vfs.go @@ -490,7 +490,7 @@ func (v *VFS) Truncate(ctx Context, ino Ino, size int64, fh uint64, attr *Attr) if err == 0 { v.writer.Truncate(ino, uint64(size)) v.reader.Truncate(ino, uint64(size)) - v.invalidateLength(ino) + v.invalidateAttr(ino) } return err } @@ -526,7 +526,7 @@ func (v *VFS) Release(ctx Context, ino Ino, fh uint64) { f.Unlock() if f.writer != nil { _ = f.writer.Flush(ctx) - v.invalidateLength(ino) + v.invalidateAttr(ino) } if locks&1 != 0 { _ = v.Meta.Flock(ctx, ino, owner, F_UNLCK, false) @@ -664,14 +664,15 @@ func (v *VFS) Write(ctx Context, ino Ino, buf []byte, off, fh uint64) (err sysca if err == 0 { writtenSizeHistogram.Observe(float64(len(buf))) - v.reader.Truncate(ino, v.writer.GetLength(ino)) + v.reader.Invalidate(ino, off, size) + v.invalidateAttr(ino) } return } -func (v *VFS) Fallocate(ctx Context, ino Ino, mode uint8, off, length int64, fh uint64) (err syscall.Errno) { - defer func() { logit(ctx, "fallocate (%d,%d,%d,%d): %s", ino, mode, off, length, strerr(err)) }() - if off < 0 || length <= 0 { +func (v *VFS) Fallocate(ctx Context, ino Ino, mode uint8, off, size int64, fh uint64) (err syscall.Errno) { + defer func() { logit(ctx, "fallocate (%d,%d,%d,%d): %s", ino, mode, off, size, strerr(err)) }() + if off < 0 || size <= 0 { err = syscall.EINVAL return } @@ -684,7 +685,7 @@ func (v *VFS) Fallocate(ctx Context, ino Ino, mode uint8, off, length int64, fh err = syscall.EBADF return } - if off >= maxFileSize || off+length >= maxFileSize { + if off >= maxFileSize || off+size >= maxFileSize { err = syscall.EFBIG return } @@ -699,7 +700,23 @@ func (v *VFS) Fallocate(ctx Context, ino Ino, mode uint8, off, length int64, fh defer h.Wunlock() defer h.removeOp(ctx) - err = v.Meta.Fallocate(ctx, ino, mode, uint64(off), uint64(length)) + err = v.writer.Flush(ctx, ino) + if err != 0 { + return + } + var length uint64 + err = v.Meta.Fallocate(ctx, ino, mode, uint64(off), uint64(size), &length) + if err == 0 { + v.writer.Truncate(ino, length) + s := size + if off+size > int64(length) { + s = int64(length) - off + } + if s > 0 { + v.reader.Invalidate(ino, uint64(off), uint64(s)) + } + v.invalidateAttr(ino) + } return } @@ -765,10 +782,12 @@ func (v *VFS) CopyFileRange(ctx Context, nodeIn Ino, fhIn, offIn uint64, nodeOut if err != 0 { return } - err = v.Meta.CopyFileRange(ctx, nodeIn, offIn, nodeOut, offOut, size, flags, &copied) + var length uint64 + err = v.Meta.CopyFileRange(ctx, nodeIn, offIn, nodeOut, offOut, size, flags, &copied, &length) if err == 0 { + v.writer.Truncate(nodeOut, length) v.reader.Invalidate(nodeOut, offOut, size) - v.invalidateLength(nodeOut) + v.invalidateAttr(nodeOut) } return } @@ -1006,7 +1025,7 @@ func NewVFS(conf *Config, m meta.Meta, store chunk.ChunkStore, registerer promet return v } -func (v *VFS) invalidateLength(ino Ino) { +func (v *VFS) invalidateAttr(ino Ino) { v.modM.Lock() v.modifiedAt[ino] = time.Now() v.modM.Unlock() From 6faccc9d0b0cf91a7765e56f5fa2e55b8b1a1554 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 29 Feb 2024 10:29:48 +0800 Subject: [PATCH 05/30] flush the source file of copy_file_range (#4436) --- pkg/vfs/vfs.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/vfs/vfs.go b/pkg/vfs/vfs.go index f335e89cb3a5..ceaea18fca85 100644 --- a/pkg/vfs/vfs.go +++ b/pkg/vfs/vfs.go @@ -778,6 +778,10 @@ func (v *VFS) CopyFileRange(ctx Context, nodeIn Ino, fhIn, offIn uint64, nodeOut defer hi.removeOp(ctx) } + err = v.writer.Flush(ctx, nodeIn) + if err != 0 { + return + } err = v.writer.Flush(ctx, nodeOut) if err != 0 { return From e7f99fcacebcfdf7cf2dd49c2fed1c371c57eb29 Mon Sep 17 00:00:00 2001 From: Sandy Xu Date: Wed, 6 Mar 2024 21:11:19 +0800 Subject: [PATCH 06/30] vfs: invalidate entry cache in readdir (#4453) --- pkg/fuse/fuse.go | 6 ++-- pkg/vfs/handle.go | 34 ++++++++++++++++++++++ pkg/vfs/vfs.go | 37 +++++++++++++++++++++++- pkg/vfs/vfs_test.go | 70 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 144 insertions(+), 3 deletions(-) diff --git a/pkg/fuse/fuse.go b/pkg/fuse/fuse.go index 10d11e9b609a..a4e25c3be2e1 100644 --- a/pkg/fuse/fuse.go +++ b/pkg/fuse/fuse.go @@ -354,11 +354,12 @@ func (fs *fileSystem) ReadDir(cancel <-chan struct{}, in *fuse.ReadIn, out *fuse defer releaseContext(ctx) entries, _, err := fs.v.Readdir(ctx, Ino(in.NodeId), in.Size, int(in.Offset), in.Fh, false) var de fuse.DirEntry - for _, e := range entries { + for i, e := range entries { de.Ino = uint64(e.Inode) de.Name = string(e.Name) de.Mode = e.Attr.SMode() if !out.AddDirEntry(de) { + fs.v.UpdateReaddirOffset(ctx, Ino(in.NodeId), in.Fh, int(in.Offset)+i) break } } @@ -371,12 +372,13 @@ func (fs *fileSystem) ReadDirPlus(cancel <-chan struct{}, in *fuse.ReadIn, out * entries, readAt, err := fs.v.Readdir(ctx, Ino(in.NodeId), in.Size, int(in.Offset), in.Fh, true) ctx.start = readAt var de fuse.DirEntry - for _, e := range entries { + for i, e := range entries { de.Ino = uint64(e.Inode) de.Name = string(e.Name) de.Mode = e.Attr.SMode() eo := out.AddDirLookupEntry(de) if eo == nil { + fs.v.UpdateReaddirOffset(ctx, Ino(in.NodeId), in.Fh, int(in.Offset)+i) break } if e.Attr.Full { diff --git a/pkg/vfs/handle.go b/pkg/vfs/handle.go index 98fe7e73461a..79aff9940c3d 100644 --- a/pkg/vfs/handle.go +++ b/pkg/vfs/handle.go @@ -33,6 +33,8 @@ type handle struct { // for dir children []*meta.Entry readAt time.Time + readOff int + index map[string]int // for file locks uint8 @@ -234,3 +236,35 @@ func (v *VFS) releaseFileHandle(ino Ino, fh uint64) { h.Close() } } + +func (v *VFS) invalidateDirHandle(parent Ino, name string, inode Ino, attr *Attr) { + v.hanleM.Lock() + hs := v.handles[parent] + v.hanleM.Unlock() + for _, h := range hs { + h.Lock() + if h.children != nil && h.index != nil { + if inode > 0 { + h.children = append(h.children, &meta.Entry{ + Inode: inode, + Name: []byte(name), + Attr: attr, + }) + h.index[name] = len(h.children) - 1 + } else { + i, ok := h.index[name] + if ok { + delete(h.index, name) + h.children[i].Inode = 0 // invalid + if i >= h.readOff { + // not read yet, remove it + h.children[i] = h.children[len(h.children)-1] + h.index[string(h.children[i].Name)] = i + h.children = h.children[:len(h.children)-1] + } + } + } + } + h.Unlock() + } +} diff --git a/pkg/vfs/vfs.go b/pkg/vfs/vfs.go index ceaea18fca85..fabbe869ad63 100644 --- a/pkg/vfs/vfs.go +++ b/pkg/vfs/vfs.go @@ -172,6 +172,7 @@ func (v *VFS) Mknod(ctx Context, parent Ino, name string, mode uint16, cumask ui err = v.Meta.Mknod(ctx, parent, name, _type, mode&07777, cumask, rdev, "", &inode, attr) if err == 0 { entry = &meta.Entry{Inode: inode, Attr: attr} + v.invalidateDirHandle(parent, name, inode, attr) } return } @@ -187,6 +188,9 @@ func (v *VFS) Unlink(ctx Context, parent Ino, name string) (err syscall.Errno) { return } err = v.Meta.Unlink(ctx, parent, name) + if err == 0 { + v.invalidateDirHandle(parent, name, 0, nil) + } return } @@ -208,6 +212,7 @@ func (v *VFS) Mkdir(ctx Context, parent Ino, name string, mode uint16, cumask ui err = v.Meta.Mkdir(ctx, parent, name, mode, cumask, 0, &inode, attr) if err == 0 { entry = &meta.Entry{Inode: inode, Attr: attr} + v.invalidateDirHandle(parent, name, inode, attr) } return } @@ -219,6 +224,9 @@ func (v *VFS) Rmdir(ctx Context, parent Ino, name string) (err syscall.Errno) { return } err = v.Meta.Rmdir(ctx, parent, name) + if err == 0 { + v.invalidateDirHandle(parent, name, 0, nil) + } return } @@ -240,6 +248,7 @@ func (v *VFS) Symlink(ctx Context, path string, parent Ino, name string) (entry err = v.Meta.Symlink(ctx, parent, name, path, &inode, attr) if err == 0 { entry = &meta.Entry{Inode: inode, Attr: attr} + v.invalidateDirHandle(parent, name, inode, attr) } return } @@ -267,7 +276,14 @@ func (v *VFS) Rename(ctx Context, parent Ino, name string, newparent Ino, newnam return } - err = v.Meta.Rename(ctx, parent, name, newparent, newname, flags, nil, nil) + var inode Ino + var attr = &Attr{} + err = v.Meta.Rename(ctx, parent, name, newparent, newname, flags, &inode, attr) + if err == 0 { + v.invalidateDirHandle(parent, name, 0, nil) + v.invalidateDirHandle(newparent, newname, 0, nil) + v.invalidateDirHandle(newparent, newname, inode, attr) + } return } @@ -292,6 +308,7 @@ func (v *VFS) Link(ctx Context, ino Ino, newparent Ino, newname string) (entry * err = v.Meta.Link(ctx, ino, newparent, newname, attr) if err == 0 { entry = &meta.Entry{Inode: ino, Attr: attr} + v.invalidateDirHandle(newparent, newname, ino, attr) } return } @@ -357,14 +374,31 @@ func (v *VFS) Readdir(ctx Context, ino Ino, size uint32, off int, fh uint64, plu }) } } + index := make(map[string]int) + for i, e := range inodes { + index[string(e.Name)] = i + } + h.index = index } if off < len(h.children) { entries = h.children[off:] + // we don't know how much of them will be sent, assume all of them + h.readOff = len(h.children) - 1 } readAt = h.readAt return } +func (v *VFS) UpdateReaddirOffset(ctx Context, ino Ino, fh uint64, off int) { + h := v.findHandle(ino, fh) + if h == nil { + return + } + h.Lock() + defer h.Unlock() + h.readOff = off +} + func (v *VFS) Releasedir(ctx Context, ino Ino, fh uint64) int { h := v.findHandle(ino, fh) if h == nil { @@ -398,6 +432,7 @@ func (v *VFS) Create(ctx Context, parent Ino, name string, mode uint16, cumask u v.UpdateLength(inode, attr) fh = v.newFileHandle(inode, attr.Length, flags) entry = &meta.Entry{Inode: inode, Attr: attr} + v.invalidateDirHandle(parent, name, inode, attr) } return } diff --git a/pkg/vfs/vfs_test.go b/pkg/vfs/vfs_test.go index a35ae77381dd..407cacda4543 100644 --- a/pkg/vfs/vfs_test.go +++ b/pkg/vfs/vfs_test.go @@ -860,3 +860,73 @@ func TestInternalFile(t *testing.T) { t.Fatalf("result: %s", string(resp[:n])) } } + +func TestReaddirCache(t *testing.T) { + v, _ := createTestVFS() + ctx := NewLogContext(meta.Background) + entry, st := v.Mkdir(ctx, 1, "testdir", 0777, 022) + if st != 0 { + t.Fatalf("mkdir testdir: %s", st) + } + parent := entry.Inode + for i := 0; i < 100; i++ { + _, _ = v.Mkdir(ctx, parent, fmt.Sprintf("d%d", i), 0777, 022) + } + fh, _ := v.Opendir(ctx, parent, 0) + _, _ = v.Mkdir(ctx, parent, fmt.Sprintf("d%d", 100), 0777, 022) + defer v.Releasedir(ctx, parent, fh) + var off = 20 + var files = make(map[string]bool) + // read first 20 + entries, _, _ := v.Readdir(ctx, parent, 20, 0, fh, true) + for _, e := range entries[:off] { + files[string(e.Name)] = true + } + v.UpdateReaddirOffset(ctx, parent, fh, off) + for i := 0; i < 100; i += 10 { + name := fmt.Sprintf("d%d", i) + _ = v.Rmdir(ctx, parent, name) + delete(files, name) + } + for i := 100; i < 110; i++ { + _, _ = v.Mkdir(ctx, parent, fmt.Sprintf("d%d", i), 0777, 022) + _ = v.Rename(ctx, parent, fmt.Sprintf("d%d", i), parent, fmt.Sprintf("d%d", i+10), 0) + delete(files, fmt.Sprintf("d%d", i)) + } + for { + entries, _, _ := v.Readdir(ctx, parent, 20, off, fh, true) + if len(entries) == 0 { + break + } + if len(entries) > 20 { + entries = entries[:20] + } + for _, e := range entries { + if e.Inode > 0 { + files[string(e.Name)] = true + } else { + t.Logf("invalid entry %s", e.Name) + } + } + off += len(entries) + v.UpdateReaddirOffset(ctx, parent, fh, off) + } + for i := 0; i < 100; i += 10 { + name := fmt.Sprintf("d%d", i) + if _, ok := files[name]; ok { + t.Fatalf("dir %s should be deleted", name) + } + } + for i := 100; i < 110; i++ { + name := fmt.Sprintf("d%d", i) + if _, ok := files[name]; ok { + t.Fatalf("dir %s should be deleted", name) + } + } + for i := 110; i < 120; i++ { + name := fmt.Sprintf("d%d", i) + if _, ok := files[name]; !ok { + t.Fatalf("dir %s should be added", name) + } + } +} From 64219aab898fbbe66ade80ad28e896d83dd200d3 Mon Sep 17 00:00:00 2001 From: li1213987842 <30381726+li1213987842@users.noreply.github.com> Date: Tue, 12 Mar 2024 10:42:03 +0800 Subject: [PATCH 07/30] gateway: fix gateway head dir return 200 but expect 404 (#4486) --- pkg/gateway/gateway.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 735e94225151..7aea7e10744c 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -601,8 +601,15 @@ func (n *jfsObjects) GetObjectInfo(ctx context.Context, bucket, object string, o err = jfsToObjectErr(ctx, eno, bucket, object) return } - // put /dir1/key1; head /dir1 return 404; head /dir1/ return 200 - if strings.HasSuffix(object, sep) && !fi.IsDir() || !strings.HasSuffix(object, sep) && fi.IsDir() { + // put /dir1/key1; head /dir1 return 404; head /dir1/ return 404; head /dir1/key1 return 200 + // put /dir1/key1/; head /dir1/key1 return 404; head /dir1/key1/ return 200 + var isObject bool + if strings.HasSuffix(object, sep) && fi.IsDir() && fi.Atime() == 0 { + isObject = true + } else if !strings.HasSuffix(object, sep) && !fi.IsDir() { + isObject = true + } + if !isObject { err = jfsToObjectErr(ctx, syscall.ENOENT, bucket, object) return } From 59394e9178786af2bb5ac1d3ba7c114000a9ba12 Mon Sep 17 00:00:00 2001 From: Sandy Xu Date: Thu, 14 Mar 2024 18:06:36 +0800 Subject: [PATCH 08/30] gateway: add open flag when downloading an object (#4507) --- cmd/object.go | 2 +- pkg/gateway/gateway.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/object.go b/cmd/object.go index a82ba0fbeac0..9e17518eb8b8 100644 --- a/cmd/object.go +++ b/cmd/object.go @@ -96,7 +96,7 @@ func (f *jFile) Close() error { } func (j *juiceFS) Get(key string, off, limit int64) (io.ReadCloser, error) { - f, err := j.jfs.Open(ctx, j.path(key), 0) + f, err := j.jfs.Open(ctx, j.path(key), vfs.MODE_MASK_R) if err != 0 { return nil, err } diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index 7aea7e10744c..caff44dc78ee 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -468,7 +468,7 @@ func (n *jfsObjects) GetObjectNInfo(ctx context.Context, bucket, object string, if err != nil { return } - f, eno := n.fs.Open(mctx, n.path(bucket, object), 0) + f, eno := n.fs.Open(mctx, n.path(bucket, object), vfs.MODE_MASK_R) if eno != 0 { return nil, jfsToObjectErr(ctx, eno, bucket, object) } From 11fd1a28d7601f0b3bba4f30431a3f84b7c3d339 Mon Sep 17 00:00:00 2001 From: tangyoupeng Date: Mon, 18 Mar 2024 15:43:13 +0800 Subject: [PATCH 09/30] fix guid lookupGroup (#4532) --- sdk/java/libjfs/guid.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/java/libjfs/guid.go b/sdk/java/libjfs/guid.go index db14d349879e..95f743a7756b 100644 --- a/sdk/java/libjfs/guid.go +++ b/sdk/java/libjfs/guid.go @@ -115,7 +115,7 @@ func (m *mapping) lookupGroup(name string) uint32 { } logger.Debugf("update group to %s:%d by lookup group", name, id) m.updateGroup(name, id) - return 0 + return id } func (m *mapping) lookupUserID(id uint32) string { From ecb0765ef40803f45f6f510a4597d8c040c4aaa2 Mon Sep 17 00:00:00 2001 From: tangyoupeng Date: Wed, 20 Mar 2024 10:12:23 +0800 Subject: [PATCH 10/30] Hadoop: rmr check perm only when is not empty (#4539) --- pkg/meta/utils.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/meta/utils.go b/pkg/meta/utils.go index 30a0e9c20253..a69d38f6111d 100644 --- a/pkg/meta/utils.go +++ b/pkg/meta/utils.go @@ -256,9 +256,6 @@ func updateLocks(ls []plockRecord, nl plockRecord) []plockRecord { } func (m *baseMeta) emptyDir(ctx Context, inode Ino, skipCheckTrash bool, count *uint64, concurrent chan int) syscall.Errno { - if st := m.Access(ctx, inode, MODE_MASK_W|MODE_MASK_X, nil); st != 0 { - return st - } for { var entries []*Entry if st := m.en.doReaddir(ctx, inode, 0, &entries, 10000); st != 0 && st != syscall.ENOENT { @@ -267,6 +264,9 @@ func (m *baseMeta) emptyDir(ctx Context, inode Ino, skipCheckTrash bool, count * if len(entries) == 0 { return 0 } + if st := m.Access(ctx, inode, MODE_MASK_W|MODE_MASK_X, nil); st != 0 { + return st + } var wg sync.WaitGroup var status syscall.Errno // try directories first to increase parallel From a4a58f12a2d2ce8e267bb117b931e139c0a97dc9 Mon Sep 17 00:00:00 2001 From: Sandy Xu Date: Wed, 27 Mar 2024 17:10:37 +0800 Subject: [PATCH 11/30] meta: fix negative values in quota before used in statfs (#4591) --- pkg/meta/base.go | 19 +++++++------------ pkg/meta/base_test.go | 21 +++++++++++++++++++++ pkg/meta/interface.go | 16 ++++++++++++++++ 3 files changed, 44 insertions(+), 12 deletions(-) diff --git a/pkg/meta/base.go b/pkg/meta/base.go index a8e582079f44..686d0fe96128 100644 --- a/pkg/meta/base.go +++ b/pkg/meta/base.go @@ -986,25 +986,20 @@ func (m *baseMeta) StatFS(ctx Context, ino Ino, totalspace, availspace, iused, i if q == nil { continue } + q.sanitize() if usage == nil { usage = q } if q.MaxSpace > 0 { - ls := q.MaxSpace - q.UsedSpace - if ls < 0 { - ls = 0 - } - if uint64(ls) < *availspace { - *availspace = uint64(ls) + ls := uint64(q.MaxSpace - q.UsedSpace) + if ls < *availspace { + *availspace = ls } } if q.MaxInodes > 0 { - li := q.MaxInodes - q.UsedInodes - if li < 0 { - li = 0 - } - if uint64(li) < *iavail { - *iavail = uint64(li) + li := uint64(q.MaxInodes - q.UsedInodes) + if li < *iavail { + *iavail = li } } } diff --git a/pkg/meta/base_test.go b/pkg/meta/base_test.go index b6a1ec7338f8..6df45774ef01 100644 --- a/pkg/meta/base_test.go +++ b/pkg/meta/base_test.go @@ -659,6 +659,27 @@ func testMetaClient(t *testing.T, m Meta) { t.Fatalf("total space %d, iavail %d", totalspace, iavail) } + base.loadQuotas() + base.quotaMu.RLock() + q := base.dirQuotas[subIno] + base.quotaMu.RUnlock() + q.update(4<<10, 15) // used > max + base.doFlushQuotas() + if st := m.StatFS(ctx, subIno, &totalspace, &availspace, &iused, &iavail); st != 0 { + t.Fatalf("statfs: %s", st) + } + if totalspace != 4<<10 || availspace != 0 || iused != 15 || iavail != 0 { + t.Fatalf("total space %d, availspace %d, iused %d, iavail %d", totalspace, availspace, iused, iavail) + } + q.update(-8<<10, -20) // used < 0 + base.doFlushQuotas() + if st := m.StatFS(ctx, subIno, &totalspace, &availspace, &iused, &iavail); st != 0 { + t.Fatalf("statfs: %s", st) + } + if totalspace != 1<<10 || availspace != 1<<10 || iused != 0 || iavail != 10 { + t.Fatalf("total space %d, availspace %d, iused %d, iavail %d", totalspace, availspace, iused, iavail) + } + if st := m.Rmdir(ctx, 1, "subdir"); st != 0 { t.Fatalf("rmdir subdir: %s", st) } diff --git a/pkg/meta/interface.go b/pkg/meta/interface.go index f31b9fb56c29..50febddccaa4 100644 --- a/pkg/meta/interface.go +++ b/pkg/meta/interface.go @@ -304,6 +304,22 @@ func (q *Quota) update(space, inodes int64) { atomic.AddInt64(&q.newInodes, inodes) } +// not thread safe +func (q *Quota) sanitize() { + if q.UsedSpace < 0 { + q.UsedSpace = 0 + } + if q.MaxSpace > 0 && q.MaxSpace < q.UsedSpace { + q.MaxSpace = q.UsedSpace + } + if q.UsedInodes < 0 { + q.UsedInodes = 0 + } + if q.MaxInodes > 0 && q.MaxInodes < q.UsedInodes { + q.MaxInodes = q.UsedInodes + } +} + // Meta is a interface for a meta service for file system. type Meta interface { // Name of database From c42509bc936045ddbd5d3f163ee4ca7a098fc194 Mon Sep 17 00:00:00 2001 From: jiefenghuang Date: Mon, 1 Apr 2024 21:42:14 +0800 Subject: [PATCH 12/30] enhance: check for link (#4625) Signed-off-by: jiefenghuang --- pkg/meta/base.go | 3 +++ pkg/meta/redis.go | 2 +- pkg/meta/sql.go | 2 +- pkg/meta/tkv.go | 2 +- 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/meta/base.go b/pkg/meta/base.go index 686d0fe96128..c43396e1d26e 100644 --- a/pkg/meta/base.go +++ b/pkg/meta/base.go @@ -1397,6 +1397,9 @@ func (m *baseMeta) Link(ctx Context, inode, parent Ino, name string, attr *Attr) if name == "" { return syscall.ENOENT } + if name == "." || name == ".." { + return syscall.EEXIST + } defer m.timeit("Link", time.Now()) if attr == nil { diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index b01ed125ce40..d5cac93817cb 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -1896,7 +1896,7 @@ func (m *redisMeta) doLink(ctx Context, inode, parent Ino, name string, attr *At if pattr.Parent > TrashInode { return syscall.ENOENT } - if st := m.Access(ctx, parent, MODE_MASK_W, &pattr); st != 0 { + if st := m.Access(ctx, parent, MODE_MASK_W|MODE_MASK_X, &pattr); st != 0 { return st } if pattr.Flags&FlagImmutable != 0 { diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index 5ce756340b96..0a760112c2f2 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -1954,7 +1954,7 @@ func (m *dbMeta) doLink(ctx Context, inode, parent Ino, name string, attr *Attr) } var pattr Attr m.parseAttr(&pn, &pattr) - if st := m.Access(ctx, parent, MODE_MASK_W, &pattr); st != 0 { + if st := m.Access(ctx, parent, MODE_MASK_W|MODE_MASK_X, &pattr); st != 0 { return st } if pn.Flags&FlagImmutable != 0 { diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index b09d9f087c83..7e917730de09 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -1729,7 +1729,7 @@ func (m *kvMeta) doLink(ctx Context, inode, parent Ino, name string, attr *Attr) if pattr.Parent > TrashInode { return syscall.ENOENT } - if st := m.Access(ctx, parent, MODE_MASK_W, &pattr); st != 0 { + if st := m.Access(ctx, parent, MODE_MASK_W|MODE_MASK_X, &pattr); st != 0 { return st } if pattr.Flags&FlagImmutable != 0 { From ef24d00640d545cb97dbdfbff5f806f84d407e2b Mon Sep 17 00:00:00 2001 From: Sandy Xu Date: Mon, 1 Apr 2024 22:21:46 +0800 Subject: [PATCH 13/30] meta/sql: fix nanosecond part in Write (#4618) --- pkg/meta/sql.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index 0a760112c2f2..cac46df19cc2 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -2253,10 +2253,10 @@ func (m *dbMeta) Write(ctx Context, inode Ino, indx uint32, off uint32, slice Sl return err } nodeAttr.Mtime = mtime.UnixNano() / 1e3 - nodeAttr.Mtimensec = int16(mtime.Nanosecond()) + nodeAttr.Mtimensec = int16(mtime.Nanosecond() % 1e3) ctime := time.Now() nodeAttr.Ctime = ctime.UnixNano() / 1e3 - nodeAttr.Ctimensec = int16(ctime.Nanosecond()) + nodeAttr.Ctimensec = int16(ctime.Nanosecond() % 1e3) var ck = chunk{Inode: inode, Indx: indx} ok, err = s.ForUpdate().MustCols("indx").Get(&ck) From 87ba05de7582b0cf6af183466a1bc9f479e344b8 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 3 Apr 2024 15:48:08 +0800 Subject: [PATCH 14/30] pkg/object: fix storage class of BOS (#4636) Co-authored-by: zhijian --- pkg/object/bos.go | 4 +++ pkg/object/object_storage_test.go | 50 ++++++++++--------------------- 2 files changed, 20 insertions(+), 34 deletions(-) diff --git a/pkg/object/bos.go b/pkg/object/bos.go index 2c1a0b4bce87..6070d7fbff88 100644 --- a/pkg/object/bos.go +++ b/pkg/object/bos.go @@ -57,6 +57,10 @@ func (q *bosclient) Limits() Limits { } } +func (q *bosclient) SetStorageClass(sc string) { + q.sc = sc +} + func (q *bosclient) Create() error { _, err := q.c.PutBucket(q.bucket) if err == nil && q.sc != "" { diff --git a/pkg/object/object_storage_test.go b/pkg/object/object_storage_test.go index b4522ddad59e..e16d4ec71d63 100644 --- a/pkg/object/object_storage_test.go +++ b/pkg/object/object_storage_test.go @@ -73,40 +73,22 @@ func listAll(s ObjectStorage, prefix, marker string, limit int64, followLink boo } func setStorageClass(o ObjectStorage) string { - switch s := o.(type) { - case *wasb: - s.sc = string(blob2.AccessTierCool) - return s.sc - case *bosclient: - s.sc = "STANDARD_IA" - return s.sc - case *COS: - s.sc = "STANDARD_IA" - return s.sc - case *ks3: - s.sc = "STANDARD_IA" - return s.sc - case *gs: - s.sc = "NEARLINE" - return s.sc - case *obsClient: - s.sc = "STANDARD_IA" - return s.sc - case *ossClient: - s.sc = string(oss.StorageIA) - return s.sc - case *qingstor: - s.sc = "STANDARD_IA" - return s.sc - case *s3client: - s.sc = "STANDARD_IA" - return s.sc - case *tosClient: - s.sc = string(enum.StorageClassIa) - return s.sc - default: - return "" - } + if osc, ok := o.(SupportStorageClass); ok { + var sc = "STANDARD_IA" + switch o.(type) { + case *wasb: + sc = string(blob2.AccessTierCool) + case *gs: + sc = "NEARLINE" + case *ossClient: + sc = string(oss.StorageIA) + case *tosClient: + sc = string(enum.StorageClassIa) + } + osc.SetStorageClass(sc) + return sc + } + return "" } // nolint:errcheck From f1e6577ab3ddc3b9c71f0e4925161a9ccce59e3b Mon Sep 17 00:00:00 2001 From: zhijian Date: Mon, 15 Apr 2024 11:32:29 +0800 Subject: [PATCH 15/30] =?UTF-8?q?gateway=EF=BC=9Afix=20the=20way=20bucket?= =?UTF-8?q?=20paths=20are=20calculated=20in=20multi-bucket=20mode=20(#4687?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/gateway/gateway.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index caff44dc78ee..ce97e89c06f8 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -157,7 +157,7 @@ func (n *jfsObjects) isValidBucketName(bucket string) error { } func (n *jfsObjects) path(p ...string) string { - if len(p) > 0 && p[0] == n.conf.Format.Name { + if !n.gConf.MultiBucket && len(p) > 0 && p[0] == n.conf.Format.Name { p = p[1:] } return sep + minio.PathJoin(p...) From 89986f8a90704227fbc6010eadd93c3e31414972 Mon Sep 17 00:00:00 2001 From: Changxin Miao Date: Wed, 17 Apr 2024 16:53:32 +0800 Subject: [PATCH 16/30] Fix panic when reading and writing s.writer concurrently (#4710) Signed-off-by: Changxin Miao Co-authored-by: Davies Liu --- pkg/vfs/writer.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/vfs/writer.go b/pkg/vfs/writer.go index aaedc00ffbb3..2ab7331cad0c 100644 --- a/pkg/vfs/writer.go +++ b/pkg/vfs/writer.go @@ -119,7 +119,6 @@ func (s *sliceWriter) flushData() { s.writer.Abort() s.err = syscall.EIO } - s.writer = nil } // protected by s.chunk.file From 4e3639b18fcd84b764925f8b21ab6b0f57ea6ef8 Mon Sep 17 00:00:00 2001 From: zhijian Date: Thu, 18 Apr 2024 18:45:36 +0800 Subject: [PATCH 17/30] postgres: fix postgres address parse (#4731) --- pkg/object/object_storage_test.go | 2 +- pkg/object/sql.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/object/object_storage_test.go b/pkg/object/object_storage_test.go index e16d4ec71d63..047b72247266 100644 --- a/pkg/object/object_storage_test.go +++ b/pkg/object/object_storage_test.go @@ -911,7 +911,7 @@ func TestPG(t *testing.T) { //skip mutate } func TestPGWithSearchPath(t *testing.T) { //skip mutate - _, err := newSQLStore("postgres", "localhost:5432/test?sslmode=disable&search_path=juicefs,public", "", "") + _, err := newSQLStore("postgres", "127.0.0.1:5432/test?sslmode=disable&search_path=juicefs,public", "", "") if !strings.Contains(err.Error(), "currently, only one schema is supported in search_path") { t.Fatalf("TestPGWithSearchPath error: %s", err) } diff --git a/pkg/object/sql.go b/pkg/object/sql.go index 21b7b1df1a3d..622f358eeeab 100644 --- a/pkg/object/sql.go +++ b/pkg/object/sql.go @@ -168,9 +168,9 @@ func newSQLStore(driver, addr, user, password string) (ObjectStorage, error) { uri = "postgres://" + uri driver = "pgx" - parse, err := url.Parse(addr) + parse, err := url.Parse(uri) if err != nil { - return nil, fmt.Errorf("parse url %s failed: %s", addr, err) + return nil, fmt.Errorf("parse url %s failed: %s", uri, err) } searchPath = parse.Query().Get("search_path") if searchPath != "" { From f902cc6ec6df1d0824b51a6366aa3dac8a2abeef Mon Sep 17 00:00:00 2001 From: Changxin Miao Date: Mon, 22 Apr 2024 16:09:59 +0800 Subject: [PATCH 18/30] meta: Fix error message in quota always shows inode 0 (#4741) Signed-off-by: Changxin Miao --- pkg/meta/base.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/meta/base.go b/pkg/meta/base.go index c43396e1d26e..1a8d9702566b 100644 --- a/pkg/meta/base.go +++ b/pkg/meta/base.go @@ -708,8 +708,9 @@ func (m *baseMeta) getQuotaParent(ctx Context, inode Ino) Ino { if inode <= RootInode { break } + lastInode := inode if inode, st = m.getDirParent(ctx, inode); st != 0 { - logger.Warnf("Get directory parent of inode %d: %s", inode, st) + logger.Warnf("Get directory parent of inode %d: %s", lastInode, st) break } } @@ -732,8 +733,9 @@ func (m *baseMeta) checkDirQuota(ctx Context, inode Ino, space, inodes int64) bo if inode <= RootInode { break } + lastInode := inode if inode, st = m.getDirParent(ctx, inode); st != 0 { - logger.Warnf("Get directory parent of inode %d: %s", inode, st) + logger.Warnf("Get directory parent of inode %d: %s", lastInode, st) break } } @@ -756,8 +758,9 @@ func (m *baseMeta) updateDirQuota(ctx Context, inode Ino, space, inodes int64) { if inode <= RootInode { break } + lastInode := inode if inode, st = m.getDirParent(ctx, inode); st != 0 { - logger.Warnf("Get directory parent of inode %d: %s", inode, st) + logger.Warnf("Get directory parent of inode %d: %s", lastInode, st) break } } From 790db4184802d93393abb9fd0ffee9ec8c1d9269 Mon Sep 17 00:00:00 2001 From: zhijian Date: Mon, 22 Apr 2024 16:12:47 +0800 Subject: [PATCH 19/30] cmd/clone: set the attr should be done after checking permissions (#4745) --- pkg/meta/sql.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index cac46df19cc2..e9594ba89e3d 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -3907,6 +3907,12 @@ func (m *dbMeta) doCloneEntry(ctx Context, srcIno Ino, parent Ino, name string, n.Inode = ino n.Parent = parent now := time.Now() + + m.parseAttr(&n, attr) + if eno := m.Access(ctx, srcIno, MODE_MASK_R, attr); eno != 0 { + return eno + } + if cmode&CLONE_MODE_PRESERVE_ATTR == 0 { n.Uid = ctx.Uid() n.Gid = ctx.Gid() @@ -3922,10 +3928,6 @@ func (m *dbMeta) doCloneEntry(ctx Context, srcIno Ino, parent Ino, name string, if n.Type == TypeFile && n.Nlink > 1 { n.Nlink = 1 } - m.parseAttr(&n, attr) - if eno := m.Access(ctx, srcIno, MODE_MASK_R, attr); eno != 0 { - return eno - } if top { var pattr Attr From ab138028e36038171da6129b378fe3240c5a104a Mon Sep 17 00:00:00 2001 From: Sandy Xu Date: Wed, 1 May 2024 11:48:56 +0800 Subject: [PATCH 20/30] meta/redis: fix some transactions that are not wrapped by multi/exec (#4801) --- pkg/meta/redis.go | 35 +++++++++++++++++++++++++++-------- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index d5cac93817cb..ac66f373fe97 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -389,10 +389,11 @@ func (m *redisMeta) incrCounter(name string, value int64) (int64, error) { func (m *redisMeta) setIfSmall(name string, value, diff int64) (bool, error) { var changed bool + ctx := Background name = m.prefix + name - err := m.txn(Background, func(tx *redis.Tx) error { + err := m.txn(ctx, func(tx *redis.Tx) error { changed = false - old, err := tx.Get(Background, name).Int64() + old, err := tx.Get(ctx, name).Int64() if err != nil && err != redis.Nil { return err } @@ -400,7 +401,11 @@ func (m *redisMeta) setIfSmall(name string, value, diff int64) (bool, error) { return nil } else { changed = true - return tx.Set(Background, name, value, 0).Err() + _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error { + pipe.Set(ctx, name, value, 0) + return nil + }) + return err } }, name) @@ -1184,7 +1189,11 @@ func (m *redisMeta) doReadlink(ctx Context, inode Ino, noatime bool) (atime int6 } attr.Atime = now.Unix() attr.Atimensec = uint32(now.Nanosecond()) - return tx.Set(ctx, m.inodeKey(inode), m.marshal(attr), 0).Err() + _, e = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error { + pipe.Set(ctx, m.inodeKey(inode), m.marshal(attr), 0) + return nil + }) + return e }, m.inodeKey(inode)) atime = attr.Atime return @@ -3386,7 +3395,11 @@ func (m *redisMeta) doRepair(ctx Context, inode Ino, attr *Attr) syscall.Errno { attr.Nlink++ } } - return tx.Set(ctx, m.inodeKey(inode), m.marshal(attr), 0).Err() + _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error { + pipe.Set(ctx, m.inodeKey(inode), m.marshal(attr), 0) + return nil + }) + return err }, m.inodeKey(inode), m.entryKey(inode))) } @@ -3435,10 +3448,13 @@ func (m *redisMeta) doSetXattr(ctx Context, inode Ino, name string, value []byte } else if !ok { return ENOATTR } - _, err := m.rdb.HSet(ctx, key, name, value).Result() + _, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error { + pipe.HSet(ctx, key, name, value) + return nil + }) return err default: // XattrCreateOrReplace - _, err := m.rdb.HSet(ctx, key, name, value).Result() + _, err := tx.HSet(ctx, key, name, value).Result() return err } }, key)) @@ -4413,7 +4429,10 @@ func (m *redisMeta) doTouchAtime(ctx Context, inode Ino, attr *Attr, now time.Ti } attr.Atime = now.Unix() attr.Atimensec = uint32(now.Nanosecond()) - if err = tx.Set(ctx, m.inodeKey(inode), m.marshal(attr), 0).Err(); err == nil { + if _, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error { + pipe.Set(ctx, m.inodeKey(inode), m.marshal(attr), 0) + return nil + }); err == nil { updated = true } return err From c37e3d4cb885bf9820301c9b3f44d8981a3d1162 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 13 May 2024 15:38:19 +0800 Subject: [PATCH 21/30] sync: fix concurrent stats update in cluster mode (#4843) --- pkg/sync/cluster.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/sync/cluster.go b/pkg/sync/cluster.go index 656e820f4d41..e0e60d509d57 100644 --- a/pkg/sync/cluster.go +++ b/pkg/sync/cluster.go @@ -83,7 +83,11 @@ func httpRequest(url string, body []byte) (ans []byte, err error) { return io.ReadAll(resp.Body) } +var sendStatMu sync.Mutex + func sendStats(addr string) { + sendStatMu.Lock() + defer sendStatMu.Unlock() var r Stat r.Skipped = skipped.Current() r.Copied = copied.Current() From 159a402992d08d274f15b49924d8768d219709da Mon Sep 17 00:00:00 2001 From: zhijian Date: Wed, 15 May 2024 19:50:08 +0800 Subject: [PATCH 22/30] cmd/clone: fix src mode check (#4856) --- pkg/meta/redis.go | 9 +++++---- pkg/meta/tkv.go | 9 +++++---- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index ac66f373fe97..2088e0eba40f 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -4212,14 +4212,15 @@ func (m *redisMeta) LoadMeta(r io.Reader) (err error) { return err } -func (m *redisMeta) doCloneEntry(ctx Context, srcIno Ino, parent Ino, name string, ino Ino, attr *Attr, cmode uint8, cumask uint16, top bool) syscall.Errno { +func (m *redisMeta) doCloneEntry(ctx Context, srcIno Ino, parent Ino, name string, ino Ino, originAttr *Attr, cmode uint8, cumask uint16, top bool) syscall.Errno { return errno(m.txn(ctx, func(tx *redis.Tx) error { a, err := tx.Get(ctx, m.inodeKey(srcIno)).Bytes() if err != nil { return err } - m.parseAttr(a, attr) - if eno := m.Access(ctx, srcIno, MODE_MASK_R, attr); eno != 0 { + m.parseAttr(a, originAttr) + attr := *originAttr + if eno := m.Access(ctx, srcIno, MODE_MASK_R, &attr); eno != 0 { return eno } attr.Parent = parent @@ -4268,7 +4269,7 @@ func (m *redisMeta) doCloneEntry(ctx Context, srcIno Ino, parent Ino, name strin } _, err = tx.TxPipelined(ctx, func(p redis.Pipeliner) error { - p.Set(ctx, m.inodeKey(ino), m.marshal(attr), 0) + p.Set(ctx, m.inodeKey(ino), m.marshal(&attr), 0) p.IncrBy(ctx, m.usedSpaceKey(), align4K(attr.Length)) p.Incr(ctx, m.totalInodesKey()) if len(srcXattr) > 0 { diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index 7e917730de09..38b6d1f358e1 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -3359,14 +3359,15 @@ func (m *kvMeta) LoadMeta(r io.Reader) error { }) } -func (m *kvMeta) doCloneEntry(ctx Context, srcIno Ino, parent Ino, name string, ino Ino, attr *Attr, cmode uint8, cumask uint16, top bool) syscall.Errno { +func (m *kvMeta) doCloneEntry(ctx Context, srcIno Ino, parent Ino, name string, ino Ino, originAttr *Attr, cmode uint8, cumask uint16, top bool) syscall.Errno { return errno(m.txn(func(tx *kvTxn) error { a := tx.get(m.inodeKey(srcIno)) if a == nil { return syscall.ENOENT } - m.parseAttr(a, attr) - if eno := m.Access(ctx, srcIno, MODE_MASK_R, attr); eno != 0 { + m.parseAttr(a, originAttr) + attr := *originAttr + if eno := m.Access(ctx, srcIno, MODE_MASK_R, &attr); eno != 0 { return eno } attr.Parent = parent @@ -3416,7 +3417,7 @@ func (m *kvMeta) doCloneEntry(ctx Context, srcIno Ino, parent Ino, name string, } } - tx.set(m.inodeKey(ino), m.marshal(attr)) + tx.set(m.inodeKey(ino), m.marshal(&attr)) prefix := m.xattrKey(srcIno, "") tx.scan(prefix, nextKey(prefix), false, func(k, v []byte) bool { tx.set(m.xattrKey(ino, string(k[len(prefix):])), v) From 2cd2e016546b3e3cdd829981c82353e6ab619db5 Mon Sep 17 00:00:00 2001 From: Sandy Xu Date: Tue, 21 May 2024 10:28:31 +0800 Subject: [PATCH 23/30] vfs: fix the race of getting file length (#4876) --- pkg/vfs/reader.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/vfs/reader.go b/pkg/vfs/reader.go index 72c67240c5b5..35a14ec51b65 100644 --- a/pkg/vfs/reader.go +++ b/pkg/vfs/reader.go @@ -168,12 +168,10 @@ func (s *sliceReader) run() { inode := f.inode f.Unlock() - f.Lock() - length := f.length - f.Unlock() var slices []meta.Slice err := f.r.m.Read(meta.Background, inode, indx, &slices) f.Lock() + length := f.length if s.state != BUSY || f.err != 0 || f.closing { s.done(0, 0) } From 5d39bcdb2c2004b4246b08815c529f560db534ce Mon Sep 17 00:00:00 2001 From: prOOrc Date: Tue, 28 May 2024 04:55:17 +0300 Subject: [PATCH 24/30] fix: JfsObjects.CompleteMultipartUpload (#4893) Co-authored-by: zhijian --- pkg/gateway/gateway.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index ce97e89c06f8..cad4f1ce5ac0 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -932,7 +932,7 @@ func (n *jfsObjects) CompleteMultipartUpload(ctx context.Context, bucket, object var total uint64 for _, part := range parts { p := n.ppath(bucket, uploadID, strconv.Itoa(part.PartNumber)) - copied, eno := n.fs.CopyFileRange(mctx, p, 0, tmp, total, 1<<30) + copied, eno := n.fs.CopyFileRange(mctx, p, 0, tmp, total, 5<<30) if eno != 0 { err = jfsToObjectErr(ctx, eno, bucket, object, uploadID) logger.Errorf("merge parts: %s", err) From 6d5126665e13d9def3e32ca510b368d51c232339 Mon Sep 17 00:00:00 2001 From: Sandy Xu Date: Thu, 30 May 2024 16:14:03 +0800 Subject: [PATCH 25/30] meta: fix protection check for compaction (#4901) --- pkg/meta/base_test.go | 20 ++++++++++++++++++++ pkg/meta/redis.go | 32 +++++++++++++++++--------------- pkg/meta/sql.go | 30 ++++++++++++++++-------------- pkg/meta/tkv.go | 30 ++++++++++++++++-------------- 4 files changed, 69 insertions(+), 43 deletions(-) diff --git a/pkg/meta/base_test.go b/pkg/meta/base_test.go index 6df45774ef01..193e1b825f0e 100644 --- a/pkg/meta/base_test.go +++ b/pkg/meta/base_test.go @@ -1304,6 +1304,26 @@ func testCompaction(t *testing.T, m Meta, trash bool) { if len(slices) != 1 || slices[0].Len != 3<<20 { t.Fatalf("inode %d should be compacted, but have %d slices, size %d", inode, len(slices), slices[0].Len) } + + m.NewSlice(ctx, &sliceId) + _ = m.Write(ctx, inode, 2, 0, Slice{Id: sliceId, Size: 2338508, Len: 2338508}, time.Now()) + m.NewSlice(ctx, &sliceId) + _ = m.Write(ctx, inode, 2, 8829056, Slice{Id: sliceId, Size: 1074933, Len: 1074933}, time.Now()) + m.NewSlice(ctx, &sliceId) + _ = m.Write(ctx, inode, 2, 7663608, Slice{Id: sliceId, Size: 41480, Len: 4148}, time.Now()) + _ = m.Fallocate(ctx, inode, fallocZeroRange, 2*ChunkSize+4515328, 3152428, nil) + _ = m.Fallocate(ctx, inode, fallocZeroRange, 2*ChunkSize+4515328, 2607724, nil) + if c, ok := m.(compactor); ok { + c.compactChunk(inode, 2, true) + } + if st := m.Read(ctx, inode, 2, &slices); st != 0 { + t.Fatalf("read 1: %s", st) + } + // compact twice: 4515328+2607724-2338508 = 4784544; 8829056+1074933-2338508-4784544=2780937 + if len(slices) != 3 || slices[0].Len != 2338508 || slices[1].Len != 4784544 || slices[2].Len != 2780937 { + t.Fatalf("inode %d should be compacted, but have %d slices, size %d,%d,%d", + inode, len(slices), slices[0].Len, slices[1].Len, slices[2].Len) + } } func testConcurrentWrite(t *testing.T, m Meta) { diff --git a/pkg/meta/redis.go b/pkg/meta/redis.go index 2088e0eba40f..ecd790f3b653 100644 --- a/pkg/meta/redis.go +++ b/pkg/meta/redis.go @@ -2918,17 +2918,19 @@ func (m *redisMeta) compactChunk(inode Ino, indx uint32, force bool) { return } skipped := skipSome(ss) - var first, last *slice - if skipped > 0 { - first, last = ss[0], ss[skipped-1] - } - ss = ss[skipped:] - pos, size, slices := compactChunk(ss) - if len(ss) < 2 || size == 0 { + compacted := ss[skipped:] + pos, size, slices := compactChunk(compacted) + if len(compacted) < 2 || size == 0 { return } - if first != nil && last != nil && pos+size > first.pos && last.pos+last.len > pos { - panic(fmt.Sprintf("invalid compaction: skipped slices [%+v, %+v], pos %d, size %d", *first, *last, pos, size)) + for _, s := range ss[:skipped] { + if pos+size > s.pos && s.pos+s.len > pos { + var sstring string + for _, s := range ss { + sstring += fmt.Sprintf("\n%+v", *s) + } + panic(fmt.Sprintf("invalid compaction skipped %d, pos %d, size %d; slices: %s", skipped, pos, size, sstring)) + } } var id uint64 @@ -2936,11 +2938,11 @@ func (m *redisMeta) compactChunk(inode Ino, indx uint32, force bool) { if st != 0 { return } - logger.Debugf("compact %d:%d: skipped %d slices (%d bytes) %d slices (%d bytes)", inode, indx, skipped, pos, len(ss), size) + logger.Debugf("compact %d:%d: skipped %d slices (%d bytes) %d slices (%d bytes)", inode, indx, skipped, pos, len(compacted), size) err = m.newMsg(CompactChunk, slices, id) if err != nil { if !strings.Contains(err.Error(), "not exist") && !strings.Contains(err.Error(), "not found") { - logger.Warnf("compact %d %d with %d slices: %s", inode, indx, len(ss), err) + logger.Warnf("compact %d %d with %d slices: %s", inode, indx, len(compacted), err) } return } @@ -2948,13 +2950,13 @@ func (m *redisMeta) compactChunk(inode Ino, indx uint32, force bool) { var rs []*redis.IntCmd // trash disabled: check reference of slices trash := m.toTrash(0) if trash { - for _, s := range ss { + for _, s := range compacted { if s.id > 0 { buf = append(buf, m.encodeDelayedSlice(s.id, s.size)...) } } } else { - rs = make([]*redis.IntCmd, len(ss)) + rs = make([]*redis.IntCmd, len(compacted)) } key := m.chunkKey(inode, indx) errno := errno(m.txn(ctx, func(tx *redis.Tx) error { @@ -2983,7 +2985,7 @@ func (m *redisMeta) compactChunk(inode Ino, indx uint32, force bool) { pipe.HSet(ctx, m.delSlices(), fmt.Sprintf("%d_%d", id, time.Now().Unix()), buf) } } else { - for i, s := range ss { + for i, s := range compacted { if s.id > 0 { rs[i] = pipe.HIncrBy(ctx, m.sliceRefs(), m.sliceKey(s.id, s.size), -1) } @@ -3010,7 +3012,7 @@ func (m *redisMeta) compactChunk(inode Ino, indx uint32, force bool) { m.of.InvalidateChunk(inode, indx) m.cleanupZeroRef(m.sliceKey(id, size)) if !trash { - for i, s := range ss { + for i, s := range compacted { if s.id > 0 && rs[i].Err() == nil && rs[i].Val() < 0 { m.deleteSlice(s.id, s.size) } diff --git a/pkg/meta/sql.go b/pkg/meta/sql.go index e9594ba89e3d..57d9bc929f5a 100644 --- a/pkg/meta/sql.go +++ b/pkg/meta/sql.go @@ -2774,17 +2774,19 @@ func (m *dbMeta) compactChunk(inode Ino, indx uint32, force bool) { return } skipped := skipSome(ss) - var first, last *slice - if skipped > 0 { - first, last = ss[0], ss[skipped-1] - } - ss = ss[skipped:] - pos, size, slices := compactChunk(ss) - if len(ss) < 2 || size == 0 { + compacted := ss[skipped:] + pos, size, slices := compactChunk(compacted) + if len(compacted) < 2 || size == 0 { return } - if first != nil && last != nil && pos+size > first.pos && last.pos+last.len > pos { - panic(fmt.Sprintf("invalid compaction: skipped slices [%+v, %+v], pos %d, size %d", *first, *last, pos, size)) + for _, s := range ss[:skipped] { + if pos+size > s.pos && s.pos+s.len > pos { + var sstring string + for _, s := range ss { + sstring += fmt.Sprintf("\n%+v", *s) + } + panic(fmt.Sprintf("invalid compaction skipped %d, pos %d, size %d; slices: %s", skipped, pos, size, sstring)) + } } var id uint64 @@ -2792,18 +2794,18 @@ func (m *dbMeta) compactChunk(inode Ino, indx uint32, force bool) { if st != 0 { return } - logger.Debugf("compact %d:%d: skipped %d slices (%d bytes) %d slices (%d bytes)", inode, indx, skipped, pos, len(ss), size) + logger.Debugf("compact %d:%d: skipped %d slices (%d bytes) %d slices (%d bytes)", inode, indx, skipped, pos, len(compacted), size) err = m.newMsg(CompactChunk, slices, id) if err != nil { if !strings.Contains(err.Error(), "not exist") && !strings.Contains(err.Error(), "not found") { - logger.Warnf("compact %d %d with %d slices: %s", inode, indx, len(ss), err) + logger.Warnf("compact %d %d with %d slices: %s", inode, indx, len(compacted), err) } return } var buf []byte trash := m.toTrash(0) if trash { - for _, s := range ss { + for _, s := range compacted { if s.id > 0 { buf = append(buf, m.encodeDelayedSlice(s.id, s.size)...) } @@ -2835,7 +2837,7 @@ func (m *dbMeta) compactChunk(inode Ino, indx uint32, force bool) { } } } else { - for _, s_ := range ss { + for _, s_ := range compacted { if s_.id == 0 { continue } @@ -2871,7 +2873,7 @@ func (m *dbMeta) compactChunk(inode Ino, indx uint32, force bool) { } else if err == nil { m.of.InvalidateChunk(inode, indx) if !trash { - for _, s := range ss { + for _, s := range compacted { if s.id == 0 { continue } diff --git a/pkg/meta/tkv.go b/pkg/meta/tkv.go index 38b6d1f358e1..98ccf9260fd7 100644 --- a/pkg/meta/tkv.go +++ b/pkg/meta/tkv.go @@ -2399,17 +2399,19 @@ func (m *kvMeta) compactChunk(inode Ino, indx uint32, force bool) { return } skipped := skipSome(ss) - var first, last *slice - if skipped > 0 { - first, last = ss[0], ss[skipped-1] - } - ss = ss[skipped:] - pos, size, slices := compactChunk(ss) - if len(ss) < 2 || size == 0 { + compacted := ss[skipped:] + pos, size, slices := compactChunk(compacted) + if len(compacted) < 2 || size == 0 { return } - if first != nil && last != nil && pos+size > first.pos && last.pos+last.len > pos { - panic(fmt.Sprintf("invalid compaction: skipped slices [%+v, %+v], pos %d, size %d", *first, *last, pos, size)) + for _, s := range ss[:skipped] { + if pos+size > s.pos && s.pos+s.len > pos { + var sstring string + for _, s := range ss { + sstring += fmt.Sprintf("\n%+v", *s) + } + panic(fmt.Sprintf("invalid compaction skipped %d, pos %d, size %d; slices: %s", skipped, pos, size, sstring)) + } } var id uint64 @@ -2417,18 +2419,18 @@ func (m *kvMeta) compactChunk(inode Ino, indx uint32, force bool) { if st != 0 { return } - logger.Debugf("compact %d:%d: skipped %d slices (%d bytes) %d slices (%d bytes)", inode, indx, skipped, pos, len(ss), size) + logger.Debugf("compact %d:%d: skipped %d slices (%d bytes) %d slices (%d bytes)", inode, indx, skipped, pos, len(compacted), size) err = m.newMsg(CompactChunk, slices, id) if err != nil { if !strings.Contains(err.Error(), "not exist") && !strings.Contains(err.Error(), "not found") { - logger.Warnf("compact %d %d with %d slices: %s", inode, indx, len(ss), err) + logger.Warnf("compact %d %d with %d slices: %s", inode, indx, len(compacted), err) } return } var dsbuf []byte trash := m.toTrash(0) if trash { - for _, s := range ss { + for _, s := range compacted { if s.id > 0 { dsbuf = append(dsbuf, m.encodeDelayedSlice(s.id, s.size)...) } @@ -2450,7 +2452,7 @@ func (m *kvMeta) compactChunk(inode Ino, indx uint32, force bool) { tx.set(m.delSliceKey(time.Now().Unix(), id), dsbuf) } } else { - for _, s := range ss { + for _, s := range compacted { if s.id > 0 { tx.incrBy(m.sliceKey(s.id, s.size), -1) } @@ -2480,7 +2482,7 @@ func (m *kvMeta) compactChunk(inode Ino, indx uint32, force bool) { m.cleanupZeroRef(id, size) if !trash { var refs int64 - for _, s := range ss { + for _, s := range compacted { if s.id > 0 && m.client.txn(func(tx *kvTxn) error { refs = tx.incrBy(m.sliceKey(s.id, s.size), 0) return nil From fe546ffacd0418a5a145be54d7d2ee1e16e65b41 Mon Sep 17 00:00:00 2001 From: Sandy Xu Date: Thu, 30 May 2024 20:07:25 +0800 Subject: [PATCH 26/30] vfs: fix the issue that plock may be unlocked by mistake (#4910) --- pkg/vfs/vfs.go | 2 +- pkg/vfs/vfs_unix.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/vfs/vfs.go b/pkg/vfs/vfs.go index fabbe869ad63..7adf706d07f0 100644 --- a/pkg/vfs/vfs.go +++ b/pkg/vfs/vfs.go @@ -564,7 +564,7 @@ func (v *VFS) Release(ctx Context, ino Ino, fh uint64) { v.invalidateAttr(ino) } if locks&1 != 0 { - _ = v.Meta.Flock(ctx, ino, owner, F_UNLCK, false) + _ = v.Meta.Flock(ctx, ino, owner^fh, F_UNLCK, false) } } _ = v.Meta.Close(ctx, ino) diff --git a/pkg/vfs/vfs_unix.go b/pkg/vfs/vfs_unix.go index 701d3e57099d..f2b58e5e0189 100644 --- a/pkg/vfs/vfs_unix.go +++ b/pkg/vfs/vfs_unix.go @@ -292,7 +292,7 @@ func (v *VFS) Flock(ctx Context, ino Ino, fh uint64, owner uint64, typ uint32, b } h.addOp(ctx) defer h.removeOp(ctx) - err = v.Meta.Flock(ctx, ino, owner, typ, block) + err = v.Meta.Flock(ctx, ino, owner^fh, typ, block) if err == 0 { h.Lock() if typ == syscall.F_UNLCK { From cf2cc3bd3ab06cd94dccbd3645b48d335527cfdf Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 30 May 2024 23:11:21 +0800 Subject: [PATCH 27/30] upgrade golang.org/x/net to 0.23 (#4911) --- go.mod | 8 ++++---- go.sum | 16 ++++++++-------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/go.mod b/go.mod index 68320171ed53..e237a6b2fa44 100644 --- a/go.mod +++ b/go.mod @@ -67,12 +67,12 @@ require ( go.etcd.io/etcd/client/v3 v3.5.9 go.uber.org/automaxprocs v1.5.2 go.uber.org/zap v1.20.0 - golang.org/x/crypto v0.17.0 - golang.org/x/net v0.17.0 + golang.org/x/crypto v0.21.0 + golang.org/x/net v0.23.0 golang.org/x/oauth2 v0.7.0 golang.org/x/sync v0.2.0 - golang.org/x/sys v0.15.0 - golang.org/x/term v0.15.0 + golang.org/x/sys v0.18.0 + golang.org/x/term v0.18.0 golang.org/x/text v0.14.0 google.golang.org/api v0.94.0 google.golang.org/protobuf v1.30.0 diff --git a/go.sum b/go.sum index 07dda592e88d..8b79467f3c6a 100644 --- a/go.sum +++ b/go.sum @@ -1123,8 +1123,8 @@ golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0 golang.org/x/crypto v0.0.0-20220112180741-5e0467b6c7ce/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= -golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= -golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= +golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -1225,8 +1225,8 @@ golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= -golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= -golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= +golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= +golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190402181905-9f3314589c9a/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -1369,15 +1369,15 @@ golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= -golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= +golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U= -golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= -golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= +golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8= +golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= From a18582680e83e18ec532bebc0281872ee0f623d8 Mon Sep 17 00:00:00 2001 From: Sandy Xu Date: Tue, 4 Jun 2024 18:27:48 +0800 Subject: [PATCH 28/30] meta: fix the error code of baseMeta.Open (#4920) --- pkg/meta/base.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/pkg/meta/base.go b/pkg/meta/base.go index 1a8d9702566b..db70363cdb66 100644 --- a/pkg/meta/base.go +++ b/pkg/meta/base.go @@ -1648,22 +1648,23 @@ func (m *baseMeta) touchAtime(ctx Context, inode Ino, attr *Attr) { } } -func (m *baseMeta) Open(ctx Context, inode Ino, flags uint32, attr *Attr) (rerr syscall.Errno) { +func (m *baseMeta) Open(ctx Context, inode Ino, flags uint32, attr *Attr) (st syscall.Errno) { if m.conf.ReadOnly && flags&(syscall.O_WRONLY|syscall.O_RDWR|syscall.O_TRUNC|syscall.O_APPEND) != 0 { return syscall.EROFS } defer func() { - if rerr == 0 { + if st == 0 { m.touchAtime(ctx, inode, attr) } }() if m.conf.OpenCache > 0 && m.of.OpenCheck(inode, attr) { return 0 } - var err syscall.Errno // attr may be valid, see fs.Open() if attr != nil && !attr.Full { - err = m.GetAttr(ctx, inode, attr) + if st = m.GetAttr(ctx, inode, attr); st != 0 { + return + } } var mmask uint8 = 0 switch flags & (syscall.O_RDONLY | syscall.O_WRONLY | syscall.O_RDWR) { @@ -1674,7 +1675,7 @@ func (m *baseMeta) Open(ctx Context, inode Ino, flags uint32, attr *Attr) (rerr case syscall.O_RDWR: mmask = MODE_MASK_R | MODE_MASK_W } - if rerr = m.Access(ctx, inode, mmask, attr); rerr != 0 { + if st = m.Access(ctx, inode, mmask, attr); st != 0 { return } @@ -1691,10 +1692,8 @@ func (m *baseMeta) Open(ctx Context, inode Ino, flags uint32, attr *Attr) (rerr return syscall.EPERM } } - if err == 0 { - m.of.Open(inode, attr) - } - return err + m.of.Open(inode, attr) + return 0 } func (m *baseMeta) InvalidateChunkCache(ctx Context, inode Ino, indx uint32) syscall.Errno { From 2647b03980ba98ff0566c7b0de56c8df345a7955 Mon Sep 17 00:00:00 2001 From: zhijian Date: Wed, 5 Jun 2024 17:48:26 +0800 Subject: [PATCH 29/30] gateway: list object follow symlink (#4919) --- .github/scripts/command/sync_minio.sh | 15 ++++++++++++++- pkg/gateway/gateway.go | 8 ++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/.github/scripts/command/sync_minio.sh b/.github/scripts/command/sync_minio.sh index db556bf2d767..80bd4b917c84 100755 --- a/.github/scripts/command/sync_minio.sh +++ b/.github/scripts/command/sync_minio.sh @@ -79,11 +79,12 @@ test_sync_external_link(){ [ -z $(./mc cat myminio/myjfs/hello) ] } +# list object should be skipped when encountering a loop symlink test_sync_loop_symlink(){ prepare_test touch hello ln -s hello /jfs/hello - ./juicefs sync minio://minioadmin:minioadmin@localhost:9005/myjfs/ minio://minioadmin:minioadmin@localhost:9000/myjfs/ && exit 1 || true + ./juicefs sync minio://minioadmin:minioadmin@localhost:9005/myjfs/ minio://minioadmin:minioadmin@localhost:9000/myjfs/ rm -rf /jfs/hello ./juicefs sync minio://minioadmin:minioadmin@localhost:9005/myjfs/ minio://minioadmin:minioadmin@localhost:9000/myjfs/ } @@ -107,6 +108,18 @@ test_sync_deep_symlink(){ done } +test_sync_list_object_symlink(){ + prepare_test + cd /jfs + mkdir dir1 + mkdir -p dir2/src_dir + echo abc > dir2/src_dir/afile + ln -s ./../dir2/src_dir dir1/symlink_dir + cd - + ./juicefs sync minio://minioadmin:minioadmin@localhost:9005/myjfs/dir1/ minio://minioadmin:minioadmin@localhost:9000/myjfs/dir3/ + ./mc cat myminio/myjfs/dir3/symlink_dir/afile | grep abc || (echo "content should be abc" && exit 1) +} + prepare_test(){ umount_jfs /jfs $META_URL python3 .github/scripts/flush_meta.py $META_URL diff --git a/pkg/gateway/gateway.go b/pkg/gateway/gateway.go index cad4f1ce5ac0..3cb0c7451909 100644 --- a/pkg/gateway/gateway.go +++ b/pkg/gateway/gateway.go @@ -303,6 +303,14 @@ func (n *jfsObjects) listDirFactory() minio.ListDirFunc { if root && len(fi.Name()) == len(metaBucket) && fi.Name() == metaBucket { continue } + if stat, ok := fi.(*fs.FileStat); ok && stat.IsSymlink() { + var err syscall.Errno + p := n.path(bucket, prefixDir, fi.Name()) + if fi, err = n.fs.Stat(mctx, p); err != 0 { + logger.Errorf("stat %s: %s", p, err) + continue + } + } if fi.IsDir() { entries = append(entries, fi.Name()+sep) } else { From 9abfc79a34557515a53980436aafb8129bf4fd75 Mon Sep 17 00:00:00 2001 From: Changxin Miao Date: Thu, 13 Jun 2024 18:08:06 +0800 Subject: [PATCH 30/30] Manager's IP address is never set in sync cluster mode (#4939) Signed-off-by: Changxin Miao --- pkg/sync/cluster.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/sync/cluster.go b/pkg/sync/cluster.go index e0e60d509d57..344424f427a8 100644 --- a/pkg/sync/cluster.go +++ b/pkg/sync/cluster.go @@ -193,6 +193,7 @@ func startManager(config *Config, tasks <-chan object.Object) (string, error) { if ip == "" { return "", fmt.Errorf("no local ip found") } + addr = ip } if !strings.Contains(addr, ":") {