N聚合

🔒
❌ 关于 FreshRSS
发现新文章,点击刷新页面。
昨天以前首页

Tableau BI工具对接 AnalyticDB for PostgreSQL数据源

AnalyticDB for PostgreSQL(原HybridDB for PostgreSQL)作为高性能分析型数据库,可以支持用户对其业务数据进行实时分析,能够让企业敏锐感知市场动态,做出必要决策。
Tableau是一款数据分析与可视化工具,它支持连接本地或云端数据,不管是电子表格,还是数据库数据,都能进行无缝连接。本文介绍Tableau以AnalyticDB for PostgreSQL作为数据源,如何进行有效的数据分析。

使用AnalyticDB for PostgreSQL

AnalyticDB for PostgreSQL基于Greenplum,所以在选择连接器的时候选择Greenplum连接器:



点开出现登录页面,填上DB的连接信息完成登录。

登录后页面:



根据指导操作,可以将任意表进行统计分析,并进行报表展示。

例如使用TPCH数据中的lineitem,点开一张工作表可以进行任意维度的数据展示了:



每从度量或者维度中选择一个字段,放到工作表区时,Tableau都会发送一个query到AnalyticDB for PostgreSQL进行数据查询,例如上述图表发送的query:

BEGIN;declare "SQL_CUR0x7fdabf04ca00" cursor with hold for SELECT "lineitem"."l_linestatus" AS "l_linestatus",
          "lineitem"."l_shipmode" AS "l_shipmode",
          SUM("lineitem"."l_orderkey") AS "sum_l_orderkey_ok",
          ((CAST("lineitem"."l_shipdate" AS DATE) + CAST(TRUNC((-1 * (EXTRACT(DAY FROM "lineitem"."l_shipdate") - 1))) AS INTEGER) * INTERVAL '1 DAY') + CAST(TRUNC((-1 * (EXTRACT(MONTH FROM "lineitem"."l_shipdate") - 1))) AS INTEGER) * INTERVAL '1 MONTH') AS "tyr_l_shipdate_ok"
        FROM "public"."lineitem" "lineitem"
        GROUP BY 1,
          2,
          4;fetch 10000 in "SQL_CUR0x7fdabf04ca00

一些注意事项

关掉cursor

默认情况下Tableau使用cursor模式从AnalyticDB for PostgreSQL拉取数据:

FETCH 10000 in “SQL_CUR0x7fe678049e00”

如果提取的数据量很大,并且Tableau服务器的内存足够放下所有的查询数据,可以通过关闭cursor的模式进行性能调优。
通过TDC文件关闭cursor模式:

<?xml version='1.0' encoding='utf-8' ?>  
<connection-customization class='greenplum' enabled='true' version='4.3'>  
<vendor name='greenplum'/>  
<driver name='greenplum'/>  
<customizations>  
<customization name='odbc-connect-string-extras' value='UseDeclareFetch=0' />
</customizations>  
</connection-customization>

将该文件以tdc为后缀名,Desktop版本的Tableau放到DocumentsMy Tableau RepositoryDatasources下面,其他版本的同样放置到对应的Datasources目录下,重启Tableau即可生效。
也可以修改fetch的size,让其每次fetch更多的数据:

<?xml version='1.0' encoding='utf-8' ?>  
<connection-customization class='greenplum' enabled='true' version='4.3'>  
<vendor name='greenplum'/>  
<driver name='greenplum'/>  
<customizations>  
<customization name='odbc-connect-string-extras' value='Fetch=100000' />  
</customizations>  
</connection-customization>

初始化sql

连接建立时可以通过初始化SQL设置特定参数,例如:



SQL后面不要带‘;’,否则执行会报错,因为Tableau会将该SQL封装执行,中间如果有分号会报语法错误。同样在自定义SQL时,SQL结尾也不能加‘;’。



本文作者:陆封

原文链接

更多技术干货敬请关注云栖社区知乎机构号:阿里云云栖社区 - 知乎

本文为云栖社区原创内容,未经允许不得转载。

GMTC2019|闲鱼-基于Flutter的架构演进与创新

2012年应届毕业加入阿里巴巴,主导了闲鱼基于Flutter的新混合架构,同时推进了Flutter在闲鱼各业务线的落地。未来将持续关注终端技术的演变及趋势

Flutter的优势与挑战



Flutter是Google开源的跨端便携UI工具包,除了具有非常优秀的跨端渲染一致性,还具备非常高效的研发体验,丰富的开箱即用的UI组件,以及跟Native媲美的性能体验。由于它的众多优势,也使得Flutter成为了近些年来热门的新技术。

通过以上的特点可以看出,Flutter可以极大的加速客户端的研发效率,与此同时得到优秀的性能体验,基于我的思考,Flutter会为以下团队带来较大的收益:

  • 中小型的客户端团队非常适合Flutter开发,不仅一端编写双端产出,还有效的解决了小团队需要双端人员(iOS:Android)占比接近1:1的限制,在项目快速推进过程中,能让整个团队的产能最大化。
  • App在Android市场占比远高于iOS的团队,比如出海东南亚的一些App,Android市场整体占比在90%以上,通过Flutter可以将更多的人力Focus在Android市场上,同时通过在iOS端较小的投入,在结果上达到买一送一的效果。
  • 以量产App为主要策略的团队,不论是量产ToB的企业App,还是有针对性的产出不同领域的ToC的App的公司,都可以通过一端开发多端产出的Flutter得到巨大的产能提升。



闲鱼在以上的场景中属于第一种场景,服务3亿用户的闲鱼App的背后,开发资源投入很少,与竞对相比,我们是一只再小不过的团队,在这种场景下,Flutter为闲鱼业务的稳定发展以及提供更多的创新产品给予了很大的帮助。

但与此同时,Flutter在设计上带来的优势同时又会带来新的问题。所有的新技术都是脱胎于老技术的,Flutter也不例外,其身上带有很多Chrome的影子。我们再做一层简化,如果我们认为Flutter是一个使用Dart语言的浏览器容器,请大家思考一下两个问题如何解决。

  • 如果在一个已经存在的App中加入Flutter,如何让Native与Flutter进行无缝的衔接,同时保证相互开发之间的隔离性
  • 如果在Flutter的容器中,使用已有的Native UI组件,在Flutter与Native渲染机制不同的情况下,怎么保证两者的无缝衔接以及高性能。

闲鱼的架构演进与创新

带着上面两个问题,我们来到闲鱼场景下的具体Case以及解决方案的演进过程。

已有App+Flutter容器



在这种情况下,闲鱼需要考虑的是首先要考虑引入Flutter容器后的内存压力,保证不要产生更多的内存溢出。与此同时我们希望能让Flutter和Native之间的页面切换是顺畅的,对不同技术栈之间的同学透明。因此我们有针对性的进行了多次迭代。

在没有任何改造的情况下以iOS为例,你可以通过创建新的FlutterViewController来创建一个新的Flutter容器,这个方案下,当创建多个FlutterViewController时会同时在内存中创建多个Flutter Engine的Runtime(虽然底层Dart VM依然只有一个),这对内存消耗是相当大的,同时多个Flutter Engine的Runtime会造成每个Runtime内的数据无法直接共享,造成数据同步困难。

这种情况下,闲鱼选择了全局共享同一个FlutterViewController的方式保证了内存占用的最小化,同时通过基础框架Flutter Boost提供了Native栈与Flutter栈的通信与管理,保证了当Native打开或关闭一个新的Flutter页面时,Dart侧的Navigator也做到自动的打开或关闭一个新的Widget。目前Google官方的提供的方案上就是参考闲鱼早先的这个版本进行的实现的。

然而在这种情况下,如果出现如闲鱼图中所示多个Tab的场景下,整个堆栈逻辑就会产生混乱,因此闲鱼在这个基础上对Flutter Boost的方案进行了升级并开源,通过在Dart侧提供一个BoostContainerManager的方式,提供了对多个Navigator的管理能力,如果打比方来看这件事,就相当于,针对Flutter的容器提供了一个类似WebView的OpenWindow的能力,每做一次OpenWindow的调用,就会产生一个新的Navigator,这样开发者就可以自由的选择是在Navigator里进行Push和Pop,还是直接通过Flutter Boost新开一个Navigator进行独立管理。

Flutter Boost目前已在github开源,由于闲鱼目前线上版本只支持Flutter 1.2的版本,因此需要支持1.5的同学等稍等,我们会在近期更新支持1.5的Flutter Boost版本。

Flutter页面+Native UI



由于闲鱼是一个闲置交易社区,因此图片和视频相对较多,对图片视频的线上性能以及内存占用有较严格的要求。目前Flutter已提供的几种方案中(Platform View以及Flutter Plugin),不论是对内存的占用还是整个的线上流畅度上还存在一定的问题,这就造成了当大部分同学跟闲鱼一样实现一个复杂的图文Feed推荐场景的时候,非常容易产生内存溢出。而实际上,闲鱼在以上的场景下有针对性的做出了较大的优化。

在整个的Native UI到Flutter渲染引擎桥接的过程中,我们选用了Flutter Plugin中提供的FlutterTextureRegistry的能力,在去年上半年我们优先针对视频的场景进行了优化,优化的思路主要是针对Flutter Engine底层的外接纹理接口进行修改,将原有接口中必须传入一个PixelBuffer的内存对象这一限制做了扩展,增加一个新的接口保证其可以传入一个GPU对象的TextureID。

如图中所示,优化后的整个链路Flutter Engine可以直接通过Native端已经生成好的TextureID进行Flutter侧的渲染,这样就将链路从Native侧生成的TextureID->copy的内存对象PixelBuffer->生成新的TextureID->渲染,转变为Native侧生成的TextureID->渲染。整个链路极大的缩短,保证了整个的渲染效率以及更小的内存消耗。闲鱼在将这套方案上线后,又尝试将该方案应用于图片渲染的场景下,使得图片的缓存,CDN优化,图片裁切等方案与Native归一,在享受已有集团中间件的性能优化的同时,也得到了更小的内存消耗,方案落地后,内存溢出大幅减少。

目前该方案由于需要配合Flutter Engine的修改,因此暂时无法提供完整的方案至开源社区,我们正在跟google积极沟通整个修改方案,相信在这一两个月内会将试验性的Engine Patch开源至社区,供有兴趣的同学参考。

复杂业务场景的架构创新实践



将以上两个问题解决以后,闲鱼开始了Flutter在业务侧的全面落地,然而很快又遇到新的问题,在多人协作过程中:

  • 如何提供一些标准供大家进行参考保证代码的一致性
  • 如何将复杂业务进行有效的拆解变成子问题
  • 如何保证更多的同学可以快速上手并写出性能和稳定性都不错的代码



在方案的前期,我们使用了社区的Flutter Redux方案,由于最先落地的详情,发布等页面较为复杂,因此我们有针对性的对View进行了组件化的拆分,但由于业务的复杂性,很快这套方案就出现了问题,对于单个页面来说,State的属性以及Reducer的数量都非常多,当产生新需求堆叠的时候,修改困难,容易产生线上问题。

针对以上的情况,我们进行了整个方案的第二个迭代,在原有Page的基础上提供了Component的概念,使得每个Component具备完整的Redux元素,保证了UI,逻辑,数据的完整隔离,每个Component单元下代码相对较少,易于维护和开发,但随之而来的问题是,当页面需要产生数据同步时,整个的复杂性飙升,在Page的维度上失去了统一状态管理的优势。



在这种情况下闲鱼换个角度看端侧的架构设计,我们参考React Redux框架中的Connect的思想,移除掉在Component的Store,随之而来的是新的Connector作为Page和Component的数据联通的桥梁,我们基于此实现了Page State到Component State的转换,以及Component State变化后对Page State的自动同步,从而保证了将复杂业务有效的拆解成子问题,同时享受到统一状态管理的优势。与此同时基于新的框架,在统一了大家的开发标准的情况下,新框架也在底层有针对性的提供了对长列表,多列表拼接等case下的一些性能优化,保证了每一位同学在按照标准开发后,可以得到相对目前市面上其他的Flutter业务框架相比更好的性能。

目前这套方案Fish Redux已经在github开源,目前支持1.5版本,感兴趣的同学可以去github进行了解。

研发智能化在闲鱼的应用

闲鱼在去年经历了业务的快速成长,在这个阶段上,我们同时进行了大量的Flutter的技术改造和升级,在尝试新技术的同时,如何能保证线上的稳定,线下的有更多的时间进行新技术的尝试和落地,我们需要一些新的思路和工作方式上的改变。



以我们日常工作为例,Flutter的研发同学,在每次开发完成后,需要在本地进行Flutter产物的编译并上传到远端Repo,以便对Native同学透明,保证日常的研发不受Flutter改造的干扰。在这个过程中,Flutter侧的业务开发同学面临着很多打包上传更新同步等繁琐的工作,一不小心就会出错,后续的排查等让Flutter前期的开发变成了开发5分钟,打包测试2小时。同时Flutter到底有没有解决研发效率快的问题,以及同学们在落地过程中有没有Follow业务架构的标准,这一切都是未知的。



在痛定思痛以后,我们认为数据化+自动化是解决这些问题的一个较好的思路。因此我们首先从源头对代码进行管控,通过commit,将代码与后台的需求以及bug一一关联,对于不符合要求的commit信息,不允许进行代码合并,从而保证了后续数据报表分析的数据源头是健康的。

在完成代码和任务关联后,通过webhook就可以比较轻松的完成后续的工作,将每次的commit有效的关联到我们的持续集成平台的任务上来,通过闲鱼CI工作平台将日常打包自动化测试等流程变为自动化的行为,从而极大的减少了日常的工作。粗略统计下来,在去年自动化体系落地的过程中单就自动打Flutter包上传以及触发最终的App打包这一流程就让每位同学每天节省一个小时以上的工作量,效果非常明显。另外,基于代码关联需求的这套体系,可以相对容易的构建后续的数据报表对整个过程和结果进行细化的分析,用数据驱动过程改进,保证新技术的落地过程的收益有理有据。

总结与展望

