S-JIS[2026-06-14]

Arrow Java VectorSchemaRoot

JavaのArrowのVectorSchemaRootクラスのメモ。

 

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
    public static void main(String[] args) throws IOException {
        try (BufferAllocator allocator = new RootAllocator()) {
            try (VectorSchemaRoot root = createData(allocator)) {
                printVector(root);
                printGeneral(root);

                var bytes = serialize(root);
                deserialize(bytes, allocator);
            }

            List<byte[]> list = serializeUnloader(allocator);
            deserializeLoader(list, allocator);
        }
    }

ArrowのデータはVectorSchemaRootで操作するが、メモリーはBufferAllocatorで管理する。


VectorSchemaRootを作成する例

    static VectorSchemaRoot createData(BufferAllocator allocator) {
        var idVector = new IntVector("id", allocator);
        var nameVector = new VarCharVector("name", allocator);

        int SIZE = 10;
        idVector.allocateNew(SIZE);
        nameVector.allocateNew(SIZE);

        for (int i = 0; i < SIZE; i++) {
            idVector.set(i, i + 1);
            if (i % 2 == 0) {
                nameVector.setNull(i);
            } else {
                nameVector.set(i, ("name" + i).getBytes(StandardCharsets.UTF_8));
            }
        }

        idVector.setValueCount(SIZE);
//      System.out.println("idVector.size=" + idVector.getValueCount());
        nameVector.setValueCount(SIZE);
//      System.out.println("nameVector.size=" + nameVector.getValueCount());

        var root = new VectorSchemaRoot(List.of(idVector, nameVector));
        System.out.println("root.size=" + root.getRowCount());

        System.out.println("----");
        System.out.print(root.contentToTSVString()); // 内容の簡易表示
        System.out.println("----");
        return root;
    }

Arrowはカラムナー形式でデータを持つので、列毎にデータを作る。

各列のVectorは、まずallocateNew()で領域を確保する。

そして、set(index, value)で値をセットしていく。(setメソッドは、indexがキャパシティーを越えるとエラーになる)
setSafe(index, value)で値をセットすると、キャパシティーを超えたら自動的に拡張される。

最後に、setValueCount(個数)でデータ数をセットする。(これを呼ばないと、いくら値をセットしても0個の扱いになってしまう)

VectorSchemaRootのレコード数は、先頭のVectorのデータ数と同じになる模様。
(普通は、各列のデータ数は合わせる)


VectorSchemaRootのVectorを取得する例

    static void printVector(VectorSchemaRoot root) {
        var idVector = (IntVector) root.getVector("id");
        var nameVector = (VarCharVector) root.getVector("name");

        for (int i = 0; i < root.getRowCount(); i++) {
            int id = idVector.get(i);
            byte[] bytes = nameVector.get(i);
            String name = (bytes != null) ? new String(bytes, StandardCharsets.UTF_8) : null;

            System.out.printf("row=%d, id=%d, name=%s, date=%s%n", i, id, name, date);
        }
    }

VectorSchemaRootのVectorを汎用的に取得する例

    static void printGeneral(VectorSchemaRoot root) {
        for (int row = 0; row < root.getRowCount(); row++) {

            System.out.print("row=" + row);

            for (var vector : root.getFieldVectors()) {
                Object value = vector.getObject(row);

                System.out.print(", " + vector.getName() + "=" + value);
            }

            System.out.println();
        }
    }

ArrowStreamWriterでシリアライズする例

    static byte[] serialize(VectorSchemaRoot root) throws IOException {
        System.out.println("==== serialize start ====");

        try (var os = new ByteArrayOutputStream();
                var writer = new ArrowStreamWriter(root, null, os)) {
            writer.start();
            writer.writeBatch();
            writer.end();

            System.out.println("==== serialize end ====");
            return os.toByteArray();
        }
    }

ArrowStreamWriterのstart()〜end()で、OutputStreamに書き込まれる。

複数のBatch(ブロック)に分ける場合は、ブロック毎にroot.allocateNew()でクリアして各Vectorに値を入れ直し、writer.writeBatch()を呼ぶ。


