Spring boot Mybatis-Plus Dynamic Connection (自動切換資料源)


Spring boot Mybatis Plus Dynamic change Connection


說明:

    Spring boot 自動切換連線Databases 資料源,可以透過 Spring jdbc AbstractRoutingDataSource 切換資料源,如果想要深入了解怎麼執行運做可以看一下Spring jdbc source code ,接下來我們來說說如何運用 AbstractRoutingDataSource 做到自動切換資料源。


首先Databases是Master與Slave,自動切換讀寫分離

定義兩個 annotation @EnableMybatisCluster @MybatisClusterMaster  

@Retention(RetentionPolicy.RUNTIME)
@Documented
@Target(ElementType.TYPE)
@Import({ MultiDataSourceConfig.class, MybatisPlusConfig.class})
public @interface EnableMybatisCluster {
}

當Query 需要指定為Master執行時使用,直接加在ServiceImpl mehtod 
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MybatisClusterMaster {
}

創建 Constant , 取讀Databases yaml  名稱
public interface DataSourceConstant {
interface Master {
String JDBC_VALUE = "spring.dynamic.datasource.master";

String DATA_SOURCE = "dataSourceMaster";
}

interface Slave {
String JDBC_VALUE = "spring.dynamic.datasource.slave";

String DATA_SOURCE = "dataSourceSlave";
}

interface ResourceMapper {
String CLASSPATH_PATH = "classpath:mapper/*.xml";
}

}

創建 Enums , 如果本身有多台Slave,可自行加入多台Slave名稱。
public enum DBTypeEnum {
/**
* MASTER, SLAVE
*/
MASTER, SLAVE;
}


創建DataSourceConfig ,建立Bean Master , Slave DataSource , DataSource 預設值為Master,未指定時都以Master為主。

@Configuration
@Slf4j
public class MultiDataSourceConfig {

/**
* DataSource
*
* @return Master DataSource
*/
@Bean(name = DataSourceConstant.Master.DATA_SOURCE)
@ConfigurationProperties(prefix = DataSourceConstant.Master.JDBC_VALUE)
@Primary
public DataSource dataSourceMaster() {
return DataSourceBuilder.create().build();
}

/**
* DataSource
*
* @return Slave1 DataSource
*/
@Bean(name = DataSourceConstant.Slave.DATA_SOURCE)
@ConfigurationProperties(prefix = DataSourceConstant.Slave.JDBC_VALUE)
public DataSource dataSourceSlave() {
return DataSourceBuilder.create().build();
}


/**
* default master
* @param masterDataSource 主要資料庫
* @param slaveDataSource 備用資料庫
* @return DataSource
*/
@Bean
public DataSource myRoutingDataSource(@Qualifier(DataSourceConstant.Master.DATA_SOURCE) DataSource masterDataSource,
@Qualifier(DataSourceConstant.Slave.DATA_SOURCE) DataSource slaveDataSource) {
Map<Object, Object> targetDataSources = new HashMap<Object, Object>();
targetDataSources.put(DBTypeEnum.MASTER, masterDataSource);
targetDataSources.put(DBTypeEnum.SLAVE, slaveDataSource);
MyRoutingDataSource myRoutingDataSource = new MyRoutingDataSource();
myRoutingDataSource.setDefaultTargetDataSource(masterDataSource);//設置預設為master
myRoutingDataSource.setTargetDataSources(targetDataSources);
return myRoutingDataSource;
}

}

創建 Mybatis-Plus SqlSessionFactory, 其中有使用到Mybatis Plugin Interceptor , 後續會說明為什麼會需要。

@EnableTransactionManagement
@Configuration
public class MybatisPlusConfig {

@Resource(name = "myRoutingDataSource")
private DataSource myRoutingDataSource;

@Resource
private MybatisPluginInterceptor mybatisPluginInterceptor;

/**
* @return sqlSessionFactory
* @throws Exception
*/
@Bean
public SqlSessionFactory sqlSessionFactory() throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(myRoutingDataSource);
sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(DataSourceConstant.ResourceMapper.CLASSPATH_PATH));

sqlSessionFactoryBean.setPlugins(mybatisPluginInterceptor );
// MybatisConfiguration mybatisConfiguration = new MybatisConfiguration();
// sqlSessionFactoryBean.setConfiguration(mybatisConfiguration);
return sqlSessionFactoryBean.getObject();
}


/**
* 事務配置
* @return 事務管理器
*/
@Bean
public DataSourceTransactionManager transactionManager() {
DataSourceTransactionManager tx = new DataSourceTransactionManager();
tx.setDataSource(myRoutingDataSource);
return tx;
}
}