回顾一下上下文

  • Flutter的特性非常适合中小型客户端团队/Android市场占比较高的团队/量产App的团队。同时由于Flutter的特性导致其在混合开发的场景下面存在一定劣势。
  • 闲鱼团队针对混合开发上的几个典型问题提供了对应的解决方案,使整个方案达到上线要求,该修改会在后续开放给google及社区。
  • 为全面推动Flutter在业务场景下的落地,闲鱼团队通过多次迭代演进出Fish Redux框架,保证了每位同学可以快速写出相对优秀的Flutter代码。
  • 新技术的落地过程中,在过程中通过数据化和自动化的方案极大的提升了过程中的效率,为Flutter在闲鱼的落地打下了坚实的基础。

除了本文提及的各种方案外,闲鱼目前还在多个方向上发力,并对针对Flutter生态的未来进行持续的关注,分享几个现在在做的事情

  • Flutter整个上层基础设施的标准化演进,混合工程体系是否可以在上层完成类似Spring-boot的完整体系构架,帮助更多的Flutter团队解决上手难,无行业标准的问题。
  • 动态性能力的扩展,在符合各应用商店标准的情况下,助力业务链路的运营效率提升,保证业务效果。目前闲鱼已有的动态化方案会后续作为Fish-Redux的扩展能力提供动态化组件能力+工具链体系。
  • Fish-Redux + UI2Code,打通代码生成链路和业务框架,保证在团队标准统一的情况下,将UI工作交由机器生成。
  • Flutter + FaaS,让客户端同学可以成为全栈工程师,通过前后端一体的架构设计,极大的减少协同,提升效率。

让工程师去从事更多创造性的工作,是我们一直努力的目标。闲鱼团队也会在新的一年更多的完善Flutter体系的建设,将更多已有的沉淀回馈给社区,帮助Flutter社区一起健康成长。



本文作者:闲鱼技术-宗心

原文链接

更多技术干货敬请关注云栖社区知乎机构号:阿里云云栖社区 - 知乎

本文为云栖社区原创内容,未经允许不得转载。

原理解析 | 深入了解 Apache Flink 的网络协议栈

作者:Nico Kruber;翻译:曹英杰

Flink 的网络协议栈是组成 flink-runtime 模块的核心组件之一,是每个 Flink 作业的核心。它连接所有 TaskManager 的各个子任务(Subtask),因此,对于 Flink 作业的性能包括吞吐与延迟都至关重要。与 TaskManager 和 JobManager 之间通过基于 Akka 的 RPC 通信的控制通道不同,TaskManager 之间的网络协议栈依赖于更加底层的 Netty API。

本文将首先介绍 Flink 暴露给流算子(Stream operator)的高层抽象,然后详细介绍 Flink 网络协议栈的物理实现和各种优化、优化的效果以及 Flink 在吞吐量和延迟之间的权衡。

1.逻辑视图

Flink 的网络协议栈为彼此通信的子任务提供以下逻辑视图,例如在 A 通过 keyBy() 操作进行数据 Shuffle :



这一过程建立在以下三种基本概念的基础上:

▼ 子任务输出类型(ResultPartitionType):
Pipelined(有限的或无限的):一旦产生数据就可以持续向下游发送有限数据流或无限数据流。
Blocking:仅在生成完整结果后向下游发送数据。

▼ 调度策略:
同时调度所有任务(Eager):同时部署作业的所有子任务(用于流作业)。
上游产生第一条记录部署下游(Lazy):一旦任何生产者生成任何输出,就立即部署下游任务。
上游产生完整数据部署下游:当任何或所有生产者生成完整数据后,部署下游任务。

▼ 数据传输:
高吞吐:Flink 不是一个一个地发送每条记录,而是将若干记录缓冲到其网络缓冲区中并一次性发送它们。这降低了每条记录的发送成本因此提高了吞吐量。
低延迟:当网络缓冲区超过一定的时间未被填满时会触发超时发送,通过减小超时时间,可以通过牺牲一定的吞吐来获取更低的延迟。

我们将在下面深入 Flink 网络协议栈的物理实现时看到关于吞吐延迟的优化。对于这一部分,让我们详细说明输出类型与调度策略。首先,需要知道的是子任务的输出类型和调度策略是紧密关联的,只有两者的一些特定组合才是有效的。

Pipelined 结果是流式输出,需要目标 Subtask 正在运行以便接收数据。因此需要在上游 Task 产生数据之前或者产生第一条数据的时候调度下游目标 Task 运行。批处理作业生成有界结果数据,而流式处理作业产生无限结果数据。

批处理作业也可能以阻塞方式产生结果,具体取决于所使用的算子和连接模式。在这种情况下,必须等待上游 Task 先生成完整的结果,然后才能调度下游的接收 Task 运行。这能够提高批处理作业的效率并且占用更少的资源。

下表总结了 Task 输出类型以及调度策略的有效组合:



注释:

[1]目前 Flink 未使用
[2]批处理 / 流计算统一完成后,可能适用于流式作业

此外,对于具有多个输入的子任务,调度以两种方式启动:当所有或者任何上游任务产生第一条数据或者产生完整数据时调度任务运行。要调整批处理作业中的输出类型和调度策略,可以参考 ExecutionConfig#setExecutionMode()——尤其是 ExecutionMode,以及 ExecutionConfig#setDefaultInputDependencyConstraint()。

2.物理数据传输

为了理解物理数据连接,请回想一下,在 Flink 中,不同的任务可以通过 Slotsharing group 共享相同 Slot。TaskManager 还可以提供多个 Slot,以允许将同一任务的多个子任务调度到同一个 TaskManager 上。

对于下图所示的示例,我们假设 2 个并发为 4 的任务部署在 2 个 TaskManager 上,每个 TaskManager 有两个 Slot。TaskManager 1 执行子任务 A.1,A.2,B.1 和 B.2,TaskManager 2 执行子任务 A.3,A.4,B.3 和 B.4。在 A 和 B 之间是 Shuffle 连接类型,比如来自于 A 的 keyBy() 操作,在每个 TaskManager 上会有 2x4 个逻辑连接,其中一些是本地的,另一些是远程的:



不同任务(远程)之间的每个网络连接将在 Flink 的网络堆栈中获得自己的 TCP 通道。但是,如果同一任务的不同子任务被调度到同一个 TaskManager,则它们与同一个 TaskManager 的网络连接将多路复用并共享同一个 TCP 信道以减少资源使用。在我们的例子中,这适用于 A.1→B.3,A.1→B.4,以及 A.2→B.3 和 A.2→B.4,如下图所示:



每个子任务的输出结果称为 ResultPartition,每个 ResultPartition 被分成多个单独的 ResultSubpartition- 每个逻辑通道一个。Flink 的网络协议栈在这一点的处理上,不再处理单个记录,而是将一组序列化的记录填充到网络缓冲区中进行处理。每个子任务本地缓冲区中最多可用 Buffer 数目为(每个发送方和接收方各一个):

#channels  * buffers-per-channel + floating-buffers-per-gate

单个 TaskManager 上的网络层 Buffer 总数通常不需要配置。有关如何在需要时进行配置的详细信息,请参阅配置网络缓冲区的文档。

▼ 造成反压(1)

每当子任务的数据发送缓冲区耗尽时——数据驻留在 Subpartition 的缓冲区队列中或位于更底层的基于 Netty 的网络堆栈内,生产者就会被阻塞,无法继续发送数据,而受到反压。接收端以类似的方式工作:Netty 收到任何数据都需要通过网络 Buffer 传递给 Flink。如果相应子任务的网络缓冲区中没有足够可用的网络 Buffer,Flink 将停止从该通道读取,直到 Buffer 可用。这将反压该多路复用上的所有发送子任务,因此也限制了其他接收子任务。下图说明了过载的子任务 B.4,它会导致多路复用的反压,也会导致子任务 B.3 无法接受和处理数据,即使是 B.3 还有足够的处理能力。



为了防止这种情况发生,Flink 1.5 引入了自己的流量控制机制。

3.Credit-based 流量控制

Credit-based 流量控制可确保发送端已经发送的任何数据,接收端都具有足够的能力(Buffer)来接收。新的流量控制机制基于网络缓冲区的可用性,作为 Flink 之前机制的自然延伸。每个远程输入通道(RemoteInputChannel)现在都有自己的一组独占缓冲区(Exclusive buffer),而不是只有一个共享的本地缓冲池(LocalBufferPool)。与之前不同,本地缓冲池中的缓冲区称为流动缓冲区(Floating buffer),因为它们会在输出通道间流动并且可用于每个输入通道。

数据接收方会将自身的可用 Buffer 作为 Credit 告知数据发送方(1 buffer = 1 credit)。每个 Subpartition 会跟踪下游接收端的 Credit(也就是可用于接收数据的 Buffer 数目)。只有在相应的通道(Channel)有 Credit 的时候 Flink 才会向更底层的网络协议栈发送数据(以 Buffer 为粒度),并且每发送一个 Buffer 的数据,相应的通道上的 Credit 会减 1。除了发送数据本身外,数据发送端还会发送相应 Subpartition 中有多少正在排队发送的 Buffer 数(称之为 Backlog)给下游。数据接收端会利用这一信息(Backlog)去申请合适数量的 Floating buffer 用于接收发送端的数据,这可以加快发送端堆积数据的处理。接收端会首先申请和 Backlog 数量相等的 Buffer,但可能无法申请到全部,甚至一个都申请不到,这时接收端会利用已经申请到的 Buffer 进行数据接收,并监听是否有新的 Buffer 可用。



Credit-based 的流控使用 Buffers-per-channel 来指定每个 Channel 有多少独占的 Buffer,使用 Floating-buffers-per-gate 来指定共享的本地缓冲池(Local buffer pool)大小(可选3),通过共享本地缓冲池,Credit-based 流控可以使用的 Buffer 数目可以达到与原来非 Credit-based 流控同样的大小。这两个参数的默认值是被精心选取的,以保证新的 Credit-based 流控在网络健康延迟正常的情况下至少可以达到与原策略相同的吞吐。可以根据实际的网络 RRT (round-trip-time)和带宽对这两个参数进行调整。

注释3:如果没有足够的 Buffer 可用,则每个缓冲池将获得全局可用 Buffer 的相同份额(±1)。

▼ 造成反压(2)

与没有流量控制的接收端反压机制不同,Credit 提供了更直接的控制:如果接收端的处理速度跟不上,最终它的 Credit 会减少成 0,此时发送端就不会在向网络中发送数据(数据会被序列化到 Buffer 中并缓存在发送端)。由于反压只发生在逻辑链路上,因此没必要阻断从多路复用的 TCP 连接中读取数据,也就不会影响其他的接收者接收和处理数据。

▼ Credit-based 的优势与问题

由于通过 Credit-based 流控机制,多路复用中的一个信道不会由于反压阻塞其他逻辑信道,因此整体资源利用率会增加。此外,通过完全控制正在发送的数据量,我们还能够加快 Checkpoint alignment:如果没有流量控制,通道需要一段时间才能填满网络协议栈的内部缓冲区并表明接收端不再读取数据了。在这段时间里,大量的 Buffer 不会被处理。任何 Checkpoint barrier(触发 Checkpoint 的消息)都必须在这些数据 Buffer 后排队,因此必须等到所有这些数据都被处理后才能够触发 Checkpoint(“Barrier 不会在数据之前被处理!”)。

但是,来自接收方的附加通告消息(向发送端通知 Credit)可能会产生一些额外的开销,尤其是在使用 SSL 加密信道的场景中。此外,单个输入通道( Input channel)不能使用缓冲池中的所有 Buffer,因为存在无法共享的 Exclusive buffer。新的流控协议也有可能无法做到立即发送尽可能多的数据(如果生成数据的速度快于接收端反馈 Credit 的速度),这时则可能增长发送数据的时间。虽然这可能会影响作业的性能,但由于其所有优点,通常新的流量控制会表现得更好。可能会通过增加单个通道的独占 Buffer 数量,这会增大内存开销。然而,与先前实现相比,总体内存使用可能仍然会降低,因为底层的网络协议栈不再需要缓存大量数据,因为我们总是可以立即将其传输到 Flink(一定会有相应的 Buffer 接收数据)。

在使用新的 Credit-based 流量控制时,可能还会注意到另一件事:由于我们在发送方和接收方之间缓冲较少的数据,反压可能会更早的到来。然而,这是我们所期望的,因为缓存更多数据并没有真正获得任何好处。如果要缓存更多的数据并且保留 Credit-based 流量控制,可以考虑通过增加单个输入共享 Buffer 的数量。



注意:如果需要关闭 Credit-based 流量控制,可以将这个配置添加到 flink-conf.yaml 中:taskmanager.network.credit-model:false。但是,此参数已过时,最终将与非 Credit-based 流控制代码一起删除。

4.序列号与反序列化

下图从上面的扩展了更高级别的视图,其中包含网络协议栈及其周围组件的更多详细信息,从发送算子发送记录(Record)到接收算子获取它:



在生成 Record 并将其传递出去之后,例如通过 Collector#collect(),它被传递给 RecordWriter,RecordWriter 会将 Java 对象序列化为字节序列,最终存储在 Buffer 中按照上面所描述的在网络协议栈中进行处理。RecordWriter 首先使用 SpanningRecordSerializer 将 Record 序列化为灵活的堆上字节数组。然后,它尝试将这些字节写入目标网络 Channel 的 Buffer 中。我们将在下面的章节回到这一部分。

在接收方,底层网络协议栈(Netty)将接收到的 Buffer 写入相应的输入通道(Channel)。流任务的线程最终从这些队列中读取并尝试在 RecordReader 的帮助下通过 SpillingAdaptiveSpanningRecordDeserializer 将累积的字节反序列化为 Java 对象。与序列化器类似,这个反序列化器还必须处理特殊情况,例如跨越多个网络 Buffer 的 Record,或者因为记录本身比网络缓冲区大(默认情况下为32KB,通过 taskmanager.memory.segment-size 设置)或者因为序列化 Record 时,目标 Buffer 中已经没有足够的剩余空间保存序列化后的字节数据,在这种情况下,Flink 将使用这些字节空间并继续将其余字节写入新的网络 Buffer 中。

4.1 将网络 Buffer 写入 Netty

在上图中,Credit-based 流控制机制实际上位于“Netty Server”(和“Netty Client”)组件内部,RecordWriter 写入的 Buffer 始终以空状态(无数据)添加到 Subpartition 中,然后逐渐向其中填写序列化后的记录。但是 Netty 在什么时候真正的获取并发送这些 Buffer 呢?显然,不能是 Buffer 中只要有数据就发送,因为跨线程(写线程与发送线程)的数据交换与同步会造成大量的额外开销,并且会造成缓存本身失去意义(如果是这样的话,不如直接将将序列化后的字节发到网络上而不必引入中间的 Buffer)。

