Java 中如何模拟真正的同时并发请求?

来源:等你归去来 ,

www.cnblogs.com/yougewe/p/9745198.html

 

有时需要测试一下某个功能的并发性能,又不要想借助于其他工具,索性就自己的开发语言,来一个并发请求就最方便了。

Java 中模拟并发请求,自然是很方便的,只要多开几个线程,发起请求就好了。但是,这种请求,一般会存在启动的先后顺序了,算不得真正的同时并发!怎么样才能做到真正的同时并发呢?是本文想说的点,Java 中提供了闭锁 CountDownLatch, 刚好就用来做这种事就最合适了。

只需要:

  1. 开启n个线程,加一个闭锁,开启所有线程;
  2. 待所有线程都准备好后,按下开启按钮,就可以真正的发起并发请求了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package com.test;
 
 
 
import java.io.BufferedReader;
 
import java.io.IOException;
 
import java.io.InputStream;
 
import java.io.InputStreamReader;
 
import java.io.OutputStream;
 
import java.net.HttpURLConnection;
 
import java.net.MalformedURLException;
 
import java.net.URL;
 
import java.util.concurrent.CountDownLatch;
 
 
 
public class LatchTest {
 
 
 
    public static void main(String[] args) throws InterruptedException {
 
        Runnable taskTemp = new Runnable() {
 
 
 
       // 注意,此处是非线程安全的,留坑
 
            private int iCounter;
 
 
 
            @Override
 
            public void run() {
 
                for(int i = 0; i < 10; i++) {
 
                    // 发起请求
 
//                    HttpClientOp.doGet("https://www.baidu.com/");
 
                    iCounter++;
 
                    System.out.println(System.nanoTime() + " [" + Thread.currentThread().getName() + "] iCounter = " + iCounter);
 
                    try {
 
                        Thread.sleep(100);
 
                    } catch (InterruptedException e) {
 
                        e.printStackTrace();
 
                    }
 
                }
 
            }
 
        };
 
 
 
        LatchTest latchTest = new LatchTest();
 
        latchTest.startTaskAllInOnce(5, taskTemp);
 
    }
 
 
 
    public long startTaskAllInOnce(int threadNums, final Runnable task) throws InterruptedException {
 
        final CountDownLatch startGate = new CountDownLatch(1);
 
        final CountDownLatch endGate = new CountDownLatch(threadNums);
 
        for(int i = 0; i < threadNums; i++) {
 
            Thread t = new Thread() {
 
                public void run() {
 
                    try {
 
                        // 使线程在此等待,当开始门打开时,一起涌入门中
 
                        startGate.await();
 
                        try {
 
                            task.run();
 
                        } finally {
 
                            // 将结束门减1,减到0时,就可以开启结束门了
 
                            endGate.countDown();
 
                        }
 
                    } catch (InterruptedException ie) {
 
                        ie.printStackTrace();
 
                    }
 
                }
 
            };
 
            t.start();
 
        }
 
        long startTime = System.nanoTime();
 
        System.out.println(startTime + " [" + Thread.currentThread() + "] All thread is ready, concurrent going...");
 
        // 因开启门只需一个开关,所以立马就开启开始门
 
        startGate.countDown();
 
        // 等等结束门开启
 
        endGate.await();
 
        long endTime = System.nanoTime();
 
        System.out.println(endTime + " [" + Thread.currentThread() + "] All thread is completed.");
 
        return endTime - startTime;
 
    }
 
}

其执行效果如下图所示:

Java 中如何模拟真正的同时并发请求?

HttpClientOp  工具类,可以使用 成熟的工具包,也可以自己写一个简要的访问方法,参考如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
class HttpClientOp {
 
