Spring boot 3 OpenApi 2.0

                     Spring Boot 3 Using SpringDoc OpenApi 2.0.0-M5

在Spring boot 3 版本需要使用Swagger 時,如果使用 io.springfox 是無法使用,因未支援Spring boot 3版本號,目前只支援到Spring boo 2.x版本 ,而Spring boot 3要使用Swagger 的話,需要使用io.springdoc OpenApi 2.x版本,目前opena 在maven上只能找到1.6.11版本 ,無法找到2.0版本 , 2.0版是另外查詢的,OpenApi 2.0


新增pom.xml library   ${com.swagger.spring.boot.V3.version}= 2.0.0.M5

<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>${com.swagger.spring.boot.V3.version}</version>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-api</artifactId>
<version>${com.swagger.spring.boot.V3.version}</version>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-common</artifactId>
<version>${com.swagger.spring.boot.V3.version}</version>
</dependency>

建立Swagger.java 

透過@Configuartion 標注,提供Spring載入設定

新增在Swagger 預設的header參數,例如token , user , version之類,設定Swagger , 類分


import io.swagger.v3.oas.annotations.enums.ParameterIn;
import io.swagger.v3.oas.models.*;
import io.swagger.v3.oas.models.info.Info;
import io.swagger.v3.oas.models.media.NumberSchema;
import io.swagger.v3.oas.models.media.StringSchema;
import io.swagger.v3.oas.models.parameters.Parameter;
import org.springdoc.core.customizers.OperationCustomizer;
import org.springdoc.core.models.GroupedOpenApi;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.servlet.config.annotation.EnableWebMvc;

import java.util.ArrayList;
import java.util.List;


/**
* @author neillin
* @since 2022/10/02 - 9:34 AM
*/
@Configuration
@EnableWebMvc
public class SwaggerConfig {

@Value("${app.version}")
private String apVersion;

/**
* default Parameters "X-Auth-Token","userId","version" header 參數
* @return List<OperationCustomizer>
*/
private List<OperationCustomizer> setDefaultHeader(){
List<OperationCustomizer> operationCustomizers = new ArrayList<>();
//default header type
String[] header = {"X-Auth-Token","userId","version"};
String[] headerName = {"授權驗証","user_id","版本號"};

for (int i = 0; i < header.length; i++) {
String setHeader = header[i];
String setHeaderName = headerName[i];
OperationCustomizer operationCustomizer = new OperationCustomizer() {
@Override
public Operation customize(Operation operation, HandlerMethod handlerMethod) {
Parameter customHeaderVersion = new Parameter();

if(setHeader.equals("userId")) {
customHeaderVersion.in(ParameterIn.HEADER.toString())
.name(setHeader)
.description(setHeaderName)
.schema(new NumberSchema()) //long
.required(false);
} else {
customHeaderVersion.in(ParameterIn.HEADER.toString())
.name(setHeader)
.description(setHeaderName)
.schema(new StringSchema()) //string
.required(false);
}
operation.addParametersItem(customHeaderVersion);
return operation;
}
};
operationCustomizers.add(operationCustomizer);
}
return operationCustomizers;
}

/**
* @return GroupedOpenApi
*/
@Bean
public GroupedOpenApi publicApi() {
return GroupedOpenApi.builder()
.group("OpenApi") //顯示在Select a definition 可以分層,
.pathsToMatch("/**") //全部 controller
.build()
.addAllOperationCustomizer(this.setDefaultHeader());
}


@Bean
public OpenAPI springShopOpenAPI(){
return new OpenAPI()
.components(new Components())
.info(new Info()
.title("OpenApi-API服務")
.version(apVersion)
.description("API服務"))
.externalDocs(new ExternalDocumentation()
.description("Spring-boot v3 + OpenAPI 3")
.url("https://springdoc.org/v2") );
}



}


在Controller 加入Swagger,  實做每一個Api標題 (可參考官方網站)

如果使用SpringFox Swagger 與SpringDoc OpenApi的 Annotations,需要調整

@Api → @Tag

