Elasticsearch Aggregations Max Sum for Java API

Elasticsearch Aggregations for Java


今天介紹的是使用Elasticsearch Aggregations 計算和加總的功能,什麼是Elasticsearch 的Aggregations,在ES最大的好處是可以用Aggs做計算跟加總,ES已經幫我們做好了計算的套件功能,我們只需要拿來使用就可以,非常的好用又快速。不管是Group by 或Cardinality、Sum使用Aggregations就可以幫你算出你想要的數值。
     例如現在格式為 
{ "download": 12,
  "createtime": "2016-01-01",
  "product": "3c",
  "version": "1.2.3",
  "userid": 11224 }

 我們將download 數值找出Max數值在Sum,我們要如何下ES Aggs指令如下:
curl  -XPOST http://xxx:9200/index/type
{
  "query": {
    "bool": {
      "must": [
        {
          "match_all": {}
        }
      ]
    }
  },
  "size": 0,
  "aggs": {
    "by_group": {
      "terms": {
        "field": "product",
        "size": 0
      },
      "max_download": {
        "max": {
          "field": "download"
        }
      }
    },
    "sum_download": {
      "sum_bucket": {
        "buckets_path": "product>max_download",
        "size": 0
      }
    }
  }
}




  
透過以上指令使用curl查詢,回傳數值是正常,那我們接下來就是用ES所提供的Java API




BoolQueryBuilder query = QueryBuilders.boolQuery().must(QueryBuilders.matchAllQuery());

        TermsBuilder aggs = AggregationBuilders.terms("by_product").field("product").size(0)
                .subAggregation( AggregationBuilders.max("by_max").field("download")).size(0)
                .subAggregation(AggregationBuilders.sum("by_sum").field("by_product>by_max")).size(0);

        SearchResponse ResultSet =  ES(nodes,nodes2,nodes3).prepareSearch("index").setTypes("type")
                .setQuery(query)
                .addAggregation(aggs)
                .setSize(0)
                .get();

        Aggregations result = ResultSet.getAggregations();
        Terms groupby1 =  result.get("by_product");
        for(Terms.Bucket object : groupby1.getBuckets()) {
            Terms bymax = object.getAggregations().get("by_max");
            Terms bysum = object.getAggregations().get("by_sum");
        }



說明一下為什麼在Aggs 為什麼要使用size =0 , size =0 就是將所有的資料顯示出來,如果沒有設定,預設值為10筆資料。另外比較特別的是
Top Hits API size就反過來,需要設定為9999999將所有資料顯示。如果設定為0是不會顯示資料,這要注意一下。
在ResultSet設定size=0,因為我們不需要取得內容所以不用顯示出來。

References
https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-max-aggregation.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-sum-aggregation.html

Azure swapon 記憶體


Azure swapon 設定


有使用Azure平台的虛擬機的人都知道,在azure VM會有一個mnt這個路徑 ,但是這個路徑是用來放暫時的檔案,當虛擬機重啟後/mnt資料夾內的檔案將會消失。我們今天就要來教各位不要浪費這個空間。使用Swapon增加虛擬機的記憶體,讓虛擬機可以使用更多記憶體。

       首先查詢是如有使用Swapon空間,
Stop1
swapon -s 確認目前的swanp大小,
 Stop2
設定需要的swap記憶體空間,count我們新增54G的swap或是你可以設定更多空間
1048576*54 =54525952 54G空間 
sudo dd if=/dev/zero of=/mnt/swapfile(路徑自選) bs=1024 count=54525952 
Stop3
 接下來將swap空間格式化 
sudo mkswap /mnt/swapfile 
Stop4
啟動swap空間
sudo swapon /mnt/swapfile
查詢是如有啟動swap
swapon -s
Stop5
在來就是設定重啟後自動掛載swap空間,編輯fstab 
sudo vim /etc/fstab 
加入以下文字
/mnt/swapfile swap swap defaults 0 0 

完成,當虛擬機重動後VM將會自動掛載swap空間,讓虛擬機記憶體空間更多,可以試試在azure架設Elasticsearch非常的好用,

Spark SQL Wildcard Example


