BlankLin

lazy and boring

0%

背景介绍

1
ClickHouse exception, code: 517, host: xx.xx.xx.xx, port: xxxx; Code: 517, e.displayText() = DB::Exception: Metadata on replica is not up to date with common metadata in Zookeeper. Cannot alter: Bad version

我们在修改表结构(例如alter table drop column xxx)时经常会遇到以上报错,原因副本上的元数据和在zookeeper上的元数据不一致,无法更新,因为版本号不一样。

查找system.replication_queue

  • 表介绍
    链接地址包含了所有ReplicatedMergeTree复制表家族在zookeeper上存储的副本任务队列的相关信息。
  • 列信息

    • database (String) — 数据库名称.
    • table (String) — 表名称.
    • replica_name (String) — zookeeper上副本名称,同表不同副本有不同名字.
    • position (UInt32) — 当前任务队列的位置.
    • node_name (String) — ZooKeeper上节点名称.
    • type (String) — 任务队列的名称,分别是:
      • GET_PART — 从另一个副本拿到part.
      • ATTACH_PART — 加载part, 有可能来自我们自己的副本 (如果是在detached目录上发现). 您可以认为它是带有一些优化的GET_PART,因为他们接近相似.
      • MERGE_PARTS — 合并part.
      • DROP_RANGE — 删除指定范围内的指定分区列表.
      • CLEAR_COLUMN — 注意:从指定分区删除特殊列(已弃用).
      • CLEAR_INDEX — 注意:从指定分区删除指定索引(已弃用).
      • REPLACE_RANGE — 删除一定范围内的part,并用新part替换.
      • MUTATE_PART — 对part应用用一个或多个mutation.
      • ALTER_METADATA — 根据/metadata和/columns路径应用alter修改.
    • create_time (Datetime) — 任务被提交执行后的时间.
    • required_quorum (UInt32) — 等待任务完成并且确认完成的副本数. 这个字段仅跟GET_PARTS任务相关.
    • source_replica (String) — 源副本的名称.
    • new_part_name (String) — 新part的名称.
    • parts_to_merge (Array (String)) — 要更新或者合并的part名称.
    • is_detach (UInt8) — DETACH_PARTS任务是否在任务队列里的标示位.
    • is_currently_executing (UInt8) — 当然任务是否正在执行的标示位.
    • num_tries (UInt32) — 尝试完成任务失败的次数.
    • last_exception (String) — 上次错误发生的详情.
    • last_attempt_time (Datetime) — 任务最后一次尝试的时间
    • num_postponed (UInt32) — 延期任务数.
    • postpone_reason (String) — 任务延期时间.
    • last_postpone_time (Datetime) — 任务上次延期的时间.
    • merge_type (String) — 当前合并的类型. 如果是mutation则为空.
  • 通过该表捞出zookeeper的节点副本信息

    1
    select node_name from system.replication_queue where database = 'xx' and table = 'xx' and is_currently_executing = 1

    system.replcas

system.mutations

处理方式

+

const

friend

友邻函数

extern

vitrual

基类希望其派生类进行覆盖(override)的函数。这种函数,基类通常将其定义为虚函数(加virtual)。当我们使用基类的指针或者引用调用虚函数时,该调用将被动态绑定

用法解释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Base {
public:
virtual void print() {
cout << "==BASE==";
}
}
class Derived: public Base {
public:
void print(){
cout << "==DERIVED=="
}
}
int main() {
Base *pointer = new Derived();
// ==DERIVED==
pointer->print();
}

Base中加了Virtual关键字的函数就是虚拟函数(例如函数print),于是在Base的派生类Derived中就可以通过重写虚拟函数来实现对基类虚拟函数的覆盖。当基类Base的指针point指向派生类Derived的对象时,对pointprint函数的调用实际上是调用了Derivedprint函数而不是Baseprint函数。这是面向对象中的多态性的体现

纯虚函数

纯虚函数声明如下: virtual void funtion1()=0; 纯虚函数一定没有定义,纯虚函数用来规范派生类的行为,即接口。包含纯虚函数的类是抽象类,抽象类不能定义实例,但可以声明指向实现该抽象类的具体类的指针或引用。

注意点

  • 定义一个函数为虚函数,不代表函数为不被实现的函数。
  • 定义一个虚函数是为了允许用基类的指针或引用来调用子类的这个函数。
  • 定义一个函数为纯虚函数,才代表函数没有被实现。
  • 定义纯虚函数是为了实现一个接口,起到一个规范的作用,规范继承这个类的程序员必须实现这个函数。
  • 任何友元(friend)/构造(construct)/static静态函数之外的函数都可以是虚函数。
  • 关键字virtual只能出现在类内部的声明语句之前而不能用于类外部的函数定义
  • 基类定义了virtual,继承类的该函数也具有了virtual属性

static

template

inline

auto

在声明变量时根据变量初始值的类型自动为此变量选择匹配的类型。

用法解释:

1
2
3
auto f = 3.14;  //double
auto s("hello"); //const char*
auto z = new auto(9); //int *

注意点

  • 可以用valatile,pointer(*),reference(&),rvalue reference(&&) 来修饰auto
    1
    2
    3
    4
    auto k = 5;  
    auto* pK = new auto(k);
    auto** ppK = new auto(&k);
    const auto n = 6;
  • 用auto声明的变量必须初始化
  • auto不能与其他类型组合连用
  • 函数和模板参数不能被声明为auto
  • 定义在堆上的变量,使用了auto的表达式必须被初始化
  • 以为auto是一个占位符,并不是一个他自己的类型,因此不能用于类型转换或其他一些操作,如sizeof和typeid
  • 定义在一个auto序列的变量必须始终推导成同一类型
    1
    2
    // 错误,必须是初始化为同一类型
    auto x1 = 5, x2 = 5.0, x3='r';
  • auto不能自动推导成CV-qualifiers (constant & volatile qualifiers)
  • auto会退化成指向数组的指针,除非被声明为引用

template

左值和右职

左值和右值的概念

  • 左值是可以放在赋值号左边可以被赋值的值;左值必须要在内存中有实体;
  • 右值当在赋值号右边取出值赋给其他变量的值;右值可以在内存也可以在CPU寄存器。
  • 一个对象被用作右值时,使用的是它的内容(值),被当作左值时,使用的是它的地址。

引用

  • 引用是C++语法做的优化,引用的本质还是靠指针来实现的。引用相当于变量的别名。
  • 引用可以改变指针的指向,还可以改变指针所指向的值。
  • 引用的基本规则:
    • 声明引用的时候必须初始化,且一旦绑定,不可把引用绑定到其他对象;即引用必须初始化,不能对引用重定义;
    • 对引用的一切操作,就相当于对原对象的操作。

左值引用和右值引用

  • 左值引用:
    • 左值引用的基本语法:type &引用名 = 左值表达式;
  • 右值引用:
    • 右值引用的基本语法type &&引用名 = 右值表达式;
    • 右值引用在企业开发人员在代码优化方面会经常用到。
    • 右值引用的“&&”中间不可以有空格。

operator

operator是C++的关键字,它和运算符一起使用,表示一个运算符函数,理解时应将operator=整体上视为一个函数名。

1、只有C++预定义的操作符才可以被重载;

2、对于内置类型的操作符,它的预定义不能改变,即不能改变操作符原来的功能;

3、重载操作符不能改变他们的操作符优先级;

4、重载操作符不能改变操作数的个数;

5、除了对()操作符外,对其他重载操作符提供缺省实参都是非法的;

explicit

放构造函数前,防止隐式转换,普通构造函数能够被隐式调用,而explicit构造函数只能被显式调用。例子如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Test1 {
public:
Test1(int n) {
num = n;
}
private:
int num;
}

class Test2 {
public:
explicit Test2(int n) {
num = n;
}
private:
int num;
}

int main(){
Test1 t1 = 12; // 隐私调用其构造函数Test1,等同于Test1 t1(12)
Test2 t2 = 100; //编译错误,不能隐式调用其构造函数
Test2 t3(323); //显示调用成功
}

dynamic_cast

将基类的指针或引用安全地转换成派生类的指针或引用,并用派生类的指针或引用调用非虚函数。如果是基类指针或引用调用的是虚函数无需转换就能在运行时调用派生类的虚函数

前提条件:当我们将dynamic_cast用于某种类型的指针或引用时,只有该类型含有虚函数时,才能进行这种转换。否则,编译器会报错。

dynamic_cast运算符的调用形式如下所示:

dynamic_cast(e) //e是指针

dynamic_cast(e) //e是左值

dynamic_cast(e)//e是右值

e能成功转换为type*类型的情况有三种:

1)e的类型是目标type的公有派生类:派生类向基类转换一定会成功。

2)e的类型是目标type的基类,当e是指针指向派生类对象,或者基类引用引用派生类对象时,类型转换才会成功,当e指向基类对象,试图转换为派生类对象时,转换失败。

3)e的类型就是type的类型时,一定会转换成功。

reinterpt_cast

该函数将一个类型的指针转换为另一个类型的指针,这种转换不用修改指针变量值存放格式(不改变指针变量值),只需在编译时重新解释指针的类型就可做到。

1
2
int* a = 1;
double* b = reinterpret_cast<double*>(a)

const_cast

该函数用于去除指针变量的常量属性,将它转换为一个对应指针类型的普通变量。反过来说,也可以将一个非常量的指针变量转换为一个常量指针变量。这种转换是在编译期间作出的类型更改。

1
2
const int* a = 1;
int* pk = const_cast<int*>(a); // 相当于 int* pk = (int*) a

static_cast

该函数主要用于基本类型之间和具有继承关系的类型之间的转换。这种转换一般会更改变量的内部表示方式,因此,static_cast应用于指针类型转换没有太大意义。

主要有如下几种用法:

  • 用于类层次结构中基类和子类之间指针或引用的转换。
  • 进行上行转换(把子类的指针或引用转换成基类表示)是安全的;
  • 进行下行转换(把基类指针或引用转换成子类指针或引用)时,由于没有动态类型检查,所以是不安全的。
  • 用于基本数据类型之间的转换,如把int转换成char,把int转换成enum。这种转换的安全性也要开发人员来保证。
  • 把void指针转换成目标类型的指针(不安全!!)
  • 把任何类型的表达式转换成void类型。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    //基本类型转换
    int i=0;
    double d = static_cast<double>(i); //相当于 double d = (double)i;

    //转换继承类的对象为基类对象
    class Base{};
    class Derived : public Base{};
    Derived d;
    Base b = static_cast<Base>(d); //相当于 Base b = (Base)d;

mutable

mutable的作用有两点:
(1)保持常量对象中大部分数据成员仍然是“只读”的情况下,实现对个别数据成员的修改;
(2)使类的const函数可以修改对象的mutable数据成员。

使用mutable的注意事项:
(1)mutable只能作用于类的非静态和非常量数据成员。
(2)在一个类中,应尽量或者不用mutable,大量使用mutable表示程序设计存在缺陷。

using

    1. 命名空间的使用

      一般是为了代码的冲突,都会用命名空间,

      1
      2
      3
      4
      5
      6
      7
      8
      9
      // 命名空间名称
      namespace android
      class Test {

      }
      // 直接使用该命名空间
      using namespace android;
      // 使用该命名空间的Test类
      using android::Test;
    1. 在子类汇总引用基类的成员

      注意,using只是引用,不参与形参的指定

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      class Base {
      public:
      Base() {}
      virtual ~Base() {}
      void hello(){ std::cout << "hello world" << std::endl; }
      protected:
      int value;
      }
      // 私有继承,无法使用基类的public/protected属性的变量和函数
      class Common: private Base {
      public:
      // 使用using方法是来引用,这样Common类就能直接用了
      using Base::hello;
      using Base:value;

      void test() { std::cout << "value is: " << value << std::endll }
      }
    1. 别名指定

      从可读性上说,typedef 要比 using 好理解,另外typedef是无法使用模版的,而using可以使用

      1
      2
      3
      4
      5
      6
      7
      // 这样之后就可以使用value_type xx 去代表type xx
      using value_type = type;

      template<typename T>
      using Vec = MyVector<T, MyAlloc<T>>;
      //usage
      Vec<int> vec;