創建 MyRoutingDataSource ,繼承AbstractRoutingDataSource 
@Slf4j
public class MyRoutingDataSource extends AbstractRoutingDataSource {
@Nullable
@Override
protected Object determineCurrentLookupKey() {
log.debug("線程[{}],切换到的資料庫為:{}", Thread.currentThread().getId(), DBContextHolder.get());
return DBContextHolder.get();
}
}

創建 DBContextHolder , 自動切換目前資料源,假設有多台Slave可自行定義調整連線數超過一定數量後更換其他台Slave。
@Slf4j
public class DBContextHolder {

private static final ThreadLocal<DBTypeEnum> contextHolder = new ThreadLocal<DBTypeEnum>();

private static final AtomicInteger counter = new AtomicInteger(-1);

public static void set(DBTypeEnum dbType) {
contextHolder.set(dbType);
}

public static DBTypeEnum get() {
return contextHolder.get();
}

public static void master() {
remove();
log.info("使用的資料庫: MASTER ...");
set(DBTypeEnum.MASTER);
}

/**
* 如果有多台slave可切換,default 1台slave
*/
public static void slave() {
remove();
log.info("使用的資料庫: SLAVE ....");
int index = counter.getAndIncrement() % 2;
if (counter.get() > 9999) { //當連線數超過9999時切換其他台Slave
counter.set(-1);
}
if (index == 0) {
set(DBTypeEnum.SLAVE);
} else {
set(DBTypeEnum.SLAVE);
}
}

/**
* remove 線程
*/
public static void remove(){
contextHolder.remove();
}
}

透過Spring AbstractRoutingDataSource 自動切換資料源,可以有很多個方法,
本身是使用 Spring AOP 及Mybatis-plus 的Plugin 做自動切換功能。為什麼不直接用Spring AOP就好,還需要使用Mybatis-Plus Plugin ,就是要確保在執行insert update , delete 時, 不使用到Slave 而是使用Master。
因本身AOP是直接攔截Service, 而不是 Mapper , 會遇到的問題是,當第一個執行為Select 時,Connection是連線Slave,在執行第二個 insert  or  update是不會切換成Master,因為是同一個線程,所以導致在 insert , update時會寫入Slave而不是Master ,後續才會用 Mybatis Plugin做補強。
@Aspect
@Component
@Slf4j
public class DataSourceAop {
/**
* select using slave
*/
@Pointcut("" +
"!@annotation(com.mybatis.dynamic.annotation.MybatisClusterMaster) " +
"&& " +
" (execution(* com.myProject.*.service..*.select*(..)) " +
"|| execution(* com.myProject.*.service..*.query*(..)) " +
"|| execution(* com.myProject.*.service..*.find*(..)) " +
"|| execution(* com.myProject.*.service..*.get*(..)) " +
")")
public void readPointcut() {
}

/**
* other using master
*/
@Pointcut("@annotation(com.mybatis.dynamic.annotation.MybatisClusterMaster) " +
"|| execution(* com.myProject.*.service..*.insert*(..)) " +
"|| execution(* com.myProject.*.service..*.add*(..)) " +
"|| execution(* com.myProject.*.service..*.save*(..)) " +
"|| execution(* com.myProject.*.service..*.update*(..)) " +
"|| execution(* com.myProject.*.service..*.edit*(..)) " +
"|| execution(* com.myProject.*.service..*.modify*(..)) " +
"|| execution(* com.myProject.*.service..*.delete*(..)) " +
"|| execution(* com.myProject.*.service..*.remove*(..))"
)
public void writePointcut() {
}

@Before("readPointcut()")
public void read() {
DBContextHolder.slave();
}

@Before("writePointcut()")
public void write() {
DBContextHolder.master();
}
}

創建 MybatisPluginInterceptor 
繼承了Interceptor ,攔截SQL在執行之前,判斷該SQL為 insert , update , delete 自動切換成master , 如果是select 不做轉換。
@Component
@Intercepts({
@Signature(type = Executor.class, method = "update", args = { MappedStatement.class , Object.class }),
@Signature(type = Executor.class, method = "query", args = { MappedStatement.class , Object.class , RowBounds.class, ResultHandler.class, CacheKey.class, BoundSql.class}),
@Signature(type = Executor.class, method = "query", args = { MappedStatement.class , Object.class , RowBounds.class, ResultHandler.class }),

})
public class MybatisPluginInterceptor implements Interceptor {


/**
* 補仃 線程是一致性的,當第一次執行為select 無法切換成master , 透過攔截器自動切換
* @param invocation
* @return Object
*/
public Object intercept(Invocation invocation) throws Throwable {
Object[] objects = invocation.getArgs();
MappedStatement ms = (MappedStatement) objects[0];
boolean synchronizationActive = TransactionSynchronizationManager.isSynchronizationActive();
if(!synchronizationActive){
if( DBContextHolder.get().equals(DBTypeEnum.SLAVE) && ( ms.getSqlCommandType().equals(SqlCommandType.DELETE) || ms.getSqlCommandType().equals(SqlCommandType.UPDATE) || ms.getSqlCommandType().equals(SqlCommandType.INSERT) ) ){
DBContextHolder.master();
}
}
return invocation.proceed();
}

public Object plugin(Object target) {
return Plugin.wrap(target, this);
}

public void setProperties(Properties properties) {
}
}


Github Code:


References

Spark Streaming for Kakfa 資料流串接

Spark Streaming for Kafka 

大家應該都知道Spark 套件,不需要在做說明,今天介紹如何透過Spark Steaming 處理資料,使用Kafka 接收資料,將接收回來資料存入kafka 中,在透過Spark Streaming API取得資料,資料處理完後儲存至DB。

首先需要先建立好Kafka 及Zookeeper Service 平台,接下來就是要寫程式利用Spark Steaming套件取得Kafka Consumer資料。
 
    先將log資料傳送到Kafka Cluster Service,在利用Spark Streaming 將資料取出做ETL或計算,把需要儲存的資料存入DB or NoSQl。
    以下為架構圖:









GithubSource Code

今天的範例為Scala程式語言透過SparkSteaming將Kafka Cluster Topic內的資料收集回來的資料ETL後儲存Elasticsearch DB

建立一個專案為SBT ,將以下的repository 加入Sbt內

//Library
libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-core_2.10" % sparkVers ,
  "org.apache.spark" % "spark-streaming_2.10" % sparkVers   ,
  "org.apache.spark" % "spark-streaming-kafka_2.10" % sparkVers ,
  "org.elasticsearch" % "elasticsearch-spark_2.10" % "2.3.3",
  "com.typesafe.play" % "play-json_2.10" % "2.4.5"
)


Kafka資料格式 id,name,age,address,使用Spark Streaming RDD折解字串,將每一個字串轉成Row回傳完成後,使用DataFrame Filter把需要的Data存入Elasticsearch DB


建立 SparkConf ,設定Elasticsearch IP,
注意:Elasticsearch DB Server 需要在同一個Lan底下,不能透過transport連接Elasticsearch,

class SparkConfig {
  def SparkConfToES = new SparkConf()
    .setAppName("spark streaming kafka")
    .setMaster("local")
    .set("spark.serializer",classOf[KryoSerializer].getName)
    .set("es.nodes", "192.168.1.33")
    .set("es.port", "9200")
}



新增一個Main Class ,建立Spark Streaming, 設定Zookeeper Server
val conf =  SparkConfToES
    val sc = new SparkContext(conf)
    val batchIntervl = 15
    val ssc = new StreamingContext(sc, Seconds(batchIntervl))
    val sqlContext = new SQLContext(sc)
    //kafka group
    val group_id = "receiveScanner"
    // kafka topic
    val topic = Map("testStreaming"-> 1)
    // zk connect
    val zkParams = Map(
      "zookeeper.connect" ->"localhost",
      "zookeeper.connection.timeout.ms" -> "10000",
      "group.id" -> group_id)



說明以下codo是在做什麼動作,透過Streaming 以每15秒讀取Kafka Consumer Data,使用play Json Parse將資料轉成Json格式,折解後將資料轉成RDD[Row],RDD[Row]在轉成DataFrame,使用Spark SQL 過濾 age 小於20以下,在排序age 並將資料寫入Elasticsearch Nosql DB


// Kafka
    val kafkaConsumer = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,zkParams,topic,StorageLevel.MEMORY_ONLY_SER)
    val receiveData = kafkaConsumer.map(_._2 )
    // printer kafka data
    receiveData.print()
    receiveData.foreachRDD{ rdd=>
      val transform = rdd.map{ line =>
        val data = Json.parse(line)
        // play json parse
        val id = (data \ "id").asOpt[Int] match { case Some(x) => x; case None => 0}
        val name = ( data \ "name"  ).asOpt[String] match { case Some(x)=> x ; case None => "" }
        val age = (data \ "age").asOpt[Int] match { case Some(x) => x; case None => 0}
        val address = ( data \ "address"  ).asOpt[String] match { case Some(x)=> x ; case None => "" }
        Row(id,name,age,address)
      }
      val transfromrecive = sqlContext.createDataFrame(transform,schameType)
      import org.apache.spark.sql.functions._
      import org.elasticsearch.spark.sql._
      //filter age < 20,to ES database
      transfromrecive.where(col("age").<(20)).orderBy(col("age").asc)
        .saveToEs("member/user",Map("es.mapping.id" -> "id"))
    }