Spark SQL

        最近工作上的需要使用Spark SQL連接不同的DataBase,或是連接Nosql,是個非常的實用工具,一般使用SQL語法,只能單一Search一個Database多個Table,但是在Spark SQL完成不一樣,可以跨不同的Database不同的Table在將需要的Table,Join or where,這樣少減非常多時間及麻煩。個人是使用Scala語言,Spark SQL也可以Java語言,只是個人覺得Java在編寫Code時需要編寫蠻多行數,所以改成用Scala寫簡單又快速 :)。

SQL語法與Spark SQL大約有點相識,很多都是差不多,例如用SQL "Where" ID=1Spark SQL 一樣也是"Where" 只是Spark SQL 的等於是"===" ,ID === 1 ,或是可以使用Filter 功能與 Where是一樣的。看個人使用的習實,自已常用的是Filter

在Spark SQL如何使用萬用字元search需要的資料呢?
範例一:找出大寫A-Z其他不需要方法:
SQL 語法

SELECT * FROM USER WHERE TEXT LIKE '%A-Z%'


Spark SQL 語法

.filter( col(TEXT).rlike("[^.*A-Z.*]") )

解釋一下Spark SQL rlike 是有包括[字元] ,like則是完全符合字元
大家在使用時需要注意。這也是自已摸索很久才瞭解,
.*則是包括全部字元, *.則是一個字元,分享給大家。
如果需要使用以string比對內容文字需要import spark function
    import org.apache.spark.sql.functions._

範例二:找出相同內容文字:
SQL  語法

SELECT * FROM USER WHERE substring(TEXT,2,4) = 'NEIL';


Spark SQL 語法

.filter( substring(col(TEXT),2,4).like("NEIL") )






Cassandra Nosql login model



Cassandra Nosql Database Play Framework for Scala

Cassandra是一個Open Source的Database,這幾年來越來越多人使用Nosql取代傳統式的Databse,Nosql Databse特點就是免費、分散式架構,比起一般傳統式的DB速度較快,可運用在Win Linux, Mac Os平台上 ,但缺點就是技術文件較少,需要自已研讀國外技術文件及研究API。

    今天要介紹的是使用Docker虛擬化平台,建置簡易Cassnadra Database Login model , 首先安裝Docker安裝完成Docker後至DockerHub下載Cassandra官方建置完成的Docker image。
1.下載Cassandra Docker Image

docker pull cassandra




2.啟動Cassandra,在啟動Cassandra之前記得開啟對外Port,否則無法連線至Cassandra Database

docker run -it --name cassandra -p 7199:7199 -p 8888:8888 -p 9016:9016 -p 9042:9042 -p 9160:9160 -p 50031:50031 -p 61620:61620 -p 61621:61621  cassandra





解釋:-it 執行Docker image
--name 指定docker 名稱
-p 對外port    7199:7199 前面port為docker image內部:後面port為本機對外port
啟動完成後透過DevCenter工具,DevCenter像是MySqlWorkbench介面工具。透過DevCenter可以直接使用CQL語法CURD功能。
DevCenter是一套連接Cassandra工具,由DataStax Team所開發,下載時需要先註冊,
DataStax下次有時間在介紹。

3.啟動DataStax測試連線Cassandra for Docker image






連線正常將會在左面出現 docker cassandra [1/1 Connected] 。

3. 建立Cassandra  KEYSPCE 及TABLE  User Schema
透過DevCenter工具執行CQL語法




Create login KEYSPCE

CREATE KEYSPACE IF NOT EXISTS login
WITH replication = {
 'class' : 'SimpleStrategy',
 'replication_factor' : 1
};



------------------------------------------------------------------------------------------------------------------

Create user Table 

CREATE TABLE login.user (
 id uuid,
 name text,
 password text,
 PRIMARY KEY (id)
);



------------------------------------------------------------------------------------------------------------------
在Cassandra比較特別的是如果你需要查詢欄位,是以PRIMARY KEY為主,
如果需要查詢Table內的欄位,只能建立index是否在查詢只能透過PRIMARY KEY才可以。
Create index for search user name

CREATE INDEX IF NOT EXISTS by_name ON login.user (name);



------------------------------------------------------------------------------------------------------------------

新增一筆USER資料
Insert data