const char , char const , char * const的区别

把一个声明从右向左读。( * 读成 pointer to ),C++标准规定,const关键字放在类型或变量名之前等价的。

1
2
3
4
5
6
7
8
9
10
// cp is a const pointer to char
// 定义一个指向字符的指针常数,即const指针
char * const cp;

// p is a pointer to const char
// 定义一个指向字符常数的指针
const char * p;

// 同上因为C++里面没有const*的运算符,所以const只能属于前面的类型。
char const * p;

背景介绍

  • 首先解释下为什么要写入分布式表,而不是MergeTree
    分布式表链接,分布式表引擎不会存储任务数据,但是允许分布式查询可以路由到多台机器,读取都是并行的,每一个读操作,如果有配置的话,远程机器上的表索引都会被使用到,所以我们知道分布式表是不存储数据的,只是数据的搬运工而已,我们正常做法应该是直接写入数据到MergeTree引擎表,然后通过分布式做路由分发查询而已。那么问题来了,什么场景要写入到分布式表呢?
    因为分布式表建表语句支持指定分片索引sharding_key,这个sharding_key可以把通过分布式表写入的数据转发到同一个shard,那么在同一个shard里的数据才能满足排序主键和去重
    1
    2
    3
    4
    5
    6
    7
    CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
    (
    name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1],
    name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2],
    ...
    ) ENGINE = Distributed(cluster, database, table[, sharding_key[, policy_name]])
    [SETTINGS name=value, ...]
  • 可以使用xxHash32函数来对要去重的字段进行哈希计算后路由到同一个分片上
    1
    ENGINE = Distributed('cluster', 'database', 'table_local', xxHash32(logid))

system.metrics

  • 介绍

    地址,该表包含了被直接算计的指标,或者当前值,举例就是同时查询的进程数,或者当前副本延迟时间,这个表一直被实时更新。
    该表包含3个元素,如下

    • metric (String) — 指标名.
    • value (Int64) — 指标值.
    • description (String) — 指标描述.
  • 找出当前每个节点分布式文件写入的总数,找出最高的那个节点

    1
    select * from system.metrics where metric = 'DistributedFilesToInsert'

    clickhouse

system.distribution_queue

  • 介绍

    地址,该表包含了关于本地文件等待传递到shard的队列信息,这些本地文件包含的用异步方式通过写入分布式表创建的新part文件。

该表包含字段如下:

  • database (String) — 数据库名称.

  • table (String) — 表名称.

  • data_path (String) — 本地文件的路径.

  • is_blocked (UInt8) — 标示位,是否传递文件到服务器被锁住.

  • error_count (UInt64) — 错误数.

  • data_files (UInt64) — 一个目录下文件个数.

  • data_compressed_bytes (UInt64) — 单位字节,被压缩的数据大小.

  • broken_data_files (UInt64) — 由于错误而标示损坏的文件数.

  • broken_data_compressed_bytes (UInt64) — 单位字节,所损文件的压缩字节大小.

  • last_exception (String) — 上次发生错误的内容.

  • 去最高的那个节点查找正在写入的队列
    1
    select * from system.distribution_queue where data_files > 0
    clickhouse

    通过data_files可以看到当前在等待写入的文件数,通过data_compressed_bytes可以看到当前等待写入的总文件大小,通过error_count可以看到错误次数,通过last_exception可以看到上次发生错误的内容

若是clickhosue19版本,则使用以下命令查询

  • 查出分布式表存储的路径配置
    1
    2
    3
    cd config.d
    cat storage.xml
    <path>/data1/default/clickhouse-data</path>

    如果不是这样的配置,直接查询 storage_configuration的配置

  • 进入分布式表的存储目录,计算出每个库表的总文件数

    1
    2
    3
    4
    5
    6
    7
    cd /data1/default/clickhouse-data/data
    # find -type f 表示查找出所有类型是文件的格式
    # awk -F ',' '{print $1}' 表示按照,切割后取第一个
    # uniq -c 表示聚合后去重
    # sort -h -r 表示逆序排序
    # awk -F '/default' '{print $1}' 表示按/default切割后取第一条
    find -type f | awk -F ',' '{print $1}' | uniq -c | sort -h -r | awk -F '/default' '{print $1}'

    clickhouse

    文件数目最多的就是写入积压的

  • 根据flush命令获取到写入错误的原因

    1
    SYSTEM FLUSH DISTRIBUTED db.tb

    clickhouse

    根据错误能直接解决就直接去解决,无法解决的话看下面

    解决分布式表写入积压

  • 删除分布式表

    则意味着这个分布式表的所有写入任务都会自动清理,谨慎操作,别把底表删了

    1
    drop table if exists db.tb
  • 清空分布式表数据

    这个方式只是清除掉积压的数据,不用删表,用户通过分布式表来查询时不会报表不存在,但是如果数据量非常大就不要用truncate方式了,因为会导致整个表被锁非常久,应该用alter table xx drop partititon xx方式,一个一个分区删。

    1
    truncate table db.tb

问题背景

最近经常遇到Read timed out报错,具体内容看下面,从字面意思判断我们知道是读超时,那为什么会发生读超时呢?

1
ru.yandex.clickhouse.except.ClickHouseException: ClickHouse exception, code: 159, host: bigdata-clickhouse-xxxx.ys, port: 8023; Read timed out	at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.getException(ClickHouseExceptionSpecifier.java:86)	at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:56)	at ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier.specify(ClickHouseExceptionSpecifier.java:25)	at ru.yandex.clickhouse.ClickHouseStatementImpl.getInputStream(ClickHouseStatementImpl.java:797)	at ru.yandex.clickhouse.ClickHouseStatementImpl.getLastInputStream(ClickHouseStatementImpl.java:691)	at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:340)	at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:324)	at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:319)	at ru.yandex.clickhouse.ClickHouseStatementImpl.executeQuery(ClickHouseStatementImpl.java:314)	at

源码解读

  • 从抛出的堆栈入手,先读ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
     /**
    * Here we expect the ClickHouse error message to be of the following format:
    * "Code: 10, e.displayText() = DB::Exception: ...".
    */
    private static ClickHouseException specify(String clickHouseMessage, Throwable cause, String host, int port) {
    if (Utils.isNullOrEmptyString(clickHouseMessage) && cause != null) {
    return getException(cause, host, port);
    }

    try {
    int code;
    if (clickHouseMessage.startsWith("Poco::Exception. Code: 1000, ")) {
    code = 1000;
    } else {
    // Code: 175, e.displayText() = DB::Exception:
    code = getErrorCode(clickHouseMessage);
    }
    // ошибку в изначальном виде все-таки укажем
    Throwable messageHolder = cause != null ? cause : new Throwable(clickHouseMessage);
    if (code == -1) {
    return getException(messageHolder, host, port);
    }

    return new ClickHouseException(code, messageHolder, host, port);
    } catch (Exception e) {
    log.error("Unsupported ClickHouse error format, please fix ClickHouseExceptionSpecifier, message: {}, error: {}", clickHouseMessage, e.getMessage());
    return new ClickHouseUnknownException(clickHouseMessage, cause, host, port);
    }
    }
    private static ClickHouseException getException(Throwable cause, String host, int port) {
    if (cause instanceof SocketTimeoutException)
    // if we've got SocketTimeoutException, we'll say that the query is not good. This is not the same as SOCKET_TIMEOUT of clickhouse
    // but it actually could be a failing ClickHouse
    {
    return new ClickHouseException(ClickHouseErrorCode.TIMEOUT_EXCEEDED.code, cause, host, port);
    } else if (cause instanceof ConnectTimeoutException || cause instanceof ConnectException)
    // couldn't connect to ClickHouse during connectTimeout
    {
    return new ClickHouseException(ClickHouseErrorCode.NETWORK_ERROR.code, cause, host, port);
    } else {
    return new ClickHouseUnknownException(cause, host, port);
    }
    }
    注意到上文中异常类型是SocketTimeoutException,为啥会是这个异常呢?
  • 继续阅读ru.yandex.clickhouse.ClickHouseStatementImpl
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    private InputStream getInputStream(
    ClickHouseSqlStatement parsedStmt,
    Map<ClickHouseQueryParam, String> additionalClickHouseDBParams,
    List<ClickHouseExternalData> externalData,
    Map<String, String> additionalRequestParams
    ) throws ClickHouseException {
    .............................
    HttpEntity entity = null;
    try {
    uri = followRedirects(uri);
    HttpPost post = new HttpPost(uri);
    post.setEntity(requestEntity);

    if (parsedStmt.isIdemponent()) {
    httpContext.setAttribute("is_idempotent", Boolean.TRUE);
    } else {
    httpContext.removeAttribute("is_idempotent");
    }

    HttpResponse response = client.execute(post, httpContext);
    entity = response.getEntity();
    checkForErrorAndThrow(entity, response);

    InputStream is;
    if (entity.isStreaming()) {
    is = entity.getContent();
    } else {
    FastByteArrayOutputStream baos = new FastByteArrayOutputStream();
    entity.writeTo(baos);
    is = baos.convertToInputStream();
    }

    // retrieve response summary
    if (isQueryParamSet(ClickHouseQueryParam.SEND_PROGRESS_IN_HTTP_HEADERS, additionalClickHouseDBParams, additionalRequestParams)) {
    Header summaryHeader = response.getFirstHeader("X-ClickHouse-Summary");
    currentSummary = summaryHeader != null ? Jackson.getObjectMapper().readValue(summaryHeader.getValue(), ClickHouseResponseSummary.class) : null;
    }

    return is;
    } catch (ClickHouseException e) {
    throw e;
    } catch (Exception e) {
    log.info("Error during connection to {}, reporting failure to data source, message: {}", properties, e.getMessage());
    EntityUtils.consumeQuietly(entity);
    log.info("Error sql: {}", sql);
    throw ClickHouseExceptionSpecifier.specify(e, properties.getHost(), properties.getPort());
    }
    }
    注意到这是建立一个http连接,使用post方式,等到数据传输完成,返回执行结果,如果有任何ClickHouseException则直接抛掉,否则捕获未知异常Exception,格式化成固定文本的错误信息,这个错误信息就是在ClickHouseExceptionSpecifier.specify方法里被处理成Code: 10, e.displayText() = DB::Exception: ...这种结构。
    所以回到上面的结论,就是说这个http连接出现异常,未等到结果,抛出了SocketTimeoutException,那么根据这个异常,我们可以定位到根本原因了
  • 接着看java.net.SocketTimeoutException
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    /**
    * Signals that a timeout has occurred on a socket read or accept.
    *
    * @since 1.4
    */

    public class SocketTimeoutException extends java.io.InterruptedIOException {
    private static final long serialVersionUID = -8846654841826352300L;

    /**
    * Constructs a new SocketTimeoutException with a detail
    * message.
    * @param msg the detail message
    */
    public SocketTimeoutException(String msg) {
    super(msg);
    }

    /**
    * Construct a new SocketTimeoutException with no detailed message.
    */
    public SocketTimeoutException() {}
    }
    这个类非常简短,但是很清晰告诉我们,会抛出SocketTimeoutException异常的原因只能是一个socket读取或者接收时发生了超时
  • 题外话什么是socket
    socket的含义就是两个应用程序通过一个双向的通信连接实现数据的交换,连接的一段就是一个socket,又称为套接字。实现一个socket连接通信至少需要两个套接字,一个运行在服务端(插孔),一个运行在客户端(插头)。套接字用于描述IP地址和端口,是一个通信链的句柄。应用程序通过套接字向网络发出请求或应答网络请求。注意的是套接字既不是程序也不是协议,只是操作系统提供给通信层的一组抽象API接口。
    socket是应用层与TCP/IP协议簇通信的中间抽象层,是一组接口。在设计模式中其实就是门面模式。Socket将复杂的TCP/IP协议簇隐藏在接口后面,对于用户而言,一组接口即可让Socket去组织数据,以符合指定的协议。
    1

回归正题,什么是Read Timed Out