    public static String doGet(String httpurl) {
 
        HttpURLConnection connection = null;
 
        InputStream is = null;
 
        BufferedReader br = null;
 
        String result = null;// 返回结果字符串
 
        try {
 
            // 创建远程url连接对象
 
            URL url = new URL(httpurl);
 
            // 通过远程url连接对象打开一个连接,强转成httpURLConnection类
 
            connection = (HttpURLConnection) url.openConnection();
 
            // 设置连接方式:get
 
            connection.setRequestMethod("GET");
 
            // 设置连接主机服务器的超时时间:15000毫秒
 
            connection.setConnectTimeout(15000);
 
            // 设置读取远程返回的数据时间:60000毫秒
 
            connection.setReadTimeout(60000);
 
            // 发送请求
 
            connection.connect();
 
            // 通过connection连接,获取输入流
 
            if (connection.getResponseCode() == 200) {
 
                is = connection.getInputStream();
 
                // 封装输入流is,并指定字符集
 
                br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
 
                // 存放数据
 
                StringBuffer sbf = new StringBuffer();
 
                String temp = null;
 
                while ((temp = br.readLine()) != null) {
 
                    sbf.append(temp);
 
                    sbf.append("rn");
 
                }
 
                result = sbf.toString();
 
            }
 
        } catch (MalformedURLException e) {
 
            e.printStackTrace();
 
        } catch (IOException e) {
 
            e.printStackTrace();
 
        } finally {
 
            // 关闭资源
 
            if (null != br) {
 
                try {
 
                    br.close();
 
                } catch (IOException e) {
 
                    e.printStackTrace();
 
                }
 
            }
 
 
 
            if (null != is) {
 
                try {
 
                    is.close();
 
                } catch (IOException e) {
 
                    e.printStackTrace();
 
                }
 
            }
 
 
 
            connection.disconnect();// 关闭远程连接
 
        }
 
 
 
        return result;
 
    }
 
 
 
    public static String doPost(String httpUrl, String param) {
 
 
 
        HttpURLConnection connection = null;
 
        InputStream is = null;
 
        OutputStream os = null;
 
        BufferedReader br = null;
 
        String result = null;
 
        try {
 
            URL url = new URL(httpUrl);
 
            // 通过远程url连接对象打开连接
 
            connection = (HttpURLConnection) url.openConnection();
 
            // 设置连接请求方式
 
            connection.setRequestMethod("POST");
 
            // 设置连接主机服务器超时时间:15000毫秒
 
            connection.setConnectTimeout(15000);
 
            // 设置读取主机服务器返回数据超时时间:60000毫秒
 
            connection.setReadTimeout(60000);
 
 
 
            // 默认值为:false,当向远程服务器传送数据/写数据时,需要设置为true
 
            connection.setDoOutput(true);
 
            // 默认值为:true,当前向远程服务读取数据时,设置为true,该参数可有可无
 
            connection.setDoInput(true);
 
            // 设置传入参数的格式:请求参数应该是 name1=value1&name2=value2 的形式。
 
            connection.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
 
            // 设置鉴权信息:Authorization: Bearer da3efcbf-0845-4fe3-8aba-ee040be542c0
 
            connection.setRequestProperty("Authorization", "Bearer da3efcbf-0845-4fe3-8aba-ee040be542c0");
 
            // 通过连接对象获取一个输出流
 
            os = connection.getOutputStream();
 
            // 通过输出流对象将参数写出去/传输出去,它是通过字节数组写出的
 
            os.write(param.getBytes());
 
            // 通过连接对象获取一个输入流,向远程读取
 
            if (connection.getResponseCode() == 200) {
 
 
 
                is = connection.getInputStream();
 
                // 对输入流对象进行包装:charset根据工作项目组的要求来设置
 
                br = new BufferedReader(new InputStreamReader(is, "UTF-8"));
 
 
 
                StringBuffer sbf = new StringBuffer();
 
                String temp = null;
 
                // 循环遍历一行一行读取数据
 
                while ((temp = br.readLine()) != null) {
 
                    sbf.append(temp);
 
                    sbf.append("rn");
 
                }
 
                result = sbf.toString();
 
            }
 
        } catch (MalformedURLException e) {
 
            e.printStackTrace();
 
        } catch (IOException e) {
 
            e.printStackTrace();
 
        } finally {
 
            // 关闭资源
 
            if (null != br) {
 
                try {
 
                    br.close();
 
                } catch (IOException e) {
 
                    e.printStackTrace();
 
                }
 
            }
 
            if (null != os) {
 
                try {
 
                    os.close();
 
                } catch (IOException e) {
 
                    e.printStackTrace();
 
                }
 
            }
 
            if (null != is) {
 
                try {
 
                    is.close();
 
                } catch (IOException e) {
 
                    e.printStackTrace();
 
                }
 
            }
 
            // 断开与远程地址url的连接
 
            connection.disconnect();
 
        }
 
        return result;
 
    }
 
}