insert into user(id,name,password)VALUES(now(),'neil','1234');




4.依個人使用開發工具習慣為主,本身比較習慣使用Intellij IDEA, 建立Play Framework


















開啟build.sbt,加入Cassandra Library


libraryDependencies ++= Seq(
    "com.datastax.cassandra" % "cassandra-driver-core" % "3.0.1" ,
    "com.datastax.cassandra" % "cassandra-driver-mapping" % "3.0.1"
)



加入Library後,建立兩個Package ,cassandraDAO,loingModel,
app
 |--- controllers
 |--- cassandraDAO
 |--- loginModel

Download My GitHub Cassandra_login_model

controllers資料夾建立一個Login Class
在Class 建立三個函式def ,分別為 login,logout,authenticate

登入動作

def login = Action { implicit request =>
    println("login: "+request.body.asFormUrlEncoded)
    Ok(views.html.login(loginForm))}


登出 動作

def logout = Action {
    Redirect(routes.Login.login).withNewSession.flashing(
      "success" -> "You are now logged out.")}



驗證user資料,當name與password有一項不符合無法登入

def authenticate = Action { implicit request =>
    loginForm.bindFromRequest.fold(
      formWithErrors => BadRequest(views.html.login(formWithErrors)),
      user => Redirect(routes.Application.index()).withSession("user" -> user.name ))}



建立Cassandra Connect  新增project 名稱為cassandraDAO, 這個Connect將是連接Cassandra DB
建立一個scala trait

trait Cassandra {
    protected def cassandra_session:Session = {
      val cluster = new Cluster.Builder().
         addContactPoint("192.168.99.100").         //       local docker cassandra
         withPort(9042).
         withQueryOptions(new QueryOptions()
         .setConsistencyLevel(QueryOptions.DEFAULT_CONSISTENCY_LEVEL)
       ).build
      val session = cluster.connect()
        session}
}



新增project models,建立一個檔案為loginModel object 內容為

object loginModel {
  val KEYSPACE = "login"
  val USER_TABLE = "user"

  /**
    * login page check username is exist
    * true: return index page
    * false: return login page
    */
  def authenticate(name:String,password:String,cassandra_session:Session): Boolean= {
    val query_by_user = QueryBuilder.select().from(KEYSPACE,USER_TABLE).where(QueryBuilder.eq("name",name))
    val resultByUser = cassandra_session.executeAsync(query_by_user)
    val data =  resultByUser.get().one()
    cassandra_session.closeAsync()
    if(data == null){
      println("check username is not null ")
      true
    }else{
      if( (name.equals(data.getString("name")) && password.equals(data.getString("password")) ) ){
        println("success good")
        false
      }else{
    println("username and password fail")
        true}}
  }}



新增html一個 login.scala.html,透過後端程式將資料傳送至html

login.scala.html

如果對於Play Framework 不太瞭解可以參考 Play官方Form說明
最後加入在conf/routes加入以下資訊,讓play framework可以取得到Controller資訊

# Home page
GET     /index                      controllers.Application.index
#login
GET     /                           controllers.Login.login
POST    /login                      controllers.Login.authenticate
GET     /login                      controllers.Login.logout

啟動Intellij IDE Test project

登入畫面

登入失敗


登入成功

如果有任何問題歡迎來信 討論與交流 Thanks!


References:
https://hub.docker.com/
https://hub.docker.com/_/cassandra/
http://www.datastax.com/
http://www.datastax.com/what-we-offer/products-services/devcenter

MultiBroker for Kafka Cluster on Ubuntu

              Apache Kafka 
A high-throughput distributed messaging system.
Kafka簡介
Apache Kafka 是一個傳送訊息的系統,原本開發來自Linkedln,用作Linkedln的活動資料傳,和數據處理管道的基礎,現在它已被不同類型的大公司作為多種類型的資料數據傳送系統使用。

Apache Kafka是一種分散式的,高吐量,速度快同時支持即時與離線兩種解決方案。在建置Apache Kafka Cluster需要Apache Zookeeper,建置Kafka 分散式平台時建議3台主機,Kafka也是可以使用單機平台。

Kafka Cluster安裝條件
我們使用三台虛擬機建置Kafka Cluster平台
Ubuntu 14.04版本
Java8
至官方KafkaZookeeper下載

1. Install Zookeeper
First Download Zookeeper
Zookeeper  3.4.8
使用Wget 取得Zookeeper檔案

user@host:~$ wget http://apache.stu.edu.tw/zookeeper/zookeeper-3.4.8/zookeeper-3.4.8.tar.gz

將下載完的Zookeeper解壓縮

user@host:~$ tar -xzf zookeeper-3.4.8.tar.gz
user@host:~$ cd zookeeper-3.4.8

將zookeeper conf檔複製成zoo.cfg
user@host:~/zookeeper-3.4.8$ cp conf/zoo_sample.cfg con/zoo.cfg
user@host:

編輯zoo.cfg檔案

user@host:~/zookeeper-3.4.8$ vi conf/zoo.cfg
tickTime=2000 
initLimit=10 
syncLimit=5 
dataDir=/tmp/zookeeper    //Zookeeper 資料儲存路徑
clientPort=2182           //使用Zookeeper Port 
server.1=zoo1:2888:3888,zoo2:2888:3888,zoo3:2888:3888

說明一下Server.1: 是Zookeeper Cluster 我們將使用三台虛擬機以 ","格開,設定完成Zookeeper 複製zoo.cfg檔案到其他虛擬機

user@host:~/zookeeper-3.4.8$ scp conf/zoo.cfg user@zoo2:~/zookeeper-3.4.8/conf
user@host:~/zookeeper-3.4.8$ scp conf/zoo.cfg user@zoo3:~/zookeeper-3.4.8/conf

create zookeeper myid , so each node can identify itself

//Server1
user@Server1:~$ echo "1" > /var/lib/zookeeper/myid
//Server2
user@Server2:~$ echo "2" > /var/lib/zookeeper/myid
//Server3
user@Server3:~$ echo "3" > /var/lib/zookeeper/myid

2.Install Kafka
Download Kafka 0.8.2.2
使用Wget 取得Kafka檔案

user@host:~$ wget http://apache.stu.edu.tw/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz
user@host:~$

解壓縮 Kafka檔案
user@host:~$ tar -xzf kafka_2.10-0.8.2.2.tgz
user@host:~$ cd kafka_2.10-0.8.2.2

編輯Kafka Servier.properties
user@host:~/kafka_2.10-0.8.2.2$ vi server.properties  
user@host:

修改zoo1虛擬機 broker.id
broker.id=1 
port=6667 
zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181

修改zoo2虛擬機 broker.id
broker.id=2 
port=6667  
zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181

修改zoo3虛擬機 broker.id
broker.id=3 
port=6667  
zookeeper.connect=zoo1:2181,zoo2:2181,zoo3:2181

3. Start Application 

Start Zookeeper Cluster Server1 to Server3
user@host:~$ zookeeper-3.4.8/bin/zkServer.sh start"
user@host:

Start Kafka Cluster Server1 to Server3

nohup kafka_2.10-0.8.2.2/bin/kafka-server-start.sh kafka_2.10-0.8.2.2/config/server.properties > /dev/null 2>&1 &

4. Test Kafka and Zookeeper
user@host:~$ zookeeper-3.4.8/bin/zkCli.sh -server Server1:2181,Server2:2181,Server3:2181
 
 
[zk: Server1:2181,Server2:2181,Server3:2181(CONNECTED) 0] ls /
// zookeeper data
[controller_epoch, controller, brokers, zookeeper, admin, consumers, config]

Connect Zookeeper Cluster

open  Terminal input command line
//create kafka topic  test
user@host:~$ kafka_2.10-0.8.2.2/bin/kafka-topics.sh --create --zookeeper Server1:2181,Server2:2181,Server3:2181 --partitions 3 --replication-factor 1 --topic test

Open Terminal Application
//send message

user@host:~$ kafka_2.10-0.8.2.2/bin/kafka-console-producer.sh --zookeeper Server1:6667,Server2:6667,Server3:6667 --topic test 
 
[2016-04-01 16:45:26,695] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
message,123

open Terminal Application
//accept message

user@host:~$ kafka_2.10-0.8.2.2/bin/kafka-console-consumer.sh --zookeeper Server1:2181,Server2:2181,Server3:2181 --topic test
message,123

5. Download Kafka Tool UI

Download KafkaOffsetMonitor-assembly-0.2.1.jar Website toole
KafkaFile

Start Kafka Offset Tools

nohup java -cp /user/KafkaOffsetMonitor-assembly-0.2.1.jar com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--zk Server1:2181,Server2:2181,Server3:2181 \
--port 9090 \
--refresh 10.seconds \
--retain 2.days > /dev/null 2>&1 &

open browser http://localhost:9090












References:
http://kafka.apache.org/
https://zookeeper.apache.org/
https://github.com/quantifind/KafkaOffsetMonitor

How to Install Spark Cluster



 Hi All 今天要介紹的是如何建置一套Spark Cluster分散式平台,Spark是一套機器學習靈活的計算分散式框架平台,適合做批次處理、資料流資理,Spark SQL,機器學習等不同的應用。因些Spark可應用在廣泛的大數據計算引擎。

● Spark 平台框架圖


1.下載Spark 
首先到Spark官方網站下載 Spark 1.6.0 ,選擇需要下載的版本及對應Hadoop版本。
**注意**Spark 官方所提供的Scala 版本為2.10,如果你想要製作在Scala 2.11需要重新Builder Source Code才可以。 
Download Page  http://spark.apache.org/downloads.html 

使用Wget 下載檔案,將檔案解壓縮
1
2
user@Master:~$ wget http://ftp.mirror.tw/pub/apache/spark/spark-1.6.0/spark-1.6.0-bin-hadoop2.6.tgz
user@Master:~$ tar -xzvf spark-1.6.0-bin-hadoop2.6.tgz


2. Configurations 設定 
我們將使用三台虛擬器,建置Spark Cluster分散式平台
  • 新增spark-env.sh
