S-JIS[2021-06-10] 変更履歴

PySpark Column

PySparkのColumnクラスについて。


概要

PySparkのColumnクラスは、SQLのカラムに相当するクラス。

このカラムとは、テーブルのカラム名のことだけではなく、それらを使った演算(式)や単なる定数(リテラル)も含む。

	col1 = df.column_name
	col2 = df.column_name + 1
	col3 = lit(123)

Columnオブジェクトは、DataFrameのwithColumnメソッドやwhen関数の引数等、いろいろな所で引数として使われる。

PySparkもSparkなので、実際にデータを処理する際は、各Executorにプログラムが配布されて分散処理される。
そのため、プログラムはシリアライズ・デシリアライズ可能である必要があるが、その仕組みがColumnクラスに用意されているのだと思う。

例えばDataFrameのwithColumnはカラムを追加するメソッドだが、その値をColumnオブジェクトで指定する。
これは、カラムを追加する処理が分散処理されるため、シリアライズする必要があるからだと思う。


Columnオブジェクトの生成

Columnオブジェクトは、DataFrameからカラム名を指定して生成できる。

	column = df.column_name
	column = df['column_name']

pyspark.sql.functionsのcol関数(もしくはcolumn関数)を使って生成することも出来る。

from pyspark.sql.functions import col

	column = col('column_name')	

Columnオブジェクトに対する演算

Columnオブジェクトに対する演算(計算式)もColumnオブジェクトになる。

	col1 = df.column_name + 1

一見すると普通のPythonの足し算を行っているように見えるが、Pythonでは演算子をオーバーロードすることが出来るので、その仕組みを使い、Columnオブジェクト内に「足し算という式」を保存しているのだと思う。
実際の分散処理ではそれを元に実際の値を使って足し算処理が行われるのだろう。


ただし、ブール演算子のorやand(短絡評価される演算子)およびnotはColumnでは定義されていない。(というかPythonではオーバーロードできない?)
代わりに|, &, ~を使用する。(ただし短絡評価されるかどうかは不明)
注意点として、|や&は比較演算子(==や!=等)より優先順位が高いので、通常のorやandの感覚(比較演算子より優先順位が低い)で使うとエラーになる。

	condition =  df.column_name == 'a'  or df.column_name == 'b'  # Columnではorが使えないのでエラー
	condition =  df.column_name == 'a'  |  df.column_name == 'b'  # |の方が==より優先度が高いので、('a' | df.colulmn_name)という扱いとなり、'a'(str型)に|が定義されていないのでエラー
	condition = (df.column_name == 'a') | (df.column_name == 'b') # OK

pyspark.sql.functionsの関数

pyspark.sql.functionsパッケージの関数も、結果としてColumnオブジェクトを返す。

from pyspark.sql.functions import lit

	column = lit(123)
from pyspark.sql import functions as F

	column = F.lit(123)

pyspark.sql.functionsパッケージには、文字列操作や算術演算・日付変換など、色々な関数が揃っている。

pyspark.sql.functionsの例
関数名 説明
col
column
DataFrameのカラム col('column_name')
lit 定数(リテラル) lit(123).cast(IntegerType())
isnull null判定 isnull(df.column_name)
df.column_name.isNull()
df.column_name.isNotNull()
size サイズ size(df.column_name)
length 文字列型の場合、文字数 length(df.column_name)
instr 文字列が含まれている場合、その位置 instr(df.column_name, 'abc')
trim 文字列型の前後の空白を除去 trim(df.column_name)
when 条件に応じた値
when(df.column_name.isNotNull(), 1).otherwise(0)
udf ユーザー定義関数を作る
 

Sparkへ戻る / 技術メモへ戻る
メールの送信先:ひしだま