How to Recurse Data Lake Folders with Synapse Spark Pools
mssparkutils file listing capabilities don't come with a recursion flag - probably by design.
One of the gotchas I ran into when going through a recent OpenHack was manually clicking through a Data Lake with a hefty number of file sources and partitions. I was in my Synapse notebook crunched for time, wishing there was a way to recursively list all files from a root directory, to figure out what DataFrames I needed to import, what the schema looks like, what the transformation pipeline is going to look like etc.
mssparkutils.fs.ls('path') command returns something like this:
We're also left with the default formatting above, as the
display() command is geared towards DataFrames. Calling
AssertionError: dataType <py4j.java_gateway.JavaMember object at 0x7fa481050358> should be an instance of <class 'pyspark.sql.types.DataType'> Traceback (most recent call last): File "/home/trusted-service-user/cluster-env/env/lib/python3.6/site-packages/notebookutils/visualization/display.py", line 96, in display df = _convert(data)
mssparkutils.fs.ls(root) returns a list object instead.
The following functions functionally achieve recursion and pretty printing in Synapse Spark Pool notebooks:
def deep_ls(path: str, max_depth=1): """ List all files and folders in specified path and subfolders within maximum recursion depth. """ # List all files in path li = mssparkutils.fs.ls(path) # Return all files for x in li: if x.size != 0: yield x # If the max_depth has not been reached, start # listing files and folders in subdirectories if max_depth > 1: for x in li: if x.size != 0: continue for y in deep_ls(x.path, max_depth - 1): yield y # If max_depth has been reached, # return the folders else: for x in li: if x.size == 0: yield x def convertfiles2df(files): """ Converts FileInfo object into Pandas DataFrame to enable display """ # Disable Arrow-based transfers since the Pandas DataFrame is tiny spark.conf.set("spark.sql.execution.arrow.enabled", "false") schema = ['path','name','size'] df = pd.DataFrame([[getattr(i,j) for j in schema] for i in files], columns = schema).sort_values('path') return(df)
convertfiles2df, we're basically taking the list returned by
mssparkutils.fs.ls, and converting it into DataFrame, so it works with the notebook
# Get files files = list(deep_ls(root, max_depth=20)) # Display with Pretty Printing display(convertfiles2df(files))
The example call above returns:
Much easier on the eye, and saves going through all the folders manually!