spark 输出写入kafka[如何在spark中使用socket输出]

spark 输出写入kafka[如何在spark中使用socket输出]

spark提供了丰富的source源,我们可以从flume、kafka、各类文件系统中读取数据进行处理,但spark的输出组件提供的并不像输入那样丰富。spark就没有提供写入kafka的输出组件。 在spark streaming中,spark的输出结果很可能会写入一个队列供其他消费者消费,而无论这个消息服务是什么,大多情况下我们都需要使用socket与之进行通信。而在spark streaming中使用socket,我们需要考虑两个问题: socket在对象序列化后会关闭,反序列化时,需要重新激活(由于java中没有反序列化事件,这条很难实现) 重复创建链接会给server端带来压力,同时造成资源浪费 那么,我们需要解决的就是,在不能复用连接到情况下,如何控制socket数量。使用懒汉式单例模式,可以延迟初始化socket链接,对象创建时并不会初始化socket,真正调用的时候才会初始化。 一个使用redis的示例如下: public class SpiderCache implements Serializable { protected String host; protected int port; protected Jedis jedis; public SpiderCache(String host, int port) { this.host = host; this.port = port; } public void lazyInit() { if (jedis == null) { jedis = new Jedis(this.host, this.port); } } public String get(String
Continue reading spark 输出写入kafka[如何在spark中使用socket输出]