くまくまーZ

HBaseへデータ投入(importtsvの使い方)

この記事の対象者

  • Hadoop/HBaseの初心者
  • HBaseが動く環境はできた
  • 実験用のデータを入れたい
  • 規模は100万か1億件ぐらい

インポート方法

  • a) hbase shell
  • b) importtsv
  • c) importtsv.bulk.output

a) hbase shell の使い方

hbase shell を使って登録する方法。rubyコードで実行できるので柔軟。

1
2
3
create 'sample', 'data'
('a'..'z').each {|i| put 'sample', i, 'data:alpha', i}
scan 'samples'
ROW                         COLUMN+CELL
 a                          column=data:alpha, timestamp=1333387516755, value=a
 b                          column=data:alpha, timestamp=1333387516772, value=b
...

手ごろなfixture作成にはよいが、これで1億件やる気はしない

b) importtsv の使い方

既存のインポートツールを利用する方法。 入力データはtsv,csvに限定されるが、hbase.jarに含まれるImportTsvが目的に合致する。 実行にはhbase.jarファイルのフルパスが必要なので、まずはjarをlocateなどで探す。

% locate /hbase-0. | grep .jar
/usr/local/lib/hbase-0.92.1/hbase-0.92.1-tests.jar
/usr/local/lib/hbase-0.92.1/hbase-0.92.1.jar
...

hadoopでのjarファイルの実行

あとは、以下の書式でjarファイル内の機能を実行できる。

hadoop jar <HBASE_JAR_FILE> <COMMAND>

ImportTsvのコマンド名は importtsv なので、以下のコマンドになる。

% hadoop jar /usr/local/lib/hbase-0.92.1/hbase-0.92.1.jar importtsv

HADOOP_CLASSPATH

このとき、HADOOP_CLASSPATH が設定されていないと以下のようなエラーが出るので注意。

% hadoop jar /usr/local/lib/hbase-0.92.1/hbase-0.92.1.jar importtsv
Exception in thread "main" java.lang.NoClassDefFoundError: com/google/common/collect/Multimap
    at org.apache.hadoop.hbase.mapreduce.Driver.main(Driver.java:43)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

その場合、例えばzshであれば、以下のコマンドで HADOOP_CLASSPATH を設定できる。

export HADOOP_CLASSPATH=`hbase classpath`

importtsvのヘルプ

設定に問題がなければ、以下のようなヘルプが表示される。

% hadoop jar /usr/local/lib/hbase-0.92.1/hbase-0.92.1.jar importtsv
ERROR: Wrong number of arguments: 0
Usage: importtsv -Dimporttsv.columns=a,b,c <tablename> <inputdir>

引数: tablename

データ投入先のHBaseテーブル名を指定する。 テーブルは、実行前に作っておく必要がある。 サンプルとして以下のsongsテーブルをhbase shellから作成する。 CF名は何でもいいので適当にdataにしておく。

create 'songs', 'data'

引数: inputdir

データのソース(tsv,csv)を指定する。fileでなくdirとなっているのは、 bulk.outputモードでは、ディレクトリを指定するからだと思う(多分)。 サンプルとして以下のsongs.csvを利用する。

20090805,ももいろパンチ,ももいろクローバー
20100505,怪盗少女,ももいろクローバー
20101110,ピンキージョーンズ,ももいろクローバー
20110706,Z伝説,ももいろクローバーZ

HBaseがローカル(Standalone)モードならば直接OS上のファイルを指定できるが、 分散モードであれば入力ファイルはhdfs上に存在している必要がある。

hdfsへのファイル転送

以下のコマンドでsongs.csvをhdfs上に配置できる。引数はlocal、dstの順。面倒なので同名がよい。 コマンドはdfsfsのどちらでもよい(多分同じ)。

hadoop dfs -put songs.csv songs.csv

hdfs上のファイルは-lsで確認できる。

% hadoop dfs -ls
-rw-r--r--   1 maiha supergroup         107 2012-04-02 03:14 /user/maiha/songs.csv

もしputでエラーが出た場合、 高確率でcore-site.xmlhadoop.tmp.dirのパスに書き込み権限がでていないので、 以下のようにwritableにする。(PATHは環境依存)

chmod a+rwxt /var/lib/hadoop/tmp

引数: -Dimporttsv.columns=a,b,c