在 Flink 中,有三种情况可以使 Netty 服务端使用(发送)网络 Buffer:

  • 写入 Record 时 Buffer 变满,或者
  • Buffer 超时未被发送,或
  • 发送特殊消息,例如 Checkpoint barrier。

在 Buffer 满后发送

RecordWriter 将 Record 序列化到本地的序列化缓冲区中,并将这些序列化后的字节逐渐写入位于相应 Result subpartition 队列中的一个或多个网络 Buffer中。虽然单个 RecordWriter 可以处理多个 Subpartition,但每个 Subpartition 只会有一个 RecordWriter 向其写入数据。另一方面,Netty 服务端线程会从多个 Result subpartition 中读取并像上面所说的那样将数据写入适当的多路复用信道。这是一个典型的生产者 - 消费者模式,网络缓冲区位于生产者与消费者之间,如下图所示。在(1)序列化和(2)将数据写入 Buffer 之后,RecordWriter 会相应地更新缓冲区的写入索引。一旦 Buffer 完全填满,RecordWriter 会(3)为当前 Record 剩余的字节或者下一个 Record 从其本地缓冲池中获取新的 Buffer,并将新的 Buffer 添加到相应 Subpartition 的队列中。这将(4)通知 Netty服务端线程有新的数据可发送(如果 Netty 还不知道有可用的数据的话4)。每当 Netty 有能力处理这些通知时,它将(5)从队列中获取可用 Buffer 并通过适当的 TCP 通道发送它。



注释4:如果队列中有更多已完成的 Buffer,我们可以假设 Netty 已经收到通知。

在 Buffer 超时后发送

为了支持低延迟应用,我们不能只等到 Buffer 满了才向下游发送数据。因为可能存在这种情况,某种通信信道没有太多数据,等到 Buffer 满了在发送会不必要地增加这些少量 Record 的处理延迟。因此,Flink 提供了一个定期 Flush 线程(the output flusher)每隔一段时间会将任何缓存的数据全部写出。可以通过 StreamExecutionEnvironment#setBufferTimeout 配置 Flush 的间隔,并作为延迟5的上限(对于低吞吐量通道)。下图显示了它与其他组件的交互方式:RecordWriter 如前所述序列化数据并写入网络 Buffer,但同时,如果 Netty 还不知道有数据可以发送,Output flusher 会(3,4)通知 Netty 服务端线程数据可读(类似与上面的“buffer已满”的场景)。当 Netty 处理此通知(5)时,它将消费(获取并发送)Buffer 中的可用数据并更新 Buffer 的读取索引。Buffer 会保留在队列中——从 Netty 服务端对此 Buffer 的任何进一步操作将在下次从读取索引继续读取。



注释5:严格来说,Output flusher 不提供任何保证——它只向 Netty 发送通知,而 Netty 线程会按照能力与意愿进行处理。这也意味着如果存在反压,则 Output flusher 是无效的。

特殊消息后发送

一些特殊的消息如果通过 RecordWriter 发送,也会触发立即 Flush 缓存的数据。其中最重要的消息包括 Checkpoint barrier 以及 end-of-partition 事件,这些事件应该尽快被发送,而不应该等待 Buffer 被填满或者 Output flusher 的下一次 Flush。

进一步的讨论

与小于 1.5 版本的 Flink 不同,请注意(a)网络 Buffer 现在会被直接放在 Subpartition 的队列中,(b)网络 Buffer 不会在 Flush 之后被关闭。这给我们带来了一些好处:

  • 同步开销较少(Output flusher 和 RecordWriter 是相互独立的)
  • 在高负荷情况下,Netty 是瓶颈(直接的网络瓶颈或反压),我们仍然可以在未完成的 Buffer 中填充数据
  • Netty 通知显著减少

但是,在低负载情况下,可能会出现 CPU 使用率和 TCP 数据包速率的增加。这是因为,Flink 将使用任何可用的 CPU 计算能力来尝试维持所需的延迟。一旦负载增加,Flink 将通过填充更多的 Buffer 进行自我调整。由于同步开销减少,高负载场景不会受到影响,甚至可以实现更高的吞吐。

4.2 BufferBuilder 和 BufferConsumer

更深入地了解 Flink 中是如何实现生产者 - 消费者机制,需要仔细查看 Flink 1.5 中引入的 BufferBuilder 和 BufferConsumer 类。虽然读取是以 Buffer 为粒度,但写入它是按 Record 进行的,因此是 Flink 中所有网络通信的核心路径。因此,我们需要在任务线程(Task thread)和 Netty 线程之间实现轻量级连接,这意味着尽量小的同步开销。你可以通过查看源代码获取更加详细的信息。

5. 延迟与吞吐

引入网络 Buffer 的目是获得更高的资源利用率和更高的吞吐,代价是让 Record 在 Buffer 中等待一段时间。虽然可以通过 Buffer 超时给出此等待时间的上限,但可能很想知道有关这两个维度(延迟和吞吐)之间权衡的更多信息,显然,无法两者同时兼得。下图显示了不同的 Buffer 超时时间下的吞吐,超时时间从 0 开始(每个 Record 直接 Flush)到 100 毫秒(默认值),测试在具有 100 个节点每个节点 8 个 Slot 的群集上运行,每个节点运行没有业务逻辑的 Task 因此只用于测试网络协议栈的能力。为了进行比较,我们还测试了低延迟改进(如上所述)之前的 Flink 1.4 版本。



如图,使用 Flink 1.5+,即使是非常低的 Buffer 超时(例如1ms)(对于低延迟场景)也提供高达超时默认参数(100ms)75% 的最大吞吐,但会缓存更少的数据。

6.结论

了解 Result partition,批处理和流式计算的不同网络连接以及调度类型,Credit-Based 流量控制以及 Flink 网络协议栈内部的工作机理,有助于更好的理解网络协议栈相关的参数以及作业的行为。后续我们会推出更多 Flink 网络栈的相关内容,并深入更多细节,包括运维相关的监控指标(Metrics),进一步的网络调优策略以及需要避免的常见错误等。

via:
https://flink.apache.org/2019/06/05/flink-network-stack.html



本文作者:Ververica

原文链接

更多技术干货敬请关注云栖社区知乎机构号:阿里云云栖社区 - 知乎

本文为云栖社区原创内容,未经允许不得转载。

Express 的使用

以下内容,基于 Express 4.x 版本

Node.js 的 Express

Express 估计是那种你第一次接触,就会喜欢上用它的框架。因为它真的非常简单,直接。

在当前版本上,一共才这么几个文件:

lib/
├── application.js
├── express.js
├── middleware
│   ├── init.js
│   └── query.js
├── request.js
├── response.js
├── router
│   ├── index.js
│   ├── layer.js
│   └── route.js
├── utils.js
└── view.js

这种程度,说它是一个“框架”可能都有些过了,几乎都是工具性质的实现,只限于 Web 层。

当然,直接了当地实现了 Web 层的基本功能,是得益于 Node.js 本身的 API 中,就提供了 nethttp 这两层, Expresshttp 的方法包装一下即可。

不过,本身功能简单的东西,在 package.json 中却有好长一串 dependencies 列表。

Hello World

在跑 Express 前,你可能需要初始化一个 npm 项目,然后再使用 npm 安装 Express

mkdir p
cd p
npm init
npm install express --save

新建一个 app.js

const express = require('express');
const app = express();
app.all('/', (req, res) => res.send('hello') );
app.listen(8888);

调试信息是通过环境变量 DEBUG 控制的:

const process = require('process');
process.env['DEBUG'] = 'express:*';

这样就可以在终端看到带颜色的输出了,嗯,是的,带颜色控制字符,vim 中直接跑就 SB 了。

应用 Application

Application 是一个上层统筹的概念,整合“请求-响应”流程。 express() 的调用会返回一个 application ,一个项目中,有多个 app 是没问题的:

const express = require('express');

const app = express();
app.all('/', (req, res) => res.send('hello'));
app.listen(8888);

const app2 = express();
app2.all('/', (req, res) => res.send('hello2'));
app2.listen(8889);

多个 app 的另一个用法,是直接把某个 path 映射到整个 app

const express = require('express');

const app = express();

app.all('/', (req, res) => {
    res.send('ok');
});

const app2 = express();
app2.get('/xx', (req, res, next) => res.send('in app2') )
app.use('/2', app2)

app.listen(8888);

这样,当访问 /2/xx 时,就会看到 in app2 的响应。

前面说了 app 实际上是一个上层调度的角色,在看后面的内容之前,先说一下 Express 的特点,整体上来说,它的结构基本上是“回调函数串行”,无论是 app ,或者 routehandlemiddleware这些不同的概念,它们的形式,基本是一致的,就是 (res, req, next) => {} ,串行的流程依赖 next() 的显式调用。

我们把 app 的功能,分成五个部分来说。

路由 - Handler 映射

app.all('/', (req, res, next) => {});
app.get('/', (req, res, next) => {});
app.post('/', (req, res, next) => {});
app.put('/', (req, res, next) => {});
app.delete('/', (req, res, next) => {});

上面的代码就是基本的几个方法,路由的匹配是串行的,可以通过 next() 控制:

const express = require('express');

const app = express();

app.all('/', (req, res, next) => {
    res.send('1 ');
    console.log('here');
    next();
});

app.get('/', (req, res, next) => {
    res.send('2 ');
    console.log('get');
    next();
});

app.listen(8888);

对于上面的代码,因为重复调用 send() 会报错。

同样的功能,也可以使用 app.route() 来实现:

const express = require('express');

const app = express();

app.route('/').all( (req, res, next) => {
    console.log('all');
    next();
}).get( (req, res, next) => {
    res.send('get');
    next();
}).all( (req, res, next) => {
    console.log('tail');
    next();
});

app.listen(8888);

app.route() 也是一种抽象通用逻辑的形式。

还有一个方法是 app.params ,它把“命名参数”的处理单独拆出来了(我个人不理解这玩意儿有什么用):

const express = require('express');

const app = express();

app.route('/:id').all( (req, res, next) => {
    console.log('all');
    next();
}).get( (req, res, next) => {
    res.send('get');
    next()
}).all( (req, res, next) => {
    console.log('tail');
});

app.route('/').all( (req, res) => {res.send('ok')});

app.param('id', (req, res, next, value) => {
    console.log('param', value);
    next();
});

app.listen(8888);

app.params 中的对应函数会先行执行,并且,记得显式调用 next()

Middleware

其实前面讲了一些方法,要实现 Middleware 功能,只需要 app.all(/.*/, () => {}) 就可以了, Express 还专门提供了 app.use() 做通用逻辑的定义:

const express = require('express');

const app = express();

app.all(/.*/, (req, res, next) => {
    console.log('reg');
    next();
});

app.all('/', (req, res, next) => {
    console.log('pre');
    next();
});

app.use((req, res, next) => {
    console.log('use');
    next();
});

app.all('/', (req, res, next) => {
    console.log('all');
    res.send('/ here');
    next();
});

app.use((req, res, next) => {
    console.log('use2');
    next();
});

app.listen(8888);

注意 next() 的显式调用,同时,注意定义的顺序, use()all() 顺序上是平等的。

Middleware 本身也是 (req, res, next) => {} 这种形式,自然也可以和 app 有对等的机制——接受路由过滤, Express 提供了 Router ,可以单独定义一组逻辑,然后这组逻辑可以跟 Middleware一样使用。

const express = require('express');
const app = express();
const router = express.Router();

app.all('/', (req, res) => {
    res.send({a: '123'});
});

router.all('/a', (req, res) => {
    res.send('hello');
});

app.use('/route', router);

app.listen(8888);

功能开关,变量容器

app.set()app.get() 可以用来保存 app 级别的变量(对, app.get() 还和 GET 方法的实现名字上还冲突了):

const express = require('express');

const app = express();

app.all('/', (req, res) => {
    app.set('title', '标题123');
    res.send('ok');
});

app.all('/t', (req, res) => {
    res.send(app.get('title'));
});

app.listen(8888);

上面的代码,启动之后直接访问 /t 是没有内容的,先访问 / 再访问 /t 才可以看到内容。

对于变量名, Express 预置了一些,这些变量的值,可以叫 settings ,它们同时也影响整个应用的行为:

  • case sensitive routing
  • env
  • etag
  • jsonp callback name
  • json escape
  • json replacer
  • json spaces
  • query parser
  • strict routing
  • subdomain offset
  • trust proxy
  • views
  • view cache
  • view engine
  • x-powered-by

具体的作用,可以参考 https://expressjs.com/en/4x/api.html#app.set

(上面这些值中,干嘛不放一个最基本的 debug 呢……)

除了基本的 set() / get() ,还有一组 enable() / disable() / enabled() / disabled() 的包装方法,其实就是 set(name, false) 这种。 set(name) 这种只传一个参数,也可以获取到值,等于 get(name)

模板引擎

Express 没有自带模板,所以模板引擎这块就被设计成一个基础的配置机制了。

const process = require('process');
const express = require('express');
const app = express();

app.set('views', process.cwd() + '/template');

app.engine('t2t', (path, options, callback) => {
    console.log(path, options);
    callback(false, '123');
});

app.all('/', (req, res) => {
    res.render('demo.t2t', {title: "标题"}, (err, html) => {
        res.send(html)
    });
});

app.listen(8888);

app.set('views', ...) 是配置模板在文件系统上的路径, app.engine() 是扩展名为标识,注册对应的处理函数,然后, res.render() 就可以渲染指定的模板了。 res.render('demo') 这样不写扩展名也可以,通过 app.set('view engine', 't2t') 可以配置默认的扩展名。

这里,注意一下 callback() 的形式,是 callback(err, html)

端口监听

app 功能的最后一部分, app.listen() ,它完成的形式是:

app.listen([port[, host[, backlog]]][, callback])

注意, host 是第二个参数。

backlog 是一个数字,配置可等待的最大连接数。这个值同时受操作系统的配置影响。默认是 512 。

请求 Request

这一块倒没有太多可以说的,一个请求你想知道的信息,都被包装到 req 的属性中的。除了,头。头的信息,需要使用 req.get(name) 来获取。

GET 参数

使用 req.query 可以获取 GET 参数:

