整理了一下Cassandra3的简单通信代码,包括增删改查。
CassandraTest.java
package com.neohope.cassandra.test;
import com.datastax.driver.core.*;
import com.datastax.driver.core.querybuilder.*;
import java.util.Iterator;
public class CassandraTest {
private static Session session = null;
private static Cluster cluster = null;
static {
QueryOptions options = new QueryOptions();
options.setConsistencyLevel(ConsistencyLevel.QUORUM);
cluster = Cluster.builder()
.addContactPoint("172.16.172.23")
.addContactPoint("172.16.172.24")
.addContactPoint("172.16.172.25")
.withPort(9042)
//.withCredentials("username", "password")
.withQueryOptions(options)
.build();
}
/*
* 获取未指定keyspace的session
*/
protected static Session GetSession() {
return GetSession(null);
}
/*
* 获取指定keyspace的Session
*
* @keySpace keyspace名称
*
*/
protected static Session GetSession(String keySpace) {
if(session==null || session.isClosed())
{
if(keySpace!=null) {
session = cluster.connect(keySpace);
}
else
{
session = cluster.connect();
}
}
return session;
}
/*
* 关闭Session
*/
protected static void CloseSession() {
if(session!=null )
{
if(!session.isClosed())
{
session.close();
}
session = null;
}
}
/*
* 关闭连接
*/
protected static void CloseConnection()
{
CloseSession();
if(cluster!=null )
{
if(!cluster.isClosed())
{
cluster.close();
}
cluster = null;
}
}
/*
* 执行SQL
*
* @ss session实例
*
* @sql 语句
*/
public static void ExecuteNoQuery(Session ss, String sql) {
ss.execute(sql);
}
/*
* 执行BoundStatement
*
* @ss session实例
*
* @bindStatement BoundStatement
*/
public static void ExecuteNoQuery(Session ss, BoundStatement bindStatement) {
ss.execute(bindStatement);
}
/*
* 执行BBatchStatement
*
* @ss session实例
*
* @batchStatement BatchStatement
*/
public static void ExecuteNoQuery(Session ss, BatchStatement batchStatement) {
ss.execute(batchStatement);
}
/*
* 新建keyspace
*
* @ss session实例
*
* @keySpaceName keyspace名称
*
* @strategyClass Strategy Class名称
*
* @replicationFactor replication系数
*
*/
public static void CreateKeySpace(Session ss, String keySpaceName, String strategyClass, int replicationFactor){
String sql = "Create keyspace "+keySpaceName+" WITH replication = {'class': '"+strategyClass+"', 'replication_factor': "+replicationFactor+"};;";
ExecuteNoQuery(ss, sql);
}
/*
* 删除keyspace
*
* @ss session实例
*
* @keySpaceName keyspace名称
*/
public static void DropKeySpace(Session ss, String keySpaceName) {
String sql = "Drop keyspace "+keySpaceName+";";
ExecuteNoQuery(ss, sql);
}
/*
* 新建表
*
* @ss session实例
*
* @tableName 表名
*/
public static void CreateTable(Session ss, String tableName, String[] colName, String[] colType, String primaryKey){
String sql = "Create table " + tableName + "(";
for(int i = 0;i<colName.length && i<colType.length;i++)
{
sql+=" "+colName[i] + " " + colType[i] + ",";
}
sql+=" Primary Key(" + primaryKey + "));";
ExecuteNoQuery(ss, sql);
}
/*
* 删除表
*
* @ss session实例
*
* @tableName 表名
*/
public static void DropTable(Session ss, String tableName) {
String sql = "Drop table "+tableName+";";
ExecuteNoQuery(ss, sql);
}
/*
* 新增或更新多列数据
*
* @ss session实例
*
* @tableName 表名
*
* @colNames 列名
*
* @values 列值
*
*/
public static void PutData(Session ss, String tableName,String[] colNames, Object[] values) {
QueryBuilder builder = new QueryBuilder(cluster);
Insert insert = builder.insertInto(tableName).values(colNames,values);
ss.execute(insert);
System.out.println("put data is succeed!");
}
/*
* 更新多列数据
*
* @ss session实例
*
* @tableName 表名
*
* @setColValue Set列名
*
* @setColValue Set列值
*
* @whereCloName 列名
*
* @whereColValue 列值
*
*/
public static void UpdateData(Session ss, String tableName,String setCloName,Object setColValue, String whereCloName, Object whereColValue) {
QueryBuilder builder = new QueryBuilder(cluster);
Update.Where update = builder.update(tableName).with((QueryBuilder.set(setCloName, setColValue))).where(QueryBuilder.eq(whereCloName, whereColValue));
ss.execute(update);
System.out.println("update data is succeed!");
}
/*
* 根据列名与列值进行查询
*
* @ss session实例
*
* @tableName 表名
*
* @whereCloName 列名
*
* @whereColValue 列值
*/
public static void GetData(Session ss, String tableName,String whereCloName, Object whereColValue) {
QueryBuilder builder = new QueryBuilder(cluster);
Select.Where select = builder.select().from(tableName).where(QueryBuilder.eq(whereCloName, whereColValue));
ResultSet rs = ss.execute(select);
Iterator it = rs.iterator();
while (it.hasNext())
{
Row row = (Row)it.next();
System.out.println("col01 is "+row.get("col01",int.class));
System.out.println("col02 is "+row.get("col02",String.class));
System.out.println("col03 is "+row.get("col03",String.class));
}
System.out.println("put data is succeed!");
}
/*
* 根据列名与列值进行删除操作
*
* @ss session实例
*
* @tableName 表名
*
* @cloName 列名
*
* @colValue 列值
*/
public static void DeleteData(Session ss, String tableName,String whereCloName, Object whereColValue) {
QueryBuilder builder = new QueryBuilder(cluster);
Delete.Where delete = builder.delete().from(tableName).where(QueryBuilder.eq(whereCloName, whereColValue));
ss.execute(delete);
System.out.println("delete data is succeed!");
}
/*
* 获取带Keyspace的表名
*
* @keySpaceName keyspace name
*
* @tableName 表名
*
*/
public static String GetTableNameWithKeyspace(String keySpaceName, String tableName)
{
return keySpaceName+"."+tableName;
}
public static void main(String[] args) throws Exception {
String keySpaceName = "neoks";
String tableName = "neotb";
String colNames[] = {"col01","col02","col03"};
String colTypes[] = {"int","text","text"};
String primaryKey = "col01";
//新建Keyspace
//Session ss = GetSession();
//CreateKeySpace(ss, keySpaceName,"SimpleStrategy", 2);
//新建表
Session ss = GetSession(keySpaceName);
//CreateTable(ss, tableName, colNames, colTypes, primaryKey);
//插入数据
Object values01[] = {1,"r01c02","r01c03"};
Object values02[] = {2,"r02c02","r02c03"};
Object values03[] = {3,"r02c02","r02c03"};
//PutData(ss,tableName,colNames,values01);
//PutData(ss,tableName,colNames,values02);
//PutData(ss,tableName,colNames,values03);
//查询数据
//GetData(ss,tableName,primaryKey,1);
//更新数据
//UpdateData(ss,tableName,"col02","I am updated",primaryKey,2);
//GetData(ss,tableName,primaryKey,2);
//删除数据
//DeleteData(ss,tableName,primaryKey,3);
//删除表
//DropTable(ss, tableName);
//删除keyspace
DropKeySpace(ss, keySpaceName);
CloseConnection();
}
}