csvのカラム名を指定する。 HTableのROW_KEYになる項目はHBASE_ROW_KEYという予約語を指定する。 それ以外は、対象テーブルのカラム名をcf:qualifier形式で指定する。

20090805,ももいろパンチ,ももいろクローバー

の最初の日付をROW_KEYに、2つ目の曲名をdataのtitleに、 3つ目の歌手名をdataのsingerに入れたいので、 以下のような指定方法になる。

-Dimporttsv.columns=HBASE_ROW_KEY,data:title,data:singer

入力ファイルの種別(csv,tsv)に関係なく、ここはカンマで区切る。 途中にスペースが入ってもいけないので(分かりづらいエラーが起きる)、 実行時は改行等にも注意する。

引数: -Dimporttsv.separator=X

tsvの場合は不要だが、ここではcsvを扱うので、セパレータを”,”として指定する。 ちなみに、セパレータには1バイトしか指定できない。

-Dimporttsv.separator=,

実行

改めてまとめると、インポートのコマンドは以下のようになる。

% hadoop jar /usr/local/lib/hbase-0.92.1/hbase-0.92.1.jar importtsv \
   -Dimporttsv.separator=, \
   -Dimporttsv.columns=HBASE_ROW_KEY,data:title,data:singer \
   songs songs.csv

確認

hbase shellで確認。

> count 'songs'
4 row(s) in 0.0170 seconds

> scan 'songs'
ROW               COLUMN+CELL
 20090805         column=data:singer, timestamp=1333393153254, value=...

c) importtsv.bulk.output の使い方

b)のimporttsvは、Reducerを使わずに直接HTableに随時書き込んでいく方式であるが、 ImportTsvにはimporttsv.bulk.outputオプションによる別のインポート方式がある。 これは、以下の2コマンドによって確実でより高速なbulkインサートを提供する。

  1. importtsv: M/RによりHBaseの内部ストレージ形式の中間データをHFileとして作成
  2. completebulkload: 作成したHFileを各クラスタに転送

c1: importtsv.bulk.output

bulk.outputモードでimporttsvを実行するには、 b) importtsv の引数に -Dimporttsv.bulk.output=XXX を追加する。 ここで XXX には、中間データを保存するhdfsのディレクトリ(存在してはいけない)を指定する。 名前は何でもよいが、「テーブル名-bulk」が後から見たときにわかりやすい(消しやすい)。

実行例

具体的には、b) importtsv で実行したコマンド

% hadoop jar /usr/local/lib/hbase-0.92.1/hbase-0.92.1.jar importtsv \
   -Dimporttsv.separator=, \
   -Dimporttsv.columns=HBASE_ROW_KEY,data:title,data:singer \
   songs songs.csv

に、-Dimporttsv.bulk.output=songs-bulk を追加して、bulk.outputを実行する。 もちろん、b)同様、入力ファイルはhdfsにputされている必要がある。 よって、bulk.outputモードでのimporttsvコマンドは以下のようになる。

% hadoop jar /usr/local/lib/hbase-0.92.1/hbase-0.92.1.jar importtsv \
   -Dimporttsv.bulk.output=songs-bulk \
   -Dimporttsv.separator=, \
   -Dimporttsv.columns=HBASE_ROW_KEY,data:title,data:singer \
   songs songs.csv

中間ファイルの確認

b) では直ちに songs テーブルに反映されたが、 bulk.output ではこの時点ではまだテーブルにwriteは発生していない。 変わりに、指定したパスに中間データが作成されている。

% hadoop dfs -ls songs-bulk
Found 3 items
-rw-r--r--   1 maiha supergroup          0 2012-04-02 04:57 /user/maiha/songs-bulk/_SUCCESS
drwxr-xr-x   - maiha supergroup          0 2012-04-02 04:57 /user/maiha/songs-bulk/_logs
drwxr-xr-x   - maiha supergroup          0 2012-04-02 04:57 /user/maiha/songs-bulk/data

c2: completebulkload

hadoop jar <HBASE_JAR> completebulkload <HDFS_BULK_OUTPUT_PATH> <TABLE_NAME>

中間データ(bulk.output)は、 completebulkloadコマンドによって各クラスタの実テーブルに投入される。

% hadoop jar /usr/local/lib/hbase-0.92.1/hbase-0.92.1.jar completebulkload songs-bulk songs

注意点