如上,就可以发起真正的并发请求了。

并发请求操作流程示意图如下:

 

Java 中如何模拟真正的同时并发请求?

 

此处设置了一道门,以保证所有线程可以同时生效。但是,此处的同时启动,也只是语言层面的东西,也并非绝对的同时并发。具体的调用还要依赖于CPU个数,线程数及操作系统的线程调度功能等,不过咱们也无需纠结于这些了,重点在于理解原理!

与 CountDownLatch 有类似功能的,还有个工具栅栏 CyclicBarrier, 也是提供一个等待所有线程到达某一点后,再一起开始某个动作,效果一致,不过栅栏的目的确实比较纯粹,就是等待所有线程到达,而前面说的闭锁 CountDownLatch 虽然实现的也是所有线程到达后再开始,但是他的触发点其实是最后那一个开关,所以侧重点是不一样的。

简单看一下栅栏是如何实现真正同时并发呢?示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
// 与 闭锁 结构一致
 
public class LatchTest {
 
 
 
    public static void main(String[] args) throws InterruptedException {
 
 
 
        Runnable taskTemp = new Runnable() {
 
 
 
            private int iCounter;
 
 
 
            @Override
 
            public void run() {
 
                // 发起请求
 
//              HttpClientOp.doGet("https://www.baidu.com/");
 
                iCounter++;
 
                System.out.println(System.nanoTime() + " [" + Thread.currentThread().getName() + "] iCounter = " + iCounter);
 
            }
 
        };
 
 
 
        LatchTest latchTest = new LatchTest();
 
//        latchTest.startTaskAllInOnce(5, taskTemp);
 
        latchTest.startNThreadsByBarrier(5, taskTemp);
 
    }
 
 
 
    public void startNThreadsByBarrier(int threadNums, Runnable finishTask) throws InterruptedException {
 
        // 设置栅栏解除时的动作,比如初始化某些值
 
        CyclicBarrier barrier = new CyclicBarrier(threadNums, finishTask);
 
        // 启动 n 个线程,与栅栏阀值一致,即当线程准备数达到要求时,栅栏刚好开启,从而达到统一控制效果
 
        for (int i = 0; i < threadNums; i++) {
 
            Thread.sleep(100);
 
            new Thread(new CounterTask(barrier)).start();
 
        }
 
        System.out.println(Thread.currentThread().getName() + " out over...");
 
    }
 
}
 
 
 
class CounterTask implements Runnable {
 
 
 
    // 传入栅栏,一般考虑更优雅方式
 
    private CyclicBarrier barrier;
 
 
 
    public CounterTask(final CyclicBarrier barrier) {
 
        this.barrier = barrier;
 
    }
 
 
 
    public void run() {
 
        System.out.println(Thread.currentThread().getName() + " - " + System.currentTimeMillis() + " is ready...");
 
        try {
 
            // 设置栅栏,使在此等待,到达位置的线程达到要求即可开启大门
 
            barrier.await();
 
        } catch (InterruptedException e) {
 
            e.printStackTrace();
 
        } catch (BrokenBarrierException e) {
 
            e.printStackTrace();
 
        }
 
        System.out.println(Thread.currentThread().getName() + " - " + System.currentTimeMillis() + " started...");
 
    }
 
}

其运行结果如下图:

Java 中如何模拟真正的同时并发请求?

 

各有其应用场景吧,关键在于需求。就本文示例的需求来说,个人更愿意用闭锁一点,因为更可控了。但是代码却是多了,所以看你喜欢吧!

发表评论