複製 con/spark-env.sh.template 新增一個 spark-env.sh檔案
    1
    user@Master:~$ cp con/spark-env.sh.template spark-env.sh
    

    • 編輯Spark-env.sh檔案
    1
    user@Master:~$ vi ~/spark/conf/spark-env.sh
    

    • 設定Hadoop路徑
    1
    HADOOP_CONF_DIR=~/hadoop/etc/hadoop/
    

    • 設定Spark JDBC Dirver,如果需要使用到mssql或是mysql將jar加入即可
    1
    SPARK_CLASSPATH=~/spark/extraClass/sqljdbc41.jar
    

      • 新增Spark slaves 
      複製 conf/slaves.template,改成 conf/slaves檔案
      1
      user@Master:~$ cp slaves.template slaves
      

      • 編輯Configure Slaves.
      編輯slaves檔案加入我們三台虛擬機器,我們直接使用hostname,需先設定/etc/hosts檔案,將三台虛擬器機名稱加入。
      ** 注意**如果沒有加入hostaname,直接加入hostname在Spark slave,Spark Cluster將無法找到Master與slave設備 
      1
      user@Master:~$ sudo vi /etc/hosts 
      

      1
      2
      3
      192.168.2.100 master
      192.168.2.101 slave1
      192.168.2.102 salve2 
      


      1
      user@Master:~$ vi ~/spark/conf/slaves
      

      1
      2
      3
      master
      slave1
      slave2
      


      • 編輯spark-defaults.conf,指定Spark other Port,如不需要請跳過此步至下一步 
      複製conf/spark-defaults.conf.template,改成conf/spark-defaults.conf 檔案
      1
      user@Master:~$ cp sparkdefaults.conf.template spark-defaults.conf
      

      1
      user@Master:~$ vi ~/spark/conf/spark-defaults.conf
      

      1
      2
      3
      4
      5
      6
      spark.driver.port   51810
      spark.fileserver.port   51811
      spark.broadcast.port   51812
      spark.replClassServer.port  51813
      spark.blockManager.port   51814
      spark.executor.port   51815
      




      3. 啟動 Spark Cluster
       啟動Spark Cluster有兩種方法:
      1.啟動start-all.sh,至動作自動將會啟動master及slave Cluster Server
      2.啟動start-master.sh及start-slave.sh
      1
      user@Master:~$ spark/sbin/start-all.sh
      

      透過jps查看所以Spark Cluster是否啟動成功 
      1
      user@Master:~$ jps
      

      1
      2
      3
      4
      15398 ResourceManager
      18483 NameNode
      1633 Master
      1733 Jps
      

      1
      user@slave1:~$ jps
      

      1
      2
      3
      4
      14398 Worker
      1283 NodeManager
      1433 DataNode
      1633 Jps
      







      How to Install Elasticsearch Cluster


      Elasticsearch Cluster建置



      Elasticsearch 底層為Apache Lucene 開源搜尋引擎,以分散式平台,十分高效能。
      主要優點:
          分散式儲存。
          簡單新增Cluster
          搜尋速度快
      設備:
          ubuntu server 14.04
         Java 8
         Elasticsearch 2.1.1

      1. 安裝Elasticsearch之前需要安裝JAVA
      首先使用apt-get新增JAVA Repository
      sudo add-apt-repository ppa:webupd8team/java
      sudo apt-get update
      sudo apt-get install oracle-java8-installer
      測試是否安裝成功 java -version
      設定JAVA_HOME
      sudo vi /etc/bash.bashrc
      #加入JAVA_HOME
      export JAVA_HOME=/usr/lib/jvm/java-8-oracle
      export PATH=$JAVA_HOME/bin:$PATH
      


      2.安裝Elasticsearch Cluster三台集群 使用ubuntu 14.04版本
      Elasticsearch官方下載檔案,使用的檔案為deb,後面階段要讓Elasticsearch開機時自動啟動所以選擇deb,可以選擇其他檔案格式。
      wget https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-2.1.1.deb 
      下載完畢後使用dpkg安裝deb
      sudo dpkg -i elasticsearch-2.1.1.deb
      安裝完成將 Elasticsearch設定自動啟動功能,重新開機測試是否正常
      sudo update-rc.d elasticsearch defaults
      查看是否有啟動成功
      curl -X GET 'http://localhost:9200'
      
      Elasticsearch Server將會回傳json格式
      {
        "name" : "test",
        "cluster_name" : "Neil",
        "version" : {
          "number" : "2.1.1",
          "build_hash" : "8ff36d139e16f8720f2947ef62c8167a888992fe",
          "build_timestamp" : "2015-01-27T13:32:39Z",
          "build_snapshot" : false,
          "lucene_version" : "5.4.1"
        },
        "tagline" : "You Know, for Search"
      }

      手動啟動
      sudo service elasticsearch start
      3.設定yml檔
      設定elasticsearch.yml檔案路徑,權限問題無法使用ls查看,直接修改即可
      sudo vi /etc/elasticsearch/elasticsearch.yml
      Master Server設定
      cluster.name: neil
      node.name: ${HOSTNAME}
      
      node.master: true node.data: false
      #指定10.1.0.5為Master Server
      discovery.zen.ping.unicast.hosts: ["10.1.0.5"]
      discovery.zen.ping.multicast.enabled: false
      discovery.zen.minimum_master_nodes: 1 
      
      Slave node-1 設定
      cluster.name: neil
      node.name: ${HOSTNAME}
      node.master: false
      node.data: true
      #指定10.1.0.5為Master Server
      discovery.zen.ping.unicast.hosts: ["10.1.0.5"]
      discovery.zen.ping.multicast.enabled: false
      discovery.zen.minimum_master_nodes: 1 
      


      Slave node-2 設定
      cluster.name: neil
      node.name: ${HOSTNAME}
      node.master: false
      node.data: true
      #指定10.1.0.5為Master Server
      discovery.zen.ping.unicast.hosts: ["10.1.0.5"]
      discovery.zen.ping.multicast.enabled: false
      discovery.zen.minimum_master_nodes: 1 

      Elasticsearch現在已經更新到5.x多版本了,設定大同小異沒有什麼變動,如果你需要用以上的設定安裝5.x 版本一樣也是可以使用的,差別在於plugin安裝,如果安裝elasticsearch-head不能跟舊版本一樣使用 plugin install elasticsearch-ehad會出問題 , 需要安裝grunt來當 Server,啟動Elasticsearch-head即可。

      資料來源:
       https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html