您好,欢迎来到五一七教育网。
搜索
您的当前位置:首页spark读写elasticsearch的坑

spark读写elasticsearch的坑

来源:五一七教育网
// 写elasticsearch的代码
ds.write
      .format("org.elasticsearch.spark.sql")
      .option("es.nodes.wan.only", "true")
      .option("es.mapping.id", "_id")
      .option("es.mapping.exclude", "_id")
      .option("es.nodes", host)
      .option("es.port", port)
      .option("es.update.script.lang","painless")
      .option("es.update.script.inline",script)  // es.update.script.inline 6.0以及之后的版本
      .option("es.update.script.params", params)
      .option("es.write.operation", "upsert")
      .option("es.batch.write.retry.count", 3)
      .option("es.update.retry.on.conflict", 3)
      .option("es.mapping.exclude", "_id")
      .option("es.batch.write.refresh", "false") // 在每次bulk操作之后执行refresh操作, 默认为true
      .option("es.batch.size.bytes", "1mb") // 默认每次bulk操作的数据量大小
      .option("es.batch.size.entries", "1000") // bulk的操作数据条数
      .mode("append")
      .save(index_name)

// 读取elasticsearch的代码配置
val ds = spark.read
      .format("org.elasticsearch.spark.sql")
      .option("es.read.metadata", "true") // 读取元数据信息
      .option("es.read.metadata.field", "_metadata")
      .option("es.nodes.wan.only","true") // 公网的时候必传
      .option("pushdown", "true")
      .option("es.port",port)
      .option("es.net.ssl","false")
      .option("es.nodes", host)
      .option("query", query) // 传入查询的dsl语句
      .option("es.read.field.include", includeField) // 读取数据的时候可以在这个进行字段筛选
      .option("es.read.field.as.array.include", arrIncludeField) // 在读取数组的时候需要加这个参数,否则无法识别
      .option("es.mapping.date.rich", "false")
      .option("es.scroll.size", "10000") // es滚动读取的时候每次请求的数据最大条数, 默认50
      .option("es.input.max.docs.per.partition", "100000") // 每个分区处理的最大条数, 默认100000
      .load(index_name)

1、es.nodes.wan.only

如果运行的spark程序和你的elasitcsearch是在同一个网段的时候,不加这个是没有问题的。但是如果不在同一个网段比如是在公网上面的话,则会报请求失败的错误。

2、es.update.retry.on.conflict

如果是并发更新的时候,如果是更新到同一条记录上的时候则会报冲突,所以要设置这个参数

3、es.update.script.inline

elasticsear的spark插件在6.0版本之前是es.update.script这个参数,由于我自己用的是5.x的版本所以用的是es.update.script参数,但是这里在更新嵌套类型的数据结构的时候会报无法转成scala.tuple2的错误,只需要使用es.update.script.inline这个参数就能解决,但是使用这个参数在更新es7.0的时候还是会报这个错误:

附上spark on elasticsearch的配置项链接:

4、spark-sql的udf里面获取广播变量的问题

今天碰到一个类似于:这个的问题,在udf里面调用广播变量的value获取值一直报空指针,后来在udf的类里面 将广播变量作为成员变量获取到了。由于udf的函数的类文件会分发到各个excutor节点上进行调用所以构造好的广播变量的成员变量在各个executor上也能顺利获取到。

2022-01-10更新

问题:集群出现以上的情况,当时有一个计算指标的任务,以upsert的方式往elasticsearch写入200多w条数据,然后卡顿了6个多小时都没写完,任务表现为hang住,es的cpu飙升到100%左右。

这个问题后来询问网友,问题应该是出现在elasticsearch的默认1s的refresh策略导致的,具体优化策略为修改es的refesh interval为-1,当数据写完之后,然后refresh=true,强制刷新一下。

由于spark on ealsticsearch5.6版本配置项es.batch.write.refresh = true。默认会在每次bulk写完成之后强制刷新一次,所以这可能会导致cpu飙升。

20220329更新:

最近由于需要进行elasticsearch5.x的集群数据迁移到elasticsearch7.10上去,由于集群都在阿里云上,并且也不支持logstash和reindex的方式,所以只能写代码进行同步。

