本文共 2201 字,大约阅读时间需要 7 分钟。
通道服务Tunnel Service是基于Table Store数据接口之上的全增量一体化服务,它通过一组Tunnel Service API和SDK为用户提供了增量、全量和增量加全量三种类型的分布式数据实时消费通道。通过为数据表建立Tunnel Service数据通道,用户可以简单地实现对表中历史存量和新增数据的消费处理。
Table Store NoSQL数据库非常适合元数据管理、时序数据监控、消息系统等服务应用,这些应用的一个常见设计模式就是利用增量数据流或者先全量后增量的数据流来触发一些附加的操作逻辑,这些附加操作包括:
利用Tunnel Service,我们可以针对这些模式轻松构建高效、弹性的解决方案,如下图:
Tunnel Service是Table Store在stream功能之上推出的更强大的数据通道功能,Tunnel Service提供了:
我们可以使用table store控制台快速体验tunnel service功能:
// 用户自定义数据消费Callback, 即实现IChannelProcessor接口(process和shutdown)private static class SimpleProcessor implements IChannelProcessor { @Override public void process(ProcessRecordsInput input) { System.out.println("Default record processor, would print records count"); System.out.println( String.format("Process %d records, NextToken: %s", input.getRecords().size(), input.getNextToken())); try { // Mock Record Process. Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void shutdown() { System.out.println("Mock shutdown"); }} // TunnelWorkerConfig里面还有更多的高级参数,这里不做展开,会有专门的文档介绍。TunnelWorkerConfig config = new TunnelWorkerConfig(new SimpleProcessor());// 配置TunnelWorker,并启动自动化的数据处理任务。TunnelWorker worker = new TunnelWorker($tunnelId, tunnelClient, config);try { worker.connectAndWorking();} catch (Exception e) { e.printStackTrace(); worker.shutdown(); tunnelClient.shutdown();}
转载地址:http://rrzja.baihongyu.com/