# PySpark Snippets

# Basic Code Snippets

Count distincts

df.select('col_name').distinct.show()

Fill NA

df.fillna({'col':'fill_value'})

Pivot DataFrame

df.groupby("column").pivot("column2").agg(count("column3"))

# Window Function

Useful Stackoverflow Question (opens new window)

w = (Window()
     .partitionBy(col(''),col(''),...)
     .orderBy(col(''),col(''),...)
     .rowsBetween(Window.unboundPreceding,0)
)

df = df.withColumn('new_name',count('col_name').over(w))

# Forward Filling in PySpark

Taken from this link (opens new window)

from pyspark.sql import Window
from pyspark.sql.functions import last

# define the window
window = Window.partitionBy('location')\
               .orderBy('time')\
               .rowsBetween(-sys.maxsize, 0)

# define the forward-filled column
filled_column = last(spark_df['temperature'], ignorenulls=True).over(window)

# do the fill
spark_df_filled = spark_df.withColumn('temp_filled_spark', filled_column)

# show off our glorious achievements
spark_df_filled.orderBy('time', 'location').show(10)  

# Applying a mapping from a dictionary

from itertools import chain
from pyspark.sql.functions import *

mapping_expr = create_map([lit(x) for x in chain(*mapping_dict.items())])
df = df.withColumn("result", mapping_expr.getItem(col("column")))

# Rolling Window Structure

from pyspark.sql.window import Window

windowSpec = \
  Window \
    .partitionBy(...) \
    .orderBy(...)

# UDFs

As decorator

@udf("string")
def decoratorUDF(input_value:str) -> str:
    return input_value[0]

As a function


def functionUDF(input_value:str) -> str:
    return input_value[0]

# Wrap it for use
myUDF = udf(functionUDF)

# Use it
df.select(myUDF(col("my_column_here")))

SQL

def functionUDF(input_value:str) -> str:
    return input_value[0]

# Register it for use
spark.udf.register("sql_udf", functionUDF)

# Use it
SELECT sql_udf(my_column_here) as udfcolumname from <table>

Vectorized UDF

@pandas_udf("string")
def vectorizedUDF(input_value: pd.Series) -> pd.Series:
    return input_value[0]

# Configs

# Cores in Cluster
sc.defaultParallelism
spark.sparkContext.defaultParallelism

# Default shuffle partitions
spark.conf.get("spark.sql.shuffle.partitions")
spark.conf.set("spark.sql.shuffle.partitions","8")

# Partitions

# Get number of partitions
df.rdd.getNumPartitions()

# Repartition
coalesce() # Narrow Transformation - can't guarantee even records across all partitions

repartition() # Wide Transformation - requires shuffling all data

# Structured Streaming

spark.readStream
     <input config>
     .filter()
     .groupBy()
     .writeStream
     <input sink config>
     .start()

Monitor a Stream

# Get StreamID
streamQuery.id

# Get Stream Status
streamQuery.status

# Waits for query to complete before terminating the process
streamQuery.awaitTermination(5)

# Stops the stream
streamQuery.stop()