使用spark rdd 【elasticsearch-spark-20_2.11】方式来读取es,然后使用【elasticsearch-rest-high-level-client】批量写入到集群中去。

其中在spark读取es的时候的参数配置如下:

.set("es.scroll.size", 5000)
.set("es.input.max.docs.per.partition", 5000000)
.set("es.input.use.sliced.partitions", false)
.set("es.scroll.keepalive", "10m")

其中es.input.max.docs.per.partition 决定spark会生成多少个partition对应执行的task,同时es.scroll.size 指定了每次滚动查询获取数据的条数。

在RestService.findPartitions获取job需要生成的partitions列表:

public static List<PartitionDefinition> findPartitions(Settings settings, Log log) {
    Version.logVersion();

    InitializationUtils.validateSettings(settings);
    InitializationUtils.validateSettingsForReading(settings);

    EsMajorVersion version = InitializationUtils.discoverEsVersion(settings, log);
    List<NodeInfo> nodes = InitializationUtils.discoverNodesIfNeeded(settings, log);
    InitializationUtils.filterNonClientNodesIfNeeded(settings, log);
    InitializationUtils.filterNonDataNodesIfNeeded(settings, log);
    InitializationUtils.filterNonIngestNodesIfNeeded(settings, log);

    RestRepository client = new RestRepository(settings);
    try {
        boolean indexExists = client.indexExists(true);

        List<List<Map<String, Object>>> shards = null;

        if (!indexExists) {
            if (settings.getIndexReadMissingAsEmpty()) {
                log.info(String.format("Index [%s] missing - treating it as empty", settings.getResourceRead()));
                shards = Collections.emptyList();
            } else {
                throw new EsHadoopIllegalArgumentException(
                        String.format("Index [%s] missing and settings [%s] is set to false", settings.getResourceRead(), ConfigurationOptions.ES_INDEX_READ_MISSING_AS_EMPTY));
            }
        } else {
            shards = client.getReadTargetShards();
            if (log.isTraceEnabled()) {
                log.trace("Creating splits for shards " + shards);
            }
        }

        log.info(String.format("Reading from [%s]", settings.getResourceRead()));

        MappingSet mapping = null;
        if (!shards.isEmpty()) {
            mapping = client.getMappings();
            if (log.isDebugEnabled()) {
                log.debug(String.format("Discovered resolved mapping {%s} for [%s]", mapping.getResolvedView(), settings.getResourceRead()));
            }
            // validate if possible
            FieldPresenceValidation validation = settings.getReadFieldExistanceValidation();
            if (validation.isRequired()) {
                MappingUtils.validateMapping(SettingsUtils.determineSourceFields(settings), mapping.getResolvedView(), validation, log);
            }
        }
        final Map<String, NodeInfo> nodesMap = new HashMap<String, NodeInfo>();
        if (nodes != null) {
            for (NodeInfo node : nodes) {
                nodesMap.put(node.getId(), node);
            }
        }
        final List<PartitionDefinition> partitions;
        // 判断es的版本是否是5.x或之后的版本,则为每个shard生成总文档数 / es.input.max.docs.per.partition 个partition
        if (version.onOrAfter(EsMajorVersion.V_5_X)) {
            partitions = findSlicePartitions(client.getRestClient(), settings, mapping, nodesMap, shards, log);
        } else {
            // 如果是5.x之前的版本,则根据有多少个shard生成多少个partition
            partitions = findShardPartitions(settings, mapping, nodesMap, shards, log);
        }
        Collections.shuffle(partitions);
        return partitions;
    } finally {
        client.close();
    }
}

/**
 * Create one {@link PartitionDefinition} per shard for each requested index.
 * 则一个shard生成一个partition
 */
