Passing Multiple Parameters in PySpark MapPartitions
Alternate title: k-Nearest Neighbours (kNN) in PySpark
You can follow the story of what I wanted to do and how I did it. Or jump to the solution.
Situation
- The PySpark MLlib (DataFrame-based, RDD-based) does not support kNN algorithm.
- The multiple SO questions (1, 2, 3) did not help.
- I did not get a chance to try the available open-source code samples (1, 2).
- There were two stable-ish solutions available:
- Use Spotify’s library called Annoy.
- Use Scikit-learn’s implementation of kNN.
- Both methods only use a single node. So, the benefit of Spark’s distributed processing goes out of the window.
- That’s a deal breaker because I have a large data set (~20+ mill records) with long vectors.
- I picked Annoy because I found it first. I discuss at the end why Scikit could be more performant.
The task is to parallelise the Annoy code across multiple nodes of the Spark cluster.
Solution
- A hint about the solution is present in this SO Answer.
- For both Annoy and Scikit, the approach is as follows:
- Build the index or fit the model on a single node. Nothing is distributed here.
- Broadcast the index (or model) across the cluster to find the nearest neighbours of a given vector.
Building the index
- I first tried to use spark-annoy. It is in Scala. The benefit of this library was that we could build the index in a distributed manner. Unfortunately, I could not figure it out.
- The default was to use the iterative approach of building the index on a single node.
- The following is the method to build the index:
1
2
3
4
5
6
7
8
9
import pandas as pd
from annoy import AnnoyIndex
def build_annoy_index(vectors: pd.Series, dim: int, num_trees: int = 100):
t = AnnoyIndex(dim, metric='angular')
for index, vector in vectors.items():
t.add_item(index, vector)
t.build(num_trees)
return t
- The
for
loop makes it a long-running process if the data is large. Sadly, it is unavoidable. - Note that the type of
vectors
ispd.Series
. I used pandas to get the goodness of indexes. It can be alist
or any other iterable. It should be an iterable irrespective of its type. That means either of the following:- Run
.collect()
on the Spark DataFrame; - Turn the spark DataFrame into a pandas DataFrame.
- Run
- That will bring all the data to a single node. So, it can potentially lead to OOM error.
Finding Nearest Neighbours
- We can parallelise this step.
- We have to broadcast the Annoy index across all the nodes of the Spark cluster.
-
The Annoy indexes are memory mapped.
It also creates large read-only file-based data structures that are mmapped into memory so that many processes may share the same data.
- It will fail if we broadcast it using
sc.broadcast(t)
. This SO answer discusses this issue. - The solution: write the index to a file and send the file to all the workers to load.
- Use
sc.addFile()
to send the file to the workers. - Use
SparkFiles.get()
to get the file path and load it in the worker node. - Here is the method to load the Annoy index in the worker nodes:
1
2
3
4
5
6
def load_annoy_index(index_file: str, dim: int):
from annoy import AnnoyIndex
index = AnnoyIndex(dim, metric='angular')
index.load(SparkFiles.get(index_file))
return index
- I call the below method to get the nearest neighbours of a set of index ids:
1
2
3
4
5
6
7
8
9
def find_neighbours(index_file, top_n, dim, item_batch):
index = load_annoy_index(index_file, dim)
# get similar items
sim_items = []
for item in item_batch:
top_n_items = index.get_nns_by_item(i=item, n=top_n)
sim_items.append((item, list(enumerate(top_n_items))))
return sim_items
- The
item_batch
is the list of Annoy index ids. - The function returns the list of
(annoy_item_index, [(rank_1, sim_item_1), ..., (rank_n, sim_item_n)])
. - For example, here is one such item from the list:
(248, [[0, 248], [9, 284764], [3, 86148], [6, 265812], [7, 508155], [2, 48388], [10, 58786], [1, 154653], [5, 364419], [4, 4444], [8, 89955]])
- To validate the function, the most similar item corresponding to the query item should be itself. In the above example, the query index
248
ranks0
in the top similar items. - To get the nearest neighbours by vectors, you pass the vectors in the
item_batch
and use theget_nns_by_vector
method. - I keep my
find_neighbours
method generic for the following parameters:- Annoy index file name: I can have any name based on my use case.
- Top n items: number of top items I want to retrieve.
- Dim: The dimension of the vector can vary depending on the various ML techniques (LDA, DL, etc.)
- We have only written the method to find the nearest neighbours. How do we call it in a distributed manner? This SO answer answers that too.
- The answer is
mapPartitions
. This method will apply the passed function to each RDD partition. Go through the answers of this SO question to know more in detail. - I will pass
find_neighbours
to themapPartitions
, and it will return an RDD with the nearest neighbours list. -
But my
find_neighbours
implementation takes four parameters, and there is no way of sending**args
inside themapPartitions
. -
I use the inbuilt python
partial()
function from thefunctools
module.The
partial()
is used for partial function application which “freezes” some portion of a function’s arguments and/or keywords resulting in a new object with a simplified signature. - Here is how my final function looks:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def build_index_get_similar_items(vector_df, index_file, top_n):
vectors = vector_df.vector
sparkvector_ids = sc.parallelize(vector_df.index.values)
# build and save index
index_file = build_save_annoy_index(vectors, dim, index_file, BASE_DIR)
print(index_file)
# add index file to the driver files
sc.addFile(index_file)
# get similar items
find_neighbours_ = partial(find_neighbours, index_file, top_n, dim)
similar_items = sparkvector_ids.mapPartitions(find_neighbours_).collect()
return similar_items
- The
build_save_annoy_index()
method builds the index, saves it to a file, and returns the file path. - Finally, we see the use of
sc.addFile(index_file)
. - The
find_neighbours_()
is the partial function. We froze theindex_file
,top_n
, anddim
. This function now only expects a single RDD as input. And this is what we wanted for themapPartitions()
method.
Saving Results
- I take the
similar_items
list and convert it into a pandas DataFrame. - Map ALL the Annoy index ids with the actual item ids. That includes all the index ids of the top-n similar items list.
- Convert the pandas DataFrame to a PySpark DataFrame.
- Save the PySpark DataFrame into a delta table.
Result
- I was able to parallelise the kNN search based on Annoy using
mapPartitions
. - On ~500k records, the run time was down from 8 minutes to 2 minutes.
- On ~10 million records (with an index built from ~500k records), the run time was ~1 hour.
- On ~10 million records (with an index built from ~10 million records), I got an OOM error. 🥲
What’s Next
- Find the reason it is going OOM.
- Converting the pandas DataFrame to PySpark DataFrame is expensive. I want to explore if I can directly go from pandas DataFrame to the delta. Ref: 1, 2.
- Since the Scikit has vectorised training and inferencing, its kNN would likely be faster. This post shows how to do it. I would probably replace the
map
withmapPartitions
.