hive 和 elasticsearch 的整合

ElasticSearch已经可以与YARN、Hadoop、Hive、Pig、Spark、Flume等大数据技术框架整合起来使用,尤其是在添加数据的时候,可以使用分布式任务来添加索引数据,尤其是在数据平台上,很多数据存储在Hive中,使用Hive操作ElasticSearch中的数据,将极大的方便开发人员。



使用elasticsearch向hive中写数据
第一步、创建hive外部表:
hive -e”
add jar hdfs:/opt/data/jar/elasticsearch-hadoop-2.4.3.jar;
add jar hdfs:/opt/data/jar/org.apache.commons.httpclient_3.1.0.v201012070820.jar;
–org.apache.commons.httpclient_3.1.0.v201012070820.jar这个jar包一般是不用加的,除非在报httpclient相关错误的时候才加



use ods;
create external table if not exists app.test_tag(
complaint_id string,
tag_name string,
classification string,
sub_classification string
)
STORED BY ‘org.elasticsearch.hadoop.hive.EsStorageHandler’
TBLPROPERTIES(‘es.nodes’=’000.000.000.000’,’es.resource’ = ‘test_tag/defect_recommend_tag’);
–es.resource斜线前面为索引(这里索引与hive表同名),斜线后面为mapping名称。mapping可以不用事先建好(建议自定义一个),当数据插入时会自动根据数据类型创建mapping。
必须指出明确的字段 select complaint_id from app.test_tag; 不可以使用 select * from app.test_tag;



通过hive向写elasticsearch的写如数据
ES-hadoop的hive整合 : https://www.elastic.co/guide/en/elasticsearch/hadoop/current/hive.html#hive
ES-hadoop的配置说明 : https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

1、上传elasticsearh-hadoop的jar包到server1-hadoop-namenode-01上



在server1-hadoop-namenode-01上执行:

cp /home/dinpay/soft/elasticsearch-hadoop-2.3.4.jar /home/dinpay/hive/lib


2、然后修改hive-site.xml文件



    cd  /home/dinpay/hive/conf

vi hive-site.xml

增加内容:




hive.aux.jars.path

file:///home/dinpay/hive/lib/elasticsearch-hadoop-2.3.4.jar

A comma separated list (with no spaces) of the jar files


一、添加依赖包
首先确定elasticsearch的版本,若es版本为6.1.2,可用jar包:elasticsearch-hadoop-6.1.2.jar,网上有说只要高于6.1.2版本的jar包即可(自行验证)。当报httpclient相关错误时,还需要添加org.apache.commons.httpclient这个jar包。
首先进入hive,然后通过下面命令添加依赖包。



add jar yourPath/elasticsearch-hadoop-6.1.2.jar;
1
当然还有其它添加方式,具体可查看https://www.elastic.co/guide/en/elasticsearch/hadoop/current/hive.html#hive



二、创建hive外部表:
create external table if not exists es_cmb_test(
ptf_id string,
ptf_name string,
bill_date string,
acc_status string
)
STORED BY ‘org.elasticsearch.hadoop.hive.EsStorageHandler’
TBLPROPERTIES(
‘es.resource’ = ‘test/es_cmb_test’,
‘es.nodes’=’192.168.1.1’,
‘es.port’=’9200’,
‘es.mapping.id’ = ‘ptf_id’,
‘es.index.auto.create’ = ‘true’,
‘es.write.operation’=’upsert’);
TBLPROPERTIES后面为设置ES的属性,例如:
通过’es.mapping.id’ = ‘ptf_id’ 指定id。
通过’es.write.operation’=’upsert’ 来执行插入或者更新操作(如果id存在)。
详情可查https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html