ArrowStreamReaderでデシリアライズする例

    static void deserialize(byte[] bytes, BufferAllocator allocator) throws IOException {
        try (var reader = new ArrowStreamReader(new ByteArrayInputStream(bytes), allocator)) {
            try (var root = reader.getVectorSchemaRoot()) {
                for (var field : root.getFieldVectors()) {
                    System.out.printf("field %s%n", field.getField());
                }

                var idVector = (IntVector) root.getVector("id");
                var nameVector = (VarCharVector) root.getVector("name");

                while (reader.loadNextBatch()) {
                    int rowCount = root.getRowCount();

                    for (int i = 0; i < rowCount; i++) {
                        int id = idVector.get(i);
                        String name = nameVector.isNull(i) ? null : new String(nameVector.get(i), StandardCharsets.UTF_8);

                        System.out.println("id=" + id + ", name=" + name);
                    }
                }
            }
        }
    }

ArrowStreamReaderでデシリアライズする際は、Batch(複数レコードのブロック)毎に扱う。
reader.loadNextBatch()で、rootの中身はそのブロックになる。
すなわち、root.getRowCount()はそのブロックのレコード数になる。


VectorUnloaderでシリアライズする例

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.ReadChannel;
import org.apache.arrow.vector.ipc.WriteChannel;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.Text;
    static List<byte[]> serializeUnloader(BufferAllocator allocator) throws IOException {
        var result = new ArrayList<byte[]>();

        var idVector = new IntVector("id", allocator);
        var nameVector = new VarCharVector("name", allocator);
        try (var root = new VectorSchemaRoot(List.of(idVector, nameVector))) {
            {
                var bytes = serializeSchema(root.getSchema());
                result.add(bytes);
            }

            var unloader = new VectorUnloader(root);

            int SIZE = 10;
            idVector.allocateNew(SIZE);
            nameVector.allocateNew(SIZE);

            int row = 0;
            for (int i = 0; i < 15; i++) {
                idVector.setSafe(row, i + 1);
                nameVector.setSafe(row, new Text("name" + i));

                root.setRowCount(row);
                int bufferSize = root.getFieldVectors().stream().mapToInt(vec -> vec.getBufferSize()).sum();
                System.out.println(i + ": bufferSize: " + bufferSize);

                if ((++row % 5) == 0) {
                    try (ArrowRecordBatch batch = unloader.getRecordBatch()) {
                        var bytes = serializeBatch(batch);
                        result.add(bytes);
                    }
                    row = 0;
                }
            }

            if (row != 0) {
                root.setRowCount(row);

                try (ArrowRecordBatch batch = unloader.getRecordBatch()) {
                    var bytes = serializeBatch(batch);
                    result.add(bytes);
                }
            }
        }

        return result;
    }
    static byte[] serializeSchema(Schema schema) throws IOException {
        try (var out = new ByteArrayOutputStream()) {
            var channel = new WriteChannel(Channels.newChannel(out));

            MessageSerializer.serialize(channel, schema);

            return out.toByteArray();
        }
    }
    static byte[] serializeBatch(ArrowRecordBatch batch) throws IOException {
        try (var out = new ByteArrayOutputStream()) {
            var channel = new WriteChannel(Channels.newChannel(out));

            MessageSerializer.serialize(channel, batch);

            return out.toByteArray();
        }
    }

MessageSerializerを使ったときに発生することがあるUnsupportedOperationException


VectorLoaderでデシリアライズする例

    static void deserializeLoader(List<byte[]> list, BufferAllocator allocator) throws IOException {
        VectorSchemaRoot root = null;
        try {
            VectorLoader loader = null;

            for (byte[] bytes : list) {
                if (root == null) {
                    var schema = deserializeSchema(bytes);
                    root = VectorSchemaRoot.create(schema, allocator);
                    loader = new VectorLoader(root);

                    continue;
                }

                try (ArrowRecordBatch batch = deserializeBatch(bytes, allocator)) {
                    loader.load(batch);
                    print(root);
                }
            }
        } finally {
            if (root != null) {
                root.close();
            }
        }
    }
    static Schema deserializeSchema(byte[] bytes) throws IOException {
        var channel = new ReadChannel(Channels.newChannel(new ByteArrayInputStream(bytes)));

        return MessageSerializer.deserializeSchema(channel);
    }
    static ArrowRecordBatch deserializeBatch(byte[] bytes, BufferAllocator allocator) throws IOException {
        var channel = new ReadChannel(Channels.newChannel(new ByteArrayInputStream(bytes)));

        return MessageSerializer.deserializeRecordBatch(channel, allocator);
    }

MessageSerializerを使ったときに発生することがあるUnsupportedOperationException


Arrow Javaへ戻る / Arrow目次へ戻る / 技術メモへ戻る
メールの送信先:ひしだま