ZeroMQ 初学 Java Binding验证代码
学习ZeroMQ使用,根据官方文档介绍,写了如下Java验证代码。仅供参考。需要依赖jzmq的jar包和本地库。
1、请求-响应模式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.coderli.zeromq.requestreplay;
import org.zeromq.ZMQ;
import com.coderli.zeromq.JZMQBase;
/**
* ZeroMQ 请求响应模式验证代码 <br>
* 此为服务端
*
* @author OneCoder
* @date 2014年1月13日 下午11:28:47
* @website http://www.coderli.com
*/
public class ReplayServer extends JZMQBase {
public static void main(String[] args) {
// 参数代表使用多少线程,大多数情况下,1个线程已经足够。
ZMQ.Context context = ZMQ. context(1);
// 指定模式为响应模式
ZMQ.Socket socket = context.socket(ZMQ. REP);
socket.bind( LOCAL_ADDRESS); // 绑定服务地址及端口
for (;;) {
System. out.println( "Server start.");
socket.recv();
String str = "Ok, I'm server";
socket.send(str);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.coderli.zeromq.requestreplay;
import org.zeromq.ZMQ;
import com.coderli.zeromq.JZMQBase;
/**
* ZeroMQ 请求响应模式验证代码 <br>
* 此为客户端
*
* @author OneCoder
* @date 2014年1月11日 上午10:34:18
* @website http://www.coderli.com
*/
public class RequestClient extends JZMQBase {
/**
* @param args
* @author OneCoder
* @date 2014年1月11日 上午10:34:18
*/
public static void main(String[] args) {
ZMQ.Context context = ZMQ. context(1);
// 指定模式为请求模式
ZMQ.Socket socket = context.socket(ZMQ. REQ);
// 创建链接
socket.connect( LOCAL_ADDRESS);
int count = 1;
for (;;) {
try {
long time = System. nanoTime();
socket.send( "Hello, currentTime: " + count);
byte[] recs = socket.recv();
long end = System. nanoTime();
System. out.println( new String(recs) + " Cost time: "
+ (end - time));
count++;
Thread. sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
测试结果,单线程请求-相应一次的耗时大概在450us。
2、Publish-subscribe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.coderli.zeromq.pubsub;
import java.util.Random;
import org.zeromq.ZMQ;
import com.coderli.zeromq.JZMQBase;
/**
* ZeroMQ 发布订阅模式Java验证代码 <br>
* 此为发布者
*
* @author OneCoder
* @date 2014年1月14日 上午11:16:03
* @blog http://www.coderli.com
*/
public class Publisher extends JZMQBase {
public static void main(String[] args) throws InterruptedException {
// 参数代表使用多少线程,大多数情况下,1个线程已经足够。
ZMQ.Context context = ZMQ. context(1);
// 指定模式为发布者
ZMQ.Socket socket = context.socket(ZMQ. PUB);
socket.bind( LOCAL_ADDRESS); // 绑定服务地址及端口
for (;;) {
int i = ( int) ( new Random().nextDouble() * 2 + 1);
String s = String. valueOf(i);
long time = System. nanoTime();
socket.send(s + String. valueOf(time));
System. out.println( "发布了新消息,时间:" + time + " 类型:" + s);
Thread. sleep(2000);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.coderli.zeromq.pubsub;
import org.zeromq.ZMQ;
import com.coderli.zeromq.JZMQBase;
/**
* ZeroMQ 发布订阅模式Java验证代码 <br>
* 此为订阅者1号
*
* @author OneCoder
* @date 2014年1月14日 上午11:16:03
* @blog http://www.coderli.com
*/
public class SubscriberOne extends JZMQBase {
public static void main(String[] args) throws InterruptedException {
ZMQ.Context context = ZMQ. context(1);
// 指定模式为请求模式
ZMQ.Socket socket = context.socket(ZMQ. SUB);
// 创建订阅者,必须要过主题过滤器
byte[] filter = "1".getBytes();
socket.subscribe(filter);
socket.connect( LOCAL_ADDRESS);
for (;;) {
byte[] recs = socket.recv();
long receiveTime = System. nanoTime();
String oriMsg = new String(recs);
String msg = new String(recs,1,recs.length-1);
long pubTime = Long. valueOf(msg);
long costTime = receiveTime - pubTime;
System. out.println( "Receive: " + oriMsg + " Cost time: " + costTime);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.coderli.zeromq.pubsub;
import org.zeromq.ZMQ;
import com.coderli.zeromq.JZMQBase;
/**
* ZeroMQ 发布订阅模式Java验证代码 <br>
* 此为订阅者2号
*
* @author OneCoder
* @date 2014年1月14日 上午11:16:03
* @blog http://www.coderli.com
*/
public class SubscriberTwo extends JZMQBase{
public static void main(String[] args) throws InterruptedException {
ZMQ.Context context = ZMQ. context(1);
// 指定模式为请求模式
ZMQ.Socket socket = context.socket(ZMQ. SUB);
// 创建订阅者,必须要过主题过滤器
byte[] filter = "2".getBytes();
socket.subscribe(filter);
socket.connect( LOCAL_ADDRESS);
for (;;) {
byte[] recs = socket.recv();
long receiveTime = System. nanoTime();
String oriMsg = new String(recs);
String msg = new String(recs,1,recs.length-1);
long pubTime = Long. valueOf(msg);
long costTime = receiveTime - pubTime;
System. out.println( "Receive: " + oriMsg + " Cost time: " + costTime);
}
}
}
* 发布者中随机发布开头为1或者2的消息。
* 订阅者中必须有过滤器,从前向后匹配发布者发送的消息,完全匹配则接收消息。这里1/2号订阅者分别过滤消息开头是1/2的数据。
* 如果没有发布者,则订阅者阻塞,直到有发布者发送消息。如果订阅者掉线,消息会丢失。
* 如果要订阅多个filter只需多次调用subscribe方法即可。
* 从发布到订阅收到消息,大约耗时300us。
3、PipeLine模式
想象一下这样的场景,如果需要统计各个机器的日志,我们需要将统计任务分发到各个节点机器上,最后收集统计结果,做一个汇总。PipeLine比较适合于这种场景,他的结构图,如图3所示。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.coderli.zeromq.pipeline;
import org.zeromq.ZMQ;
import com.coderli.zeromq.JZMQBase;
/**
* ZeroMQ Pipeline模式Java验证代码 <br>
* 此为主Pusher
*
* @author OneCoder
* @date 2014年1月14日 上午11:16:03
* @blog http://www.coderli.com
*/
public class MainPusher extends JZMQBase {
public static void main(String[] args) throws InterruptedException {
// 参数代表使用多少线程,大多数情况下,1个线程已经足够。
ZMQ.Context context = ZMQ. context(1);
// 指定模式为Pusher
ZMQ.Socket socket = context.socket(ZMQ. PUSH);
socket.bind( LOCAL_ADDRESS); // 绑定服务地址及端口
for (;;) {
long time = System. nanoTime();
socket.send(String. valueOf(time));
System. out.println( "发布了新消息,时间:" + time);
Thread. sleep(2000);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.coderli.zeromq.pipeline;
import org.zeromq.ZMQ;
import com.coderli.zeromq.JZMQBase;
/**
* ZeroMQ Pipeline模式Java验证代码 <br>
* 此为中转worker
*
* @author OneCoder
* @date 2014年1月14日 上午11:16:03
* @blog http://www.coderli.com
*/
public class WorkerOne extends JZMQBase {
public static void main(String[] args) {
// 指定模式为pull模式
ZMQ.Socket receiver = ZMQ.context(1).socket(ZMQ.PULL);
receiver.connect( LOCAL_ADDRESS);
// 指定模式为push模式
ZMQ.Socket sender = ZMQ.context(1).socket(ZMQ.PUSH);
sender.connect( LOCAL_ADDRESS_PUSHER);
for (;;) {
byte[] recs = receiver.recv();
long receiveTime = System. nanoTime();
String oriMsg = new String(recs);
long pubTime = Long. valueOf(oriMsg);
long costTime = receiveTime - pubTime;
System. out.println( "Receive: " + oriMsg + " Cost time: " + costTime);
sender.send( "1" + oriMsg);
System. out.println( "Send to sinker.");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.coderli.zeromq.pipeline;
import org.zeromq.ZMQ;
import com.coderli.zeromq.JZMQBase;
/**
* ZeroMQ Pipeline模式Java验证代码 <br>
* 此为中转worker
*
* @author OneCoder
* @date 2014年1月14日 上午11:16:03
* @blog http://www.coderli.com
*/
public class WorkerTwo extends JZMQBase {
public static void main(String[] args) {
// 指定模式为pull模式
ZMQ.Socket receiver = ZMQ.context(1).socket(ZMQ.PULL);
receiver. connect(LOCAL_ADDRESS);
// 指定模式为push模式
ZMQ.Socket sender = ZMQ.context(1).socket(ZMQ.PUSH);
sender. connect(LOCAL_ADDRESS_PUSHER);
for (;;) {
byte[] recs = receiver.recv();
long receiveTime = System. nanoTime();
String oriMsg = new String(recs);
long pubTime = Long. valueOf(oriMsg);
long costTime = receiveTime - pubTime;
System. out
.println( "Receive: " + oriMsg + " Cost time: " + costTime);
sender.send( "2" + oriMsg);
System. out.println( "Send to sinker.");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.coderli.zeromq.pipeline;
import org.zeromq.ZMQ;
import com.coderli.zeromq.JZMQBase;
/**
* ZeroMQ Pipeline模式Java验证代码 <br>
* 此为最终sinker
*
* @author OneCoder
* @date 2014年1月14日 上午11:16:03
* @blog http://www.coderli.com
*/
public class Sinker extends JZMQBase {
public static void main(String[] args) {
ZMQ.Context context = ZMQ. context(1);
// 指定模式为pull模式
ZMQ.Socket receiver = context.socket(ZMQ. PULL);
receiver. bind(LOCAL_ADDRESS_PUSHER);
for (;;) {
byte[] recs = receiver.recv();
long receiveTime = System. nanoTime();
String oriMsg = new String(recs);
String msg = new String(recs,1,recs.length-1);
long pubTime = Long. valueOf(msg);
long costTime = receiveTime - pubTime;
System. out.println( "Receive: " + oriMsg + " Cost time: " + costTime);
}
}
}
以上只是一些初级结构的初步使用,对于我来说重点还是研究router模式,实现N对M集群的定向通信。随后会公布研究代码
本文由作者按照 CC BY 4.0 进行授权