Get function to run in parallel with pyspark.mllib.linalg.distributed matrix

Keywords´╝Ü apache-spark pyspark apache-spark-mllib

Question: 

The following reproducible code does what I want, but is slow. I am not sure if I am correctly initiating the function map_simScore() to get the correct level of parallelism.

Initializing the test DataFrame with spark.range(0, 25000, 1) results in a DataFrame with around 76 MB distributed over 3 partitions.

The cluster I have has 3 worker nodes with 16 cores and 62 GB of memory each.

from pyspark.sql import SparkSession,
from pyspark.sql import functions as F,
from pyspark.sql import types as T
from pyspark.ml.feature import RegexTokenizer
from pyspark.ml.feature import Word2Vec    

spark = (
    SparkSession.builder
    .master('yarn')
    .appName("linalg_test")
    .getOrCreate()
)    

placeholder = (
    r"Lorem ipsum dolor sit amet consectetur adipiscing elit "
    r"sed do eiusmod tempor incididunt ut labore et dolore magna aliqua "
    r"Ut enim ad minim veniam quis nostrud exercitation ullamco laboris "
    r"nisi ut aliquip ex ea commodo consequat Duis aute irure dolor in "
    r"reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla "
    r"pariatur Excepteur sint occaecat cupidatat non proident sunt in "
    r"culpa qui officia deserunt mollit anim id est laborum"
)

win = (
    W.partitionBy(F.col('doc_id'))
    .rowsBetween(W.unboundedPreceding, W.currentRow)
)

df_SO = (
    spark.range(0, 25000, 1)
    .withColumn('rand1', (F.rand(seed=12345) * 50).cast(T.IntegerType()))
    .withColumn('doc_id', F.floor(F.col('rand1')/4) )
    .withColumn('int', F.lit(1))
    .withColumn('line_id', F.sum('int').over(win))
    .withColumn('rand2', (F.rand(seed=54321) * 50).cast(T.IntegerType()))
    .withColumn('rand3', (F.rand(seed=51432) * 100).cast(T.IntegerType()))
    .withColumn('text', F.lit(placeholder))
    .withColumn('text', F.expr("substring(text, rand2, rand3)" ))
    .withColumn('text', F.split(F.col('text'), ' '))
    .where(F.col('rand2') > 3)
    .select('doc_id', 'line_id', 'text')
)       

word2Vec = (
    Word2Vec()
    .setInputCol("text")
    .setOutputCol("vector")
    .setMinCount(1)
    .setNumPartitions(5)
    .setStepSize(0.1)
    .setWindowSize(10)
    .setVectorSize(400)
    .setMaxSentenceLength(1)
)

model_SO = word2Vec.fit(df_SO)

df_SO2 = model_SO.transform(df_SO)


df_SO2.rdd.getNumPartitions()

df_SO2 = df_SO2.repartition(3, 'doc_id')

spark.catalog.clearCache()
df_SO2.createOrReplaceTempView("df_SO2")
spark.catalog.cacheTable("df_SO2")
df_SO2.limit(1).count()


doc_ids = (
    df_SO
    .groupBy('doc_id')
    .agg(F.count(F.col('doc_id')).alias('numLines') )
    .toPandas()
)


def map_simScore(id):
    dftmp = df_SO2.filter(F.col('doc_id') == id)
    dfcnt = float(doc_ids[doc_ids.doc_id.eq(id)]['numLines'].values[0])

    stats = (
        IndexedRowMatrix(dftmp.rdd.map(lambda row: (row.line_id, row.vector.toArray())))
        .toBlockMatrix()
        .transpose()
        .toIndexedRowMatrix()
        .columnSimilarities()
        .toRowMatrix()
        .computeColumnSummaryStatistics()
    )

    SimScore = len(stats.max()[np.where(stats.max() > 0.8)]) / dfcnt

    return (id, SimScore)


doc_ids.doc_id.map(map_invId_simScore)

Also tried the following, which gets the same results, but does not necessarily run any faster and is also initiated the same way, which may be the problem.

def map_simScore2(id):
    dftmp = df_SO2.filter(F.col('doc_id') == id)
    dfcnt = float(doc_ids[doc_ids.doc_id.eq(id)]['numLines'].values[0])

    SimScore = (
        spark.createDataFrame(
            IndexedRowMatrix(dftmp.rdd.map(lambda row: (row.line_id, row.vector.toArray())))
            .toBlockMatrix()
            .transpose()
            .toIndexedRowMatrix()
            .columnSimilarities()
            .entries)
        .filter(F.col('value') > 0.80)
        .groupBy('j')
        .agg(F.count('j'))
        .count() / dfcnt
    )

    return (id, SimScore)


doc_ids.doc_id.map(map_invId_simScore2)

Answers: