KafkaConnectでTwitterデータを取り込む

参考

メモ

ここでは、 confluent-hub コマンドでインストールする。

1
$ confluent-hub install jcustenborder/kafka-connect-twitter

以下の設定ファイルを作る。

/etc/kafka/connect-twitter-source.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
name=connector1
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector

# Set these required values
twitter.oauth.accessTokenSecret=hoge
process.deletes=false
filter.keywords=kafka
kafka.status.topic=twitter-status
kafka.delete.topic=twitter-delete
twitter.oauth.consumerSecret=hoge
twitter.oauth.accessToken=hoge
twitter.oauth.consumerKey=hoge

キーのところは、適宜TwitterのDeveloper向けページで生成して記載すること。

スタンドアローンモードで実行する。

1
$ connect-standalone /etc/kafka/connect-standalone.properties /etc/kafka/connect-twitter-source.properties

なお、もし分散モードだったら、以下のようにする。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
curl -H "Content-Type: application/json" -X POST http://localhost:8083/connectors -d '
{
"name": "twitter",
"config": {
"tasks.max":1,
"connector.class":"com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector",
"twitter.oauth.accessTokenSecret":"hoge",
"process.deletes":"false",
"filter.keywords":"kafka",
"kafka.status.topic":"twitter-status",
"kafka.delete.topic":"twitter-delete",
"twitter.oauth.consumerSecret":"hoge",
"twitter.oauth.accessToken":"hoge",
"twitter.oauth.consumerKey":"hoge",
}
}
'

最後に、入力されるメッセージを確認する。

1
$ kafka-console-consumer --bootstrap-server broker:9092 --topic twitter-status | jq .

結果は以下のような形式である。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int64",
"optional": true,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"doc": "Return the created_at",
"field": "CreatedAt"
},

(snip)

"payload": {
"CreatedAt": XXXXXXXXXXXXX,
"Id": XXXXXXXXXXXXXXXXXXX,
"Text": "hoge",
"Source": "<a href=\"http://twitter.com/download/android\" rel=\"nofollow\">Twitter for Android</a>",
"Truncated": false,

(snip)

共有