所谓read timed out就是读超时,http连接已创建,客户端等待服务端返回结果,等待时间超过了超时时间,所以客户端连接断开,抛出SocketTimeoutException异常。我们可以模拟出这个过程,代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class ReadTimedOutTest {

public static void main(String[] args) {
try {
ServerSocket serverSocket = new ServerSocket(8888, 200);
Thread.sleep(66666666);
} catch (Exception e) {
e.printStackTrace();
}
}

@Test
public void tetReadTimedOut() {
Socket socket = new Socket();
long starTime = 0;
try {
socket.connect(new InetSocketAddress("127.0.0.1", 8888), 10000);
System.out.println("socket连接成功。。。。");
socket.setSoTimeout(2000);
starTime = System.currentTimeMillis();
int read = socket.getInputStream().read();
} catch (Exception e) {
e.printStackTrace();
} finally {
long endTime = System.currentTimeMillis();
System.out.println("执行时间:"+(endTime - starTime));
}
}
}

报错信息如下截图:
2
意思是说已连接服务端的8888端口,握手是正常的,然后开始传输数据,因为限制了服务端Thread.sleep,让服务端无法给客户端传输数据

解决方法

clickhouse-jdbc默认给SOCKET_TIMEOUT设置的时间是30s,由于服务端不知道何时能返回结果(此时间受settings.max_execution_time影响),所以我们最好给jdbc设置一个socket_timeout=max_execution_time+10s,防止服务端还在处理,而客户端已经超时断开连接。
3
下面这个方法,主要是解析jdbc:clickhouse://127.0.0.1:8023?socket_timeout=500000&connection_timeout=500000

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static Properties parseUriQueryPart(String query, Properties defaults) {
if (query == null) {
return defaults;
}
Properties urlProps = new Properties(defaults);
String queryKeyValues[] = query.split("&");
for (String keyValue : queryKeyValues) {
String keyValueTokens[] = keyValue.split("=");
if (keyValueTokens.length == 2) {
urlProps.put(keyValueTokens[0], keyValueTokens[1]);
} else {
logger.warn("don't know how to handle parameter pair: {}", keyValue);
}
}
return urlProps;
}

4
5

clickhouse-jdbc执行过程

实例化ClickHouseDataSource对象

1
ClickHouseDataSource dataSource = new ClickHouseDataSource(jdbc, properties);

该构造函数主要是将jdbc:clickhouse//ip:port/database?key1=value1&key2=value2这个url进行解析,若涉及clickhouse默认参数,则会覆盖clickhouse默认配置,返回ClickHousePropertiesd对象
6

获取ClickHouseConnectionImpl连接

1
ClickHouseConnectionImpl connection = (ClickHouseConnectionImpl) dataSource.getConnection();
  • 1、实例化ClickHouseConnectionImpl对象

    1
    ClickHouseConnectionImpl connection = new ClickHouseConnectionImpl(url, properties);

    7
    8
    这里主要是干2件事,1、首先要实例化出ClickHouseHttpClientBuilder客户端对象构造器,这个构造器会把我们在jdbc:clickhouse//ip:port/database?key1=value1&key2=value2这个url里的key1=value1&key2=value2传入给CloseableHttpClient类型的httpclient,2、再去初始化连接,获取clickhouse服务端的时区和版本,所以我们每次连接clickhouse-jdbc,都会看到发出一条sqlselect timezone(), version()

  • 2、把连接存到Collections.synchronizedMap线程安全的map

    1
    registerConnection(connection);
  • 3、返回连接对象

执行sql过程

1
connection.createStatement().executeQuery(sql);
  • 先初始化ClickHouseStatementImpl对象

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public ClickHouseStatement createStatement(int resultSetType) throws SQLException {
    return LogProxy.wrap(
    ClickHouseStatement.class,
    new ClickHouseStatementImpl(
    httpclient,
    this,
    properties,
    resultSetType));
    }
  • 再执行sql
    1、方法getLastInputStream,支持multi-query用法,多条sql去查
    9
    2、调用的getInputStream方法其实就是构造http的post请求,去执行和返回结果,出错则抛出异常
    10
    3、获取最后的执行结果
    11
    4、返回最后sql的结果集ResultSet,select查询类按配置取到最后结果,其他为null

总结

  • Read timed out表示已经连接成功(即三次握手已经完成),但是服务器没有及时返回数据(没有在设定的时间内返回数据),导致读超时。
  • javalinux中的 Read timed out 并不是通过C函数setSockOpt(SO_RCVTIMEO)来设置的,而是通过select(s, timeout)来实现定时器,并抛出JNI异常来控制的
  • java socket读超时的设置是在read()方法被调用的时候传入的,所以只要在read()调用之前设置即可

背景

SpringBootschedule模块可以支持定时脚本,原理其实就是SchedulingTaskExecutor类,它实现了java.util.concurrent.Executor接口,这个接口主要是定义了线程的执行,例如我们日常常用的线程池执行器ThreadPoolExecutor类就是实现了Executor接口。此文重点不是介绍SpringBootschedule模块,所以具体实现逻辑及源码部分解析,在此略过。但问题是schedule模块不支持分布式部署,而我们当前的业务需要部署在多个节点上,为了实现多个节点上在某个时刻只执行某个定时脚本,其他节点不重复执行,我们调研了MYSQL的锁,用以实现分布式锁场景。

mysql锁

