JavaのArrowのVectorSchemaRootクラスのメモ。
import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ipc.ArrowStreamReader; import org.apache.arrow.vector.ipc.ArrowStreamWriter;
public static void main(String[] args) throws IOException {
try (BufferAllocator allocator = new RootAllocator()) {
try (VectorSchemaRoot root = createData(allocator)) {
printVector(root);
printGeneral(root);
var bytes = serialize(root);
deserialize(bytes, allocator);
}
List<byte[]> list = serializeUnloader(allocator);
deserializeLoader(list, allocator);
}
}
ArrowのデータはVectorSchemaRootで操作するが、メモリーはBufferAllocatorで管理する。
static VectorSchemaRoot createData(BufferAllocator allocator) {
var idVector = new IntVector("id", allocator);
var nameVector = new VarCharVector("name", allocator);
int SIZE = 10;
idVector.allocateNew(SIZE);
nameVector.allocateNew(SIZE);
for (int i = 0; i < SIZE; i++) {
idVector.set(i, i + 1);
if (i % 2 == 0) {
nameVector.setNull(i);
} else {
nameVector.set(i, ("name" + i).getBytes(StandardCharsets.UTF_8));
}
}
idVector.setValueCount(SIZE);
// System.out.println("idVector.size=" + idVector.getValueCount());
nameVector.setValueCount(SIZE);
// System.out.println("nameVector.size=" + nameVector.getValueCount());
var root = new VectorSchemaRoot(List.of(idVector, nameVector));
System.out.println("root.size=" + root.getRowCount());
System.out.println("----");
System.out.print(root.contentToTSVString()); // 内容の簡易表示
System.out.println("----");
return root;
}
Arrowはカラムナー形式でデータを持つので、列毎にデータを作る。
各列のVectorは、まずallocateNew()で領域を確保する。
そして、set(index, value)で値をセットしていく。(setメソッドは、indexがキャパシティーを越えるとエラーになる)
setSafe(index, value)で値をセットすると、キャパシティーを超えたら自動的に拡張される。
最後に、setValueCount(個数)でデータ数をセットする。(これを呼ばないと、いくら値をセットしても0個の扱いになってしまう)
VectorSchemaRootのレコード数は、先頭のVectorのデータ数と同じになる模様。
(普通は、各列のデータ数は合わせる)
static void printVector(VectorSchemaRoot root) {
var idVector = (IntVector) root.getVector("id");
var nameVector = (VarCharVector) root.getVector("name");
for (int i = 0; i < root.getRowCount(); i++) {
int id = idVector.get(i);
byte[] bytes = nameVector.get(i);
String name = (bytes != null) ? new String(bytes, StandardCharsets.UTF_8) : null;
System.out.printf("row=%d, id=%d, name=%s, date=%s%n", i, id, name, date);
}
}
static void printGeneral(VectorSchemaRoot root) {
for (int row = 0; row < root.getRowCount(); row++) {
System.out.print("row=" + row);
for (var vector : root.getFieldVectors()) {
Object value = vector.getObject(row);
System.out.print(", " + vector.getName() + "=" + value);
}
System.out.println();
}
}
static byte[] serialize(VectorSchemaRoot root) throws IOException {
System.out.println("==== serialize start ====");
try (var os = new ByteArrayOutputStream();
var writer = new ArrowStreamWriter(root, null, os)) {
writer.start();
writer.writeBatch();
writer.end();
System.out.println("==== serialize end ====");
return os.toByteArray();
}
}
ArrowStreamWriterのstart()〜end()で、OutputStreamに書き込まれる。
複数のBatch(ブロック)に分ける場合は、ブロック毎にroot.allocateNew()でクリアして各Vectorに値を入れ直し、writer.writeBatch()を呼ぶ。
static void deserialize(byte[] bytes, BufferAllocator allocator) throws IOException {
try (var reader = new ArrowStreamReader(new ByteArrayInputStream(bytes), allocator)) {
try (var root = reader.getVectorSchemaRoot()) {
for (var field : root.getFieldVectors()) {
System.out.printf("field %s%n", field.getField());
}
var idVector = (IntVector) root.getVector("id");
var nameVector = (VarCharVector) root.getVector("name");
while (reader.loadNextBatch()) {
int rowCount = root.getRowCount();
for (int i = 0; i < rowCount; i++) {
int id = idVector.get(i);
String name = nameVector.isNull(i) ? null : new String(nameVector.get(i), StandardCharsets.UTF_8);
System.out.println("id=" + id + ", name=" + name);
}
}
}
}
}
ArrowStreamReaderでデシリアライズする際は、Batch(複数レコードのブロック)毎に扱う。
reader.loadNextBatch()で、rootの中身はそのブロックになる。
すなわち、root.getRowCount()はそのブロックのレコード数になる。
import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorLoader; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.VectorUnloader; import org.apache.arrow.vector.ipc.ReadChannel; import org.apache.arrow.vector.ipc.WriteChannel; import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; import org.apache.arrow.vector.ipc.message.MessageSerializer; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.arrow.vector.util.Text;
static List<byte[]> serializeUnloader(BufferAllocator allocator) throws IOException {
var result = new ArrayList<byte[]>();
var idVector = new IntVector("id", allocator);
var nameVector = new VarCharVector("name", allocator);
try (var root = new VectorSchemaRoot(List.of(idVector, nameVector))) {
{
var bytes = serializeSchema(root.getSchema());
result.add(bytes);
}
var unloader = new VectorUnloader(root);
int SIZE = 10;
idVector.allocateNew(SIZE);
nameVector.allocateNew(SIZE);
int row = 0;
for (int i = 0; i < 15; i++) {
idVector.setSafe(row, i + 1);
nameVector.setSafe(row, new Text("name" + i));
root.setRowCount(row);
int bufferSize = root.getFieldVectors().stream().mapToInt(vec -> vec.getBufferSize()).sum();
System.out.println(i + ": bufferSize: " + bufferSize);
if ((++row % 5) == 0) {
try (ArrowRecordBatch batch = unloader.getRecordBatch()) {
var bytes = serializeBatch(batch);
result.add(bytes);
}
row = 0;
}
}
if (row != 0) {
root.setRowCount(row);
try (ArrowRecordBatch batch = unloader.getRecordBatch()) {
var bytes = serializeBatch(batch);
result.add(bytes);
}
}
}
return result;
}
static byte[] serializeSchema(Schema schema) throws IOException {
try (var out = new ByteArrayOutputStream()) {
var channel = new WriteChannel(Channels.newChannel(out));
MessageSerializer.serialize(channel, schema);
return out.toByteArray();
}
}
static byte[] serializeBatch(ArrowRecordBatch batch) throws IOException {
try (var out = new ByteArrayOutputStream()) {
var channel = new WriteChannel(Channels.newChannel(out));
MessageSerializer.serialize(channel, batch);
return out.toByteArray();
}
}
→MessageSerializerを使ったときに発生することがあるUnsupportedOperationException
static void deserializeLoader(List<byte[]> list, BufferAllocator allocator) throws IOException {
VectorSchemaRoot root = null;
try {
VectorLoader loader = null;
for (byte[] bytes : list) {
if (root == null) {
var schema = deserializeSchema(bytes);
root = VectorSchemaRoot.create(schema, allocator);
loader = new VectorLoader(root);
continue;
}
try (ArrowRecordBatch batch = deserializeBatch(bytes, allocator)) {
loader.load(batch);
print(root);
}
}
} finally {
if (root != null) {
root.close();
}
}
}
static Schema deserializeSchema(byte[] bytes) throws IOException {
var channel = new ReadChannel(Channels.newChannel(new ByteArrayInputStream(bytes)));
return MessageSerializer.deserializeSchema(channel);
}
static ArrowRecordBatch deserializeBatch(byte[] bytes, BufferAllocator allocator) throws IOException {
var channel = new ReadChannel(Channels.newChannel(new ByteArrayInputStream(bytes)));
return MessageSerializer.deserializeRecordBatch(channel, allocator);
}
→MessageSerializerを使ったときに発生することがあるUnsupportedOperationException