const express = require('express');
const app = express();

app.all('/', (req, res) => {
    console.log(req.query);
    res.send('ok');
});

app.listen(8888);

请求:

# -*- coding: utf-8 -*-
import requests
requests.get('http://localhost:8888', params={"a": '中文'.encode('utf8')})

POST 参数

POST 参数的获取,使用 req.body ,但是,在此之前,需要专门挂一个 Middleware , req.body才有值:

const express = require('express');
const app = express();

app.use(express.urlencoded({ extended: true }));
app.all('/', (req, res) => {
    console.log(req.body);
    res.send('ok');
});

app.listen(8888);
# -*- coding: utf-8 -*-

import requests

requests.post('http://localhost:8888', data={"a": '中文'})

如果你是整块扔的 json 的话:

# -*- coding: utf-8 -*-

import requests
import json

requests.post('http://localhost:8888', data=json.dumps({"a": '中文'}),
              headers={'Content-Type': 'application/json'})

Express 中也有对应的 express.json() 来处理:

const express = require('express');
const app = express();

app.use(express.json());
app.all('/', (req, res) => {
    console.log(req.body);
    res.send('ok');
});

app.listen(8888);

Express 中处理 body 部分的逻辑,是单独放在 body-parser 这个 npm 模块中的。 Express 也没有提供方法,方便地获取原始 raw 的内容。另外,对于 POST 提交的编码数据, Express 只支持 UTF-8 编码。

如果你要处理文件上传,嗯, Express 没有现成的 Middleware ,额外的实现在 https://github.com/expressjs/multer 。( Node.js 天然没有“字节”类型,所以在字节级别的处理上,就会感觉很不顺啊)

Cookie

Cookie 的获取,也跟 POST 参数一样,需要外挂一个 cookie-parser 模块才行:

const express = require('express');
const cookieParser = require('cookie-parser');
const app = express();
app.use(express.urlencoded({ extended: true }));
app.use(express.json());
app.use(cookieParser())
app.all('/', (req, res) => {
    console.log(req.cookies);
    res.send('ok');
});

app.listen(8888);

请求:

# -*- coding: utf-8 -*-

import requests
import json

requests.post('http://localhost:8888', data={'a': '中文'},
              headers={'Cookie': 'a=1'})

如果 Cookie 在响应时,是配置 res 做了签名的,则在 req 中可以通过 req.signedCookies 处理签名,并获取结果。

来源 IP

ExpressX-Forwarded-For 头,做了特殊处理,你可以通过 req.ips 获取这个头的解析后的值,这个功能需要配置 trust proxy 这个 settings 来使用:

const express = require('express');
const cookieParser = require('cookie-parser');
const app = express();
app.use(express.urlencoded({ extended: true }));
app.use(express.json());
app.use(cookieParser())
app.set('trust proxy', true);
app.all('/', (req, res) => {
    console.log(req.ips);
    console.log(req.ip);
    res.send('ok');
});

app.listen(8888);

请求:

# -*- coding: utf-8 -*-

import requests
import json

#requests.get('http://localhost:8888', params={"a": '中文'.encode('utf8')})
requests.post('http://localhost:8888', data={'a': '中文'},
              headers={'X-Forwarded-For': 'a, b, c'})

如果 trust proxy 不是 true ,则 req.ip 会是一个 ipv4 或者 ipv6 的值。

响应 Response

Express 的响应,针对不同类型,本身就提供了几种包装了。

普通响应

使用 res.send 处理确定性的内容响应:

res.send({ some: 'json' });
res.send('<p>some html</p>');
res.status(404); res.end();
res.status(500); res.end();

res.send() 会自动 res.end() ,但是,如果只使用 res.status() 的话,记得加上 res.end()

模板渲染

模板需要预先配置,在 Request 那节已经介绍过了。

const process = require('process');
const express = require('express');
const cookieParser = require('cookie-parser');
const app = express();
app.use(express.urlencoded({ extended: true }));
app.use(express.json());
app.use(cookieParser())

app.set('trust proxy', false);
app.set('views', process.cwd() + '/template');
app.set('view engine', 'html');
app.engine('html', (path, options, callback) => {
    callback(false, '<h1>Hello</h1>');
});

app.all('/', (req, res) => {
    res.render('index', {}, (err, html) => {
        res.send(html);
    });
});

app.listen(8888);

这里有一个坑点,就是必须在对应的目录下,有对应的文件存在,比如上面例子的 template/index.html ,那么 app.engine() 中的回调函数才会执行。都自定义回调函数了,这个限制没有任何意义, path, options 传入就好了,至于是不是要通过文件系统读取内容,怎么读取,又有什么关系呢。

Cookie

res.cookie 来处理 Cookie 头:

const process = require('process');
const express = require('express');
const cookieParser = require('cookie-parser');
const app = express();
app.use(express.urlencoded({ extended: true }));
app.use(express.json());
app.use(cookieParser("key"))

app.set('trust proxy', false);
app.set('views', process.cwd() + '/template');
app.set('view engine', 'html');
app.engine('html', (path, options, callback) => {
    callback(false, '<h1>Hello</h1>');
});
app.all('/', (req, res) => {
    res.render('index', {}, (err, html) => {
        console.log('cookie', req.signedCookies.a);
        res.cookie('a', '123', {signed: true});
        res.cookie('b', '123', {signed: true});
        res.clearCookie('b');
        res.send(html);
    });
});

app.listen(8888);

请求:

# -*- coding: utf-8 -*-

import requests
import json

res = requests.post('http://localhost:8888', data={'a': '中文'},
                    headers={'X-Forwarded-For': 'a, b, c',
                             'Cookie': 'a=s%3A123.p%2Fdzmx3FtOkisSJsn8vcg0mN7jdTgsruCP1SoT63z%2BI'})
print(res, res.text, res.headers)

注意三点:

  • app.use(cookieParser("key")) 这里必须要有一个字符串做 key ,才可以正确使用签名的 cookie 。
  • clearCookie() 仍然是用“设置过期”的方式来达到删除目的,cookie()clearCookie() 并不会整合,会写两组 b=xx 进头。
  • res.send() 会在连接上完成一个响应,所以,与头相关的操作,都必须放在 res.send() 前面。

头和其它

res.set() 可以设置指定的响应头, res.rediect(301, 'http://www.zouyesheng.com') 处理重定向, res.status(404); res.end() 处理非 20 响应。

const process = require('process');
const express = require('express');
const cookieParser = require('cookie-parser');
const app = express();
app.use(express.urlencoded({ extended: true }));
app.use(express.json());
app.use(cookieParser("key"))

app.set('trust proxy', false);
app.set('views', process.cwd() + '/template');
app.set('view engine', 'html');
app.engine('html', (path, options, callback) => {
    callback(false, '<h1>Hello</h1>');
});

app.all('/', (req, res) => {
    res.render('index', {}, (err, html) => {
        res.set('X-ME', 'zys');
        //res.redirect('back');
        //res.redirect('http://www.zouyesheng.com');
        res.status(404);
        res.end();
    });
});

app.listen(8888);

res.redirect('back') 会自动获取 referer 头作为 Location 的值,使用这个时,注意 referer为空的情况,会造成循环重复重定向的后果。

Chunk 响应

Chunk 方式的响应,指连接建立之后,服务端的响应内容是不定长的,会加个头: Transfer-Encoding: chunked ,这种状态下,服务端可以不定时往连接中写入内容(不排除服务端的实现会有缓冲区机制,不过我看 Express 没有)。

const process = require('process');
const express = require('express');
const cookieParser = require('cookie-parser');
const app = express();
app.use(express.urlencoded({ extended: true }));
app.use(express.json());
app.use(cookieParser("key"))

app.set('trust proxy', false);
app.set('views', process.cwd() + '/template');
app.set('view engine', 'html');
app.engine('html', (path, options, callback) => {
    callback(false, '<h1>Hello</h1>');
});

app.all('/', (req, res) => {
    const f = () => {
        const t = new Date().getTime() + '\n';
        res.write(t);
        console.log(t);
        setTimeout(f, 1000);
    }

    setTimeout(f, 1000);
});

app.listen(8888);

上面的代码,访问之后,每过一秒,都会收到新的内容。

大概是 res 本身是 Node.js 中的 stream 类似对象,所以,它有一个 write() 方法。

要测试这个效果,比较方便的是直接 telet:

zys@zys-alibaba:/home/zys/temp >>> telnet localhost 8888
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
GET / HTTP/1.1
Host: localhost

HTTP/1.1 200 OK
X-Powered-By: Express
Date: Thu, 20 Jun 2019 08:11:40 GMT
Connection: keep-alive
Transfer-Encoding: chunked

e
1561018300451

e
1561018301454

e
1561018302456

e
1561018303457

e
1561018304458

e
1561018305460

e
1561018306460

每行前面的一个字节的 e ,为 16 进制的 14 这个数字,也就是后面紧跟着的内容的长度,是 Chunk 格式的要求。具体可以参考 HTTP 的 RFC , https://tools.ietf.org/html/rfc2616#page-2

Tornado 中的类似实现是:

# -*- coding: utf-8 -*-

import tornado.ioloop
import tornado.web
import tornado.gen
import time

class MainHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        while True:
            yield tornado.gen.sleep(1)
            s = time.time()
            self.write(str(s))
            print(s)
            yield self.flush()

def make_app():
    return tornado.web.Application([
        (r"/", MainHandler),
    ])

if __name__ == "__main__":
    app = make_app()
    app.listen(8888)
    tornado.ioloop.IOLoop.current().start()

Express 中的实现,有个大坑,就是:

app.all('/', (req, res) => {
    const f = () => {
        const t = new Date().getTime() + '\n';
        res.write(t);
        console.log(t);
        setTimeout(f, 1000);
    }

    setTimeout(f, 1000);
});

这段逻辑,在连接已经断了的情况下,并不会停止,还是会永远执行下去。所以,你得自己处理好:

const process = require('process');
const express = require('express');
const cookieParser = require('cookie-parser');
const app = express();
app.use(express.urlencoded({ extended: true }));
app.use(express.json());
app.use(cookieParser("key"))

app.set('trust proxy', false);
app.set('views', process.cwd() + '/template');
app.set('view engine', 'html');
app.engine('html', (path, options, callback) => {
    callback(false, '<h1>Hello</h1>');
});

app.all('/', (req, res) => {
    let close = false;
    const f = () => {
        const t = new Date().getTime() + '\n';
        res.write(t);
        console.log(t);
        if(!close){
            setTimeout(f, 1000);
        }
    }

    req.on('close', () => {
        close = true;
    });

    setTimeout(f, 1000);
});

app.listen(8888);

req 挂了一些事件的,可以通过 close 事件来得到当前连接是否已经关闭了。

req 上直接挂连接事件,从 net http Express 这个层次结构上来说,也很,尴尬了。 Web 层不应该关心到网络连接这么底层的东西的。

我还是习惯这样:

app.all('/', (req, res) => {
    res.write('<h1>123</h1>');
    res.end();
});

不过 res.write() 是不能直接处理 json 对象的,还是老老实实 res.send() 吧。

我会怎么用 Express

先说一下,我自己,目前在 Express 运用方面,并没有太多的时间和复杂场景的积累。

即使这样,作为技术上相对传统的人,我会以我以往的 web 开发的套路,来使用 Express

我不喜欢日常用 app.all(path, callback) 这种形式去组织代码。

首先,这会使 path 定义散落在各处,方便了开发,麻烦了维护。

其次,把 path 和具体实现逻辑 callback 绑在一起,我觉得也是反思维的。至少,对于我个人来说,开发的过程,先是想如何实现一个 handler ,最后,再是考虑要把这个 handle 与哪些 path 绑定。

再次,单纯的 callback 缺乏层次感,用 app.use(path, callback) 这种来处理共用逻辑的方式,我觉得完全是扯谈。共用逻辑是代码之间本身实现上的关系,硬生生跟网络应用层 HTTP 协议的 path 概念抽上关系,何必呢。当然,对于 callback 的组织,用纯函数来串是可以的,不过我在这方面并没有太多经验,所以,我还是选择用类继承的方式来作层次化的实现。

我自己要用 Express ,大概会这样组件项目代码(不包括关系数据库的 Model 抽象如何组织这部分):

./
├── config.conf
├── config.js
├── handler
│   ├── base.js
│   └── index.js
├── middleware.js
├── server.js
└── url.js
  • config.conf 是 ini 格式的项目配置。
  • config.js 处理配置,包括日志,数据库连接等。
  • middleware.js 是针对整体流程的扩展机制,比如,给每个请求加一个 UUID ,每个请求都记录一条日志,日志内容有请求的细节及本次请求的处理时间。
  • server.js 是主要的服务启动逻辑,整合各种资源,命令行参数 port 控制监听哪个端口。不需要考虑多进程问题,(正式部署时 nginx 反向代理到多个应用实例,多个实例及其它资源统一用 supervisor 管理)。
  • url.js 定义路径与 handler 的映射关系。
  • handler ,具体逻辑实现的地方,所有 handler 都从 BaseHandler 继承。

BaseHandler 的实现:

class BaseHandler {
    constructor(req, res, next){
        this.req = req;
        this.res = res;
        this._next = next;
        this._finised = false;
    }

    run(){
        this.prepare();
        if(!this._finised){
            if(this.req.method === 'GET'){
                this.get();
                return;
            }
            if(this.req.method === 'POST'){
                this.post();
                return;
            }
            throw Error(this.req.method + ' this method had not been implemented');
        }
    }

    prepare(){}
    get(){
        throw Error('this method had not been implemented');
    }
    post(){
        throw Error('this method had not been implemented');
    }

    render(template, values){
        this.res.render(template, values, (err, html) => {
            this.finish(html);
        });
    }

    write(content){
        if(Object.prototype.toString.call(content) === '[object Object]'){
            this.res.write(JSON.stringify(content));
        } else {
            this.res.write(content);
        }
    }

    finish(content){
        if(this._finised){
            throw Error('this handle was finished');
        }
        this.res.send(content);
        this._finised = true;
        if(this._next){ this._next() }
    }

}

module.exports = {BaseHandler};

if(module === require.main){
    const express = require('express');
    const app = express();
    app.all('/', (req, res, next) => new BaseHandler(req, res, next).run() );
    app.listen(8888);
}

要用的话,比如 index.js

