I am using Spark 2.0 with the Python API.
I have a dataframe with a column of type DateType(). I would like to add a column to the dataframe containing the most recent Monday.
I can do it like this:
reg_schema = pyspark.sql.types.StructType([
    pyspark.sql.types.StructField('AccountCreationDate', pyspark.sql.types.DateType(), True),
    pyspark.sql.types.StructField('UserId', pyspark.sql.types.LongType(), True)
])
reg = spark.read.schema(reg_schema).option('header', True).csv(path_to_file)
reg = reg.withColumn('monday',
    pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate,'E') == 'Mon',
        reg.AccountCreationDate).otherwise(
    pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate,'E') == 'Tue',
        pyspark.sql.functions.date_sub(reg.AccountCreationDate, 1)).otherwise(
    pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate, 'E') == 'Wed',
        pyspark.sql.functions.date_sub(reg.AccountCreationDate, 2)).otherwise(
    pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate, 'E') == 'Thu',
        pyspark.sql.functions.date_sub(reg.AccountCreationDate, 3)).otherwise(
    pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate, 'E') == 'Fri',
        pyspark.sql.functions.date_sub(reg.AccountCreationDate, 4)).otherwise(
    pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate, 'E') == 'Sat',
        pyspark.sql.functions.date_sub(reg.AccountCreationDate, 5)).otherwise(
    pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate, 'E') == 'Sun',
        pyspark.sql.functions.date_sub(reg.AccountCreationDate, 6))
        )))))))
However, this seems like a lot of code for something that should be rather simple. Is there a more concise way of doing this?
Spark SQL provides last_day() function, which returns/get the last day of a month when the input Date is in yyyy-MM-dd format. For example, 2019-01-31 would be returned for input date 2019-01-25 , where 31 is the last day in January month.
In order to get Week number from date in pyspark we use weekofyear() function. To get week number of the year from date in pyspark we use weekofyear() function. To get week number of the month from date, we use weekofmonth() function.
current_date There are two variations for the spark sql current date syntax. You can specify it with the parenthesis as current_date()or as current_date. They both return the current date in the default format 'YYYY-MM-DD'.
I found out that pyspark's function trunc also works.
import pyspark.sql.functions as f
df = spark.createDataFrame([
    (datetime.date(2020, 10, 27), ),
    (datetime.date(2020, 12, 21), ),
    (datetime.date(2020, 10, 13), ),
    (datetime.date(2020, 11, 11), ),
], ["date_col"])
df = df.withColumn("first_day_of_week", f.trunc("date_col", "week"))
You can determine next date using next_day and subtract a week. Required functions can be imported as follows:
from pyspark.sql.functions import next_day, date_sub
And as:
def previous_day(date, dayOfWeek):
    return date_sub(next_day(date, "monday"), 7)
Finally an example:
from pyspark.sql.functions import to_date
df = sc.parallelize([
    ("2016-10-26", )
]).toDF(["date"]).withColumn("date", to_date("date"))
df.withColumn("last_monday", previous_day("date", "monday"))
With result:
+----------+-----------+
|      date|last_monday|
+----------+-----------+
|2016-10-26| 2016-10-24|
+----------+-----------+
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With