Cassandra3简单通讯代码

整理了一下Cassandra3的简单通信代码,包括增删改查。

CassandraTest.java

package com.neohope.cassandra.test;


import com.datastax.driver.core.*;
import com.datastax.driver.core.querybuilder.*;

import java.util.Iterator;

public class CassandraTest {

    private static Session session = null;
    private static Cluster cluster = null;
    static {
        QueryOptions options = new QueryOptions();
        options.setConsistencyLevel(ConsistencyLevel.QUORUM);

        cluster = Cluster.builder()
                .addContactPoint("172.16.172.23")
                .addContactPoint("172.16.172.24")
                .addContactPoint("172.16.172.25")
                .withPort(9042)
                //.withCredentials("username", "password")
                .withQueryOptions(options)
                .build();
    }

    /*
    * 获取未指定keyspace的session
    */
    protected static Session GetSession() {
        return GetSession(null);
    }

    /*
    * 获取指定keyspace的Session
    *
    *  @keySpace keyspace名称
    *
    */
    protected static Session GetSession(String keySpace) {
        if(session==null || session.isClosed())
        {
            if(keySpace!=null) {
                session = cluster.connect(keySpace);
            }
            else
            {
                session = cluster.connect();
            }
        }

        return session;
    }

    /*
   * 关闭Session
   */
    protected static void CloseSession() {
        if(session!=null )
        {
            if(!session.isClosed())
            {
                session.close();
            }

            session = null;
        }
    }

    /*
   * 关闭连接
   */
    protected static void CloseConnection()
    {
        CloseSession();
        if(cluster!=null )
        {
            if(!cluster.isClosed())
            {
                cluster.close();
            }
            cluster = null;
        }

    }

    /*
    * 执行SQL
    *
    * @ss session实例
    *
    * @sql 语句
    */
    public static void ExecuteNoQuery(Session ss, String sql) {
        ss.execute(sql);
    }

    /*
   * 执行BoundStatement
   *
   * @ss session实例
   *
   * @bindStatement BoundStatement
   */
    public static void ExecuteNoQuery(Session ss, BoundStatement bindStatement) {
        ss.execute(bindStatement);
    }

    /*
   * 执行BBatchStatement
   *
   * @ss session实例
   *
   * @batchStatement BatchStatement
   */
    public static void ExecuteNoQuery(Session ss, BatchStatement batchStatement) {
        ss.execute(batchStatement);
    }

    /*
    * 新建keyspace
    *
    * @ss session实例
    *
    * @keySpaceName keyspace名称
    *
    * @strategyClass Strategy Class名称
    *
    * @replicationFactor replication系数
    *
    */
    public static void CreateKeySpace(Session ss, String keySpaceName, String strategyClass, int replicationFactor){
        String sql = "Create keyspace "+keySpaceName+" WITH replication = {'class': '"+strategyClass+"', 'replication_factor': "+replicationFactor+"};;";
        ExecuteNoQuery(ss, sql);
    }

    /*
     * 删除keyspace
     *
     * @ss session实例
     *
     * @keySpaceName keyspace名称
     */
    public static void DropKeySpace(Session ss, String keySpaceName) {
        String sql = "Drop keyspace "+keySpaceName+";";
        ExecuteNoQuery(ss, sql);
    }

    /*
     * 新建表
     *
     * @ss session实例
     *
     * @tableName 表名
     */
    public static void CreateTable(Session ss, String tableName, String[] colName, String[] colType, String primaryKey){
        String sql = "Create table " + tableName + "(";
        for(int i = 0;i<colName.length && i<colType.length;i++)
        {
            sql+=" "+colName[i] + " " + colType[i] + ",";
        }
        sql+=" Primary Key(" + primaryKey + "));";

        ExecuteNoQuery(ss, sql);
    }

    /*
     * 删除表
     *
     * @ss session实例
     *
     * @tableName 表名
     */
    public static void DropTable(Session ss, String tableName) {
        String sql = "Drop table "+tableName+";";
        ExecuteNoQuery(ss, sql);
    }