const BaseHandler = require('./base').BaseHandler;

class IndexHandler extends BaseHandler {
    get(){
        this.finish({a: 'hello'});
    }
}

module.exports = {IndexHandler};

url.js 中的样子:

const IndexHandler = require('./handler/index').IndexHandler;

const Handlers = [];

Handlers.push(['/', IndexHandler]);

module.exports = {Handlers};

日志

后面这几部分,都不属于 Express 本身的内容了,只是我个人,随便想到的一些东西。

找一个日志模块的实现,功能上,就看这么几点:

  • 标准的级别: DEBUG,INFO,WARN, ERROR 这些。
  • 层级的多个 logger
  • 可注册式的多种 Handler 实现,比如文件系统,操作系统的 rsyslog ,标准输出,等。
  • 格式定义,一般都带上时间和代码位置。

Node.js 中,大概就是 log4js 了, https://github.com/log4js-node/log4js-node

const log4js = require('log4js');

const layout = {
    type: 'pattern',
    pattern: '- * %p * %x{time} * %c * %f * %l * %m',
    tokens: {
        time: logEvent => {
            return new Date().toISOString().replace('T', ' ').split('.')[0];
        }
    }
};
log4js.configure({
  appenders: {
        file: { type: 'dateFile', layout: layout, filename: 'app.log', keepFileExt: true },
        stream: { type: 'stdout', layout: layout }
  },
  categories: {
      default: { appenders: [ 'stream' ], level: 'info', enableCallStack: false },
      app: { appenders: [ 'stream', 'file' ], level: 'info', enableCallStack: true }
  }
});

const logger = log4js.getLogger('app');
logger.error('xxx');

const l2 = log4js.getLogger('app.good');
l2.error('ii');

总的来说,还是很好用的,但是官网的文档不太好读,有些细节的东西没讲,好在源码还是比较简单。

说几点:

  • getLogger(name) 需要给一个名字,否则 default 的规则都匹配不到。
  • getLogger('parent.child') 中的名字,规则匹配上,可以通过 . 作父子继承的。
  • enableCallStack: true 加上,才能拿到文件名和行号。

ini 格式配置

json 作配置文件,功能上没问题,但是对人为修改是不友好的。所以,个人还是喜欢用 ini 格式作项目的环境配置文件。

Node.js 中,可以使用 ini 模块作解析:

const s = `
[database]
host = 127.0.0.1
port = 5432
user = dbuser
password = dbpassword
database = use_this_database

[paths.default]
datadir = /var/lib/data
array[] = first value
array[] = second value
array[] = third value
`

const fs = require('fs');
const ini = require('ini');

const config = ini.parse(s);
console.log(config);

它扩展了 array[] 这种格式,但没有对类型作处理(除了 true false),比如,获取 port ,结果是 "5432" 。简单够用了。

WebSocket

Node.js 中的 WebSocket 实现,可以使用 ws 模块, https://github.com/websockets/ws

要把 ws 的 WebSocket Server 和 Expressapp 整合,需要在 ExpressServer层面动手,实际上这里说的 Server 就是 Node.js 的 http 模块中的 http.createServer()

const express = require('express');
const ws = require('ws');

const app = express();

app.all('/', (req, res) => {
    console.log('/');
    res.send('hello');
});

const server = app.listen(8888);

const wss = new ws.Server({server, path: '/ws'});
wss.on('connection', conn => {
    conn.on('message', msg => {
        console.log(msg);
        conn.send(new Date().toISOString());
    });
});

对应的一个客户端实现,来自: https://github.com/ilkerkesen/tornado-websocket-client-example/blob/master/client.py

# -*- coding: utf-8 -*-

import time
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado import gen
from tornado.websocket import websocket_connect

class Client(object):
    def __init__(self, url, timeout):
        self.url = url
        self.timeout = timeout
        self.ioloop = IOLoop.instance()
        self.ws = None
        self.connect()
        PeriodicCallback(self.keep_alive, 2000).start()
        self.ioloop.start()

    @gen.coroutine
    def connect(self):
        print("trying to connect")
        try:
            self.ws = yield websocket_connect(self.url)
        except Exception:
            print("connection error")
        else:
            print("connected")
            self.run()

    @gen.coroutine
    def run(self):
        while True:
            msg = yield self.ws.read_message()
            print('read', msg)
            if msg is None:
                print("connection closed")
                self.ws = None
                break

    def keep_alive(self):
        if self.ws is None:
            self.connect()
        else:
            self.ws.write_message(str(time.time()))

if __name__ == "__main__":
    client = Client("ws://localhost:8888/ws", 5)

其它



本文作者:zephyr

原文链接

更多技术干货敬请关注云栖社区知乎机构号:阿里云云栖社区 - 知乎

本文为云栖社区原创内容,未经允许不得转载。

阿里开源!云原生应用自动化引擎 OpenKruise | 直击 KubeCon


阿里妹导读:在近期开展的 KubeCon China 2019 上,阿里云将陆续为全球用户分享阿里巴巴超大规模云原生落地实践、云原生前沿技术与应用包括 OpenKruise 开源项目、开放云原生应用中心(Cloud Native App Hub),同时将重磅发布边缘容器、云原生应用管理与交付体系等产品和服务。
接下来的三天,阿里妹将连线会场,为你带来实时报道。

2019年6月24日至26日,由 CNCF 主办的云原生技术大会 KubeCon 在中国上海盛装启幕,阿里云容器平台团队正式宣布开源重量级项目 OpenKruise,将基于阿里巴巴经济体多年大规模应用部署、发布与管理最佳实践沉淀的能力开放给业界。

OpenKruise 是阿里巴巴开源的 Kubernetes 之上云原生应用自动化的引擎。Kruise 项目源自于阿里巴巴经济体应用过去多年的大规模应用部署、发布与管理的最佳实践,源于阿里云Kubernetes服务数千客户的需求沉淀。

“云原生应用自动化引擎”加持下的阿里经济体“全面上云”

随着云原生概念的兴起,越来越多的应用开始尝试在云原生的土壤上耕耘。那么什么是云原生?简而言之,云原生就是一套能够充分利用“云”的能力,高效构建与交付应用的方法论集合,使得应用容器化的用户可以充分的利用云的弹性和“不可变基础设施”等优势专注于自身核心业务价值。

当前,阿里巴巴基础设施的云原生演进与升级也正在如火如荼的进行。而在这个阿里巴巴经济体整体云化的过程中,阿里内部在超大规模的互联网场景中,已经开始进行大量的云原生的理念落地实践,比如轻量级容器化。

阿里巴巴经济体正在大规模推进应用的轻量级容器化,从而达成利用容器的敏捷和一致等特性快速构建符合云原生理念的电商站点交付的能力,适应类似“双十一”大促的严苛技术需求。再比如说云原生应用管理,阿里巴巴经济体正在将 Kubernetes 等项目的应用编排与自动化能力,穿透到上层运维框架当中,驱动电商应用按照云原生的技术理念进行编排、交付、运行。

在阿里巴巴经济体的整体云原生化过程当中,阿里的技术团队逐渐沉淀出了一套紧贴上游社区标准,适应互联网规模化场景的技术理念与最佳实践。这其中,最重要的无疑是如何对应用进行自动化的发布、运行和管理。

OpenKruise:来自阿里经济体云原生化历程的宝贵经验与最佳实践

在 KubeCon 上海,阿里云容器平台团队正式宣布了重量级项目 OpenKruise(以下简称 Kruise)的开源。

Kruise 是 cruise 的谐音,"k" for Kubernetes。字面意义是巡航或豪华游艇,寓意 Kubernetes 上应用的自动巡航,满载阿里巴巴多年应用部署管理经验。

Kruise 的目标是 automate everything on Kubernetes ! Kruise 项目源自于阿里巴巴经济体应用过去多年的大规模应用部署、发布与管理的最佳实践,源于容器平台团队对集团应用规模化运维,规模化建站的能力,源于阿里云 Kubernetes 服务数千客户的需求沉淀。Kruise 借力于云原生社区,集成阿里巴巴云原生实践之精华,反哺社区,指引业界云原生化最佳实践,少走弯路。

OpenKruise 是阿里巴巴开源的 Kubernetes 之上云原生应用自动化的引擎。Kruise 核心在于自动化,我们将从不同维度解决 Kubernetes 之上应用的自动化,包括,部署、升级、弹性扩缩容、Qos 调节、健康检查、迁移修复等等。此次 Kruise 开源的内容主要在应用部署,升级方面,即一套增强版 controller 组件用于应用的部署、升级、运维。后续,Kruise 会依次开源智能化的弹性扩缩容组件,以及应用 Qos 自调节能力的组件等。

Kruise Controllers:将 Kubernetes 的“控制器模式”进行到底

以下内容主要介绍 Kruise Controllers 一套用于 Kubernetes 之上应用自动化部署管理的 controller 组件。

众所周知,Kubernetes 项目的核心原理就是“控制器模式”。

目前,Kubernetes 项目默认已经提供了一套 Controller 组件,例如 Deployment、Statefulset、DaemonSet 等,这些 Controller 提供了比较丰富的应用部署和管理功能。但是,随着 Kubernetes 的使用范围越来越广,真实的企业与规模性场景中的业务诉求与上游 Controller 功能不匹配的情况也越来越常见。

以阿里巴巴为例:阿里巴巴内部的 Kubernetes 集群需要服务涵盖50几个BU,上万种应用。这个体量非常庞大,对规模性和高可用性带来了巨大的挑战。与此同时,阿里云上的 Kubernetes 服务也接入了上千家企业客户,收集并支撑了各种各样的客户需求。这些诉求与最后阿里经济体的实践经验,最终促成了 Kruise 开源项目的诞生。

Kruise 第一期开源主要包含以下Controller,后续会加入更多。

Advanced StatefulSet:具备丰富发布策略、支持原地升级的 StatefulSet

Advanced StatefulSet扩展了原生的StatefulSet,加入了两个新的特性。

  • 原地升级 (In-place update strategy)

原生的 StatefulSet 在做 rolling update 的时候会销毁并且重建 pods. 这在阿里巴巴规模体量的场景下,代价巨大。

  • 首先,所有被删除的应用的 Pods 需要被重新调度一遍,由于 pod 数量大,这对调度带来了不必要的开销,更糟的是,重新调度的 pod 无法正常被调度,由于资源被占用,亲和特性等其他原因。Pod 被重新调度到新的 node上,损失了原来的本地 state, 虽然通常可以被重建,但是还是带来额外开销。
  • 重调度后的 pods 很有可能分布在不同的机器上,由于网络拓扑结构的改变,需要重新申请 IP, 有些依赖 IP 保持的应用无法正常工作,此外,对网络流量的传输带来了不确定性。
  • 针对多容器的 Pod,升级 sidecar 容器而导致主容器重建,通常是不可接受的。

Advanced StatefulSet 引入了原地升级功能,允许在不销毁 pod 的情况下,更新容器 image。这样带来的好处是效率和稳定性。效率很明显,pod 不需要被重新调度了,还是跑在原来的 node,一些本地存储 state 还是可以保留。稳定性体现在 IP 保持,网络拓扑以及流量结构基本不变,稳定性在阿里巴巴及阿里云经济体中一直以来是一个极其重要的指标。

  • 允许最大不可用实例的配置(Max Unavailable)

社区原生的 StatefulSet 在升级的过程中是不允许同时升级多个实例的,这主要是为了某些有状态应用需要依次按序升级的需求。但是,从阿里巴巴场景,以及阿里云容器平台之上的客户了解到,许多应用不需要依次按序升级的语义,这样带来的问题是效率太低。特别是像阿里巴巴一些应用实例数巨大的场景,问题尤其显著。

MaxUnavailable 的功能正是为了解决这个问题,它允许应用实例被并行升级,且保持始终保持最大不可用的实例数不超过 MaxUnavailable 的限制数。



Broadcast Job:像 DaemonSet 那样运行的一次性 Job

Broadcast Job 会在集群中每个node上面跑一个 pod 直至结束。类似于社区的DaemonSet,区别在于 DaemonSet 始终保持一个 pod 长服务在每个 node 上跑,而 BroadcastJob 中最终这个 pod 会结束。相比 DaemonSet,Broadcast 结束后不再占用资源,这在某些场景中特别适用,比如升级 node 中某些组件,检测node 上一些配置是否正确等。



SidecarSet:大规模场景下 Sidecar 管理利器

Sidecar 在 Kubernetes 中是一个辅助容器的概念,和主容器跑在同一个 pod 中。Sidecar 容器一般是一些基础服务组件如 monitoring 容器,log collection 容器等。

在一个公司中,主业务容器和基础组件容器通常由不同的团队开发和维护,多个团队同时操作和修改同一份 yaml 文件或同一个 API 资源对象,时常会产生一些冲突,且不便于管理。SidecarSet 的理念在于将主业务容器和辅助容器的运维模式解耦。当业务用户提交应用时,不需要显示指定 sidecar 容器,由 sidecar 容器相应的团队编写规则负责自动注入。并且在容器运维和升级时候,利用 Advanced Statefulset 原地升级的功能,业务团队和基础架构团队分别按照自己定义的策略升级各自相应的容器,而不需要耦合在一起升级,产生不必要的影响。Istio 其实采用类似的思想自动给业务容器注入 sidecar 容器的功能,但是其缺乏 sidecar 容器后续升级运维的能力。SidecarSet 有效地把 Sidecar 容器的部署和管理抽象出来。



OpenKruise 正在面向开源社区招募合作伙伴与子项目!

Kruise 社区的准则,是基于 Kubernetes 的核心技术理念来构建更强大的自动化能力。目前,Kruise 正在计划发布更多的 Controller 来覆盖更多的场景和功能比如丰富的发布策略、金丝雀发布、蓝绿发布、分批发布等等。

更为重要的是,OpenKruise 是一个 Umbrella 项目,OpenKruise 的维护者们,正以最开放的姿态面向全球招募合作伙伴和贡献者。没错,我们非常期待您能够为 OpenKruise 贡献和共建新的自动化能力,或者一起来共同推Kubernetes 云原生应用编排能力的演进与发展。



本文作者:未来已来

原文链接

更多技术干货敬请关注云栖社区知乎机构号:阿里云云栖社区 - 知乎

本文来自云栖社区合作伙伴“阿里技术”,如需转载请联系原作者。

