整理了一下Cassandra3的简单通信代码,包括增删改查。
CassandraTest.java
package com.neohope.cassandra.test; import com.datastax.driver.core.*; import com.datastax.driver.core.querybuilder.*; import java.util.Iterator; public class CassandraTest { private static Session session = null; private static Cluster cluster = null; static { QueryOptions options = new QueryOptions(); options.setConsistencyLevel(ConsistencyLevel.QUORUM); cluster = Cluster.builder() .addContactPoint("172.16.172.23") .addContactPoint("172.16.172.24") .addContactPoint("172.16.172.25") .withPort(9042) //.withCredentials("username", "password") .withQueryOptions(options) .build(); } /* * 获取未指定keyspace的session */ protected static Session GetSession() { return GetSession(null); } /* * 获取指定keyspace的Session * * @keySpace keyspace名称 * */ protected static Session GetSession(String keySpace) { if(session==null || session.isClosed()) { if(keySpace!=null) { session = cluster.connect(keySpace); } else { session = cluster.connect(); } } return session; } /* * 关闭Session */ protected static void CloseSession() { if(session!=null ) { if(!session.isClosed()) { session.close(); } session = null; } } /* * 关闭连接 */ protected static void CloseConnection() { CloseSession(); if(cluster!=null ) { if(!cluster.isClosed()) { cluster.close(); } cluster = null; } } /* * 执行SQL * * @ss session实例 * * @sql 语句 */ public static void ExecuteNoQuery(Session ss, String sql) { ss.execute(sql); } /* * 执行BoundStatement * * @ss session实例 * * @bindStatement BoundStatement */ public static void ExecuteNoQuery(Session ss, BoundStatement bindStatement) { ss.execute(bindStatement); } /* * 执行BBatchStatement * * @ss session实例 * * @batchStatement BatchStatement */ public static void ExecuteNoQuery(Session ss, BatchStatement batchStatement) { ss.execute(batchStatement); } /* * 新建keyspace * * @ss session实例 * * @keySpaceName keyspace名称 * * @strategyClass Strategy Class名称 * * @replicationFactor replication系数 * */ public static void CreateKeySpace(Session ss, String keySpaceName, String strategyClass, int replicationFactor){ String sql = "Create keyspace "+keySpaceName+" WITH replication = {'class': '"+strategyClass+"', 'replication_factor': "+replicationFactor+"};;"; ExecuteNoQuery(ss, sql); } /* * 删除keyspace * * @ss session实例 * * @keySpaceName keyspace名称 */ public static void DropKeySpace(Session ss, String keySpaceName) { String sql = "Drop keyspace "+keySpaceName+";"; ExecuteNoQuery(ss, sql); } /* * 新建表 * * @ss session实例 * * @tableName 表名 */ public static void CreateTable(Session ss, String tableName, String[] colName, String[] colType, String primaryKey){ String sql = "Create table " + tableName + "("; for(int i = 0;i<colName.length && i<colType.length;i++) { sql+=" "+colName[i] + " " + colType[i] + ","; } sql+=" Primary Key(" + primaryKey + "));"; ExecuteNoQuery(ss, sql); } /* * 删除表 * * @ss session实例 * * @tableName 表名 */ public static void DropTable(Session ss, String tableName) { String sql = "Drop table "+tableName+";"; ExecuteNoQuery(ss, sql); } /* * 新增或更新多列数据 * * @ss session实例 * * @tableName 表名 * * @colNames 列名 * * @values 列值 * */ public static void PutData(Session ss, String tableName,String[] colNames, Object[] values) { QueryBuilder builder = new QueryBuilder(cluster); Insert insert = builder.insertInto(tableName).values(colNames,values); ss.execute(insert); System.out.println("put data is succeed!"); } /* * 更新多列数据 * * @ss session实例 * * @tableName 表名 * * @setColValue Set列名 * * @setColValue Set列值 * * @whereCloName 列名 * * @whereColValue 列值 * */ public static void UpdateData(Session ss, String tableName,String setCloName,Object setColValue, String whereCloName, Object whereColValue) { QueryBuilder builder = new QueryBuilder(cluster); Update.Where update = builder.update(tableName).with((QueryBuilder.set(setCloName, setColValue))).where(QueryBuilder.eq(whereCloName, whereColValue)); ss.execute(update); System.out.println("update data is succeed!"); } /* * 根据列名与列值进行查询 * * @ss session实例 * * @tableName 表名 * * @whereCloName 列名 * * @whereColValue 列值 */ public static void GetData(Session ss, String tableName,String whereCloName, Object whereColValue) { QueryBuilder builder = new QueryBuilder(cluster); Select.Where select = builder.select().from(tableName).where(QueryBuilder.eq(whereCloName, whereColValue)); ResultSet rs = ss.execute(select); Iterator it = rs.iterator(); while (it.hasNext()) { Row row = (Row)it.next(); System.out.println("col01 is "+row.get("col01",int.class)); System.out.println("col02 is "+row.get("col02",String.class)); System.out.println("col03 is "+row.get("col03",String.class)); } System.out.println("put data is succeed!"); } /* * 根据列名与列值进行删除操作 * * @ss session实例 * * @tableName 表名 * * @cloName 列名 * * @colValue 列值 */ public static void DeleteData(Session ss, String tableName,String whereCloName, Object whereColValue) { QueryBuilder builder = new QueryBuilder(cluster); Delete.Where delete = builder.delete().from(tableName).where(QueryBuilder.eq(whereCloName, whereColValue)); ss.execute(delete); System.out.println("delete data is succeed!"); } /* * 获取带Keyspace的表名 * * @keySpaceName keyspace name * * @tableName 表名 * */ public static String GetTableNameWithKeyspace(String keySpaceName, String tableName) { return keySpaceName+"."+tableName; } public static void main(String[] args) throws Exception { String keySpaceName = "neoks"; String tableName = "neotb"; String colNames[] = {"col01","col02","col03"}; String colTypes[] = {"int","text","text"}; String primaryKey = "col01"; //新建Keyspace //Session ss = GetSession(); //CreateKeySpace(ss, keySpaceName,"SimpleStrategy", 2); //新建表 Session ss = GetSession(keySpaceName); //CreateTable(ss, tableName, colNames, colTypes, primaryKey); //插入数据 Object values01[] = {1,"r01c02","r01c03"}; Object values02[] = {2,"r02c02","r02c03"}; Object values03[] = {3,"r02c02","r02c03"}; //PutData(ss,tableName,colNames,values01); //PutData(ss,tableName,colNames,values02); //PutData(ss,tableName,colNames,values03); //查询数据 //GetData(ss,tableName,primaryKey,1); //更新数据 //UpdateData(ss,tableName,"col02","I am updated",primaryKey,2); //GetData(ss,tableName,primaryKey,2); //删除数据 //DeleteData(ss,tableName,primaryKey,3); //删除表 //DropTable(ss, tableName); //删除keyspace DropKeySpace(ss, keySpaceName); CloseConnection(); } }