spark 输出写入kafka[如何在spark中使用socket输出]

大数据 Jenner 8508℃ 0评论

spark提供了丰富的source源,我们可以从flume、kafka、各类文件系统中读取数据进行处理,但spark的输出组件提供的并不像输入那样丰富。spark就没有提供写入kafka的输出组件。

在spark streaming中,spark的输出结果很可能会写入一个队列供其他消费者消费,而无论这个消息服务是什么,大多情况下我们都需要使用socket与之进行通信。而在spark streaming中使用socket,我们需要考虑两个问题:

  1. socket在对象序列化后会关闭,反序列化时,需要重新激活(由于java中没有反序列化事件,这条很难实现)
  2. 重复创建链接会给server端带来压力,同时造成资源浪费

那么,我们需要解决的就是,在不能复用连接到情况下,如何控制socket数量。使用懒汉式单例模式,可以延迟初始化socket链接,对象创建时并不会初始化socket,真正调用的时候才会初始化。

一个使用redis的示例如下:

public class SpiderCache implements Serializable {
    protected String host;
    protected int port;
    protected Jedis jedis;

    public SpiderCache(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void lazyInit() {
        if (jedis == null) {
            jedis = new Jedis(this.host, this.port);
        }
    }

    public String get(String key){
        this.lazyInit();
        String hashKey = getHashKey(key);
        return jedis.get(hashKey);
    }

    public boolean exists(String key) {
        this.lazyInit();
        return jedis.exists(key);
    }

    public boolean set(String key, String value){
        this.lazyInit();
        String hashKey = getHashKey(key);
        if (jedis.set(hashKey, value).equals("OK")) {
            return true;
        }
        return false;
    }

    protected String getHashKey(String key) {
        return DigestUtils.md5Hex(key);
    }
}

使用redis(或kafka)时,只需要将上述对象传入到map或reduce对象中即可。这样你不必重复在每个map和reduce中创建对象,同时也不需要考虑链接失效的问题。

 

 

原创文章,转载请注明: 转载自始终不够

本文链接地址: spark 输出写入kafka[如何在spark中使用socket输出]

转载请注明:始终不够 » spark 输出写入kafka[如何在spark中使用socket输出]

喜欢 (0)