@ApiIgnore → @Parameter(hidden = true) or @Operation(hidden = true) or @Hidden

@ApiImplicitParam → @Parameter

@ApiImplicitParams → @Parameters

@ApiModel → @Schema

@ApiModelProperty(hidden = true) → @Schema(accessMode = READ_ONLY)

@ApiModelProperty → @Schema

@ApiOperation(value = "foo", notes = "bar") → @Operation(summary = "foo", description = "bar")

@ApiParam → @Parameter

@ApiResponse(code = 404, message = "foo") → @ApiResponse(responseCode = "404", description = "foo")


@RestController
@RequestMapping
("/api")
@Slf4j
@Tag
(name = "api", description = "測試")
public class TestController {

@Resource
private TestDBService testDBService;



@Operation(summary = "測試db data", description = "測試es ",parameters = {@Parameter(name = "userId" , description = "用戶id")})
@ApiResponse(responseCode = "2xx" , description = "成功")
@GetMapping(value = "/db/{userId}" , produces = APPLICATION_JSON_VALUE)
public Object getDB(@PathVariable Long userId){
Object data = testDBService.findByUserId(userId);
return success(data);
}


Swagger Ui default 為swagger-ui.html , 也是可以自行定義需要的名稱。

開啟Swagger 網址 http://localhost:8080/swagger-ui.html 就會出現下面畫面,可以看到剛剛設定的X-Auth-Token header及api所需要帶入的參數。


Github:

https://github.com/neiltw/springboot3-openapi-2.0

References:

https://springdoc.org/v2/

Spring boot Mybatis-Plus Dynamic Connection (自動切換資料源)


Spring boot Mybatis Plus Dynamic change Connection


說明:

    Spring boot 自動切換連線Databases 資料源,可以透過 Spring jdbc AbstractRoutingDataSource 切換資料源,如果想要深入了解怎麼執行運做可以看一下Spring jdbc source code ,接下來我們來說說如何運用 AbstractRoutingDataSource 做到自動切換資料源。


首先Databases是Master與Slave,自動切換讀寫分離

定義兩個 annotation @EnableMybatisCluster @MybatisClusterMaster  

@Retention(RetentionPolicy.RUNTIME)
@Documented
@Target(ElementType.TYPE)
@Import({ MultiDataSourceConfig.class, MybatisPlusConfig.class})
public @interface EnableMybatisCluster {
}

當Query 需要指定為Master執行時使用,直接加在ServiceImpl mehtod 
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MybatisClusterMaster {
}

創建 Constant , 取讀Databases yaml  名稱
public interface DataSourceConstant {
interface Master {
String JDBC_VALUE = "spring.dynamic.datasource.master";

String DATA_SOURCE = "dataSourceMaster";
}

interface Slave {
String JDBC_VALUE = "spring.dynamic.datasource.slave";

String DATA_SOURCE = "dataSourceSlave";
}

interface ResourceMapper {
String CLASSPATH_PATH = "classpath:mapper/*.xml";
}

}

創建 Enums , 如果本身有多台Slave,可自行加入多台Slave名稱。
public enum DBTypeEnum {
/**
* MASTER, SLAVE
*/
MASTER, SLAVE;
}


創建DataSourceConfig ,建立Bean Master , Slave DataSource , DataSource 預設值為Master,未指定時都以Master為主。

@Configuration
@Slf4j
public class MultiDataSourceConfig {

/**
* DataSource
*
* @return Master DataSource
*/
@Bean(name = DataSourceConstant.Master.DATA_SOURCE)
@ConfigurationProperties(prefix = DataSourceConstant.Master.JDBC_VALUE)
@Primary
public DataSource dataSourceMaster() {
return DataSourceBuilder.create().build();
}

/**
* DataSource
*
* @return Slave1 DataSource
*/
@Bean(name = DataSourceConstant.Slave.DATA_SOURCE)
@ConfigurationProperties(prefix = DataSourceConstant.Slave.JDBC_VALUE)
public DataSource dataSourceSlave() {
return DataSourceBuilder.create().build();
}


/**
* default master
* @param masterDataSource 主要資料庫
* @param slaveDataSource 備用資料庫
* @return DataSource
*/
@Bean
public DataSource myRoutingDataSource(@Qualifier(DataSourceConstant.Master.DATA_SOURCE) DataSource masterDataSource,
@Qualifier(DataSourceConstant.Slave.DATA_SOURCE) DataSource slaveDataSource) {
Map<Object, Object> targetDataSources = new HashMap<Object, Object>();
targetDataSources.put(DBTypeEnum.MASTER, masterDataSource);
targetDataSources.put(DBTypeEnum.SLAVE, slaveDataSource);
MyRoutingDataSource myRoutingDataSource = new MyRoutingDataSource();
myRoutingDataSource.setDefaultTargetDataSource(masterDataSource);//設置預設為master
myRoutingDataSource.setTargetDataSources(targetDataSources);
return myRoutingDataSource;
}

}

創建 Mybatis-Plus SqlSessionFactory, 其中有使用到Mybatis Plugin Interceptor , 後續會說明為什麼會需要。

@EnableTransactionManagement
@Configuration
public class MybatisPlusConfig {

@Resource(name = "myRoutingDataSource")
private DataSource myRoutingDataSource;

@Resource
private MybatisPluginInterceptor mybatisPluginInterceptor;

/**
* @return sqlSessionFactory
* @throws Exception
*/
@Bean
public SqlSessionFactory sqlSessionFactory() throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(myRoutingDataSource);
sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(DataSourceConstant.ResourceMapper.CLASSPATH_PATH));

sqlSessionFactoryBean.setPlugins(mybatisPluginInterceptor );
// MybatisConfiguration mybatisConfiguration = new MybatisConfiguration();
// sqlSessionFactoryBean.setConfiguration(mybatisConfiguration);
return sqlSessionFactoryBean.getObject();
}