云上的Growth hacking之路,打造产品的增长引擎

增长关乎产品的存亡

增长!增长!增长!业务增长是每一个创业者每天面临的最大问题。无论你的产品是APP,还是web,或者是小程序,只能不断的维持用户的增长,才能向资本市场讲出一个好故事,融资活下去。活到最后的产品,才有机会盈利。

为了获取用户的增长,可以投放广告,也可以内容营销、社交传播、销售地推,或者持续的专注于产品优化。无论哪一种方式,我们都面临这几个问题:

  1. 运营活动,覆盖了多少用户?
  2. 多少用户,开始使用产品?
  3. 多少用户付费?
  4. 多少用户持续的活跃?
  5. 下一步,我们应该把精力放在哪些方面?是持续运营?还是开发新功能?



如果不能回答这些问题,无疑我们的运营活动或者开发就是盲人摸象,完全靠运气。为了解答这些问题,我们不妨关注一下growth hacking这种数据驱动的手段。

Growth Hacker的核心思想

传统的市场营销策略,例如投放电视广告,覆盖了多少人,有多少人看过广告后进行了购买,多少人进行了复购,没有准确的数据进行衡量,只能依赖于资深专家根据经验判断。在互联网行业,每一个产品都是新的,前所未有的。每一个产品能不能存活,每一次运营的效果如何,没有多少经验可供借鉴,结果是不确定的。



GrowthHacking是兴起于硅谷的创业公司的marketing手段,旨在使用少量预算获得巨量增长。由于其极高的性价比和有效性,非常适合于创业公司,因而得到了广泛传播。

Growth Hacker的核心思想是通过数据指标,驱动运营决策,以及优化产品。Growthacker通过关注用户获取、用户转化、用户留存、用户推荐、盈利等核心的一系列指标,以及通过各种维度拆解,分析出下一步的增长决策。通过Growth Hacking,打造一个产品增长策略的闭环。



那么我们如何才能搭建出GrowthHacking架构,为自己的产品赋能呢?

GrowthHacking之架构

Growth Hacking 包含了数据的采集、存储、分析、报表、A/B test等系统,首先我们来看,传统的解决方案,搭建出GrowthHacking有哪些痛点:

搭建运营体系的痛点

搭建运营体系的过程中,常常面临以下问题:

  1. 缺少数据,数据散落在各个地方,有的是app数据,有的是web数据,有的是小程序数据,没有一个统一的架构来把数据采集到一个地方。
  2. 缺少一个分析平台。传统的策略,需要运维团队帮助搭建hadoop集群,需要专门团队持久运维。
  3. 离线跑报表,一晚上才能拿到一次结果,周期太长。手工跑一次,几个小时过去了,有什么新的想法,不能及时验证。严重影响运营效率。

借助云服务搭建的GrowthHacking技术架构

为了解决以上问题, 日志服务提供了日志采集、存储、交互分析、可视化的一整套基础设施,可以帮助用户快速搭建出来灵活易用的Growthing Hacking的技术架构,每天的工作只需要专注于运营分析即可。



Growth Hacking首先从数据采集开始,定义清楚要采集的日志内容、格式。把各个终端、服务器的日志集中采集到云端的日志服务。后续通过日志服务提供的SQL实时分析功能,交互式的分析。定义一些常规报表,每日打开报表自动计算最新结果,也可以定义报告,自动发送最新报表。全部功能参考用户手册

此外,除了日志数据的分析,还可以为用户定义一些标签,存储在rds中,通过rds和日志的联合分析,挖掘不同标签对应的指标。

日志服务有如下特点:

  • 免运维:一次完成数据的埋点、数据接入,之后只需专注于运营分析即可,无需专门的运维团队。
  • 实时性:用SQL实时计算,秒级响应。快人一步得到分析结果。
  • 灵活性:任意调整SQL,实时获取结果,非常适合交互式分析。
  • 弹性:遇到运营活动,流量突然暴涨,动动手指快速扩容。
  • 性价比:市场上常见的分析类产品,多采用打包价格,限制使用量。日志服务按量付费,价格更低,功能更强大。

借助于日志服务提供的这套数据采集、存储、分析的基础设施。运营者可以从繁重的数据准备工作重解脱出来,专注于使用SQL去分析数据,配置报表,验证运营想法。



开始搭建GrowthHacking系统

具体而言,Growth Hacking的架构可以拆分如下:

  1. 数据收集

2. 存储

    • 选择日志服务的region。
    • 定义每一种日志存储的Project & LogStore。

3. 分析

    • 开启分析之路,定义常规报表,或者交互式分析。
    • 通过分析结果,调整运营策略,有针对性的优化产品。

基于日志服务,可以完成Growth Hacking的分析策略:

  1. 定义北极星指标。
  2. 拉新分析。
  3. 留存分析。
  4. 事件分析。
  5. 漏斗分析。
  6. 用户分群。
  7. A/B test。

在日志服务中,可以通过定义一系列仪表盘,来沉淀数据分析的结果。接下来的几篇文章中,将依次介绍如何在日志服务实现上述几种策略。



总结

本文主要介绍Growth Hacking的整体架构,之后将用一系列文章介绍step by step如何介入数据,如何分析数据。



本文作者:云雷

原文链接

更多技术干货敬请关注云栖社区知乎机构号:阿里云云栖社区 - 知乎

本文为云栖社区原创内容,未经允许不得转载。

AutoScaling 成本优化模式升级--混合实例策略

伸缩组成本优化模式以成本为目标,始终创建最低价的实例,同时,通过多可用区,多实例规格分布,以此来提高服务稳定性。但是,对于成本优势最大化的竞价实例,伸缩组难以防范竞价实例大范围回收可能导致的服务雪崩,本次升级允许用户制定更详细的成本控制策略,在成本和稳定性之间进行调整和权衡。

成本优化模式简介

当您的伸缩配置选择了多实例规格,并想以最低的价格来使用同等规模的 ECS 实例配置时,您可以选择使用 成本优化策略 的伸缩组,来降低您的 ECS 实例使用成本;当您的伸缩配置选择的实例为抢占式实例时,您可能会遇到由于价格、库存等原因导致抢占式实例创建失败场景,从而导致扩容不及时,影响到业务,您也可以选择使用 成本优化策略 的伸缩组,在抢占式实例创建失败的时候自动为您尝试创建同规格的按量实例,来保证业务的稳定性。

从上述的描述,我们可以清晰的看到,成本优化模式的核心策略:

  1. 创建实例时,以单核cpu价格价格最低来选择创建实例的 InstanceType(实例规格),ZoneId(可用区)等配置信息。
  2. 竞价实例创建失败时,调整为创建按量实例,以保证业务连续性。

我们将上述的策略称为最低价策略(LowestPrice)。

关于成本优化模式更详细的信息,请查看 AutoScaling 推出成本优化模式

成本优化模式升级

成本优化模式的升级策略主要针对竞价实例回收机制可能带来的业务雪崩情况。主要集中在以下两点:

  1. 混合实例配比。允许用户为成本优化伸缩组制定按量实例与竞价实例的混合策略。
  2. 竞价实例主动替换。在竞价实例释放前创建新实例,主动替换掉当前的竞价实例。

在下面的文章中,我们将原成本优化伸缩组称为普通成本优化伸缩组,将指定实例混合策略的成本优化伸缩组称为成本优化混合实例伸缩组。

参数详解

  • OnDemandBaseCapacity

伸缩组所需要的按量实例的最小个数,当伸缩组中按量实例个数小于该值时,将优先创建按量实例。

  • OnDemandPercentageAboveBaseCapacity

满足 OnDemandBaseCapacity 条件后,创建实例中按量实例所占的比例。

  • SpotInstancePools

SpotInstancePools 指定了最低价的多个实例规格,当创建竞价实例时,将在 SpotInstancePools 中进行均衡分布。

  • SpotInstanceRemedy

是否开启竞价实例的补偿机制。开启后在竞价实例被回收前5分钟左右,将主动替换掉当前竞价实例。

兼容性介绍

成本优化混合实例伸缩组与普通成本优化伸缩组在接口和功能方面是完全兼容的。当您不指定混合实例策略的相关参数时,您将创建出普通成本优化伸缩组。同时,对于成本优化混合实例伸缩组,通过合理的制定混合实例策略,能够具有与普通成本优化伸缩组完全相同的行为。下面举例说明:

  1. 假设普通成本优化伸缩组创建的全为按量实例。

此时,你创建的成本优化混合实例伸缩组只需要指定OnDemandBaseCapacity=0, OnDemandPercentageAboveBaseCapacity=100,spotInstancePools=1,那么将拥有完全相同的行为。

  1. 假设普通成本优化伸缩组优先创建竞价实例。

此时,你创建的成本优化混合实例伸缩组只需要指定OnDemandBaseCapacity=0, OnDemandPercentageAboveBaseCapacity=0,spotInstancePools=1,那么将拥有完全相同的行为。

扩缩容策略

成本优化混合实例伸缩组拥有一套相对独立的扩缩容策略,您在大多数情况下不需要关注实例的选择过程,如果您需要对伸缩组行为具有更详细的了解,本节中对扩缩容过程进行了详细的描述。

扩容策略

当指定了伸缩组的实例混合策略之后,伸缩组并非仅对新创建出来的实例按照混合比例进行创建,而是保证伸缩组整体的实例配比趋近目标配比。

  • 按量实例扩容策略

按量实例部分,采用了 LowestPrice 的创建方式,多实例规格与多可用区按照优先级方式依此进行选择,该部分与普通成本优化伸缩组保持一致。

  • 竞价实例扩容策略

竞价实例部分,采用了 LowestPrice 的创建方式,当配置多实例规格时,将根据 SpotInstancePools 配置,在最低价的多个实例规格之间平均分配,针对每一种实例规格,当无法成功创建时,按照价格顺序依次选取下一规格继续进行创建,当竞价实例全部不可创建,将退回到创建对应的按量实例。多可用区则按照优先级的方式依次进行选择。

下面,我们通过示例来描述成本优化混合实例伸缩组的扩容行为:

假设伸缩组组内按量实例个数为3,竞价实例为1个ecs.n1.tiny规格实例,OnDemandBaseCapacity = 5,OnDemandPercentageAboveBaseCapacity = 40,SpotInstancePools = 2,伸缩组实例规格配置为:ecs.n1.tiny, ecs.n1.small,ecs.n1.medium(价格依此上升)。

缩容策略

成本优化混合实例伸缩组的释放策略不遵循伸缩组上指定的释放策略,为了保持实例伸缩组内实例的混合配比,将采用以下描述的实例释放策略。首先,将根据伸缩组实例混合策略,确定将要释放的按量实例与竞价实例的个数,我们将在保证足够数量的实例被释放的前提下,按照伸缩组整体趋近期望配比的方式确定释放按量实例和竞价实例的个数。当按量实例个数不足时,将释放更多的竞价实例;当竞价实例个数不足时,将改为释放按量实例。

  • 按量实例缩容策略

释放按量实例时,将按照以下条件选择可释放的实例:

  1. 优先释放价格高的实例;
  2. 价格相同时,按照伸缩组指定的释放策略选取合适数量的实例进行释放。
  • 竞价实例缩容策略

释放竞价实例时,将按照以下条件选择可释放的实例:

  1. 将首先释放不属于spotInstancePools中规格类型的实例,这部分实例的释放策略与上述按量实例的缩容策略相同;
  2. 如果还需要释放规格类型属于spotInstancePools的实例,将进一步选择释放所需要的实例,选择方式如下:
    1. 选择释放的实例将使得剩余实例的实例规格在spotInstancePools中趋于均衡分布;
    2. 相同规格的多个实例可供选择时,将按照伸缩组指定的释放策略选择释放的实例。

下面,同样我们通过简单的示例来描述成本优化混合实例伸缩组在缩容时的实例选择过程:

假设伸缩组组内按量实例个数为8,竞价实例为2个ecs.n1.tiny规格实例,2个ecs.n1.small规格实例,OnDemandBaseCapacity = 5,OnDemandPercentageAboveBaseCapacity = 40,SpotInstancePools = 2,伸缩组实例规格配置为:ecs.n1.tiny, ecs.n1.small,ecs.n1.medium(价格依此上升)。


竞价实例补偿

竞价实例在系统回收之前五分钟左右将会发送系统回收消息,当您开启竞价实例主动替换功能之后,在系统发送竞价实例的回收消息之后,弹性伸缩将会为该竞价实例创建补偿任务,并在稍后通过创建新的竞价实例来替换即将释放的实例。我们将这一主动替换即将被回收的竞价实例的行为称为竞价实例补偿

竞价实例补偿是保障业务连续性的辅助保障机制,该补偿机制具有以下特点,你需要对这些特点有充分的认识,以便您配置合理的成本优化伸缩组。

  1. 竞价实例补偿的时间窗口。在收到竞价实例系统回收消息后,将为对应的实例生成补偿任务,大约五分钟时间后实例将被回收,当实例被回收后,伸缩组内的对应实例将被健康检查机制清除伸缩组(大约6分钟)。竞价实例补偿任务的有效期为:补偿任务生成到健康检查将实例移除伸缩组之间。一旦错过补偿时间窗口,对应的补偿任务将会失效和清理,意味着对应实例错过补偿期。
  2. 有限的补偿能力。一次竞价实例的补偿过程分为新实例启动和旧实例释放两个过程,补偿任务执行过程中,伸缩组将处于锁定状态。由于暂时伸缩组不支持并行伸缩活动处理,因此,在有限的补偿时间窗口内,能够进行的补偿任务次数和实例数是有限的。由于竞价实例回收通常是呈现批次状,因此,为了最大程度利用有限的补偿能力,我们将对补偿任务进行一定程度的聚合之后,按批次进行下发,最大程度的补偿更多的实例。

最佳实践

这里我们主要展示如何使用java SDK创建伸缩规则,并采用maven进行依赖管理。创建目标追踪伸缩规则,需要使用aliyun-java-sdk-ess 2.3.1及以上版本。

程序所需的maven依赖如下:

<dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-core</artifactId>
            <version>3.0.8</version>
        </dependency>
        <dependency>
            <groupId>com.aliyun</groupId>
            <artifactId>aliyun-java-sdk-ess</artifactId>
            <version>2.3.1</version>
        </dependency>

创建混合实例的成本优化伸缩组:

