OpenResty-4.OpenResty使用Kafka

openresty官方模块

1
https://github.com/openresty/lua-nginx-module#readme

其中没有kafka的模块,可以在google上搜索可以查到lua-resty-kafka

1
https://github.com/doujiang24/lua-resty-kafka

1.安装lua-resty-kafka

1
2
3
4
5
6
wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip
unzip lua-resty-kafka-master.zip -d /opt/nginx/
#拷贝lua-resty-kafka到openresty
mkdir /opt/openresty/lualib/kafka
cp -rf /opt/nginx/lua-resty-kafka-master/lib/resty /opt/openresty/lualib/kafka/

2.安装单机kafka

(1) 下载

1
2
3
cd /opt/soft
wget http://mirror.bit.edu.cn/apache/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz
tar -xvf kafka_2.11-0.9.0.1.tgz

(2) 开启单机zookeeper

1
nohup sh bin/zookeeper-server-start.sh config/zookeeper.properties > ./zk.log 2>&1 &

(3) 启动kafka服务

1
2
3
#在config/servier.properties下修改host.name
#host.name={your_server_ip}
nohup sh bin/kafka-server-start.sh config/server.properties > ./server.log 2>&1 &

(4) 创建测试topic

1
sh bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test1 --partitions 1 --replication-factor 1

3.配置lua脚本

(1) 配置依赖

在server上面配置

1
lua_package_path "/opt/openresty/lualib/kafka/?.lua;;";

(2) log_by_lua_file配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
-- 引入lua所有api
local cjson = require "cjson"
local producer = require "resty.kafka.producer"
-- 定义kafka broker地址,ip需要和kafka的host.name配置一致
local broker_list = {{ host = "10.10.34.51", port = 9092 },}
-- 定义json便于日志数据整理收集
local log_json = {}
log_json["uri"]=ngx.var.uri
log_json["args"]=ngx.var.args
-- 转换json为字符串
local message = cjson.encode(log_json);
--定义kafka异步生产者
local bp = producer:new(broker_list, { producer_type = "async" })
--发送日志消息,send第二个参数key,用于kafka路由控制:
--key为nill(空)时,一段时间向同一partition写入数据
--指定key,按照key的hash写入到对应的partition
local ok, err = bp:send("openresty_test", nil, message)
if not ok then
ngx.log(ngx.ERR, "kafka send err:", err)
return
end

(3)