/**
* 事務配置
* @return 事務管理器
*/
@Bean
public DataSourceTransactionManager transactionManager() {
DataSourceTransactionManager tx = new DataSourceTransactionManager();
tx.setDataSource(myRoutingDataSource);
return tx;
}
}


創建 MyRoutingDataSource ,繼承AbstractRoutingDataSource 
@Slf4j
public class MyRoutingDataSource extends AbstractRoutingDataSource {
@Nullable
@Override
protected Object determineCurrentLookupKey() {
log.debug("線程[{}],切换到的資料庫為:{}", Thread.currentThread().getId(), DBContextHolder.get());
return DBContextHolder.get();
}
}

創建 DBContextHolder , 自動切換目前資料源,假設有多台Slave可自行定義調整連線數超過一定數量後更換其他台Slave。
@Slf4j
public class DBContextHolder {

private static final ThreadLocal<DBTypeEnum> contextHolder = new ThreadLocal<DBTypeEnum>();

private static final AtomicInteger counter = new AtomicInteger(-1);

public static void set(DBTypeEnum dbType) {
contextHolder.set(dbType);
}

public static DBTypeEnum get() {
return contextHolder.get();
}

public static void master() {
remove();
log.info("使用的資料庫: MASTER ...");
set(DBTypeEnum.MASTER);
}

/**
* 如果有多台slave可切換,default 1台slave
*/
public static void slave() {
remove();
log.info("使用的資料庫: SLAVE ....");
int index = counter.getAndIncrement() % 2;
if (counter.get() > 9999) { //當連線數超過9999時切換其他台Slave
counter.set(-1);
}
if (index == 0) {
set(DBTypeEnum.SLAVE);
} else {
set(DBTypeEnum.SLAVE);
}
}

/**
* remove 線程
*/
public static void remove(){
contextHolder.remove();
}
}