CreateScalingGroupRequest request = new CreateScalingGroupRequest();
request.setScalingGroupName(name);
request.setMinSize(0);
request.setMaxSize(100);
request.setVSwitchId(vsId);
request.setMultiAZPolicy("COST_OPTIMIZED");
request.setOnDemandBaseCapacity(onDemandBaseCapacity);
request.setOnDemandPercentageAboveBaseCapacity(onDemandPercentageAboveBaseCapacity);
request.setSpotInstanceRemedy(spotInstanceRemedy);
request.setSpotInstancePools(spotInstancePools);
CreateScalingGroupResponse response = client.getAcsResponse(request);



本文作者:tongkn

原文链接

更多技术干货敬请关注云栖社区知乎机构号:阿里云云栖社区 - 知乎

本文为云栖社区原创内容,未经允许不得转载。

AnalyticDB for PG 如何作为数据源对接帆软 FineBI

AnalyticDB for PostgreSQL 基于开源数据库 Greenplum 构建,兼容Greenplum 和 PostgreSQL 的语法,接口和生态。本章节介绍如何通过FineBI连接 分析型数据库PostgreSQL版 并进行报表开发。

准备工作

开始使用FineBI之前,用户需要先完成以下准备工作。
下载并安装FineBI

操作步骤

首先进行”新建数据连接“,并选择 "Greenplum Database"。



之后将 JDBC URL,数据库名称,用户名密码等输入进行连接测试。



注意事项

对于新安装的FineBI,第一次连接 Greenplum 或 PostgreSQL 数据源时,需要先下载其 JDBC Driver,可以按操作步骤下载并将对应JDBC 驱动安装到 FineBI 目录。AnalyticDB for PostgreSQL 既支持 PostgreSQL JDBC Driver,也支持 Greenplum 社区 Driver。



本文作者:陆封

原文链接

更多技术干货敬请关注云栖社区知乎机构号:阿里云云栖社区 - 知乎

本文为云栖社区原创内容,未经允许不得转载。

支付宝玉伯:从前端到体验,如何把格局做大

国内的前端行业,是一个群星璀璨,同时又有些纷纷扰扰的圈子。很多初出茅庐的年轻人怀着改变世界的梦想,谁也不服谁。不过,有一些为前端领域做出贡献的拓荒者几乎受到所有人的尊敬,玉伯就是这些拓荒者中的一员。



如今,他已经是蚂蚁金服研究员,带领着体验技术部,打造出 Ant Design、AntV、Eggjs 等广受欢迎的开源项目,他所在的团队也成为国内前端开发者向往的地方。

在同事眼中,玉伯是一个严谨的人,同时保持着对生活的热爱,他曾以 lifesinger 为笔名写名为“岁月如歌”的博客、参与 GitHub 上的开源社区,到现在也经常在知乎上分享自己的知识和见解。

从中科院到支付宝

时间转回到 2006 年,当时在中科院物理所进行硕博连读的玉伯对前途产生了迷茫,是就这样继续深造,将来投身学术界,还是出来干一番事业?

当时,腾讯的 QQ 已经开始有所起色,在年轻人之间开始风靡,淘宝网已经成为中国最受欢迎的线上购物网站,互联网正风起云涌。这时,玉伯得知中科院软件所正在找人,一番思考之后,玉伯毅然放弃学业投身到软件行业。由于他当时年龄小,在软件所工作期间,经常闹出被误认为是学生的笑话。

中科院的生活单纯但缺乏激情,2008 年,玉伯终于离开了象牙塔,南下杭州,加入了当时正在招兵买马的淘宝 UED。虽然并非科班出身,但玉伯从 2002 年起就已经开始接触前端开发,从此与前端结下了不解之缘。

加入淘宝 UED 后,他与承玉等人一起研发了 Kissy,当时淘宝前台业务的标准前端技术栈,并将之开源,在 GitHub 上,Kissy 一度是阿里系开源项目 Star 数最多的项目。

在淘宝期间,玉伯还发起了 Sea.js,一个开源的 JavaScript 模块加载框架,它契合了前端工程化的演进趋势,也是现代大中型前端项目的基础。

2012 年,玉伯加入支付宝前端开发部,负责基础技术组。第二年,他遇到了职业生涯的另一个重大选择:阿里宣布“ALL IN 无线”,支付宝前端解体,所有人都面临选择,要么转岗去做移动端开发,要么留下来做中后台的前端开发。玉伯选择留了下来。

虽然从事后来看,无论是走的还是留的,结果都挺好的,但当时对于玉伯是一个痛苦的时刻,甚至对前端的价值产生了怀疑,他在《阿里前端的困局与突围》中写道:

一个事实:
把国内大部分公司的 UX 部门解散掉,也不会太影响产品的体验。在国内,UX 主要还是起到美工的作用,虽然我不想承认。
前端依旧是美工,而且仅仅是实现工。
在阿里,我们不得不承认一个事实:前端的确有价值,但放在全局来看,前端产生的价值并非核心价值。 在阿里,虽然前端的工作已经不可或缺,但对大公司而言,不可或缺的岗位多了去呢,不可或缺不代表有核心价值,我就不说了。

不过好在,他很快振作起来,从中后台业务中找到了前端的价值。

“后来我们发现中后台业务也是有很多事情可以去做的,无论是业务还是技术都值得深挖,只是以前前端只关注 C 端业务,但其实 To B 的业务对前端来说是一片蓝海。”玉伯说。

玉伯发现中后台的业务量其实非常大,如果没有一套系统的规范来应对,研发效率和产品体验都将面临挑战。

在这样的背景下,前端技术部改名为体验技术部,玉伯和他的小伙伴们踏上了新的征程。

冰山之下的体验

意识到中后台方面前端体验的缺失,玉伯开始带领团队做这方面的工作,他还专门招募了设计师团队,和前端工程师一起工作,开始在体验方面深挖。

设计师的加入让前端团队发生了巨大变化,也让玉伯开始思考体验的更深层含义,他在《我们是如何从前端技术进化到体验科技的》一文中表示:

前端技术再牛,都很难直接解决产品层的用户体验。对中后台产品来说,设计的价值也远远不止于让产品的颜值提升,设计的更多价值,在于深入到产品的业务逻辑里去,去帮助业务梳理产品信息架构与任务流程。用户体验是一个非常综合的事,需要各种专业人士在同一个产品上聚焦发力,一起共同努力才能真正提升产品体验。

他还引用乔布斯的话说:设计不止于好看,更关乎好用。

为了让前端工程师和设计师更好的协作,玉伯说,团队曾经开展过一个活动:任何设计师的要求都是合理的,只要设计师提出的要求都尽可能的去实现,除非技术上的确实现不了。这个活动让设计师感觉到前端工程师的尊重,增进了双方的互相了解。而且前端工程师和设计师都是视觉型动物,都关注人机交互的细节,所以相处下来很融洽。

2015 年,体验技术部推出了 Ant Design,它有别于 UI 组件库,是一种全新的设计系统,随着 Ant Design 不断的证明自己,它受到了阿里内外的广泛赞誉,也在一定程度上引领了国内业界关注中后台体验的风潮。

发展到现在,体验技术部的格局也远远超出了之前的设想,玉伯介绍,现在他们除了支持业务之外,还会关注四大块:

  • 企业级的中台设计体系,包括 Ant Design 等,随着前端技术的发展继续打磨。
  • 前端基础技术栈,包括上面的 Ant Design,以及数据可视化、图形技术等。
  • 工程产品方向,包括 Basement、云凤蝶、九色鹿等。
  • 创新业务,比如语雀等。

玉伯认为,好的技术都是源自于好的业务土壤,正是因为有业务需要,所以能逼着他们研究技术,提升效能。

硅谷知名分析师 Ben Thompson 在《神圣的不满:颠覆者之利器》中说过:“如果你的公司专注于为用户创造最好的体验,那么意味着,你的发展空间上不封顶!”这是因为,用户的期待不是静止的,当你满足用户现有的体验时,他们又会产生新的需求,这可能为公司带来新的商机。

这段话也正是体验技术部成长的最佳注解。

在这个过程中,体验技术部越来越大,玉伯的角色也更多的考虑人员和团队管理上的问题。

以开源的方式做管理

“我更多的还是用一种直觉来做管理,如果要总结的话就是两点,找到对的事,找到对的人。”玉伯说。他还指出,到底是不是对的事有时候不是那么好判断,要去不断的试错,快速花 1-3 个月尝试,如果是对的就加大投入;找到对的人,一方面是满足业务对人员的需求,一方面是找到符合技术发展的人才,需要靠积累的人脉去挖人。

前段时间,在社区知名的 Node 大牛死月被发现在蚂蚁金服的职级只有 P6,引起了大家的一些讨论,死月本人已做过回复,玉伯也发表了他对前端人才的看法:

在体验技术部,我们看一个人的成长,至少会看三个方面:能力、热情、思维模式。
......
还有一个考量维度是一个人的思维模式。具体到技术岗,很重要的一点,是去看一个同学会不会去思考事情背后的 WHY,会不会去深入了解 WHY 所处的大环境,会不会在想清楚 WHY 后,去分析思考解决路径,在有多条路可选的时候,敢不敢去做取舍权衡,能不能去找到最佳路径和实现策略。WHY - HOW - WHAT - DO 的思维框架下,DO 是最后一步,很关键,但前面三步如果思考不清,光有 DO 是很难拿到优秀结果的

体验技术部的团队文化是“简单、自由、有爱”,玉伯更愿意充分发挥团队成员的个性和聪明才智,并反映到产品中。

在团队管理上,玉伯是那种有点反流程的管理方法,他更愿意使用异步的沟通方式,通过内部语雀或者 Gitlab 的 issue 来交流,因为他认为多数人在当面沟通中难以表达复杂的思维过程,倾听者不集中精神也难以理解。

这实际上是开源社区通行的沟通方式,开源的精神和做法贯穿了玉伯的职业生涯。在他刚加入支付宝,在还没有写一行代码的时候就宣布要用开源的方式打造支付宝下一代的前端框架。后来更是带领团队一手打造出 Ant Design 这样的广受欢迎的项目。

“开源对个人的代码能力还有软技能都有非常大的帮助,在公司里可能不是每个人都有机会去写核心代码,但做开源项目你是自由的。”玉伯表示。开源也正是让体验技术部保持活力,提升对技术追求的重要手段。

开源也是体验技术部对外重要的品牌形象,正是因为有这些开源项目,大家才会对体验技术部认可,并且希望来和心目中的大牛一起工作。

最后,针对目前前端领域比较浮躁的风气,玉伯认为还是应该安下心来,脚踏实地的做些实事,才能取得真正的进步。

体验技术部仍在招兵买马中,今年是体验技术部技术产品发展的关键之年,欢迎意气相投的技术、设计、产品、运营、商业化的人才加入。简历投递邮箱为:afx-platform-talent@list.alibaba-inc.com

