0%

文件上传是一个很常见的功能,在业务场景中其又可分为单文件上传、分片上传、断点续传、秒传等。

一个小文件上传在一个http连接便可以很快的完成,其无需担心上传失败重新上传的问题。而一个大文件的上传则不能这样,试想一个场景:10G的文件直接上传,如果上传一方的网速很好,服务器的网络带宽很小,那么服务器的带宽全被这个上传连接占用,其他人上传文件则已没有带宽可用;如果在网速较差的环境下上传,快要上传完成的时候网络中断,又得重新上传,一定会抓狂。

Read more »

这里以 ubuntu18.04 LTS 系统为例

查看系统内核版本:

1
uname -a

查看已安装内核信息,确认需要安装的相应包:

1
sudo dpkg --get-selections |grep linux
Read more »

顾名思义,ThreadLocal是为线程提供私有的局部变量。它不同于其他常规的变量,需要使用自身的getset方法来获取和设置值。ThreadLocal的典型应用是在类中被申明为静态变量,用于关联用户ID、事务ID,亦或者其他需要线程独有的属性。

对于ThreadLocal,只要该线程处于活动状态并且ThreadLocal实例是可访问的,每个线程都保留对其本地线程副本的隐式引用。如果线程消失后,其所有副本线程本地实例便会受到垃圾回收(除非其他情况,即线程外存在对这些副本的引用)。

Read more »

rocketmq支持有序的发送消息,有序的消费消息,这里便来看一下如何实现?

实战方面均以RocketMQTemplate形式展现,集成方案详见《springboot中rocketmq的集成与使用》

消息的有序发送方面,我们可以直接使用syncSendOrderly(同步有序发送)和asyncSendOrderly(异步有序发送)两种类型的方法进行发送消息,他们的区别就如命名一样是同步和异步的区别。

Read more »

行为的触发时机

消费者消费偏移位置的持久化是消费客户端的行为,是在client启动的时候设定的一个定时任务,如下所示:

1
2
3
4
5
6
7
8
9
10
11
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
Read more »

rocketmq的client在启动的时候,会通过开启一个定时任务来定期刷新topic信息,这里就来看一下这个刷新的过程。

首先来看一下这个定时任务:

1
2
3
4
5
6
7
8
9
10
11
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);
Read more »

producerrocketmq的作用是消息的生产者,consumerrocketmq的作用是消息的消费者,它的生命周期是跟项目相关的,即是由使用者控制的。而为什么要将这两个角色的启动关闭流程放在一起剖析呢?是因为他们都是MQ的客户端,在启动和关闭的行为上,有着很多共同的地方。接下来便将会来仔细探究其启动和关闭的过程。

Producer

DefaultMQProducer

DefaultMQProducer的启动

producer在启动的时候会做一系列的内部初始化,其启动的源码如下所示:

1
2
3
4
5
6
7
8
9
10
11
public void start() throws MQClientException {
this.setProducerGroup(withNamespace(this.producerGroup));
this.defaultMQProducerImpl.start();
if (null != traceDispatcher) {
try {
traceDispatcher.start(this.getNamesrvAddr(), this.getAccessChannel());
} catch (MQClientException e) {
log.warn("trace dispatcher start failed ", e);
}
}
}
Read more »