Skip to content Skip to sidebar Skip to footer

Elasticsearch Analyze() Not Compatible With Spark In Python?

I'm using the elasticsearch-py client within PySpark using Python 3 and I'm running into a problem using the analyze() function with ES in conjunction with an RDD. In particular, e

Solution 1:

Essentially the Elasticsearch client is not serializable. So what you need to do is create an instance of the client for each partition, and process them:

def get_tokens(part): es = Elasticsearch() yield [es.indices.analyze(text=x)['tokens'][0] for x in part] rdd = sc.parallelize([['the quick brown fox'], ['brown quick dog']], numSlices=2) rdd.mapPartitions(lambda p: get_tokens(p)).collect()

Should give the following result: Out[17]: [[{u'end_offset': 3, u'position': 1, u'start_offset': 0, u'token': u'the', u'type': u'<ALPHANUM>'}], [{u'end_offset': 5, u'position': 1, u'start_offset': 0, u'token': u'brown', u'type': u'<ALPHANUM>'}]]

Note that for large data sets, this is going to be very inefficient as it involves a REST call to ES for each element in the dataset.


Post a Comment for "Elasticsearch Analyze() Not Compatible With Spark In Python?"