三、插入数据
insert overwrite table es_cmb_test
select
ptf_id,
ptf_name,
bill_date,
acc_status
from test.cmb_test;
1.elasticsearch字段较多时,可以建立多个hive映射表分别进行写入
2.hive无法删除elasticsearch中的记录,只能插入和更新
3.hive的insert into和insert overwrite操作elasticsearch时结果是一样的




  1. 新建一个hive表es_goods_order
    将该hive表的数据存储指定到ES上,指定索引的ID列是goods_order_id(’es.mapping.id’ = ‘goods_order_id’,);
    指定数据写入的方式是upsert(‘es.write.operation’=’upsert’),如果id不存在就插入,如果存在就执行更新操作。
    add jar file:///home/hadoop/lib/elasticsearch-hadoop-5.1.1.jar;
    set username=fxin.zhao
    use temp;
    CREATE EXTERNAL TABLE es_goods_order(
    goods_order_id string,
    sale_place string,
    station_place string,
    multi_channel_id string,
    business_date string,
    discount string,
    discount_type string,
    payment_amouunt string,
    refun_amount string
    )
    STORED BY ‘org.elasticsearch.hadoop.hive.EsStorageHandler’
    TBLPROPERTIES(
    ‘es.resource’ = ‘test_crm/es_goods_order’,
    ‘es.nodes’=’10.10.110.125’,
    ‘es.port’=’9200’,
    ‘es.mapping.id’ = ‘goods_order_id’,
    ‘es.write.operation’=’upsert’
    );
    向es_goods_order表中插入数据:3分钟启用1个maper写入80万数据。Es中的index是在导入数据的时候检查的,如果不存在,则会创建。
    add jar file:///home/hadoop/lib/elasticsearch-hadoop-5.1.1.jar;
    use temp;
    insert into table es_goods_order
    select goods_order_id,
    sale_place,
    station_place,
    multi_channel_id,
    business_date,
    discount,
    discount_type,
    payment_amouunt,
    refun_amount
    from ods.goods_order
    where dt >= ‘2016-10-01’
    and dt <= ‘2016-10-04’;
    指定ID问题: 通过’es.mapping.id’ = ‘goods_order_id’ 指定id。
    数据更新问题: 通过’es.write.operation’=’upsert’ 来执行插入或者更新操作(如果id存在)。
    ES 的hive表基于json存储。
    hadoop fs -put 20170111202237 /tmp/fuxin.zhao/es_json
    通过Hive读取与统计分析ElasticSearch中的数据
    ElasticSearch中已有的数据
    _index:lxw1234
    _type:tags
    _id:用户ID(cookieid)
    字段:area、media_view_tags、interest
    Hive建表
    由于我用的ElasticSearch版本为2.1.0,因此必须使用elasticsearch-hadoop-2.2.0才能支持,如果ES版本低于2.1.0,可以使用elasticsearch-hadoop-2.1.2.
    下载地址:https://www.elastic.co/downloads/hadoop
    add jar file:///home/liuxiaowen/elasticsearch-hadoop-2.2.0-beta1/dist/elasticsearch-hadoop-hive-2.2.0-beta1.jar;
    CREATE EXTERNAL TABLE lxw1234_es_tags (
    cookieid string,
    area string,
    media_view_tags string,
    interest string
    )
    STORED BY ‘org.elasticsearch.hadoop.hive.EsStorageHandler’
    TBLPROPERTIES(
    ‘es.nodes’ = ‘172.16.212.17:9200,172.16.212.102:9200’,
    ‘es.index.auto.create’ = ‘false’,
    ‘es.resource’ = ‘lxw1234/tags’,
    ‘es.read.metadata’ = ‘true’,
    ‘es.mapping.names’ = ‘cookieid:_metadata._id, area:area, media_view_tags:media_view_tags, interest:interest’);
    注意:因为在ES中,lxw1234/tags的_id为cookieid,要想把_id映射到Hive表字段中,必须使用这种方式:
    ‘es.read.metadata’ = ‘true’,
    ‘es.mapping.names’ = ‘cookieid:_metadata._id,…’
    数据已经可以正常查询。
    执行SELECT COUNT(1) FROM lxw1234_es_tags;Hive还是通过MapReduce来执行,每个分片使用一个Map任务:
    可以通过在Hive外部表中指定search条件,只查询过滤后的数据。比如,下面的建表语句会从ES中搜索_id=98E5D2DE059F1D563D8565的记录:
    如果数据量不大,可以使用Hive的Local模式来执行,这样不必提交到Hadoop集群
    通过Hive向ElasticSearch中写数据
    Hive建表
    这里要注意下:如果是往_id中插入数据,需要设置’es.mapping.id’ = ‘cookieid’参数,表示Hive中的cookieid字段对应到ES中的_id,而es.mapping.names中不需要再映射,这点和读取时候的配置不一样。



关闭Hive推测执行,执行INSERT:
注意:如果ES集群规模小,而source_table数据量特别大、Map任务数太多的时候,会引发错误:
原因是Map任务数太多,并发发送至ES的请求数过多。
这个和ES集群规模以及bulk参数设置有关,目前还没弄明白。
减少source_table数据量(即减少Map任务数)之后,没有出现这个错误。



执行完成后,在ES中查询lxw1234/user_tags的数据:
使用Hive将数据添加到ElasticSearch中还是非常实用的,因为我们的数据都是在HDFS上,通过Hive可以查询的。



另外,通过Hive可以查询ES数据,并在其上做复杂的统计与分析,但性能一般,比不上使用ES原生API,亦或是还没有掌握使用技巧,后面继续研究


Category elasticsearch