static List<PartitionDefinition> findShardPartitions(Settings settings, MappingSet mappingSet, Map<String, NodeInfo> nodes,
                                                     List<List<Map<String, Object>>> shards, Log log) {
    Mapping resolvedMapping = mappingSet == null ? null : mappingSet.getResolvedView();
    List<PartitionDefinition> partitions = new ArrayList<PartitionDefinition>(shards.size());
    for (List<Map<String, Object>> group : shards) {
        String index = null;
        int shardId = -1;
        List<String> locationList = new ArrayList<String> ();
        for (Map<String, Object> replica : group) {
            ShardInfo shard = new ShardInfo(replica);
            index = shard.getIndex();
            shardId = shard.getName();
            if (nodes.containsKey(shard.getNode())) {
                locationList.add(nodes.get(shard.getNode()).getPublishAddress());
            }
        }
        if (index == null) {
            // Could not find shards for this partition. Continue anyway?
            if (settings.getIndexReadAllowRedStatus()) {
                log.warn("Shard information is missing from an index and will not be reached during job execution. " +
                        "Assuming shard is unavailable and cluster is red! Continuing with read operation by " +
                        "skipping this shard! This may result in incomplete data retrieval!");
            } else {
                throw new IllegalStateException("Could not locate shard information for one of the read indices. " +
                        "Check your cluster status to see if it is unstable!");
            }
        } else {
            PartitionDefinition partition = new PartitionDefinition(settings, resolvedMapping, index, shardId,
                    locationList.toArray(new String[0]));
            partitions.add(partition);
        }
    }
    return partitions;
}

/**
 * Partitions the query based on the max number of documents allowed per partition {@link Settings#getMaxDocsPerPartition()}.
 */
static List<PartitionDefinition> findSlicePartitions(RestClient client, Settings settings, MappingSet mappingSet,
                                                     Map<String, NodeInfo> nodes, List<List<Map<String, Object>>> shards, Log log) {
    QueryBuilder query = QueryUtils.parseQueryAndFilters(settings);
    int maxDocsPerPartition = settings.getMaxDocsPerPartition();
    String types = new Resource(settings, true).type();
    Mapping resolvedMapping = mappingSet == null ? null : mappingSet.getResolvedView();

    List<PartitionDefinition> partitions = new ArrayList<PartitionDefinition>(shards.size());
    for (List<Map<String, Object>> group : shards) {
        String index = null;
        int shardId = -1;
        List<String> locationList = new ArrayList<String> ();
        for (Map<String, Object> replica : group) {
            ShardInfo shard = new ShardInfo(replica);
            index = shard.getIndex();
            shardId = shard.getName();
            if (nodes.containsKey(shard.getNode())) {
                locationList.add(nodes.get(shard.getNode()).getPublishAddress());
            }
        }
        String[] locations = locationList.toArray(new String[0]);
        if (index == null) {
            // Could not find shards for this partition. Continue anyway?
            if (settings.getIndexReadAllowRedStatus()) {
                log.warn("Shard information is missing from an index and will not be reached during job execution. " +
                        "Assuming shard is unavailable and cluster is red! Continuing with read operation by " +
                        "skipping this shard! This may result in incomplete data retrieval!");
            } else {
                throw new IllegalStateException("Could not locate shard information for one of the read indices. " +
                        "Check your cluster status to see if it is unstable!");
            }
        } else {
            StringBuilder indexAndType = new StringBuilder(index);
            if (StringUtils.hasLength(types)) {
                indexAndType.append("/");
                indexAndType.append(types);
            }
            // TODO applyAliasMetaData should be called in order to ensure that the count are exact (alias filters and routing may change the number of documents)
            // 先获取index下每个shard的文档数量
            long numDocs = client.count(indexAndType.toString(), Integer.toString(shardId), query);
            // 然后每个shard的文档数量 / es.input.max.docs.per.partition 就是该shard对应的partition数量
            int numPartitions = (int) Math.max(1, numDocs / maxDocsPerPartition);
            // 生成每个shard对应的计算出的partition数量
            for (int i = 0; i < numPartitions; i++) {
                PartitionDefinition.Slice slice = new PartitionDefinition.Slice(i, numPartitions);
                partitions.add(new PartitionDefinition(settings, resolvedMapping, index, shardId, slice, locations));
            }
        }
    }
    return partitions;
}

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- 517ttc.cn 版权所有 赣ICP备2024042791号-8

违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务