乐观锁

  • 什么是乐观锁

    用数据版本(Version)记录机制实现,这是乐观锁最常用的一种实现方式。何谓数据版本?即为数据增加一个版本标识,一般是通过为数据库表增加一个数字类型的 “version” 字段来实现。当读取数据时,将version字段的值一同读出,数据每更新一次,对此version值加1。当我们提交更新的时候,判断数据库表对应记录的当前版本信息与第一次取出来的version值进行比对,如果数据库表当前版本号与第一次取出来的version值相等,则予以更新,否则认为是过期数据。

  • 实现过程
    假设表结构如下
    1
    2
    3
    4
    5
    6
    7
    CREATE TABLE `lock` (
    `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
    `name` varchar(255) NOT NULL DEFAULT '0' COMMENT '锁名称',
    `status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '锁状态:0-空闲,1-运行',
    `version` bigint(20) NOT NULL DEFAULT '0' COMMENT '版本',
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
    查询方式:
    1
    select id,name,version from lock where id = #{id}
    加锁更新方式:
    1
    update lock set status = 1,version=version+1 where id = #{id} and version = #{version}
    释放锁更新方式:
    1
    update lock set status = 0 where id = #{id} and status = 1

    悲观锁

  • 什么是悲观锁

    与乐观锁相对应的就是悲观锁了。悲观锁就是在操作数据时,认为此操作会出现数据冲突,所以在进行每次操作时都要通过获取锁才能进行对相同数据的操作,这点跟java中的synchronized很相似,所以悲观锁需要耗费较多的时间。另外与乐观锁相对应的,悲观锁是由数据库自己实现了的,要用的时候,我们直接调用数据库的相关语句就可以了。
    说到这里,由悲观锁涉及到的另外两个锁概念就出来了,它们就是共享锁与排它锁。共享锁和排它锁是悲观锁的不同的实现,它俩都属于悲观锁的范畴。

  • 什么是共享锁

    共享锁又称读锁 (read lock),是读取操作创建的锁。其他用户可以并发读取数据,但任何事务都不能对数据进行修改(获取数据上的排他锁),直到已释放所有共享锁。当如果事务对读锁进行修改操作,很可能会造成死锁。

在查询语句后面增加 LOCK IN SHARE MODEMysql会对查询结果中的每行都加共享锁,当没有其他线程对查询结果集中的任何一行使用排他锁时,可以成功申请共享锁,否则会被阻塞。 其他线程也可以读取使用了共享锁的表,而且这些线程读取的是同一个版本的数据。
加上共享锁后,对于updateinsertdelete语句会自动加排它锁。

举例说明

1
2
3
4
5
6
# 在A窗口输入
select * from lock where id = 1 lock in shard mode
# 在B窗口输入
update lock set version = version + 1 where id = 1
# B窗口报错
[Err] 1205 - Lock wait timeout exceeded; try restarting transaction

  • 什么是排它锁

    排他锁 exclusive lock(也叫writer lock)又称写锁。
    若某个事物对某一行加上了排他锁,只能这个事务对其进行读写,在此事务结束之前,其他事务不能对其进行加任何锁,其他进程可以读取,不能进行写操作,需等待其释放。排它锁是悲观锁的一种实现,在上面悲观锁也介绍过。
    若事务 1 对数据对象A加上X锁,事务 1 可以读A也可以修改A,其他事务不能再对A加任何锁,直到事物 1 释放A上的锁。这保证了其他事务在事物 1 释放A上的锁之前不能再读取和修改A。排它锁会阻塞所有的排它锁和共享锁

举例说明

1
2
3
4
5
6
7
8
9
10
# 要使用排他锁,我们必须关闭mysql数据库的自动提交属性,因为MySQL默认使用autocommit模式,也就是说,当你执行一个更新操作后,MySQL会立刻将结果进行提交
# 在A窗口输入
set autocommit = 0;
begin;
select * from lock where id = 1 for update;
update lock set version = version + 1 where id = 1;
commit;

# 在B窗口输入,会看到一直在等待中,直到A窗口释放锁,B窗口才能获取结果
select * from lock where id = 1 for update;

行锁

InnoDB的行锁是针对索引加的锁,不是针对记录加的锁。并且该索引不能失效,否则都会从行锁升级为表锁。
行锁的劣势:开销大;加锁慢;会出现死锁
行锁的优势:锁的粒度小,发生锁冲突的概率低;处理并发的能力强
加锁的方式:自动加锁。对于UPDATEDELETEINSERT语句,InnoDB会自动给涉及数据集加排他锁;对于普通SELECT语句,InnoDB不会加任何锁;当然我们也可以显示的加锁

间隙锁

当我们用范围条件而不是相等条件检索数据,并请求共享或排他锁时,InnoDB会给符合条件的已有数据记录的索引项加锁;对于键值在条件范围内但并不存在的记录,叫做“间隙(GAP)”,InnoDB也会对这个“间隙”加锁,这种锁机制就是所谓的间隙锁(Next-Key锁)
例如

1
2
3
4
# 是一个范围条件的检索,InnoDB不仅会对符合条件的empid值为101的记录加锁,也会对empid大于101(这些记录并不存在)的“间隙”加锁
# InnoDB使用间隙锁的目的,一方面是为了防止幻读,以满足相关隔离级别的要求
# 指同一个事务内多次查询返回的结果集不一样。比如同一个事务 A 第一次查询时候有 n 条记录,但是第二次同等条件下查询却有 n+1 条记录,这就好像产生了幻觉。发生幻读的原因也是另外一个事务新增或者删除或者修改了第一个事务结果集里面的数据,同一个记录的数据内容被修改了,所有数据行的记录就变多或者变少了
Select * from emp where empid > 100 for update;

表锁

Innodb引擎中既支持行锁也支持表锁,那么什么时候会锁住整张表,什么时候只锁住一行呢? 只有通过索引条件检索数据,InnoDB才使用行级锁,否则,InnoDB将使用表锁,而检索条件是unique keyprimary key时,一定会是行锁,而检索条件是index时,有可能是行锁 ,也有可能是表锁,取决于当“值重复率”低时,甚至接近主键或者唯一索引的效果,“普通索引”依然是行锁;当“值重复率”高时,MySQL 不会把这个“普通索引”当做索引,即造成了一个没有索引的 SQL,此时引发表锁

死锁

死锁(Deadlock)是指两个或两个以上的进程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程。由于资源占用是互斥的,当某个进程提出申请资源后,使得有关进程在无外力协助下,永远分配不到必需的资源而无法继续运行,这就产生了一种特殊现象死锁。
解除正在死锁的状态有两种方法:
第一种:

1
2
3
4
5
6
# 查询是否锁表
show OPEN TABLES where In_use > 0;
# 查询进程(如果您有SUPER权限,您可以看到所有线程。否则,您只能看到您自己的线程)
show processlist
# 杀死进程id(就是上面命令的id列)
kill id

第二种:

1
2
3
4
5
6
7
8
# 查看当前的事务
SELECT * FROM INFORMATION_SCHEMA.INNODB_TRX;
# 查看当前锁定的事务
SELECT * FROM INFORMATION_SCHEMA.INNODB_LOCKS;
# 查看当前等锁的事务
SELECT * FROM INFORMATION_SCHEMA.INNODB_LOCK_WAITS;
# 杀死进程
kill 进程ID

MYISAM引擎 和 INNODB引擎的区别

MYISAM 读锁

读锁 影响其他进程对该表进行写操作,但不影响其他进程对该表进行读操作

1
2
3
lock myisam_table read;
select * from myisam_table where id = 1;
UNLOCK TABLES;

MYISAM 写锁

写锁 影响其他进程对该表进行读和写操作

1
2
3
lock myisam_table write;
select * from myisam_table where id = 1;
UNLOCK TABLES;

在自动加锁的情况下也基本如此,MyISAM 总是一次获得 SQL 语句所需要的全部锁。这也正是 MyISAM 表不会出现死锁(Deadlock Free)的原因

  • InnoDB支持事务(transaction);MyISAM不支持事务
  • Innodb 默认采用行锁, MyISAM 是默认采用表锁。加锁可以保证事务的一致性,可谓是有人(锁)的地方,就有江湖(事务)
  • MyISAM不适合高并发(MyISAM 在执行查询语句(SELECT)前,会自动给涉及的所有表加读锁,在执行更新操作 (UPDATE、DELETE、INSERT 等)前,会自动给涉及的表加写锁)

    MyISAM存储引擎有一个系统变量concurrent_insert,专门用以控制其并发插入的行为,其值分别可以为0、1或2。

    1
    2
    3
    当concurrent_insert设置为0时,不允许并发插入。
    当concurrent_insert设置为1时,如果MyISAM表中没有空洞(即表的中间没有被删除的 行),MyISAM允许在一个进程读表的同时,另一个进程从表尾插入记录。这也是MySQL 的默认设置。
    当concurrent_insert设置为2时,无论MyISAM表中有没有空洞,都允许在表尾并发插入记录

解决方法

我们采用乐观锁 来处理这次的定时任务多节点执行时分布式锁方案

  • 表结构设计

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    CREATE TABLE `job_lock` (
    `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
    `name` varchar(255) NOT NULL DEFAULT '0' COMMENT 'job名称',
    `timeout` bigint(20) NOT NULL DEFAULT '0' COMMENT '任务执行超时间隔,毫秒',
    `status` tinyint(4) NOT NULL DEFAULT '0' COMMENT 'job状态:0-空闲,1-运行',
    `description` varchar(255) NOT NULL DEFAULT '' COMMENT 'job描述',
    `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
    `gmt_update` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
    `version` bigint(20) NOT NULL DEFAULT '0' COMMENT '版本',
    PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
  • 加锁方法

    1
    2
    3
    4
    5
    6
    7
    <update id="requireLock" parameterType="java.util.Map">
    <![CDATA[
    update job_lock
    set status = 1, version=version + 1
    where id = #{id} and version =#{version} and status = 0
    ]]>
    </update>
  • 解锁方法

    1
    2
    3
    4
    5
    6
    7
    <update id="releaseLock" parameterType="java.util.Map">
    <![CDATA[
    update job_lock
    set status = 0
    where id = #{id} and status = 1
    ]]>
    </update>
  • 尝试加锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    public JobLockDO tryLock(String name) {
    if (ValidateUtils.isNull(name)){
    return null;
    }
    JobLockDO jobLocksDO = getJob(name);
    if (ValidateUtils.isNull(jobLocksDO)) {
    return null;
    }
    // 任务一直在运行中,可能是服务重启等异常情况,造成锁状态一直未更新
    if (jobLocksDO.getStatus().equals(Constant.JOB_LOCK_RUNNING)) {
    // 先判断运行是否超时,未超时,则不处理
    if (System.currentTimeMillis() - jobLocksDO.getGmtUpdate().getTime() <= jobLocksDO.getTimeout()) {
    return null;
    } else {
    // 已超时,更新任务锁状态(释放锁)
    releaseLock(jobLocksDO.getId());
    // 重新加锁
    requireLock(jobLocksDO.getId(), jobLocksDO.getVersion());
    // 返回任务锁
    return jobLocksDO;
    }
    }
    try {
    // 加锁
    if (!requireLock(jobLocksDO.getId(), jobLocksDO.getVersion())) {
    return null;
    }
    return jobLocksDO;
    } catch (Exception e) {
    LOGGER.error("require lock by name:{} fail.", name, e);
    }
    return null;
    }

背景介绍

大数据架构olap团队对clickhouse的使用是从2019年5月开始,最开始我们只在国内永顺机房落地了4台机器做共享集群,就是最开始的cluster01集群,4台机器做2个shard,每个shard2个replica,而用户对集群的访问只是通过一个vip做简单的负载均衡,关于权限这一块,我们在当时只做到了库级别的管控,简单说就是通过同步dpp_v3.hadoop_accountdpp_v3.hive_database库去更新user.xml文件,如下图所示,我们并未对租户资源做任何隔离,虽然最开始我们就是使用的多租户模型,但每个租户可以有权限去操作自己名下的所有库,包括读和写,且无限制的进行大量耗资源(cpumemory)写入和查询后就会影响到cluster01集群下其他的租户,因为cpumemory是固定的,而租户的分摊并未做限制。
avatar

clickhouse配置详解

config.xml文件介绍

clickhouse的启动方式是通过以下命令处理的,就让 ClickHouse 按照config.xml配置文件运行,同时 ClickHouse 监听配置文件,如有变化,不需要重启就能按新的配置运行。具体介绍可以参考链接

1
./bin/clickhouse-server --config=./conf/config.xml

我们在生产环境下config.xml文件主要是如下内容:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<?xml version="1.0"?> 
<yandex>
<!-- 日志配置 -->
<logger>
<level>trace</level>
<log>/tmp/log/clickhouse-server.log</log>
<errorlog>/tmp/log/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
</logger>
<!-- 开启查询和写入相关日志配置 -->
<query_log>
<database>system</database>
<table>query_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>1000</flush_interval_milliseconds>
</query_log>
<!-- tcp端口 -->
<tcp_port>9000</tcp_port>
<!-- 运行所有ip访问 -->
<listen_host>0.0.0.0</listen_host>
<!-- 最大连接数 -->
<max_connections>4096</max_connections>
<!-- 连接超时时间 -->
<keep_alive_timeout>90</keep_alive_timeout>
<!-- 最大并发查询数 -->
<max_concurrent_queries>1000</max_concurrent_queries>
<!-- 用户配置文件 -->
<users_config>users.xml</users_config>
<!-- 默认配置名称 -->
<default_profile>default</default_profile>
<!-- 默认数据库名称 -->
<default_database>default</default_database>
<!-- zookeeper配置 -->
<zookeeper incl="zookeeper-servers" optional="true" />
<!-- 宏配置 -->
<macros incl="macros" optional="true" />
<!-- 权限相关的sql存储路径 -->
<access_control_path>/var/lib/clickhouse/access</access_control_path>
</yandex>

我们应该注意到 <users_config>users.xml</users_config> 绑定了 config.xml 当前目录的 users.xml,而 users.xml就是我们最初版本的用户配置文件

users.xml文件介绍

ClickHouse支持基于 RBAC(基于角色的访问控制权限)方法的访问控制管理。作为一个分析类型(OLAP)的数据库系统,相对于MySQL数据库在用户管理方面有很大不同,clickhouse支持使用两种方式配置访问实体:

  • 通过sql直接设置,这也是官方推荐的,但是需要至少一个用户帐户启用SQL驱动的访问控制和帐户管理,这需要使用第二种方式设置access_management
  • 通过配置文件users.xml,默认位置在/etc/clickhouse-server目录下,ClickHouse使用它来定义用户相关的配置项。

    注意,您不能同时通过两种配置方法来管理同一访问实体。(You can’t manage the same access entity by both configuration methods simultaneously.)
    users.xml有三大块进行说明,分别为:profilesquotasusers,主要配置如下所示:

  • profiles介绍
    官方链接
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    <!-- profiles相当于role角色配置 -->
    <profiles>
    <!-- 角色名称,可配置多个 -->
    <default>
    <!-- 单个服务器 最大可使用内存 -->
    <max_memory_usage>10G</max_memory_usage>
    <!-- 单个服务器 用户查询最大可使用内存 -->
    <max_memory_usage_for_user>10G</max_memory_usage_for_user>
    <!-- 所有查询可使用的最大内存 -->
    <max_memory_usage_for_all_queries>10G</max_memory_usage_for_all_queries>
    <!-- 最大查询长度 -->
    <max_query_size>1073741824</max_query_size>
    <!-- DDL查询:CREATE,ALTER,RENAME,ATTACH,DETACH,DROP TRUNCATE,0:禁止,1:允许 -->
    <allow_ddl>0</allow_ddl>
    </default>
    <readonly>
    <!-- 只读角色,枚举0:允许所有查询,1:只允许读数据的查询,3:运行读取数据和更改配置 -->
    <readonly>2</readonly>
    </readonly>
    </profiles>
    • quotas介绍
      官方链接,截取部分代码实现细节如下,从源码中我们能看出,每次查询的时候,都去检查(checkExceeded())是否超过配额
      avatar,每个 interval 都有多种资源(resource_type), 比如 `<query>1</query> 是一种 type, 检查最大库存 max,检查已经使用的配额 used, 如果 used > max, 则报错。
      avatar
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      <!-- 配额,限制用户一段时间内的资源使用,即对一段时间内运行的一组查询施加限制,而不是限制单个查询。 -->
      <quotas>
      <!-- 配额名称 -->
      <default>
      <!-- 时间间隔 -->
      <interval>
      <!-- 时间周期 以秒为单位 -->
      <duration>10</duration>
      <!-- 10 秒内只能查询一次 -->
      <queries>2</queries>
      <errors>0</errors>
      <!-- 时间周期内允许返回的行数,0表示不限制 -->
      <result_rows>0</result_rows>
      <!-- 时间周期内运行读取的行数,0表示不限制 -->
      <read_rows>0</read_rows>
      <!-- 时间周期内查询的可执行时间,0表示不限制 -->
      <execution_time>0</execution_time>
      </interval>
      </default>
      </quotas>
  • users介绍
    官方链接
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    <!-- 用户配置,定义一个新用户,必须包含以下几项属性:用户名、密码、访问ip、数据库、表等等。它还可以应用上面的profile、quota -->
    <users>
    <!-- 用户名 -->
    <default>
    <!-- 此设置为用户启用或禁用SQL驱动的访问控制和帐户管理。可能的值:0-禁用。1-启用。默认为0 -->
    <access_management>1</access_management>
    <!-- 密码 -->
    <password>password</password>
    <!-- 用户可以从中连接到ClickHouse服务器的网络列表 -->
    <networks>
    <!-- 要为来自任何网络的用户打开访问权限 -->
    <ip>::/0</ip>
    </networks>
    <!-- 指定用户的角色 -->
    <profile>default</profile>
    <!-- 指定用户的配额 -->
    <quota>default</quota>
    </default>
    </users>

    常见的权限报错

  • 超过quota限制
    1
    Code: 201. DB::Exception: Received from localhost:9000. DB::Exception: Quota for user `default` for 10s has been exceeded: queries = 4/3. Interval will end at 2020-04-02 11:29:40. Name of quota template: `default`.
    如果在至少一个时间间隔内超过了限制,则将引发一个异常,并显示一条文本:是对于哪一个间隔的,何时新间隔可以开始(何时可以再次发送查询)。
  • 超过profile限制
    1
    2
    Code: 452, e.displayText() = DB::Exception: Setting max_memory_usage should not be greater than 20000000000.
    Code: 452, e.displayText() = DB::Exception: Setting max_memory_usage should not be less than 5000000000.
    查询超过了最大使用内存
  • user限制
    1
    Code: 516, e.displayText() = DB::Exception: prod_dlap_manager: Authentication failed: password is incorrect or there is no user with such name (version 206.1.1)
    用户不存在,或者用户的密码错误

    改进方案

    经过对clickhouse的权限相关了解之后,我们在2021年8月进行了一次权限升级方案改造,通过default用户创建出超级管理员super_admin及普通管理员admin(如果在user.xml里定义了super_admin用户,之后就无法修改,若修改则直接报错Cannot update user admin in users.xml because this storage is readonly),以超级管理员的身份给超级租户和普通租户赋权,可以通过SQL的方式进行权限的CRUD,以达到动态分配集群资源的目的。
    avatar
    经过拆分后,我们单独自研了DlapManager组件,用以管控Clickhouse集群读写节点、zookeeper组件、CHProxy组件(采用开源工具快速接入以实现读写ck角色的区分,达到数据写入均衡查询均衡的 目的,链接),而CHProxy组件需要的集群读写资源则通过DlapManager角色进行实时更新和生产,而外部平台业务方直接调用DlapManager组件对外开放的api进行库表的所有DDL操作,目前我们的所有普通租户均是用户通过数据梦工厂的创建项目开通实时权限后,我们会自动创建一个租户在Clickhouse这边,当该租户进行库表DDL后,DlapManager组件就会生成相应的DDL任务队列,队列以异步线程方式实时发往对应的集群,以下的DlapManager组件的系统架构设计图:
    avatar

    赋权过程

    按照官方推荐的方式进行赋权过程,以达到不同租户使用相应的配额,实现集群内的资源隔离,保障集群的稳定性。
  • 创建超级租户
    1
    2
    3
    CREATE USER super_admin; 
    GRANT ALL ON *.* TO super_admin WITH GRANT OPTION;
    CREATE USER admin;
  • 创建profiles设置
    1
    CREATE SETTINGS PROFILE IF NOT EXISTS didi_profile SETTINGS readonly = 2 READONLY
  • 创建quota配额
    1
    2
    3
    CREATE QUOTA IF NOT EXISTS didi_quota 
    FOR INTERVAL 10 second
    MAX queries 1
  • 创建角色
    1
    CREATE ROLE IF NOT EXISTS didi_role
  • 赋予 Role 权限
    1
    2
    # 允许didi_role这个角色可以访问库名叫db的所有表的查询权限
    GRANT SELECT ON db.* TO didi_role
  • 创建一个回收角色,用以回收不使用的profile、quota
    1
    REATE ROLE IF NOT EXISTS gc_role
  • Role 绑定 Profile, Quota
    1
    2
    ALTER SETTINGS PROFILE didi_profile TO didi_role;
    ALTER QUOTA didi_quota TO didi_role;
  • 应用到租户
    1
    GRANT didi_role TO super_admin
  • 验证租户角色
    1
    SELECT * FROM system.role_grants WHERE user_name LIKE 'admin'
  • 修改用户的quota(当用户的读/写超过了限额后需要给用户扩容)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    # 先创建新quota
    CREATE QUOTA IF NOT EXISTS new_quota FOR INTERVAL 5 second MAX queries 1;
    # 将原先的didi_quota绑定到gc_role
    ALTER QUOTA didi_quota TO gc_role;
    # 绑定新quota
    ALTER QUOTA new_quota TO didi_role;
    # 刷新admin的角色
    revoke didi_role from admin;
    grant didi_role to admin;
    # double check检查
    SELECT name, apply_to_list FROM system.quotas WHERE name LIKE 'new_quota'
  • 修改用户的profile(当用户的读/写超过了限额后需要给用户扩容)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    # 先创建新profile
    CREATE SETTINGS PROFILE IF NOT EXISTS new_profile SETTINGS readonly = 0 READONLY;
    # 将原先的didi_profile绑定到gc_role
    ALTER SETTINGS PROFILE didi_profile TO gc_role
    # 绑定新profile
    ALTER SETTINGS PROFILE new_profile TO didi_role;
    # 刷新admin的角色
    revoke didi_role from admin;
    grant didi_role to admin;
    # double check检查
    SELECT name, apply_to_list FROM system.settings_profiles WHERE name LIKE 'new_quota'

    权限持久化

  • 存放目录配置
    根据官方文档介绍,通过sql进行的权限相关配置,可以通过config.xml里的access_control_path属性进行路径配置,若未配置则默认是/var/lib/clickhouse/access/目录,如下图所示,当clickhouse重启后,都会先从access_control_path配置的目录里根据sql恢复所有权限,如果这些文件被删除,则重启clickhouse后之前通过sql创建的这些setting都会消失,所以注意文件的备份,目前DlapManager组件是使用mysql数据存储引擎单独存储了这份RBAC数据,再发送给clickhouse进行权限相关的CRUD,这个方式也相当于是clickhouse的备份地址,如果clickhouse重启后无法刷新这些权限,则仍然可以通过DlapManager组件写的脚本工具去重新生产出新的sqlclickhouse使用。
    avatar
  • users.list介绍
    每创建一个用户,则会在users.list文件里记录到用户名对应的一串uuid,可通过uuid找到创建该用户的sql语句,如下图:
    avatar
  • uuid.sql介绍
    包含了创建用户(create user)的sql,赋权(GRANT)的sql等
    avatar
  • quotas.list介绍
    每创建一个quota,都会在quotas.list文件记录quota名称及对应的一个uuid,可通过uuid找到创建该quotasql语句
  • roles.list介绍
    每创建一个role,都会在roles.list文件记录quota名称及对应的一个uuid,可通过uuid找到创建该rolesql语句
  • row_policies.list介绍
    每创建一个row_policy,都会在row_policies.list文件记录quota名称及对应的一个uuid,可通过uuid找到创建该row_policysql语句
  • settings_profiles.list介绍
    每创建一个profile,都会在settings_profiles.list文件记录quota名称及对应的一个uuid,可通过uuid找到创建该profilesql语句

    结论

    本文从clickhouse的用户资源角度来简单介绍了clickhouse的相关配置及如何使用,而我们自研的DlapManager组件则承担起了 ClickHouse 用户管理的角色,通过对clickhouseproflequotarole等抽象层的配置来达到对clickhouse使用资源的租户隔离目的,中心思想是我们不是直接给租户赋予相关权限,而是在租户之上创建了角色的维度,和RBAC思想一致,可以达到对租户进行灵活的扩缩容配额,最终来保障我们目前200+clickhouse节点的稳定性。最后笔者从2019年6月开始接触clickhouse,到现在也已经2年+的时间,但仍然只学到了clickhouse的冰山一角,所以以上文字也只能做到抛砖引玉,但仍然希望对阅读的各位有所帮助。

  • jvm参数设置说明
    avatar

  • 并行收集器相关参数
    avatar

  • JVM CMS相关参数
    avatar

  • JVM辅助信息参数设置
    avatar

  • java进程启动时,未指定最大堆大小和默认初始值时,系统如何分配

    1
    2
    \\ 直接启动,jvm的那些参数是如何分配的
    java xx.jar

    首先可以通过jinfo -flags pid查看jvm参数,可以发现

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    -XX:CICompilerCount=15 
    -XX:InitialHeapSize=2147483648
    -XX:MaxHeapSize=32210157568
    -XX:MaxNewSize=10736369664
    -XX:MinHeapDeltaBytes=524288
    -XX:NewSize=715653120
    -XX:OldSize=1431830528
    -XX:+UseCompressedClassPointers
    -XX:+UseCompressedOops
    -XX:+UseFastUnorderedTimeStamps
    -XX:+UseParallelGC

    知道答案是-XX:InitialHeapSize=2147483648-XX:MaxHeapSize=32210157568。另外通过 jvm默认配置发现这一段Server JVM Default Initial and Maximum Heap Sizes The default initial and maximum heap sizes work similarly on the server JVM as it does on the client JVM, except that the default values can go higher. On 32-bit JVMs, the default maximum heap size can be up to 1 GB if there is 4 GB or more of physical memory. On 64-bit JVMs, the default maximum heap size can be up to 32 GB if there is 128 GB or more of physical memory. You can always set a higher or lower initial and maximum heap by specifying those values directly; see the next section.中文意思就是32位系统默认最大值可以到1GB,如果物理内存大于或者等于4GB,而在64位系统默认最大堆内存可以达到32GB或者更多,如果物理内存大袋128GB或者更多时。
    但是要注意过多的

  • JVM GC垃圾回收器参数设置
    JVM给出了3种选择:串行收集器、并行收集器、并发收集器。串行收集器只适用于小数据量的情况,所以生产环境的选择主要是并行收集器和并发收集器。
    默认情况下JDK5.0以前都是使用串行收集器,如果想使用其他收集器需要在启动时加入相应参数。JDK5.0以后,JVM会根据当前系统配置进行智能判断。
    串行收集器
    -XX:+UseSerialGC:设置串行收集器。
    并行收集器(吞吐量优先)
    -XX:+UseParallelGC:设置为并行收集器。此配置仅对年轻代有效。即年轻代使用并行收集,而年老代仍使用串行收集。
    -XX:ParallelGCThreads=20:配置并行收集器的线程数,即:同时有多少个线程一起进行垃圾回收。此值建议配置与CPU数目相等。
    -XX:+UseParallelOldGC:配置年老代垃圾收集方式为并行收集。JDK6.0开始支持对年老代并行收集。
    -XX:MaxGCPauseMillis=100:设置每次年轻代垃圾回收的最长时间(单位毫秒)。如果无法满足此时间,JVM会自动调整年轻代大小,以满足此时间。
    -XX:+UseAdaptiveSizePolicy:设置此选项后,并行收集器会自动调整年轻代Eden区大小和Survivor区大小的比例,以达成目标系统规定的最低响应时间或者收集频率等指标。此参数建议在使用并行收集器时,一直打开。
    并发收集器(响应时间优先)
    -XX:+UseConcMarkSweepGC:即CMS收集,设置年老代为并发收集。CMS收集是JDK1.4后期版本开始引入的新GC算法。它的主要适合场景是对响应时间的重要性需求大于对吞吐量的需求,能够承受垃圾回收线程和应用线程共享CPU资源,并且应用中存在比较多的长生命周期对象。CMS收集的目标是尽量减少应用的暂停时间,减少Full GC发生的几率,利用和应用程序线程并发的垃圾回收线程来标记清除年老代内存。
    -XX:+UseParNewGC:设置年轻代为并发收集。可与CMS收集同时使用。JDK5.0以上,JVM会根据系统配置自行设置,所以无需再设置此参数。
    -XX:CMSFullGCsBeforeCompaction=0:由于并发收集器不对内存空间进行压缩和整理,所以运行一段时间并行收集以后会产生内存碎片,内存使用效率降低。此参数设置运行0次Full GC后对内存空间进行压缩和整理,即每次Full GC后立刻开始压缩和整理内存。
    -XX:+UseCMSCompactAtFullCollection:打开内存空间的压缩和整理,在Full GC后执行。可能会影响性能,但可以消除内存碎片。
    -XX:+CMSIncrementalMode:设置为增量收集模式。一般适用于单CPU情况。
    -XX:CMSInitiatingOccupancyFraction=70:表示年老代内存空间使用到70%时就开始执行CMS收集,以确保年老代有足够的空间接纳来自年轻代的对象,避免Full GC的发生。
    其它垃圾回收参数
    -XX:+ScavengeBeforeFullGC:年轻代GC优于Full GC执行。
    -XX:-DisableExplicitGC:不响应 System.gc() 代码。
    -XX:+UseThreadPriorities:启用本地线程优先级API。即使 java.lang.Thread.setPriority() 生效,不启用则无效。
    -XX:SoftRefLRUPolicyMSPerMB=0:软引用对象在最后一次被访问后能存活0毫秒(JVM默认为1000毫秒)。
    -XX:TargetSurvivorRatio=90:允许90%的Survivor区被占用(JVM默认为50%)。提高对于Survivor区的使用率。

  • JVM参数疑问解答
    -Xmn,-XX:NewSize/-XX:MaxNewSize,-XX:NewRatio 3组参数都可以影响年轻代的大小,混合使用的情况下,优先级是什么?
    如下:
    高优先级:-XX:NewSize/-XX:MaxNewSize
    中优先级:-Xmn(默认等效 -Xmn=-XX:NewSize=-XX:MaxNewSize=?)
    低优先级:-XX:NewRatio
    推荐使用-Xmn参数,原因是这个参数简洁,相当于一次设定 NewSize/MaxNewSIze,而且两者相等,适用于生产环境。-Xmn 配合 -Xms/-Xmx,即可将堆内存布局完成。
    -Xmn参数是在JDK 1.4 开始支持。

  • JVM参数设置优化例子

  1. 承受海量访问的动态Web应用
    服务器配置:8 CPU, 8G MEM, JDK 1.6.X
    参数方案:
    -server -Xmx3550m -Xms3550m -Xmn1256m -Xss128k -XX:SurvivorRatio=6 -XX:MaxPermSize=256m -XX:ParallelGCThreads=8 -XX:MaxTenuringThreshold=0 -XX:+UseConcMarkSweepGC
    调优说明:
    -Xmx 与 -Xms 相同以避免JVM反复重新申请内存。-Xmx 的大小约等于系统内存大小的一半,即充分利用系统资源,又给予系统安全运行的空间。
    -Xmn1256m 设置年轻代大小为1256MB。此值对系统性能影响较大,Sun官方推荐配置年轻代大小为整个堆的3/8。
    -Xss128k 设置较小的线程栈以支持创建更多的线程,支持海量访问,并提升系统性能。
    -XX:SurvivorRatio=6 设置年轻代中Eden区与Survivor区的比值。系统默认是8,根据经验设置为6,则2个Survivor区与1个Eden区的比值为2:6,一个Survivor区占整个年轻代的1/8。
    -XX:ParallelGCThreads=8 配置并行收集器的线程数,即同时8个线程一起进行垃圾回收。此值一般配置为与CPU数目相等。
    -XX:MaxTenuringThreshold=0 设置垃圾最大年龄(在年轻代的存活次数)。如果设置为0的话,则年轻代对象不经过Survivor区直接进入年老代。对于年老代比较多的应用,可以提高效率;如果将此值设置为一个较大值,则年轻代对象会在Survivor区进行多次复制,这样可以增加对象再年轻代的存活时间,增加在年轻代即被回收的概率。根据被海量访问的动态Web应用之特点,其内存要么被缓存起来以减少直接访问DB,要么被快速回收以支持高并发海量请求,因此其内存对象在年轻代存活多次意义不大,可以直接进入年老代,根据实际应用效果,在这里设置此值为0。
    -XX:+UseConcMarkSweepGC 设置年老代为并发收集。CMS(ConcMarkSweepGC)收集的目标是尽量减少应用的暂停时间,减少Full GC发生的几率,利用和应用程序线程并发的垃圾回收线程来标记清除年老代内存,适用于应用中存在比较多的长生命周期对象的情况。
  2. 内部集成构建服务器案例
    高性能数据处理的工具应用
    服务器配置:1 CPU, 4G MEM, JDK 1.6.X
    参数方案:
    -server -XX:PermSize=196m -XX:MaxPermSize=196m -Xmn320m -Xms768m -Xmx1024m
    调优说明:
    -XX:PermSize=196m -XX:MaxPermSize=196m 根据集成构建的特点,大规模的系统编译可能需要加载大量的Java类到内存中,所以预先分配好大量的持久代内存是高效和必要的。
    -Xmn320m 遵循年轻代大小为整个堆的3/8原则。
    -Xms768m -Xmx1024m 根据系统大致能够承受的堆内存大小设置即可。

关于时间的解释

GMT时间

格林尼治平时(Greenwich Mean Time,GMT),又称为格林尼治标准时间。
格林尼治平时的正午是指当平太阳横穿格林尼治子午线时(也就是在格林尼治上空最高点时)的时间。自1924年2月5日开始,格林尼治天文台负责每隔一小时向全世界发放调时信息。由于地球每天的自转是有些不规则的,而且正在缓慢减速,因此格林尼治平时基于天文观测本身的缺陷,已经被原子钟报时的协调世界时(UTC)所取代。

UT时间

世界时(Universal Time,UT),是一种以格林尼治子夜起算的平太阳时。
由于1925年以前人们在天文观测中,常常把每天的起始(0时)定为正午,而不是通常民用的午夜,给格林尼治平时的意义造成含糊,人们使用世界时一词来明确表示每天从午夜开始的格林尼治平时。

时区

时区是指地球上的某一个区域使用同一个时间定义。GMT时间或者UT时间,都是表示地球自转速率的一种形式。从太阳升起到太阳落下,时刻从0到24变化。这样,不同经度的地方时间自然会不相同。为了解决这个问题,人们把地球按经度划分为不同的区域,每个区域内使用同一个时间定义,相邻的区域时间差为1个小时。时区又分为理论时区和法定时区。

UTC时间

协调世界时(Coordinated Universal Time)。是主要的世界时间标准,以原子钟所定义的秒长为基础,在时刻上尽量接近GMT时间。UTC时间认为一个太阳日总是86400秒。在大多数情况下,UTC时间能与GMT时间互换。

UTC与时区

本初子午线所在的时区的时间后面加上字符Z,表示UTC时间。Z即为0时区的标志,读做Zulu。例如09:30 UTC就写作0930Z,14:45:15 UTC则为14:45:15Z或144515Z

UTC偏移量

UTC偏移量用以下形式表示: ±[hh]:[mm]、±[hh][mm]、或者±[hh]。例如UTC时间为09:30z,此时北京时间就是1730 +0800,纽约时间是0430 -0500。
UTC时间表示的格式一般为Sat, 20 May 2018 12:45:57 +0800表示东八区(北京时间)2018年5月20号 12:45:57星期六。

时差

某个地方的时刻与0时区的时刻差称为时差,时差东正西负。以本初子午线为中心,每向东一跨过一个时区,时刻增加一个小时,每向西跨过一个时区,时刻减少一个小时。

  • 如何理解向东时区增加

    由于地球是自西向东转,在地球的某一个地方观察,东边的时间比西边的时间早(东边的人们先看到太阳升起)。
    想象一下某一个时刻,太阳在你的正上空,此时你所在的地点的时间为正午12点。这时住在你东边的人们,他们看到太阳已经在西边了,他们的时刻是下午,所以往东,时刻增加。

UTC时间与本地时间的转换

UTC时间 + 时差 = 本地时间

CST时间

CST (China Standard Time,中国标准时间) 是UTC+8时区的知名名称之一,比UTC(协调世界时)提前8个小时与UTC的时间偏差可写为+08:00.

http协议里respond的header日期

avatar

All HTTP date/time stamps MUST be represented in Greenwich Mean Time (GMT), without exception.
格林尼治标准时间。 在HTTP协议中,时间都是用格林尼治标准时间来表示的,而不是本地时间。
RFC 7231, section 7.1.1.2: Date

在java里用到的时间

avatar

SimpleDateFormat工具

  • 例子如下
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;

    public class UTCTimeFormatTest {
    public static void main(String[] args) throws ParseException {
    //Z代表UTC统一时间:2017-11-27T03:16:03.944Z
    SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
    Date date = new Date();
    System.out.println(date);
    String str = format.format(date);
    System.out.println(str);
    SimpleDateFormat dayformat = new SimpleDateFormat("yyyy-MM-dd");
    String source ="2018-09-18"; //先将年月日的字符串日期格式化为date类型
    Date day = dayformat.parse(source);     //然后将date类型的日期转化为UTC格式的时间
    String str2= format.format(day);
    System.out.println(str2);
    }
    }
    打印结果
    avatar

    SimpleDateFormat线程不安全

    例子

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    public class SimpleDateFormatTest extends Thread {
    private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
    private String name;
    private String dateStr;

    public SimpleDateFormatTest(String name, String dateStr) {
    this.dateStr = dateStr;
    this.name = name;
    }

    @Override
    public void run() {
    try {
    Date date = simpleDateFormat.parse(dateStr);
    System.out.println(name + ": date : " + date);
    } catch (ParseException exception) {
    exception.printStackTrace();
    }
    }

    public static void main(String[] args) {
    ExecutorService executorService = Executors.newFixedThreadPool(3);
    executorService.execute(new SimpleDateFormatTest("A", "2017-01-01"));
    executorService.execute(new SimpleDateFormatTest("B", "2020-12-12"));
    executorService.shutdown();
    }
    }
    运行结果
    avatar

    原理解释

  • SimpleDateFormat构造函数
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public SimpleDateFormat(String pattern, Locale locale)
    {
    if (pattern == null || locale == null) {
    throw new NullPointerException();
    }

    initializeCalendar(locale);
    this.pattern = pattern;
    this.formatData = DateFormatSymbols.getInstanceRef(locale);
    this.locale = locale;
    initialize(locale);
    }
  • initializeCalendar初始化calendar方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    private void initializeCalendar(Locale loc) {
    if (calendar == null) {
    assert loc != null;
    // The format object must be constructed using the symbols for this zone.
    // However, the calendar should use the current default TimeZone.
    // If this is not contained in the locale zone strings, then the zone
    // will be formatted using generic GMT+/-H:MM nomenclature.
    calendar = Calendar.getInstance(TimeZone.getDefault(), loc);
    }
    }
  • calendar变量的构造过程
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    public static Calendar getInstance(TimeZone zone, Locale aLocale)
    {
    return createCalendar(zone, aLocale);
    }

    private static Calendar createCalendar(TimeZone zone, Locale aLocale)
    {
    CalendarProvider provider =
    LocaleProviderAdapter.getAdapter(CalendarProvider.class, aLocale)
    .getCalendarProvider();
    if (provider != null) {
    try {
    return provider.getInstance(zone, aLocale);
    } catch (IllegalArgumentException iae) {
    // fall back to the default instantiation
    }
    }

    Calendar cal = null;

    if (aLocale.hasExtensions()) {
    String caltype = aLocale.getUnicodeLocaleType("ca");
    if (caltype != null) {
    switch (caltype) {
    case "buddhist":
    cal = new BuddhistCalendar(zone, aLocale);
    break;
    case "japanese":
    cal = new JapaneseImperialCalendar(zone, aLocale);
    break;
    case "gregory":
    cal = new GregorianCalendar(zone, aLocale);
    break;
    }
    }
    }
    if (cal == null) {
    // If no known calendar type is explicitly specified,
    // perform the traditional way to create a Calendar:
    // create a BuddhistCalendar for th_TH locale,
    // a JapaneseImperialCalendar for ja_JP_JP locale, or
    // a GregorianCalendar for any other locales.
    // NOTE: The language, country and variant strings are interned.
    if (aLocale.getLanguage() == "th" && aLocale.getCountry() == "TH") {
    cal = new BuddhistCalendar(zone, aLocale);
    } else if (aLocale.getVariant() == "JP" && aLocale.getLanguage() == "ja"
    && aLocale.getCountry() == "JP") {
    cal = new JapaneseImperialCalendar(zone, aLocale);
    } else {
    cal = new GregorianCalendar(zone, aLocale);
    }
    }
    return cal;
    }

    通过查看源码发现,原来SimpleDateFormat类内部有一个Calendar对象引用,它用来储存和这个SimpleDateFormat相关的日期信息,例如sdf.parse(dateStr),sdf.format(date) 诸如此类的方法参数传入的日期相关String,Date等等, 都是交由Calendar引用来储存的.这样就会导致一个问题,如果你的SimpleDateFormat是个static的, 那么多个thread 之间就会共享这个SimpleDateFormat, 同时也是共享这个Calendar引用。

正确姿势

  • 将SimpleDateFormat定义成局部变量

    缺点是每次调用方法后都会实例化一个SimpleDateFormat对象,方法结束后会被垃圾回收

  • 如果要定义成静态变量,一定要加锁,保证同一个时刻就只有一个线程可以访问到SimpleDateFormat对象

    缺点是性能变差,每次都得等待锁释放后其他线程才能访问SimpleDateFormat对象

  • 使用ThreadLocal来保存SimpleDateFormat对象,每个线程拥有自己的SimpleDateFormat对象

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;

    public class DateUtils {
    // 单例
    private static Map<String, ThreadLocal<SimpleDateFormat>> localMap = new HashMap<>();
    private static final Object lockObject = new Object();

    public static SimpleDateFormat getSimpleDateFormat(String pattern) {
    ThreadLocal<SimpleDateFormat> threadLocal = localMap.get(pattern);
    if (threadLocal == null) {
    // 加锁,不重复初始化已经存在的pattern
    synchronized (lockObject) {
    // 再取一次是为了防止localMap被重复多次put已存在的pattern
    threadLocal = localMap.get(pattern);
    if (threadLocal == null) {
    System.out.println("put new sdf of pattern " + pattern + " to map");
    threadLocal = new ThreadLocal<SimpleDateFormat>() {
    @Override
    protected SimpleDateFormat initialValue() {
    System.out.println("thread: " + Thread.currentThread() + " init pattern: " + pattern);
    return new SimpleDateFormat(pattern);
    }
    };
    localMap.put(pattern, threadLocal);
    }
    }
    }
    return threadLocal.get();
    }

    public static String format(Date date, String pattern) {
    return getSimpleDateFormat(pattern).format(date);
    }

    public static Date parse(String dateString, String pattern) throws ParseException {
    return getSimpleDateFormat(pattern).parse(dateString);
    }
    }

    ThreadLocal解析

    avatar

    ThreadLocal是用哈希表实现的,每个线程里都有一个ThreadLocalMap,就是以Map的形式存储多个ThreadLocal对象,当线程调用ThreadLocal操作方法时,都会通过当前线程Thread对象拿到ThreadLocalMap,再通过ThreadLocal对象从ThreadLocalMap中锁定数据实体(ThreadLocal.Entry)

  • ThreadLocal.set方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public void set(T value) {
    // 取出当前线程
    Thread t = Thread.currentThread();
    // 取出Thread.ThreadLocal.ThreadLocalMap threadLocals = null;
    ThreadLocalMap map = getMap(t);
    // 已存在,则更新
    if (map != null)
    map.set(this, value);
    else
    // 否则创建map
    createMap(t, value);
    }
  • ThreadLocalMap.set方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    /**
    * Set the value associated with key.
    *
    * @param key the thread local object
    * @param value the value to be set
    */
    private void set(ThreadLocal<?> key, Object value) {

    // We don't use a fast path as with get() because it is at
    // least as common to use set() to create new entries as
    // it is to replace existing ones, in which case, a fast
    // path would fail more often than not.

    Entry[] tab = table;
    int len = tab.length;
    int i = key.threadLocalHashCode & (len-1);

    for (Entry e = tab[i];
    e != null;
    e = tab[i = nextIndex(i, len)]) {
    ThreadLocal<?> k = e.get();

    if (k == key) {
    e.value = value;
    return;
    }

    if (k == null) {
    replaceStaleEntry(key, value, i);
    return;
    }
    }

    tab[i] = new Entry(key, value);
    int sz = ++size;
    if (!cleanSomeSlots(i, sz) && sz >= threshold)
    rehash();
    }
  • ThreadLocal.createMap方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    /**
    * Create the map associated with a ThreadLocal. Overridden in
    * InheritableThreadLocal.
    *
    * @param t the current thread
    * @param firstValue value for the initial entry of the map
    */
    void createMap(Thread t, T firstValue) {
    // 实例化一个新ThreadLocalMap对象
    // this就是操作的ThreadLocal对象,firstValue就是要保存的值
    t.threadLocals = new ThreadLocalMap(this, firstValue);
    }

SimpleDateFormat挖坑自跳

坑王举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import java.text.ParseException;
import java.text.SimpleDateFormat;

public class SimpleDateFormatErrorTest {
public static void main(String[] args) {
try {
String date1 = "2021-05-01";
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
System.out.println(simpleDateFormat.parse(date1));
} catch (ParseException exception) {
exception.printStackTrace();
}
}
}

打印结果如下
avatar
这个代码预期是会走到catch的exception里,但是却正常打印输出了

SimpleDateFormat.parse源码解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
@Override
public Date parse(String text, ParsePosition pos)
{
checkNegativeNumberExpression();

int start = pos.index;
int oldStart = start;
int textLength = text.length();

boolean[] ambiguousYear = {false};

CalendarBuilder calb = new CalendarBuilder();

for (int i = 0; i < compiledPattern.length; ) {
int tag = compiledPattern[i] >>> 8;
int count = compiledPattern[i++] & 0xff;
if (count == 255) {
count = compiledPattern[i++] << 16;
count |= compiledPattern[i++];
}

switch (tag) {
case TAG_QUOTE_ASCII_CHAR:
if (start >= textLength || text.charAt(start) != (char)count) {
pos.index = oldStart;
pos.errorIndex = start;
return null;
}
start++;
break;

case TAG_QUOTE_CHARS:
while (count-- > 0) {
if (start >= textLength || text.charAt(start) != compiledPattern[i++]) {
pos.index = oldStart;
pos.errorIndex = start;
return null;
}
start++;
}
break;
// 进入默认配置
default:
// Peek the next pattern to determine if we need to
// obey the number of pattern letters for
// parsing. It's required when parsing contiguous
// digit text (e.g., "20010704") with a pattern which
// has no delimiters between fields, like "yyyyMMdd".
boolean obeyCount = false;

// 在阿拉伯语中,负数的减号可以放在数字的后面(1111-)
// In Arabic, a minus sign for a negative number is put after
// the number. Even in another locale, a minus sign can be
// put after a number using DateFormat.setNumberFormat().
// If both the minus sign and the field-delimiter are '-',
// subParse() needs to determine whether a '-' after a number
// in the given text is a delimiter or is a minus sign for the
// preceding number. We give subParse() a clue based on the
// information in compiledPattern.
boolean useFollowingMinusSignAsDelimiter = false;

if (i < compiledPattern.length) {
int nextTag = compiledPattern[i] >>> 8;
if (!(nextTag == TAG_QUOTE_ASCII_CHAR ||
nextTag == TAG_QUOTE_CHARS)) {
obeyCount = true;
}

if (hasFollowingMinusSign &&
(nextTag == TAG_QUOTE_ASCII_CHAR ||
nextTag == TAG_QUOTE_CHARS)) {
int c;
if (nextTag == TAG_QUOTE_ASCII_CHAR) {
c = compiledPattern[i] & 0xff;
} else {
c = compiledPattern[i+1];
}

if (c == minusSign) {
useFollowingMinusSignAsDelimiter = true;
}
}
}
start = subParse(text, start, tag, count, obeyCount,
ambiguousYear, pos,
useFollowingMinusSignAsDelimiter, calb);
if (start < 0) {
pos.index = oldStart;
return null;
}
}
}

// At this point the fields of Calendar have been set. Calendar
// will fill in default values for missing fields when the time
// is computed.

pos.index = start;

Date parsedDate;
try {
parsedDate = calb.establish(calendar).getTime();
// If the year value is ambiguous,
// then the two-digit year == the default start year
if (ambiguousYear[0]) {
if (parsedDate.before(defaultCenturyStart)) {
parsedDate = calb.addYear(100).establish(calendar).getTime();
}
}
}
// An IllegalArgumentException will be thrown by Calendar.getTime()
// if any fields are out of range, e.g., MONTH == 17.
catch (IllegalArgumentException e) {
pos.errorIndex = start;
pos.index = oldStart;
return null;
}

return parsedDate;
}
  • 正确姿势
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    import java.text.ParseException;
    import java.text.SimpleDateFormat;

    public class SimpleDateFormatErrorTest {
    public static void main(String[] args) {
    String date1 = "2021-05-01";
    try {
    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
    // 设置为严格模式,Calendar类默认把lenient设置为true,意思是宽松模式解析
    simpleDateFormat.setLenient(false);
    System.out.println(simpleDateFormat.parse(date1));
    } catch (ParseException exception) {
    exception.printStackTrace();
    }
    }
    }
    运行结果,看到已经抛出异常了
    avatar

    坑爹举例

  • 代码如下
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    import java.text.ParseException;
    import java.text.SimpleDateFormat;

    public class SimpleDateFormatErrorTest {
    public static void main(String[] args) {
    String date1 = "2021-05-01";
    try {
    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM-dd");
    // 设置为严格模式
    simpleDateFormat.setLenient(false);
    System.out.println(simpleDateFormat.parse(date1));
    } catch (ParseException exception) {
    exception.printStackTrace();
    }
    }
    }
    打印结果,输出是2020-12-27
    avatar
  • 详解
    官方文档
字母 日期含义 举例
y year 正常年份
Y week year 按周算的年份,比如2018年12月31日,正好是2019 Week-year的第一周第一天
D Day in year 一年中的第几天
d Day in month 正常日期的日

进程是什么

进程是一个具有独立功能的程序在一个数据集上的一次动态执行的过程,是操作系统调度和进行资源分配和调度的一个独立单位,进程一般由程序、数据集合、进程控制块三部分组成

  • 程序:用于描述进程要完成的功能,是控制进程执行的指令集
  • 数据集合:程序在执行时所需要的数据和工作区
  • 进程控制块:Program Control Block,简称PCB,包含进程的描述信息和控制信息,是进程存在的唯一标示
    进程由内存空间(代码、数据、进程空间、打开的文件)和一个或者多个线程组成

    线程是什么

    线程是程序执行流的最小单元,是处理器调度和分派的基本单位,一个进程可以有一个或多个线程,各个线程之间共享程序的内存空间(也就是所在进程的内存空间),一个标准的线程由线程ID、当前指令指针(PC)、寄存器和堆栈组成
  • 什么是时间片
    大部分操作系统(windows、linux)的任务调度都是采用时间片轮转的方式进行抢占式调度,在一个进程中,当 一个线程任务执行几毫秒后,会有操作系统的内核进行调度,通过硬件的计数器中断处理器让该线程强制暂停并将该线程的寄存器暂存到内存中,通过查看线程列表来决定下一个线程的执行,并从内存中恢复该线程的寄存器,在这个过程中,任务执行的那一小段时间就叫时间片,任务正在执行的状态就叫运行状态(RUNNABLE),被暂停的线程任务状态就叫就绪状态(WAITING),意为等待下一个时间片的到来。
    avatar

    进程和线程的关系

  • 线程是程序执行的最小单位,而进程是操作系统分配资源的最小单位;
  • 一个进程由一个或多个线程组成,线程是一个进程中代码的不同执行路线;
  • 进程之间相互独立,但同一进程下的各个线程之间共享程序的内存空间(包括代码段、数据集、堆等)及一些进程级的资源(如打开文件和信号),某进程内的线程在其它进程不可见;
  • 调度和切换:线程上下文切换比进程上下文切换要快得多
    avatar

    内核线程

    多核处理器是指一个处理器上集成多个运算核心,从而提高计算能力,每一个处理核心对应一个内核线程
    内核线程(Kernel Thread,KLT)就是直接由操作系统内核支持的线程,这种线程由内核来完成线程切换,内核通过操作调度器对线程进行调度,并负责将线程映射到各个处理器上,一般一个处理核心对应一个内核线程

    轻量级进程

    轻量级进程(Lightweight Process,LWP),轻量级进程就是我们通常意义上所讲的线程,也被叫做用户线程。由于每个轻量级进程都由一个内核线程支持,因此只有先支持内核线程,才能有轻量级进程。用户线程与内核线程的对应关系有三种模型:一对一模型、多对一模型、多对多模型,在这以4个内核线程、3个用户线程为例对三种模型进行说明。

线程状态介绍

java.lang.Thread 类定义了枚举类型State,包含Thread的所有状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public enum State {
/**
* Thread state for a thread which has not yet started.
*/
NEW,

/**
* Thread state for a runnable thread. A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
*/
RUNNABLE,

/**
* Thread state for a thread blocked waiting for a monitor lock.
* A thread in the blocked state is waiting for a monitor lock
* to enter a synchronized block/method or
* reenter a synchronized block/method after calling
* {@link Object#wait() Object.wait}.
*/
BLOCKED,

/**
* Thread state for a waiting thread.
* A thread is in the waiting state due to calling one of the
* following methods:
* <ul>
* <li>{@link Object#wait() Object.wait} with no timeout</li>
* <li>{@link #join() Thread.join} with no timeout</li>
* <li>{@link LockSupport#park() LockSupport.park}</li>
* </ul>
*
* <p>A thread in the waiting state is waiting for another thread to
* perform a particular action.
*
* For example, a thread that has called <tt>Object.wait()</tt>
* on an object is waiting for another thread to call
* <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
* that object. A thread that has called <tt>Thread.join()</tt>
* is waiting for a specified thread to terminate.
*/
WAITING,

/**
* Thread state for a waiting thread with a specified waiting time.
* A thread is in the timed waiting state due to calling one of
* the following methods with a specified positive waiting time:
* <ul>
* <li>{@link #sleep Thread.sleep}</li>
* <li>{@link Object#wait(long) Object.wait} with timeout</li>
* <li>{@link #join(long) Thread.join} with timeout</li>
* <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
* <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
* </ul>
*/
TIMED_WAITING,

/**
* Thread state for a terminated thread.
* The thread has completed execution.
*/
TERMINATED;
}

avatar

    1. 初始状态(NEW)
      实现Runnable接口和继承Thread可以得到一个线程类,new一个实例出来,线程就进入了初始状态。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      // 通过Thread类
      Thread thread = new Thread();
      // 通过Runnable接口
      Runnable runnable = new Runnable() {
      @Override
      public void run() {

      }
      };
    1. 就绪状态(RUNNABLE)
      1. READY
        就绪状态只是说你有资格运行,调度程序没有挑选到你,你就永远是就绪状态。
        调用线程的start()方法,此线程进入就绪状态。
        1
        2
        3
        4
        Thread thread = new Thread(()->{
        System.out.println("hello");
        });
        thread.start();
        当前线程sleep()方法结束,其他线程join()结束,等待用户输入完毕,某个线程拿到对象锁,这些线程也将进入就绪状态。
        当前线程时间片用完了,调用当前线程的yield()方法,当前线程进入就绪状态。
        锁池里的线程拿到对象锁后,进入就绪状态。
      1. RUNNING
        线程调度程序从可运行池中选择一个线程作为当前线程时线程所处的状态。这也是线程进入运行状态的唯一的一种方式。
    1. 阻塞状态(BLOCKED)
      阻塞状态是线程阻塞在进入synchronized关键字修饰的方法或代码块(获取锁)时的状态。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      public class ConcurrencyTest {
      public static void main(String[] args) {
      Object lock = new Object();
      ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,0, TimeUnit.SECONDS, new LinkedBlockingDeque<>());
      for (int i = 0; i< 11; i++) {
      Thread thread = new Thread(() -> {
      try {
      synchronized (lock) {
      System.out.println("try to refresh config");
      Thread.sleep(3*60*1000);
      }
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      });
      executor.submit(thread);
      }
      executor.shutdown();;
      }
      }
    1. 等待(WAITING)
      处于这种状态的线程不会被分配CPU执行时间,它们要等待被显式地唤醒,否则会处于无限期等待的状态。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      public static void main(String[] args) {
      Thread t = new Thread(()->{
      System.out.println("hello");
      });
      System.out.println("start");
      // 调用start方法,t线程从NEW状态-》runnable状态
      t.start();
      // 调用join方法,让main线程处于waiting状态,先执行t线程的start-》hello,t线程执行结束后处于terminated,main线程从waiting状态恢复到runnable状态,执行自己的的end
      t.join();
      System.out.println("end");
      }
      avatar
    1. 超时等待(TIMED_WAITING)
      处于这种状态的线程不会被分配CPU执行时间,不过无须无限期等待被其他线程显示地唤醒,在达到一定时间后它们会自动唤醒。
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      public static void main(String[] args) {
      // 当前main线程处于RUNNABLE状态
      try {
      // 调用sleep后,main线程进入TIMED_WAITING状态
      Thread.sleep(10000);
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      // 休眠结束后,恢复RUNNABLE状态
      }
      avatar
    1. 终止状态(TERMINATED)
      当线程的run()方法完成时,或者主线程的main()方法完成时,我们就认为它终止了。这个线程对象也许是活的,但是它已经不是一个单独执行的线程。线程一旦终止了,就不能复生。
      在一个终止的线程上调用start()方法,会抛出java.lang.IllegalThreadStateException异常。

线程的应用实践

  • Thread类
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    public class ThreadStatusTest {
    public static void main(String[] args) throws InterruptedException {
    Thread t = new Thread(() -> {
    System.out.println("hello");
    try {
    Thread.sleep(100000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    });
    System.out.println("start");
    System.out.println("==1=="+t.getState());
    t.start();
    System.out.println("==2=="+t.getState());
    t.join();
    System.out.println("==3=="+t.getState());
    System.out.println("end");
    }
    }
    如上图
    avatar
    avatar

如何判断程序类型

  • CPU密集型

    CPU 密集型简单理解就是利用 CPU 计算能力的任务,比如你在内存中对大量数据进行排序

  • I/O密集型

    但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。

线程池设置

  • CPU 密集型任务(CPU核数 + 1):

    这种任务消耗的主要是 CPU 资源,可以将线程数设置为 NCPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。

  • I/O 密集型任务(CPU核数 * 2):

    这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N

  • 参数介绍

    • corePoolSize:核心线程数,定义了最小可以同时运行的线程数量
    • maximumPoolSize:当队列中存放的任务达到队列容量时,当前可以同时运行的线程数量变为最大线程数
    • workQueue:当新任务来的时候,先判断当前运行的线程数量是否达到核心线程数(corePoolSize),如果达到,则新任务会被存放到队列中
    • keepAliveTime:当线程池中的线程数量大于核心线程池(corePoolSize)时,如果没有新任务提交,核心线程外的线程(maximumPoolSize - corePoolSize)不会被立即销毁,而是等到keepAliveTime时间后,才会被回收销毁
    • unit:keepAliveTime的时间单位(TimeUnit类中的成员变量)
    • threadFactory:executor创建线程的工厂类
    • handler:饱和策略

      如果当前同时运行的线程数量达到最大线程数(maximumPoolSize),并且队列(workQueue)已经满了,ThreadPoolExecutory就会执行一些策略

      • AbortPolicy:抛出 RejectExecutionExeception 来拒绝新任务加入 (默认)
      • CallerRunsPolicy:调用执行自己的线程运行拒绝任务
      • DiscardPolicy:不处理新任务,直接丢弃
      • DiscardOldestPolicy:丢弃最早的未处理的任务

avatar

线程池原理分析

  • ThreadPoolExecutor

    1
    public class ThreadPoolExecutor extends AbstractExecutorService {

    ThreadPoolExecutor类 继承了 AbstractExecutorService类

    1
    public abstract class AbstractExecutorService implements ExecutorService {

    AbstractExecutorService类 实现了 ExecutorService接口

    1
    public interface ExecutorService extends Executor {

    ExecutorService接口 继承了 Executor类

  • 线程池demo

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    package blank.lin.thread;

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;

    public class ThreadPoolTest {
    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
    1,
    2,
    0L,
    TimeUnit.MILLISECONDS,
    new ArrayBlockingQueue<Runnable>(1)
    );

    public static void main(String[] args) {
    threadPoolExecutor.execute(new Runnable() {
    public void run() {
    try {
    Thread.sleep(500);
    System.out.println("第1个任务执行完成");
    } catch (Exception exception) {
    exception.printStackTrace();
    }
    }
    });
    printCount();
    System.out.println("加入第1个任务,线程池刚刚初始化,没有可以执行任务的核心线程,创建一个核心线程来执行任务");

    threadPoolExecutor.execute(new Runnable() {
    public void run() {
    try {
    Thread.sleep(500);
    System.out.println("第2个任务执行完成");
    } catch (Exception exception) {
    exception.printStackTrace();
    }
    }
    });
    printCount();
    System.out.println("加入第2个任务,没有可以执行任务的核心线程,且任务数大于corePoolSize,新加入任务被放在了阻塞队列中");

    threadPoolExecutor.execute(new Runnable() {
    public void run() {
    try {
    Thread.sleep(500);
    System.out.println("第3个任务执行完成");
    } catch (Exception exception) {
    exception.printStackTrace();
    }
    }
    });
    printCount();
    System.out.println("加入第3个任务,此时,阻塞队列已满,新建非核心线程执行新加入任务");

    threadPoolExecutor.execute(new Runnable() {
    public void run() {
    try {
    Thread.sleep(500);
    System.out.println("第4个任务执行完成");
    } catch (Exception exception) {
    exception.printStackTrace();
    }
    }
    });
    printCount();
    System.out.println("加入第4个任务,此时,阻塞队列已满,新建非核心线程执行新加入任务");

    try {
    Thread.sleep(600);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    printCount();
    System.out.println("第1个任务执行完毕,核心线程空闲,阻塞队列的任务被取出来,使用核心线程来执行");

    try {
    Thread.sleep(600);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    printCount();
    System.out.println("第2个任务执行完毕,核心线程空闲,非核心线程在执行第3个任务");

    try {
    Thread.sleep(600);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }

    printCount();
    System.out.println("第3个任务执行完毕,非核心线程被销毁,核心线程保留");

    }

    private static void printCount() {
    System.out.println("------------------------------------");
    System.out.println("当前活跃线程数:"+threadPoolExecutor.getActiveCount());
    System.out.println("当前核心线程数:"+threadPoolExecutor.getCorePoolSize());
    System.out.println("阻塞队列中的任务数:"+threadPoolExecutor.getQueue().size());
    }

    }
  • threadPoolExecutor.execute源码分析

    处理过程一共3步

    • 首先判断核心线程池是否满了,未满则开启新线程执行任务,addWorker调用会原子性检查线程运行状态和线程数,并且
      1. If fewer than corePoolSize threads are running, try to start a new thread with the given command as its first task. The call to addWorker atomically checks runState and workerCount, and so prevents false alarms that would add threads when it shouldn’t, by returning false.
    • 如果一个任务被成功加入到队列里,也会重复检查是否应该新添加一个线程去执行,因为自上次检查后现存线程可以已死,或者线程池在进入这个方法后就关闭了,所以我们会重复检查状态,并且判断是否需要回滚队列,或者开启新线程
      1. If a task can be successfully queued, then we still need to double-check whether we should have added a thread (because existing ones died since last checking) or that the pool shut down since entry into this method. So we recheck state and if necessary roll back the enqueuing if stopped, or start a new thread if there are none.
    • 如果无法加入队列任务,我们会尝试新添加一个线程,如果新线程添加失败,我们知道线程被关闭了或者已经满了,所以任务会被拒绝
      1. If we cannot queue task, then we try to add a new thread. If it fails, we know we are shut down or saturated and so reject the task.
  • 源码
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    public void execute(Runnable command) {
    // 任务为空,抛异常
    if (command == null)
    throw new NullPointerException();
    // ctl中保存的线程池当前的一些任务状态
    int c = ctl.get();
    // 上面介绍的3步从这里开始
    // 第一步,判断当前线程池中的任务数量是否小于核心线程数
    // 如果小于的话通过addWorker新建一个线程,并将任务添加到线程去执行
    if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
    return;
    c = ctl.get();
    }
    // 第二步,如果当前线程数已经大于等于核心线程数了
    // 通过isRunning来判断线程池状态,线程池处于RUNNING状态,才会将任务加入到workQueue队列中
    if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    // 再次获取线程池状态,如果线程池不是RUNNING状态了,就需要从workQueue中移除任务
    if (! isRunning(recheck) && remove(command))
    // 执行拒绝策略
    reject(command);
    // 当前线程池为空,则重新创建线程去执行任务
    else if (workerCountOf(recheck) == 0)
    addWorker(null, false);
    }
    // 第三步
    // 创建线程执行任务失败,执行相应的拒绝任务
    else if (!addWorker(command, false))
    reject(command);
    }
    avatar

查看机器cpu信息

  • 查看物理cpu个数

    1
    cat /proc/cpuinfo| grep "physical id"| sort| uniq| wc -l

    avatar

  • 查看每个物理cpu中core的个数(即核数)

    1
    cat /proc/cpuinfo| grep "cpu cores"| uniq

    avatar

  • 查看逻辑cpu的个数

    1
    2
    3
    cat /proc/cpuinfo| grep "processor"| wc -l
    // 在java里也可以这样查询
    Runtime.getRuntime().avaliableProcessor();

    avatar

  • 查看cpu型号

    1
    cat /proc/cpuinfo | grep name | uniq | cut -f2 -d:

    avatar