S-JIS[2021-06-10] 変更履歴
PySparkのDataFrameのwithColumnメソッドについて。
withColumnメソッドはDataFrameに新しいカラムを追加するのに使う。
もしくは、既存のカラムを新しい値で置換するのに使う。
df = df.withColumn('カラム名', 値)
from pyspark.sql.types import IntegerType from pyspark.sql.functions import lit df = df.withColumn('column_name', lit(0).cast(IntegerType())) # 固定値0
withColumnの第1引数はカラム名(文字列(str型))、第2引数には値(Column型)を指定する。
Column型は、主にpyspark.sql.functionsで定義されている関数を使って生成する。
PySparkもSparkなので、実行時には各Executorで分散処理される。
すなわち、withColumnで指定した第2引数の内容はシリアライズされて各Excecutorに配布されることになる。
(Pythonにはシリアライズという概念は無いかもしれないが)
たぶん、Column型はシリアライズ用の仕組みを備えているのだと思う。
from pyspark.sql.types import IntegerType from pyspark.sql.functions import lit
df = df.withColumn('column_name', lit(0).cast(IntegerType())) # 固定値0
lit関数(関数名は、たぶんliteralの略)で固定値を指定する。
データ型を明示したいときは、後ろにcastを付ける。
from pyspark.sql.functions import col
df = df.withColumn('new_column_name', col('column_name') + 1) df = df.withColumn('new_column_name', df['column_name'] + 1) df = df.withColumn('new_column_name', df.column_name + 1)
col関数(この関数の別名はcolumn)でDataFrame内のカラムの値を使用できる。
DataFrameの変数(上記のdf)から配列形式でカラム名を指定することも出来るし、フィールド名形式で指定することも出来る。
from pyspark.sql.types import LongType from pyspark.sql.functions import col
df = df.withColumn('column_name', col('column_name').cast(LongType())) # long型に変換
元のデータ型がStringTypeでも、LongTypeにキャストすれば数値に変換される。
(数値に変換できない文字列だった場合は、エラーが発生するはず)
from pyspark.sql.functions import when
df = df.withColumn('column_name', when(df.column_name == '', 0).otherwise(1))
条件に応じて設定する値を変えたい場合はwhenを使う。
whenの第1引数は判定条件(Column型)。
第2引数は条件を満たした場合の値(リテラル(定数)もしくはColumn型)。
条件を満たさなかった場合の値はotherwiseで指定する。(otherwiseが無い場合はNoneになる)
whenを多重に指定することも出来る。
df = df.withColumn('column_name', when(df.age > 4, 1).when(df.age < 3, -1).otherwise(0))
なお、null(None)を別の値に変換したい場合は、DataFrameのfillnaメソッドを使う方がシンプルだと思う。
withColumnの第2引数はColumn型でなければならない。(でないとシリアライズして各Executorに配布できない)
このため、Sparkの実行(分散処理)中に通常のPythonの関数を呼び出したり、withColumnの外で定義した変数を参照したりすることは出来ない。
しかし、PySpark用のユーザー定義関数を作る手段が用意されている。
from pyspark.sql.functions import udf
# 判定用の集合(タプルのset) check_set = { ("a", 1), ("a", 2), ("b", 1), } check_udf = udf(lambda c1, c2: 1 if (c1, c2) in check_set else 0, returnType=IntegerType()) df = df.withColumn('column_name', check_udf(df.column_name1, df.column_name2))
udf関数にラムダを渡すと、その戻り値がwithColumnで使えるユーザー定義関数(UDF)になる。
ラムダからは、ラムダの外側で定義された変数を参照することも出来る。
Pythonのラムダは式がひとつしか書けないので、上記の例では条件式(真のときの値 if 条件 else 偽のときの値
)を指定している。
ちなみに集合(set型)はin演算子で「集合に含まれているかどうか」チェックできるので、上記の例では、(c1, c2)
でタプルに変換した後で「タプル
in check_set
」(タプルが集合に存在しているかどうか)という判定を行っている。
作られたUDFの引数(withColumnでUDFを使用する際の引数)にはColumn型しか渡せない。
ラムダの引数には、Column型が評価された値(DataFrameに実際に入っている値)が渡されてくる。
上記の例だと、df.column_name1はStringType、df.column_name2はIntegerTypeという想定で、ラムダの引数c1は文字列(str型)、c2は数値(int型)が渡ってくる。
ラムダを使わず、Pythonのデコレーター構文を使ってユーザー定義関数(UDF)を定義することも出来る。
from pyspark.sql.functions import udf
# 判定用の集合(タプルのset) check_set = { ("a", 1), ("a", 2), ("b", 1), } @udf(returnType=IntegerType()) def check_udf(c1, c2): return 1 if (c1, c2) in check_set else 0 df = df.withColumn('column_name', check_udf(df.column_name1, df.column_name2))
withColumn内で呼び出すときはユーザー定義関数の引数はColumn型なのに、
関数定義(def)の引数(呼び出される側)はColumn型ではない(いわば実際の値の型になる)のは、
変数の型を定義しないPythonならではの技巧だと思う^^;