bulk.outputモードで作られる中間データファイルは、 最終的なHTableのサイズの4~5倍のサイズになるため、 十分なディスク容量が必要になる。

具体値として、5千万行(11GB)のcsvを入れた場合、 最終的なHTableのサイズは22GBだが、 中間ファイルの作成時に最大で100GBほどディスクを消費していた。 (環境はサーバ1の擬似分散の場合)

事例: 郵便番号(12万件)

この記事の想定規模を「100万か1億件」としておきながら、 実際に入れたのが4件というのは、いくらそれで理論的に大丈夫だと言われても釈然としないのも事実。 と言いつつも100万にも届かないが、ある程度の量の分かりやすい事例として 「郵便番号データ」を入れてみる。以下、手順のみで説明は省略。

データ取得

wget http://www.post.japanpost.jp/zipcode/dl/kogaki/lzh/ken_all.lzh
lha e ken_all.lzh
nkf -Sud ken_all.csv | sed -e 's/[ "]//g' > zips.csv

データ確認

% wc -l zips.csv
123218 zips.csv
% head zips.csv
01101,060,0600000,ホッカイドウ,サッポロシチュウオウク,イカニケイサイガナイバアイ,北海道,札幌市中央区,以 下に掲載がない場合,0,0,0,0,0,0
01101,064,0640941,ホッカイドウ,サッポロシチュウオウク,アサヒガオカ,北海道,札幌市中央区,旭ケ丘,0,0,1,0,0,0
...

hdfs転送

% hadoop dfs -put zips.csv zips.csv

テーブル作成

create 'zips', 'd'

データ投入

% hadoop jar /usr/local/lib/hbase-0.92.1/hbase-0.92.1.jar importtsv \
   -Dimporttsv.separator=, \
   -Dimporttsv.columns=d:c1,d:c2,HBASE_ROW_KEY,d:c4,d:c5,d:c6,d:c7,d:c8,d:c9,d:c10,d:c11,d:c12,d:c13,d:c14,d:c15 \
   zips zips.csv

確認

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{HTable,Get}
import org.apache.hadoop.hbase.util.Bytes
import scala.collection.JavaConversions._     // for entrySet:java.util.Set

val conf  = new HBaseConfiguration
val songs = new HTable(conf, "zips")
val get   = new Get("9000005".getBytes)
val row   = songs.get(get)
val cf    = row.getFamilyMap("d".getBytes)

for (entry <- cf.entrySet) {
  val key   = Bytes.toString(entry.getKey)
  val value = Bytes.toString(entry.getValue)
  printf("%s: %s\n", key, value)
}
c1: 47201
c10: 0
c11: 0
c12: 0
c13: 0
c14: 0
c15: 0
c2: 900
c4: オキナワケン
c5: ナハシ
c6: アメク
c7: 沖縄県
c8: 那覇市
c9: 天久

importtsvとbulk.outputはどちらがいいのか?

  • importtsvは中間ファイルも作らないので手軽
  • bulk.outputはHTableを作成して置換するイメージなので安心 (importの原子性が保障されている)

importtsvはお手軽だが、落ちているリージョンサーバがあったりすると、 データの欠損や二重登録などの危険性がある。 データの正確性を考慮すると、bulk.outputを使うべき、という結論になる。

ハマリ所

参考

ScalaでHbase (基本操作)

versions

  • Scala-2.9.1.final
  • xsbt-0.11.2
  • Hbase-0.92.0
  • Hadoop-1.0.1

build.sbt

1
2
3
4
5
6
7
8
resolvers += "Apache HBase" at "https://repository.apache.org/content/repositories/releases"

resolvers += "Thrift" at "http://people.apache.org/~rawson/repo/"

libraryDependencies ++= Seq(
    "org.apache.hadoop" % "hadoop-core" % "1.0.1",
    "org.apache.hbase" % "hbase" % "0.92.0"
)

使用例 (共通部分)

1
2
3
4
5
6
import org.apache.hadoop.hbase.{HBaseConfiguration,HTableDescriptor,HColumnDescriptor}
import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get,Delete,Scan}
import org.apache.hadoop.hbase.util.Bytes

val conf  = new HBaseConfiguration
val admin = new HBaseAdmin(conf)

既存テーブルの情報を取得

hbase shelllist