DataFrame Scahme

def schameType =  StructType(
     StructField("id",IntegerType,false)::
       StructField("name",StringType,false)::
       StructField("age",IntegerType,false)::
       StructField("address",StringType,false)::
    Nil  ) 
   }



References
https://spark.apache.org/docs/1.6.2/streaming-kafka-integration.html


Elasticsearch Aggregations Max Sum for Java API

Elasticsearch Aggregations for Java


今天介紹的是使用Elasticsearch Aggregations 計算和加總的功能,什麼是Elasticsearch 的Aggregations,在ES最大的好處是可以用Aggs做計算跟加總,ES已經幫我們做好了計算的套件功能,我們只需要拿來使用就可以,非常的好用又快速。不管是Group by 或Cardinality、Sum使用Aggregations就可以幫你算出你想要的數值。
     例如現在格式為 
{ "download": 12,
  "createtime": "2016-01-01",
  "product": "3c",
  "version": "1.2.3",
  "userid": 11224 }

 我們將download 數值找出Max數值在Sum,我們要如何下ES Aggs指令如下:
curl  -XPOST http://xxx:9200/index/type
{
  "query": {
    "bool": {
      "must": [
        {
          "match_all": {}
        }
      ]
    }
  },
  "size": 0,
  "aggs": {
    "by_group": {
      "terms": {
        "field": "product",
        "size": 0
      },
      "max_download": {
        "max": {
          "field": "download"
        }
      }
    },
    "sum_download": {
      "sum_bucket": {
        "buckets_path": "product&gt;max_download",
        "size": 0
      }
    }
  }
}




  
透過以上指令使用curl查詢,回傳數值是正常,那我們接下來就是用ES所提供的Java API




BoolQueryBuilder query = QueryBuilders.boolQuery().must(QueryBuilders.matchAllQuery());

        TermsBuilder aggs = AggregationBuilders.terms("by_product").field("product").size(0)
                .subAggregation( AggregationBuilders.max("by_max").field("download")).size(0)
                .subAggregation(AggregationBuilders.sum("by_sum").field("by_product&gt;by_max")).size(0);

        SearchResponse ResultSet =  ES(nodes,nodes2,nodes3).prepareSearch("index").setTypes("type")
                .setQuery(query)
                .addAggregation(aggs)
                .setSize(0)
                .get();

        Aggregations result = ResultSet.getAggregations();
        Terms groupby1 =  result.get("by_product");
        for(Terms.Bucket object : groupby1.getBuckets()) {
            Terms bymax = object.getAggregations().get("by_max");
            Terms bysum = object.getAggregations().get("by_sum");
        }



說明一下為什麼在Aggs 為什麼要使用size =0 , size =0 就是將所有的資料顯示出來,如果沒有設定,預設值為10筆資料。另外比較特別的是
Top Hits API size就反過來,需要設定為9999999將所有資料顯示。如果設定為0是不會顯示資料,這要注意一下。
在ResultSet設定size=0,因為我們不需要取得內容所以不用顯示出來。

References
https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-max-aggregation.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-metrics-sum-aggregation.html

Azure swapon 記憶體


Azure swapon 設定


有使用Azure平台的虛擬機的人都知道,在azure VM會有一個mnt這個路徑 ,但是這個路徑是用來放暫時的檔案,當虛擬機重啟後/mnt資料夾內的檔案將會消失。我們今天就要來教各位不要浪費這個空間。使用Swapon增加虛擬機的記憶體,讓虛擬機可以使用更多記憶體。

       首先查詢是如有使用Swapon空間,
Stop1
swapon -s 確認目前的swanp大小,
 Stop2
設定需要的swap記憶體空間,count我們新增54G的swap或是你可以設定更多空間
1048576*54 =54525952 54G空間 
sudo dd if=/dev/zero of=/mnt/swapfile(路徑自選) bs=1024 count=54525952 
Stop3
 接下來將swap空間格式化 
sudo mkswap /mnt/swapfile 
Stop4
啟動swap空間
sudo swapon /mnt/swapfile
查詢是如有啟動swap
swapon -s
Stop5
在來就是設定重啟後自動掛載swap空間,編輯fstab 
sudo vim /etc/fstab 
加入以下文字
/mnt/swapfile swap swap defaults 0 0 

完成,當虛擬機重動後VM將會自動掛載swap空間,讓虛擬機記憶體空間更多,可以試試在azure架設Elasticsearch非常的好用,