Get function to run in parallel with pyspark.mllib.linalg.distributed matrix

Keywords´╝Ü apache-spark pyspark apache-spark-mllib


The following reproducible code does what I want, but is slow. I am not sure if I am correctly initiating the function map_simScore() to get the correct level of parallelism.

Initializing the test DataFrame with spark.range(0, 25000, 1) results in a DataFrame with around 76 MB distributed over 3 partitions.

The cluster I have has 3 worker nodes with 16 cores and 62 GB of memory each.

from pyspark.sql import SparkSession,
from pyspark.sql import functions as F,
from pyspark.sql import types as T
from import RegexTokenizer
from import Word2Vec    

spark = (

placeholder = (
    r"Lorem ipsum dolor sit amet consectetur adipiscing elit "
    r"sed do eiusmod tempor incididunt ut labore et dolore magna aliqua "
    r"Ut enim ad minim veniam quis nostrud exercitation ullamco laboris "
    r"nisi ut aliquip ex ea commodo consequat Duis aute irure dolor in "
    r"reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla "
    r"pariatur Excepteur sint occaecat cupidatat non proident sunt in "
    r"culpa qui officia deserunt mollit anim id est laborum"

win = (
    .rowsBetween(W.unboundedPreceding, W.currentRow)

df_SO = (
    spark.range(0, 25000, 1)
    .withColumn('rand1', (F.rand(seed=12345) * 50).cast(T.IntegerType()))
    .withColumn('doc_id', F.floor(F.col('rand1')/4) )
    .withColumn('int', F.lit(1))
    .withColumn('line_id', F.sum('int').over(win))
    .withColumn('rand2', (F.rand(seed=54321) * 50).cast(T.IntegerType()))
    .withColumn('rand3', (F.rand(seed=51432) * 100).cast(T.IntegerType()))
    .withColumn('text', F.lit(placeholder))
    .withColumn('text', F.expr("substring(text, rand2, rand3)" ))
    .withColumn('text', F.split(F.col('text'), ' '))
    .where(F.col('rand2') > 3)
    .select('doc_id', 'line_id', 'text')

word2Vec = (

model_SO =

df_SO2 = model_SO.transform(df_SO)


df_SO2 = df_SO2.repartition(3, 'doc_id')


doc_ids = (
    .agg(F.count(F.col('doc_id')).alias('numLines') )

def map_simScore(id):
    dftmp = df_SO2.filter(F.col('doc_id') == id)
    dfcnt = float(doc_ids[doc_ids.doc_id.eq(id)]['numLines'].values[0])

    stats = (
        IndexedRowMatrix( row: (row.line_id, row.vector.toArray())))

    SimScore = len(stats.max()[np.where(stats.max() > 0.8)]) / dfcnt

    return (id, SimScore)

Also tried the following, which gets the same results, but does not necessarily run any faster and is also initiated the same way, which may be the problem.

def map_simScore2(id):
    dftmp = df_SO2.filter(F.col('doc_id') == id)
    dfcnt = float(doc_ids[doc_ids.doc_id.eq(id)]['numLines'].values[0])

    SimScore = (
            IndexedRowMatrix( row: (row.line_id, row.vector.toArray())))
        .filter(F.col('value') > 0.80)
        .count() / dfcnt

    return (id, SimScore)