1
admin.listTables.foreach(println)
{NAME => 'adviewlogs', FAMILIES => [{NAME => 'ad', ...
{NAME => 'users', FAMILIES => [{NAME => 'name', ...

テーブルの新規作成

hbase shellcreate 'songs', 'data'

1
2
3
val schema = new HTableDescriptor("songs")
schema.addFamily(new HColumnDescriptor("data"))
admin.createTable(schema)

行の追加

hbase shellput 'songs', 'row1', 'data:title', '走れ'

1
2
3
4
val songs = new HTable(conf, "songs")
val put   = new Put("row1".getBytes)
put.add("data".getBytes, "title".getBytes, "走れ".getBytes)
songs.put(put)

行の追加(複数のcf)

hbase shellput 'songs', 'row2', 'data', {'title'=>'怪盗少女', 'singer'=>'ももクロ'}

1
2
3
4
5
val songs = new HTable(conf, "songs")
val put   = new Put("row2".getBytes)
put.add("data".getBytes, "title".getBytes, "怪盗少女".getBytes)
put.add("data".getBytes, "singer".getBytes, "ももクロ".getBytes)
songs.put(put)

行の取得 (cf:qualifier指定)

hbase shellget 'songs', 'row2', 'data:title'

1
2
3
4
5
val songs  = new HTable(conf, "songs")
val get    = new Get("row2".getBytes)
val row2   = songs.get(get)
val title  = row2.getValue("data".getBytes, "title".getBytes)
println(Bytes.toString(title))
怪盗少女

行の取得 (cf内全部)

hbase shellget 'songs', 'row2', 'data'

1
2
3
4
5
6
7
8
9
10
11
12
import scala.collection.JavaConversions._     // for entrySet:java.util.Set

val songs  = new HTable(conf, "songs")
val get    = new Get("row2".getBytes)
val row2   = songs.get(get)
val cf     = row2.getFamilyMap("data".getBytes)

for (entry <- cf.entrySet) {
  val key   = Bytes.toString(entry.getKey)
  val value = Bytes.toString(entry.getValue)
  printf("%s: %s\n", key, value)
}
singer: ももクロ
title: 怪盗少女

存在しない場合は getFamilyMapnull (要チェック)

行の削除 (cf指定)

hbase shelldelete 'songs', 'row1', 'data' (shellではqualifierがないと動かない?)

1
2
3
4
val songs  = new HTable(conf, "songs")
val del    = new Delete("row1".getBytes)
del.deleteFamily("data".getBytes)
songs.delete(del)

存在しない場合はnop

行の削除 (1行全体)

hbase shelldelete 'songs', 'row2'みたいなこと (これもshellではできない)

1
2
3
val songs  = new HTable(conf, "songs")
val del    = new Delete("row2".getBytes)
songs.delete(del)

存在しない場合はnop

件数取得

hbase shellcount 'songs'

自分でscanするしかない? org.apache.hadoop.hbase.mapreduce.RowCounter あたりを使う?

行の存在確認

hbase shellにはない

1
2
3
val songs  = new HTable(conf, "songs")
val get    = new Get("row3".getBytes)
songs.exists(get)

範囲取得

検索用データとして以下を hbase shell で追加 (ROW_KEY: 発売日)

1
2
3
4
put 'songs', '20090805', 'data:title', 'ももいろパンチ'
put 'songs', '20100505', 'data:title', '怪盗少女'
put 'songs', '20101110', 'data:title', 'ピンキージョーンズ'
put 'songs', '20110706', 'data:title', 'Z伝説'

2010年に発売された曲を調べる

1
2
3
val scan = new Scan("2010".getBytes, "2011".getBytes)
val iter = songs.getScanner(scan)
println(iter.size)
2
  • Scan(start, end)のendは含まれない (ruby のstart...end)
  • HbaseではRAW_KEYによる文字列ソートが保証されている (RAW_KEYの設計が重要)

テーブル削除

hbase shelldisable 'songs'deletesongs’`

1
2
admin.disableTable("songs".getBytes)
admin.deleteTable("songs".getBytes)

参考

Sbtのインストールメモ

Install

mkdir -p ~/bin
wget -O ~/bin/sbt-launch-0.11.2.jar http://typesafe.artifactoryonline.com/typesafe/ivy-releases/org.scala-tools.sbt/sbt-launch/0.11.2/sbt-launch.jar
echo 'java -Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=384M -jar `dirname $0`/sbt-launch-0.11.2.jar "$@"' > ~/bin/sbt
chmod u+x ~/bin/sbt

Check (zsh)

rehash
mkdir /tmp/aho
cd /tmp/aho
sbt
> console
scala> 1+2
res0: Int = 3

参考

Hbaseを動かしてみる

Hbaseを使ったことがない Ubuntu 11.10(oneiric) ユーザがHbaseを動かすまでのメモ

インストール

正式なパッケージはないのでppaを利用する。 (パッケージはまだ実験中でfeedback歓迎らしい [2012.3.7現在])

sudo apt-get install python-software-properties
sudo add-apt-repository ppa:hadoop-ubuntu/dev
sudo apt-get update
sudo apt-get install hbase

/etc/hosts を見て、loopback IPが127.0.0.1になってるいるかを確認する (内部的に127.0.0.1決め打ちでアクセス?)

127.0.0.1 localhost

/etc/security/limits.conf の最後に以下を追加 (“hbase” は動かすユーザ名を指定)

hbase  -       nofile  32768

/etc/pam.d/common-session の最後に以下を追加 (前述のlimits.confを有効にするための設定)

session required  pam_limits.so

設定

データファイルの保存場所を作成。(defaultは/tmp)

mkdir /var/lib/hbase/data
chown hbase:hbase /var/lib/hbase/data

/etc/hbase/conf/hbase-site.xmlconfiguration の中に以下を追加

1
2
3
4
5
6
<configuration>
  <property>
    <name>hbase.rootdir</name>
    <value>file:////var/lib/hbase/data</value>
  </property>
</configuration>

サービスの管理 (hbaseユーザで実行)

hbase master で管理できる。引数なしでusageが表示

% hbase master
Usage: Master [opts] start|stop
 start  Start Master. If local mode, start Master and RegionServer in same JVM
 stop   Start cluster shutdown; Master signals RegionServer shutdown
 where [opts] are:
   --minServers=<servers>    Minimum RegionServers needed to host user tables.
   --backup                  Master should start in backup mode

サービス起動

startはforegroundで動くので、”> log &” とかがいいのかもしれない。

hbase master start > server.log 2>&1 &

サービス停止

hbase master stop

クライアントから利用

hbase shell を実行するとconsoleが開く。見た目はirbで、中身もirb。helpでヘルプが見える。 一般ユーザが認証なしで接続できる。 (TODO: 認証方法)

% hbase shell
hbase(main):001:0> self.class.ancestors
=> [Object, HBaseConstants, Java, Kernel]
hbase(main):002:0> help
HBase Shell, version 0.92.0, r1231986, Mon Jan 16 13:16:35 UTC 2012
Type 'help "COMMAND"', (e.g. 'help "get"' -- the quotes are necessary) for help on a specific command.
Commands are grouped. Type 'help "COMMAND_GROUP"', (e.g. 'help "general"') for help on a command group.
...

コマンド一覧

テーブル作成 create 't1', 'f1', 'f2',...
テーブル削除 drop 't1'
テーブル有効 enable 't1'
テーブル無効 disable 't1'
テーブル情報 describe 't1'
データ登録  put 't1', 'r1', 'c1', 'value', ts1
データ削除   delete 't1', 'r1', 'c1', ts1
全行表示   scan 't1'
一行表示   get 't1', 'r1'

実行例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
hbase(main):001:0> create 'users', 'name', 'color'
hbase(main):002:0> put 'users', '1', 'name:name', '百田夏菜子'
hbase(main):003:0> put 'users', '1', 'name:yomi', 'ももたかなこ'
hbase(main):004:0> put 'users', '1', 'color:', 'red'
hbase(main):005:0> put 'users', '2', 'name:name', 'hoge'
hbase(main):006:0> get 'users', '1', ['name:yomi', 'color']
COLUMN                          CELL
 color:                         timestamp=1331280696571, value=red
 name:yomi                      timestamp=1331280681186, value=\xE3\x82\x82\xE3 ...
hbase(main):007:0> delete 'users', '2', 'name:name'
hbase(main):008:0> drop 'users'
ERROR: Table users is enabled. Disable it first.
hbase(main):009:0> disable 'users'
hbase(main):010:0> drop 'users'

(TODO: 日本語の表示について調べる)

参考