add move/delete permission flags
This commit is contained in:
@@ -90,7 +90,7 @@ class TestHttpCli(unittest.TestCase):
|
||||
if not vol.startswith(top):
|
||||
continue
|
||||
|
||||
mode = vol[-2]
|
||||
mode = vol[-2].replace("a", "rw")
|
||||
usr = vol[-1]
|
||||
if usr == "a":
|
||||
usr = ""
|
||||
@@ -99,7 +99,7 @@ class TestHttpCli(unittest.TestCase):
|
||||
vol += "/"
|
||||
|
||||
top, sub = vol.split("/", 1)
|
||||
vcfg.append("{0}/{1}:{1}:{2}{3}".format(top, sub, mode, usr))
|
||||
vcfg.append("{0}/{1}:{1}:{2},{3}".format(top, sub, mode, usr))
|
||||
|
||||
pprint.pprint(vcfg)
|
||||
|
||||
|
||||
@@ -68,6 +68,11 @@ class TestVFS(unittest.TestCase):
|
||||
def log(self, src, msg, c=0):
|
||||
pass
|
||||
|
||||
def assertAxs(self, dct, lst):
|
||||
t1 = list(sorted(dct.keys()))
|
||||
t2 = list(sorted(lst))
|
||||
self.assertEqual(t1, t2)
|
||||
|
||||
def test(self):
|
||||
td = os.path.join(self.td, "vfs")
|
||||
os.mkdir(td)
|
||||
@@ -88,53 +93,53 @@ class TestVFS(unittest.TestCase):
|
||||
self.assertEqual(vfs.nodes, {})
|
||||
self.assertEqual(vfs.vpath, "")
|
||||
self.assertEqual(vfs.realpath, td)
|
||||
self.assertEqual(vfs.uread, ["*"])
|
||||
self.assertEqual(vfs.uwrite, ["*"])
|
||||
self.assertAxs(vfs.axs.uread, ["*"])
|
||||
self.assertAxs(vfs.axs.uwrite, ["*"])
|
||||
|
||||
# single read-only rootfs (relative path)
|
||||
vfs = AuthSrv(Cfg(v=["a/ab/::r"]), self.log).vfs
|
||||
self.assertEqual(vfs.nodes, {})
|
||||
self.assertEqual(vfs.vpath, "")
|
||||
self.assertEqual(vfs.realpath, os.path.join(td, "a", "ab"))
|
||||
self.assertEqual(vfs.uread, ["*"])
|
||||
self.assertEqual(vfs.uwrite, [])
|
||||
self.assertAxs(vfs.axs.uread, ["*"])
|
||||
self.assertAxs(vfs.axs.uwrite, [])
|
||||
|
||||
# single read-only rootfs (absolute path)
|
||||
vfs = AuthSrv(Cfg(v=[td + "//a/ac/../aa//::r"]), self.log).vfs
|
||||
self.assertEqual(vfs.nodes, {})
|
||||
self.assertEqual(vfs.vpath, "")
|
||||
self.assertEqual(vfs.realpath, os.path.join(td, "a", "aa"))
|
||||
self.assertEqual(vfs.uread, ["*"])
|
||||
self.assertEqual(vfs.uwrite, [])
|
||||
self.assertAxs(vfs.axs.uread, ["*"])
|
||||
self.assertAxs(vfs.axs.uwrite, [])
|
||||
|
||||
# read-only rootfs with write-only subdirectory (read-write for k)
|
||||
vfs = AuthSrv(
|
||||
Cfg(a=["k:k"], v=[".::r:ak", "a/ac/acb:a/ac/acb:w:ak"]),
|
||||
Cfg(a=["k:k"], v=[".::r:rw,k", "a/ac/acb:a/ac/acb:w:rw,k"]),
|
||||
self.log,
|
||||
).vfs
|
||||
self.assertEqual(len(vfs.nodes), 1)
|
||||
self.assertEqual(vfs.vpath, "")
|
||||
self.assertEqual(vfs.realpath, td)
|
||||
self.assertEqual(vfs.uread, ["*", "k"])
|
||||
self.assertEqual(vfs.uwrite, ["k"])
|
||||
self.assertAxs(vfs.axs.uread, ["*", "k"])
|
||||
self.assertAxs(vfs.axs.uwrite, ["k"])
|
||||
n = vfs.nodes["a"]
|
||||
self.assertEqual(len(vfs.nodes), 1)
|
||||
self.assertEqual(n.vpath, "a")
|
||||
self.assertEqual(n.realpath, os.path.join(td, "a"))
|
||||
self.assertEqual(n.uread, ["*", "k"])
|
||||
self.assertEqual(n.uwrite, ["k"])
|
||||
self.assertAxs(n.axs.uread, ["*", "k"])
|
||||
self.assertAxs(n.axs.uwrite, ["k"])
|
||||
n = n.nodes["ac"]
|
||||
self.assertEqual(len(vfs.nodes), 1)
|
||||
self.assertEqual(n.vpath, "a/ac")
|
||||
self.assertEqual(n.realpath, os.path.join(td, "a", "ac"))
|
||||
self.assertEqual(n.uread, ["*", "k"])
|
||||
self.assertEqual(n.uwrite, ["k"])
|
||||
self.assertAxs(n.axs.uread, ["*", "k"])
|
||||
self.assertAxs(n.axs.uwrite, ["k"])
|
||||
n = n.nodes["acb"]
|
||||
self.assertEqual(n.nodes, {})
|
||||
self.assertEqual(n.vpath, "a/ac/acb")
|
||||
self.assertEqual(n.realpath, os.path.join(td, "a", "ac", "acb"))
|
||||
self.assertEqual(n.uread, ["k"])
|
||||
self.assertEqual(n.uwrite, ["*", "k"])
|
||||
self.assertAxs(n.axs.uread, ["k"])
|
||||
self.assertAxs(n.axs.uwrite, ["*", "k"])
|
||||
|
||||
# something funky about the windows path normalization,
|
||||
# doesn't really matter but makes the test messy, TODO?
|
||||
@@ -173,24 +178,24 @@ class TestVFS(unittest.TestCase):
|
||||
|
||||
# admin-only rootfs with all-read-only subfolder
|
||||
vfs = AuthSrv(
|
||||
Cfg(a=["k:k"], v=[".::ak", "a:a:r"]),
|
||||
Cfg(a=["k:k"], v=[".::rw,k", "a:a:r"]),
|
||||
self.log,
|
||||
).vfs
|
||||
self.assertEqual(len(vfs.nodes), 1)
|
||||
self.assertEqual(vfs.vpath, "")
|
||||
self.assertEqual(vfs.realpath, td)
|
||||
self.assertEqual(vfs.uread, ["k"])
|
||||
self.assertEqual(vfs.uwrite, ["k"])
|
||||
self.assertAxs(vfs.axs.uread, ["k"])
|
||||
self.assertAxs(vfs.axs.uwrite, ["k"])
|
||||
n = vfs.nodes["a"]
|
||||
self.assertEqual(len(vfs.nodes), 1)
|
||||
self.assertEqual(n.vpath, "a")
|
||||
self.assertEqual(n.realpath, os.path.join(td, "a"))
|
||||
self.assertEqual(n.uread, ["*"])
|
||||
self.assertEqual(n.uwrite, [])
|
||||
self.assertEqual(vfs.can_access("/", "*"), [False, False])
|
||||
self.assertEqual(vfs.can_access("/", "k"), [True, True])
|
||||
self.assertEqual(vfs.can_access("/a", "*"), [True, False])
|
||||
self.assertEqual(vfs.can_access("/a", "k"), [True, False])
|
||||
self.assertAxs(n.axs.uread, ["*"])
|
||||
self.assertAxs(n.axs.uwrite, [])
|
||||
self.assertEqual(vfs.can_access("/", "*"), [False, False, False, False])
|
||||
self.assertEqual(vfs.can_access("/", "k"), [True, True, False, False])
|
||||
self.assertEqual(vfs.can_access("/a", "*"), [True, False, False, False])
|
||||
self.assertEqual(vfs.can_access("/a", "k"), [True, False, False, False])
|
||||
|
||||
# breadth-first construction
|
||||
vfs = AuthSrv(
|
||||
@@ -247,26 +252,26 @@ class TestVFS(unittest.TestCase):
|
||||
./src
|
||||
/dst
|
||||
r a
|
||||
a asd
|
||||
rw asd
|
||||
"""
|
||||
).encode("utf-8")
|
||||
)
|
||||
|
||||
au = AuthSrv(Cfg(c=[cfg_path]), self.log)
|
||||
self.assertEqual(au.user["a"], "123")
|
||||
self.assertEqual(au.user["asd"], "fgh:jkl")
|
||||
self.assertEqual(au.acct["a"], "123")
|
||||
self.assertEqual(au.acct["asd"], "fgh:jkl")
|
||||
n = au.vfs
|
||||
# root was not defined, so PWD with no access to anyone
|
||||
self.assertEqual(n.vpath, "")
|
||||
self.assertEqual(n.realpath, None)
|
||||
self.assertEqual(n.uread, [])
|
||||
self.assertEqual(n.uwrite, [])
|
||||
self.assertAxs(n.axs.uread, [])
|
||||
self.assertAxs(n.axs.uwrite, [])
|
||||
self.assertEqual(len(n.nodes), 1)
|
||||
n = n.nodes["dst"]
|
||||
self.assertEqual(n.vpath, "dst")
|
||||
self.assertEqual(n.realpath, os.path.join(td, "src"))
|
||||
self.assertEqual(n.uread, ["a", "asd"])
|
||||
self.assertEqual(n.uwrite, ["asd"])
|
||||
self.assertAxs(n.axs.uread, ["a", "asd"])
|
||||
self.assertAxs(n.axs.uwrite, ["asd"])
|
||||
self.assertEqual(len(n.nodes), 0)
|
||||
|
||||
os.unlink(cfg_path)
|
||||
|
||||
Reference in New Issue
Block a user