透過Spring AbstractRoutingDataSource 自動切換資料源,可以有很多個方法,
本身是使用 Spring AOP 及Mybatis-plus 的Plugin 做自動切換功能。為什麼不直接用Spring AOP就好,還需要使用Mybatis-Plus Plugin ,就是要確保在執行insert update , delete 時, 不使用到Slave 而是使用Master。
因本身AOP是直接攔截Service, 而不是 Mapper , 會遇到的問題是,當第一個執行為Select 時,Connection是連線Slave,在執行第二個 insert  or  update是不會切換成Master,因為是同一個線程,所以導致在 insert , update時會寫入Slave而不是Master ,後續才會用 Mybatis Plugin做補強。
@Aspect
@Component
@Slf4j
public class DataSourceAop {
/**
* select using slave
*/
@Pointcut("" +
"!@annotation(com.mybatis.dynamic.annotation.MybatisClusterMaster) " +
"&& " +
" (execution(* com.myProject.*.service..*.select*(..)) " +
"|| execution(* com.myProject.*.service..*.query*(..)) " +
"|| execution(* com.myProject.*.service..*.find*(..)) " +
"|| execution(* com.myProject.*.service..*.get*(..)) " +
")")
public void readPointcut() {
}

/**
* other using master
*/
@Pointcut("@annotation(com.mybatis.dynamic.annotation.MybatisClusterMaster) " +
"|| execution(* com.myProject.*.service..*.insert*(..)) " +
"|| execution(* com.myProject.*.service..*.add*(..)) " +
"|| execution(* com.myProject.*.service..*.save*(..)) " +
"|| execution(* com.myProject.*.service..*.update*(..)) " +
"|| execution(* com.myProject.*.service..*.edit*(..)) " +
"|| execution(* com.myProject.*.service..*.modify*(..)) " +
"|| execution(* com.myProject.*.service..*.delete*(..)) " +
"|| execution(* com.myProject.*.service..*.remove*(..))"
)
public void writePointcut() {
}

@Before("readPointcut()")
public void read() {
DBContextHolder.slave();
}

@Before("writePointcut()")
public void write() {
DBContextHolder.master();
}
}

創建 MybatisPluginInterceptor 
繼承了Interceptor ,攔截SQL在執行之前,判斷該SQL為 insert , update , delete 自動切換成master , 如果是select 不做轉換。
@Component
@Intercepts({
@Signature(type = Executor.class, method = "update", args = { MappedStatement.class , Object.class }),
@Signature(type = Executor.class, method = "query", args = { MappedStatement.class , Object.class , RowBounds.class, ResultHandler.class, CacheKey.class, BoundSql.class}),
@Signature(type = Executor.class, method = "query", args = { MappedStatement.class , Object.class , RowBounds.class, ResultHandler.class }),

})
public class MybatisPluginInterceptor implements Interceptor {


/**
* 補仃 線程是一致性的,當第一次執行為select 無法切換成master , 透過攔截器自動切換
* @param invocation
* @return Object
*/
public Object intercept(Invocation invocation) throws Throwable {
Object[] objects = invocation.getArgs();
MappedStatement ms = (MappedStatement) objects[0];
boolean synchronizationActive = TransactionSynchronizationManager.isSynchronizationActive();
if(!synchronizationActive){
if( DBContextHolder.get().equals(DBTypeEnum.SLAVE) && ( ms.getSqlCommandType().equals(SqlCommandType.DELETE) || ms.getSqlCommandType().equals(SqlCommandType.UPDATE) || ms.getSqlCommandType().equals(SqlCommandType.INSERT) ) ){
DBContextHolder.master();
}
}
return invocation.proceed();
}

public Object plugin(Object target) {
return Plugin.wrap(target, this);
}

public void setProperties(Properties properties) {
}
}


Github Code:


References

Spark Streaming for Kakfa 資料流串接

Spark Streaming for Kafka 

大家應該都知道Spark 套件,不需要在做說明,今天介紹如何透過Spark Steaming 處理資料,使用Kafka 接收資料,將接收回來資料存入kafka 中,在透過Spark Streaming API取得資料,資料處理完後儲存至DB。

首先需要先建立好Kafka 及Zookeeper Service 平台,接下來就是要寫程式利用Spark Steaming套件取得Kafka Consumer資料。
 
    先將log資料傳送到Kafka Cluster Service,在利用Spark Streaming 將資料取出做ETL或計算,把需要儲存的資料存入DB or NoSQl。
    以下為架構圖:









GithubSource Code

今天的範例為Scala程式語言透過SparkSteaming將Kafka Cluster Topic內的資料收集回來的資料ETL後儲存Elasticsearch DB

建立一個專案為SBT ,將以下的repository 加入Sbt內

//Library
libraryDependencies ++= Seq(
  "org.apache.spark" % "spark-core_2.10" % sparkVers ,
  "org.apache.spark" % "spark-streaming_2.10" % sparkVers   ,
  "org.apache.spark" % "spark-streaming-kafka_2.10" % sparkVers ,
  "org.elasticsearch" % "elasticsearch-spark_2.10" % "2.3.3",
  "com.typesafe.play" % "play-json_2.10" % "2.4.5"
)


Kafka資料格式 id,name,age,address,使用Spark Streaming RDD折解字串,將每一個字串轉成Row回傳完成後,使用DataFrame Filter把需要的Data存入Elasticsearch DB


建立 SparkConf ,設定Elasticsearch IP,
注意:Elasticsearch DB Server 需要在同一個Lan底下,不能透過transport連接Elasticsearch,

class SparkConfig {
  def SparkConfToES = new SparkConf()
    .setAppName("spark streaming kafka")
    .setMaster("local")
    .set("spark.serializer",classOf[KryoSerializer].getName)
    .set("es.nodes", "192.168.1.33")
    .set("es.port", "9200")
}



新增一個Main Class ,建立Spark Streaming, 設定Zookeeper Server
val conf =  SparkConfToES
    val sc = new SparkContext(conf)
    val batchIntervl = 15
    val ssc = new StreamingContext(sc, Seconds(batchIntervl))
    val sqlContext = new SQLContext(sc)
    //kafka group
    val group_id = "receiveScanner"
    // kafka topic
    val topic = Map("testStreaming"-> 1)
    // zk connect
    val zkParams = Map(
      "zookeeper.connect" ->"localhost",
      "zookeeper.connection.timeout.ms" -> "10000",
      "group.id" -> group_id)



說明以下codo是在做什麼動作,透過Streaming 以每15秒讀取Kafka Consumer Data,使用play Json Parse將資料轉成Json格式,折解後將資料轉成RDD[Row],RDD[Row]在轉成DataFrame,使用Spark SQL 過濾 age 小於20以下,在排序age 並將資料寫入Elasticsearch Nosql DB


// Kafka
    val kafkaConsumer = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder](ssc,zkParams,topic,StorageLevel.MEMORY_ONLY_SER)
    val receiveData = kafkaConsumer.map(_._2 )
    // printer kafka data
    receiveData.print()
    receiveData.foreachRDD{ rdd=>
      val transform = rdd.map{ line =>
        val data = Json.parse(line)
        // play json parse
        val id = (data \ "id").asOpt[Int] match { case Some(x) => x; case None => 0}
        val name = ( data \ "name"  ).asOpt[String] match { case Some(x)=> x ; case None => "" }
        val age = (data \ "age").asOpt[Int] match { case Some(x) => x; case None => 0}
        val address = ( data \ "address"  ).asOpt[String] match { case Some(x)=> x ; case None => "" }
        Row(id,name,age,address)
      }
      val transfromrecive = sqlContext.createDataFrame(transform,schameType)
      import org.apache.spark.sql.functions._
      import org.elasticsearch.spark.sql._
      //filter age < 20,to ES database
      transfromrecive.where(col("age").<(20)).orderBy(col("age").asc)
        .saveToEs("member/user",Map("es.mapping.id" -> "id"))
    }



DataFrame Scahme

def schameType =  StructType(
     StructField("id",IntegerType,false)::
       StructField("name",StringType,false)::
       StructField("age",IntegerType,false)::
       StructField("address",StringType,false)::
    Nil  ) 
   }



References
https://spark.apache.org/docs/1.6.2/streaming-kafka-integration.html