S-JIS[2021-06-10] 変更履歴

PySpark DataFrame.withColumn

PySparkDataFrameのwithColumnメソッドについて。


概要

withColumnメソッドはDataFrameに新しいカラムを追加するのに使う。
もしくは、既存のカラムを新しい値で置換するのに使う。

	df = df.withColumn('カラム名', 値)
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import lit

	df = df.withColumn('column_name', lit(0).cast(IntegerType())) # 固定値0

withColumnの第1引数はカラム名(文字列(str型))、第2引数には値(Column型)を指定する。
Column型は、主にpyspark.sql.functionsで定義されている関数を使って生成する。

PySparkもSparkなので、実行時には各Executorで分散処理される。
すなわち、withColumnで指定した第2引数の内容はシリアライズされて各Excecutorに配布されることになる。
(Pythonにはシリアライズという概念は無いかもしれないが)
たぶん、Column型はシリアライズ用の仕組みを備えているのだと思う。


固定値の例

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import lit
	df = df.withColumn('column_name', lit(0).cast(IntegerType())) # 固定値0

lit関数(関数名は、たぶんliteralの略)で固定値を指定する。

データ型を明示したいときは、後ろにcastを付ける。


カラムの値を使用する例

from pyspark.sql.functions import col
	df = df.withColumn('new_column_name', col('column_name') + 1)
	df = df.withColumn('new_column_name', df['column_name'] + 1)
	df = df.withColumn('new_column_name', df.column_name + 1)

col関数(この関数の別名はcolumn)でDataFrame内のカラムの値を使用できる。
DataFrameの変数(上記のdf)から配列形式でカラム名を指定することも出来るし、フィールド名形式で指定することも出来る。


データ型を変換する例

from pyspark.sql.types import LongType
from pyspark.sql.functions import col
	df = df.withColumn('column_name', col('column_name').cast(LongType())) # long型に変換

元のデータ型がStringTypeでも、LongTypeにキャストすれば数値に変換される。
(数値に変換できない文字列だった場合は、エラーが発生するはず)


条件に応じた値をセットする例

from pyspark.sql.functions import when
	df = df.withColumn('column_name', when(df.column_name == '', 0).otherwise(1))

条件に応じて設定する値を変えたい場合はwhenを使う。

whenの第1引数は判定条件(Column型)。
第2引数は条件を満たした場合の値(リテラル(定数)もしくはColumn型)。

条件を満たさなかった場合の値はotherwiseで指定する。(otherwiseが無い場合はNoneになる)


whenを多重に指定することも出来る。

	df = df.withColumn('column_name', when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0))

なお、null(None)を別の値に変換したい場合は、DataFrameのfillnaメソッドを使う方がシンプルだと思う。


自作の関数を呼び出す例

withColumnの第2引数はColumn型でなければならない。(でないとシリアライズして各Executorに配布できない)
このため、Sparkの実行(分散処理)中に通常のPythonの関数を呼び出したり、withColumnの外で定義した変数を参照したりすることは出来ない。

しかし、PySpark用のユーザー定義関数を作る手段が用意されている。

from pyspark.sql.functions import udf
	# 判定用の集合(タプルのset)
	check_set = {
		("a", 1),
		("a", 2),
		("b", 1),
	}
	check_udf = udf(lambda c1, c2: 1 if (c1, c2) in check_set else 0, returnType=IntegerType())

	df = df.withColumn('column_name', check_udf(df.column_name1, df.column_name2))

udf関数にラムダを渡すと、その戻り値がwithColumnで使えるユーザー定義関数(UDF)になる。
ラムダからは、ラムダの外側で定義された変数を参照することも出来る。

Pythonのラムダは式がひとつしか書けないので、上記の例では条件式(真のときの値 if 条件 else 偽のときの値)を指定している。
ちなみに集合(set型)はin演算子で「集合に含まれているかどうか」チェックできるので、上記の例では、(c1, c2)でタプルに変換した後で「タプル in check_set」(タプルが集合に存在しているかどうか)という判定を行っている。

作られたUDFの引数(withColumnでUDFを使用する際の引数)にはColumn型しか渡せない。
ラムダの引数には、Column型が評価された値(DataFrameに実際に入っている値)が渡されてくる。
上記の例だと、df.column_name1はStringType、df.column_name2はIntegerTypeという想定で、ラムダの引数c1は文字列(str型)、c2は数値(int型)が渡ってくる。


ラムダを使わず、Pythonのデコレーター構文を使ってユーザー定義関数(UDF)を定義することも出来る。

from pyspark.sql.functions import udf
	# 判定用の集合(タプルのset)
	check_set = {
		("a", 1),
		("a", 2),
		("b", 1),
	}

	@udf(returnType=IntegerType())
	def check_udf(c1, c2):
		return 1 if (c1, c2) in check_set else 0

	df = df.withColumn('column_name', check_udf(df.column_name1, df.column_name2))

withColumn内で呼び出すときはユーザー定義関数の引数はColumn型なのに、
関数定義(def)の引数(呼び出される側)はColumn型ではない(いわば実際の値の型になる)のは、
変数の型を定義しないPythonならではの技巧だと思う^^;


DataFrameへ戻る / PySparkへ戻る / Sparkへ戻る / 技術メモへ戻る
メールの送信先:ひしだま