    /*
    * 新增或更新多列数据
    *
    * @ss session实例
    *
    * @tableName 表名
    *
    * @colNames 列名
    *
    *  @values 列值
    *
    */
    public static void PutData(Session ss, String tableName,String[] colNames, Object[] values) {
        QueryBuilder builder = new QueryBuilder(cluster);
        Insert insert = builder.insertInto(tableName).values(colNames,values);
        ss.execute(insert);
        System.out.println("put data is succeed!");
    }

    /*
    * 更新多列数据
    *
    * @ss session实例
    *
    * @tableName 表名
    *
    * @setColValue Set列名
    *
    * @setColValue Set列值
    *
    * @whereCloName 列名
    *
    * @whereColValue 列值
    *
    */
    public static void UpdateData(Session ss, String tableName,String setCloName,Object setColValue, String whereCloName, Object whereColValue) {
        QueryBuilder builder = new QueryBuilder(cluster);
        Update.Where update = builder.update(tableName).with((QueryBuilder.set(setCloName, setColValue))).where(QueryBuilder.eq(whereCloName, whereColValue));
        ss.execute(update);
        System.out.println("update data is succeed!");
    }

    /*
     * 根据列名与列值进行查询
     *
     * @ss session实例
     *
     * @tableName 表名
     *
     * @whereCloName 列名
     *
     * @whereColValue 列值
     */
    public static void GetData(Session ss, String tableName,String whereCloName, Object whereColValue) {
        QueryBuilder builder = new QueryBuilder(cluster);
        Select.Where select = builder.select().from(tableName).where(QueryBuilder.eq(whereCloName, whereColValue));
        ResultSet rs  = ss.execute(select);
        Iterator it = rs.iterator();
        while (it.hasNext())
        {
            Row row = (Row)it.next();
            System.out.println("col01 is "+row.get("col01",int.class));
            System.out.println("col02 is "+row.get("col02",String.class));
            System.out.println("col03 is "+row.get("col03",String.class));
        }
        System.out.println("put data is succeed!");
    }

    /*
    * 根据列名与列值进行删除操作
    *
    * @ss session实例
    *
    * @tableName 表名
    *
    * @cloName 列名
    *
    * @colValue 列值
    */
    public static void DeleteData(Session ss, String tableName,String whereCloName, Object whereColValue) {
        QueryBuilder builder = new QueryBuilder(cluster);
        Delete.Where delete = builder.delete().from(tableName).where(QueryBuilder.eq(whereCloName, whereColValue));
        ss.execute(delete);
        System.out.println("delete data is succeed!");
    }

    /*
     * 获取带Keyspace的表名
     *
     * @keySpaceName keyspace name
     *
     * @tableName 表名
     *
     */
    public static String GetTableNameWithKeyspace(String keySpaceName, String tableName)
    {
        return keySpaceName+"."+tableName;
    }


    public static void main(String[] args) throws Exception {
        String keySpaceName = "neoks";
        String tableName = "neotb";
        String colNames[] = {"col01","col02","col03"};
        String colTypes[] = {"int","text","text"};
        String primaryKey = "col01";

        //新建Keyspace
        //Session ss =  GetSession();
        //CreateKeySpace(ss, keySpaceName,"SimpleStrategy", 2);

        //新建表
        Session ss =  GetSession(keySpaceName);
        //CreateTable(ss, tableName, colNames, colTypes, primaryKey);

        //插入数据
        Object values01[] = {1,"r01c02","r01c03"};
        Object values02[] = {2,"r02c02","r02c03"};
        Object values03[] = {3,"r02c02","r02c03"};
        //PutData(ss,tableName,colNames,values01);
        //PutData(ss,tableName,colNames,values02);
        //PutData(ss,tableName,colNames,values03);

        //查询数据
        //GetData(ss,tableName,primaryKey,1);

        //更新数据
        //UpdateData(ss,tableName,"col02","I am updated",primaryKey,2);
        //GetData(ss,tableName,primaryKey,2);

        //删除数据
        //DeleteData(ss,tableName,primaryKey,3);

        //删除表
        //DropTable(ss, tableName);

        //删除keyspace
        DropKeySpace(ss, keySpaceName);

        CloseConnection();
    }
}

Leave a Reply

Your email address will not be published. Required fields are marked *

*