最后,想关注玉伯及其团队同学最新动态的朋友,可通过语雀直接找到他们。语雀(https://yuque.com/)是一个好用的知识管理工具,不仅适合个人记录笔记学习交流,也非常适合企业做文档协同和知识沉淀。



本文作者:平生栗子

原文链接

更多技术干货敬请关注云栖社区知乎机构号:阿里云云栖社区 - 知乎

本文为云栖社区原创内容,未经允许不得转载。

一道必备面试题:系统CPU飙高和GC频繁,如何排查?

作者 chenssy

出处:http://t.cn/EI9JdBu


处理过线上问题的同学基本上都会遇到系统突然运行缓慢,CPU 100%,以及Full GC次数过多的问题。当然,这些问题的最终导致的直观现象就是系统运行缓慢,并且有大量的报警。本文主要针对系统运行缓慢这一问题,提供该问题的排查思路,从而定位出问题的代码点,进而提供解决该问题的思路。

对于线上系统突然产生的运行缓慢问题,如果该问题导致线上系统不可用,那么首先需要做的就是,导出jstack和内存信息,然后重启系统,尽快保证系统的可用性。这种情况可能的原因主要有两种:

  • 代码中某个位置读取数据量较大,导致系统内存耗尽,从而导致Full GC次数过多,系统缓慢;
  • 代码中有比较耗CPU的操作,导致CPU过高,系统运行缓慢; 相对来说,这是出现频率最高的两种线上问题,而且它们会直接导致系统不可用。另外有几种情况也会导致某个功能运行缓慢,但是不至于导致系统不可用:
  • 代码某个位置有阻塞性的操作,导致该功能调用整体比较耗时,但出现是比较随机的;
  • 某个线程由于某种原因而进入WAITING状态,此时该功能整体不可用,但是无法复现;
  • 由于锁使用不当,导致多个线程进入死锁状态,从而导致系统整体比较缓慢。

对于这三种情况,通过查看CPU和系统内存情况是无法查看出具体问题的,因为它们相对来说都是具有一定阻塞性操作,CPU和系统内存使用情况都不高,但是功能却很慢。下面我们就通过查看系统日志来一步一步甄别上述几种问题。

1. Full GC次数过多

相对来说,这种情况是最容易出现的,尤其是新功能上线时。对于Full GC较多的情况,其主要有如下两个特征:

  • 线上多个线程的CPU都超过了100%,通过jstack命令可以看到这些线程主要是垃圾回收线程
  • 通过jstat命令监控GC情况,可以看到Full GC次数非常多,并且次数在不断增加。

首先我们可以使用top命令查看系统CPU的占用情况,如下是系统CPU较高的一个示例:

top - 08:31:10 up 30 min,  0 users,  load average: 0.73, 0.58, 0.34
KiB Mem:   2046460 total,  1923864 used,   122596 free,    14388 buffers
KiB Swap:  1048572 total,        0 used,  1048572 free.  1192352 cached Mem

  PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
    9 root      20   0 2557160 288976  15812 S  98.0 14.1   0:42.60 java

可以看到,有一个Java程序此时CPU占用量达到了98.8%,此时我们可以复制该进程id9,并且使用如下命令查看呢该进程的各个线程运行情况:

top -Hp 9

该进程下的各个线程运行情况如下:

top - 08:31:16 up 30 min,  0 users,  load average: 0.75, 0.59, 0.35
Threads:  11 total,   1 running,  10 sleeping,   0 stopped,   0 zombie
%Cpu(s):  3.5 us,  0.6 sy,  0.0 ni, 95.9 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
KiB Mem:   2046460 total,  1924856 used,   121604 free,    14396 buffers
KiB Swap:  1048572 total,        0 used,  1048572 free.  1192532 cached Mem

  PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND
   10 root      20   0 2557160 289824  15872 R 79.3 14.2   0:41.49 java
   11 root      20   0 2557160 289824  15872 S 13.2 14.2   0:06.78 java

可以看到,在进程为9的Java程序中各个线程的CPU占用情况,接下来我们可以通过jstack命令查看线程id为10的线程为什么耗费CPU最高。需要注意的是,在jsatck命令展示的结果中,线程id都转换成了十六进制形式。可以用如下命令查看转换结果,也可以找一个科学计算器进行转换:

root@a39de7e7934b:/# printf "%x\n" 10
a

这里打印结果说明该线程在jstack中的展现形式为0xa,通过jstack命令我们可以看到如下信息:

"main" #1 prio=5 os_prio=0 tid=0x00007f8718009800 nid=0xb runnable [0x00007f871fe41000]
   java.lang.Thread.State: RUNNABLE
    at com.aibaobei.chapter2.eg2.UserDemo.main(UserDemo.java:9)

"VM Thread" os_prio=0 tid=0x00007f871806e000 nid=0xa runnable

这里的VM Thread一行的最后显示nid=0xa,这里nid的意思就是操作系统线程id的意思。而VM Thread指的就是垃圾回收的线程。这里我们基本上可以确定,当前系统缓慢的原因主要是垃圾回收过于频繁,导致GC停顿时间较长。我们通过如下命令可以查看GC的情况:

root@8d36124607a0:/# jstat -gcutil 9 1000 10
  S0     S1     E      O      M     CCS    YGC     YGCT    FGC    FGCT     GCT
  0.00   0.00   0.00  75.07  59.09  59.60   3259    0.919  6517    7.715    8.635
  0.00   0.00   0.00   0.08  59.09  59.60   3306    0.930  6611    7.822    8.752
  0.00   0.00   0.00   0.08  59.09  59.60   3351    0.943  6701    7.924    8.867
  0.00   0.00   0.00   0.08  59.09  59.60   3397    0.955  6793    8.029    8.984

可以看到,这里FGC指的是Full GC数量,这里高达6793,而且还在不断增长。从而进一步证实了是由于内存溢出导致的系统缓慢。那么这里确认了内存溢出,但是如何查看你是哪些对象导致的内存溢出呢,这个可以dump出内存日志,然后通过eclipse的mat工具进行查看,如下是其展示的一个对象树结构:

经过mat工具分析之后,我们基本上就能确定内存中主要是哪个对象比较消耗内存,然后找到该对象的创建位置,进行处理即可。这里的主要是PrintStream最多,但是我们也可以看到,其内存消耗量只有12.2%。也就是说,其还不足以导致大量的Full GC,此时我们需要考虑另外一种情况,就是代码或者第三方依赖的包中有显示的System.gc()调用。这种情况我们查看dump内存得到的文件即可判断,因为其会打印GC原因:

[Full GC (System.gc()) [Tenured: 262546K->262546K(349568K), 0.0014879 secs] 262546K->262546K(506816K), [Metaspace: 3109K->3109K(1056768K)], 0.0015151 secs] [Times: user=0.00 sys=0.00, real=0.01 secs]
[GC (Allocation Failure) [DefNew: 2795K->0K(157248K), 0.0001504 secs][Tenured: 262546K->402K(349568K), 0.0012949 secs] 265342K->402K(506816K), [Metaspace: 3109K->3109K(1056768K)], 0.0014699 secs] [Times: user=0.00

比如这里第一次GC是由于System.gc()的显示调用导致的,而第二次GC则是JVM主动发起的。总结来说,对于Full GC次数过多,主要有以下两种原因:

  • 代码中一次获取了大量的对象,导致内存溢出,此时可以通过eclipse的mat工具查看内存中有哪些对象比较多;
  • 内存占用不高,但是Full GC次数还是比较多,此时可能是显示的System.gc()调用导致GC次数过多,这可以通过添加-XX:+DisableExplicitGC来禁用JVM对显示GC的响应。

2. CPU过高

在前面第一点中,我们讲到,CPU过高可能是系统频繁的进行Full GC,导致系统缓慢。而我们平常也肯能遇到比较耗时的计算,导致CPU过高的情况,此时查看方式其实与上面的非常类似。首先我们通过top命令查看当前CPU消耗过高的进程是哪个,从而得到进程id;然后通过top -Hp <pid>来查看该进程中有哪些线程CPU过高,一般超过80%就是比较高的,80%左右是合理情况。这样我们就能得到CPU消耗比较高的线程id。接着通过该线程id的十六进制表示jstack日志中查看当前线程具体的堆栈信息。

在这里我们就可以区分导致CPU过高的原因具体是Full GC次数过多还是代码中有比较耗时的计算了。如果是Full GC次数过多,那么通过jstack得到的线程信息会是类似于VM Thread之类的线程,而如果是代码中有比较耗时的计算,那么我们得到的就是一个线程的具体堆栈信息。如下是一个代码中有比较耗时的计算,导致CPU过高的线程信息:

这里可以看到,在请求UserController的时候,由于该Controller进行了一个比较耗时的调用,导致该线程的CPU一直处于100%。我们可以根据堆栈信息,直接定位到UserController的34行,查看代码中具体是什么原因导致计算量如此之高。

3. 不定期出现的接口耗时现象

对于这种情况,比较典型的例子就是,我们某个接口访问经常需要2~3s才能返回。这是比较麻烦的一种情况,因为一般来说,其消耗的CPU不多,而且占用的内存也不高,也就是说,我们通过上述两种方式进行排查是无法解决这种问题的。而且由于这样的接口耗时比较大的问题是不定时出现的,这就导致了我们在通过jstack命令即使得到了线程访问的堆栈信息,我们也没法判断具体哪个线程是正在执行比较耗时操作的线程。

对于不定时出现的接口耗时比较严重的问题,我们的定位思路基本如下:首先找到该接口,通过压测工具不断加大访问力度,如果说该接口中有某个位置是比较耗时的,由于我们的访问的频率非常高,那么大多数的线程最终都将阻塞于该阻塞点,这样通过多个线程具有相同的堆栈日志,我们基本上就可以定位到该接口中比较耗时的代码的位置。如下是一个代码中有比较耗时的阻塞操作通过压测工具得到的线程堆栈日志:

"http-nio-8080-exec-2" #29 daemon prio=5 os_prio=31 tid=0x00007fd08cb26000 nid=0x9603 waiting on condition [0x00007000031d5000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
    at java.lang.Thread.sleep(Native Method)
    at java.lang.Thread.sleep(Thread.java:340)
    at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
    at com.aibaobei.user.controller.UserController.detail(UserController.java:18)

"http-nio-8080-exec-3" #30 daemon prio=5 os_prio=31 tid=0x00007fd08cb27000 nid=0x6203 waiting on condition [0x00007000032d8000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
    at java.lang.Thread.sleep(Native Method)
    at java.lang.Thread.sleep(Thread.java:340)
    at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
    at com.aibaobei.user.controller.UserController.detail(UserController.java:18)

"http-nio-8080-exec-4" #31 daemon prio=5 os_prio=31 tid=0x00007fd08d0fa000 nid=0x6403 waiting on condition [0x00007000033db000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
    at java.lang.Thread.sleep(Native Method)
    at java.lang.Thread.sleep(Thread.java:340)
    at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)
    at com.aibaobei.user.controller.UserController.detail(UserController.java:18)

从上面的日志可以看你出,这里有多个线程都阻塞在了UserController的第18行,说明这是一个阻塞点,也就是导致该接口比较缓慢的原因。

4. 某个线程进入WAITING状态

对于这种情况,这是比较罕见的一种情况,但是也是有可能出现的,而且由于其具有一定的“不可复现性”,因而我们在排查的时候是非常难以发现的。笔者曾经就遇到过类似的这种情况,具体的场景是,在使用CountDownLatch时,由于需要每一个并行的任务都执行完成之后才会唤醒主线程往下执行。而当时我们是通过CountDownLatch控制多个线程连接并导出用户的gmail邮箱数据,这其中有一个线程连接上了用户邮箱,但是连接被服务器挂起了,导致该线程一直在等待服务器的响应。最终导致我们的主线程和其余几个线程都处于WAITING状态。

对于这样的问题,查看过jstack日志的读者应该都知道,正常情况下,线上大多数线程都是处于TIMED_WAITING状态,而我们这里出问题的线程所处的状态与其是一模一样的,这就非常容易混淆我们的判断。解决这个问题的思路主要如下:

  • 通过grep在jstack日志中找出所有的处于TIMED_WAITING状态的线程,将其导出到某个文件中,如a1.log,如下是一个导出的日志文件示例:

    “Attach Listener” #13 daemon prio=9 os_prio=31 tid=0x00007fe690064000 nid=0xd07 waiting on condition [0x0000000000000000]
    “DestroyJavaVM” #12 prio=5 os_prio=31 tid=0x00007fe690066000 nid=0x2603 waiting on condition [0x0000000000000000]
    “Thread-0” #11 prio=5 os_prio=31 tid=0x00007fe690065000 nid=0x5a03 waiting on condition [0x0000700003ad4000]
    “C1 CompilerThread3” #9 daemon prio=9 os_prio=31 tid=0x00007fe68c00a000 nid=0xa903 waiting on condition [0x0000000000000000]

  • 等待一段时间之后,比如10s,再次对jstack日志进行grep,将其导出到另一个文件,如a2.log,结果如下所示:

    “DestroyJavaVM” #12 prio=5 os_prio=31 tid=0x00007fe690066000 nid=0x2603 waiting on condition [0x0000000000000000]
    “Thread-0” #11 prio=5 os_prio=31 tid=0x00007fe690065000 nid=0x5a03 waiting on condition [0x0000700003ad4000]
    “VM Periodic Task Thread” os_prio=31 tid=0x00007fe68d114000 nid=0xa803 waiting on condition

  • 重复步骤2,待导出3~4个文件之后,我们对导出的文件进行对比,找出其中在这几个文件中一直都存在的用户线程,这个线程基本上就可以确认是包含了处于等待状态有问题的线程。因为正常的请求线程是不会在20~30s之后还是处于等待状态的。

  • 经过排查得到这些线程之后,我们可以继续对其堆栈信息进行排查,如果该线程本身就应该处于等待状态,比如用户创建的线程池中处于空闲状态的线程,那么这种线程的堆栈信息中是不会包含用户自定义的类的。这些都可以排除掉,而剩下的线程基本上就可以确认是我们要找的有问题的线程。通过其堆栈信息,我们就可以得出具体是在哪个位置的代码导致该线程处于等待状态了。

这里需要说明的是,我们在判断是否为用户线程时,可以通过线程最前面的线程名来判断,因为一般的框架的线程命名都是非常规范的,我们通过线程名就可以直接判断得出该线程是某些框架中的线程,这种线程基本上可以排除掉。而剩余的,比如上面的Thread-0,以及我们可以辨别的自定义线程名,这些都是我们需要排查的对象。

经过上面的方式进行排查之后,我们基本上就可以得出这里的Thread-0就是我们要找的线程,通过查看其堆栈信息,我们就可以得到具体是在哪个位置导致其处于等待状态了。如下示例中则是在SyncTask的第8行导致该线程进入等待了。

"Thread-0" #11 prio=5 os_prio=31 tid=0x00007f9de08c7000 nid=0x5603 waiting on condition [0x0000700001f89000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304)
    at com.aibaobei.chapter2.eg4.SyncTask.lambda$main$0(SyncTask.java:8)
    at com.aibaobei.chapter2.eg4.SyncTask$$Lambda$1/1791741888.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:748)

5. 死锁

对于死锁,这种情况基本上很容易发现,因为jstack可以帮助我们检查死锁,并且在日志中打印具体的死锁线程信息。如下是一个产生死锁的一个jstack日志示例:

可以看到,在jstack日志的底部,其直接帮我们分析了日志中存在哪些死锁,以及每个死锁的线程堆栈信息。这里我们有两个用户线程分别在等待对方释放锁,而被阻塞的位置都是在ConnectTask的第5行,此时我们就可以直接定位到该位置,并且进行代码分析,从而找到产生死锁的原因。

6. 小结

本文主要讲解了线上可能出现的五种导致系统缓慢的情况,详细分析了每种情况产生时的现象,已经根据现象我们可以通过哪些方式定位得到是这种原因导致的系统缓慢。简要的说,我们进行线上日志分析时,主要可以分为如下步骤:

  • 通过 top命令查看CPU情况,如果CPU比较高,则通过top -Hp <pid>命令查看当前进程的各个线程运行情况,找出CPU过高的线程之后,将其线程id转换为十六进制的表现形式,然后在jstack日志中查看该线程主要在进行的工作。这里又分为两种情况
  • 如果是正常的用户线程,则通过该线程的堆栈信息查看其具体是在哪处用户代码处运行比较消耗CPU;
  • 如果该线程是VM Thread,则通过jstat -gcutil <pid> <period> <times>命令监控当前系统的GC状况,然后通过jmap dump:format=b,file=<filepath> <pid>导出系统当前的内存数据。导出之后将内存情况放到eclipse的mat工具中进行分析即可得出内存中主要是什么对象比较消耗内存,进而可以处理相关代码;
  • 如果通过 top 命令看到CPU并不高,并且系统内存占用率也比较低。此时就可以考虑是否是由于另外三种情况导致的问题。具体的可以根据具体情况分析:
  • 如果是接口调用比较耗时,并且是不定时出现,则可以通过压测的方式加大阻塞点出现的频率,从而通过jstack查看堆栈信息,找到阻塞点;
  • 如果是某个功能突然出现停滞的状况,这种情况也无法复现,此时可以通过多次导出jstack日志的方式对比哪些用户线程是一直都处于等待状态,这些线程就是可能存在问题的线程;
  • 如果通过jstack可以查看到死锁状态,则可以检查产生死锁的两个线程的具体阻塞点,从而处理相应的问题。

本文主要是提出了五种常见的导致线上功能缓慢的问题,以及排查思路。当然,线上的问题出现的形式是多种多样的,也不一定局限于这几种情况,如果我们能够仔细分析这些问题出现的场景,就可以根据具体情况具体分析,从而解决相应的问题。

❌