1.使用Client类

import logging
from apscheduler.schedulers.blocking import BlockingScheduler
from clickhouse_driver import Client

logger = logging.getLogger(__name__)  # 操作日志对象


class CkClient():
    """数据库连接的公共类,初始化连接,自定义查询,删除等操作"""
    def __init__(self):
        self.host = "127.0.0.1"
        self.port = 40009
        self.user = "default"
        self.password = ""
        self.db = "warehouse_main_ck"
        self.client = Client(user=self.user, password=self.password, host=self.host, port=self.port, database=self.db)
        self.resultlist = []

    # 多条sql用;连接,拆分后,逐一执行
    def spliteSql(self, sql):
        sqllist = sql.split(';')
        return sqllist[0:-1]
    
    def executeSql(self, sql: str) -> list:
        """支持执行多条sql"""
        sqllist = self.spliteSql(sql) 
        logger.info(f"开始执行sql语句")
        for i in sqllist:
            self.resultlist = self.client.execute(i)
        return self.resultlist


def dojob():
    ck = CkClient()
    sql = """alter table  dim_customer_statictis delete where 1=1;
    insert into dim_customer_statictis
    with (
    select sum(buyer_cnt)
    from (
          select count(distinct buyer_id) as buyer_cnt
          from dws_buyer_tag
          where province != ''
          group by province
             )
) as total_cnt,
    (
        select sum(order_total_amt)
        from dws_buyer_tag
        where province != ''
    ) as total_amt
select province,
       buyer_cnt,
       buyer_percent,
       order_per_price,
       sales_percent
from (
      select province,
             total_cnt,
             total_amt,
             count(distinct buyer_id)              as buyer_cnt,
             round(buyer_cnt / total_cnt, 6)       as buyer_percent,
             sum(order_total_amt)                  as order_total_amt,
             round(avg(order_per_price), 2)        as order_per_price,
             round(order_total_amt / total_amt, 6) as sales_percent
      from dws_buyer_tag
      where province != ''
      group by province
      order by buyer_cnt desc
         );"""
    # sql1 = "alter table  dim_customer_statictis delete where 1=1;"
    result = ck.executeSql(sql)
    # result1 = ck.executeSql(sql1)
    print("res:", result)


def data_main():
    # 创建调度器:BlockingScheduler
    scheduler = BlockingScheduler()
    # 添加任务,定时启动,每天9:49
    scheduler.add_job(dojob, 'cron', hour=9, minute=49)  # day_of_week='1-5'
    scheduler.start()

if __name__ == "__main__":
    data_main()

2.使用connect函数

import logging
import traceback
from config.ckConfig import ck_configs    # 这个是自定义的模块,可以是多个clickhouse连接配置,ck_configs = {data={"host":xxx,"port":xxx,"user":xxx...},data1={"host":xxx,"port":xxx,"user":xxx...}}

from clickhouse_driver import connect

class ClickhouseManger:
    def __init__(self, conn_conf):
        """
        初始化连接
        :param conn_conf:
        """
        self.host = conn_conf["host"]
        self.port = conn_conf["port"]
        self.pwd = conn_conf["pwd"]
        self.user = conn_conf["user"]
        self.db = conn_conf["db"]
        self.conn = connect(user=self.user, password=self.pwd, host=self.host, port=self.port, database=self.db)

    def _get_cursor(self):
        """
        获取游标
        :return:
        """
        return self.conn.cursor()

    def fetchone(self, sql):
        """
        查询单条数据
        :param sql: sql语句
        :return:
        """
        logging.info(msg=f"------clickhouse SQL: {sql}")
        cursor = self._get_cursor()

        result = {}
        # noinspection PyBroadException
        try:
            cursor.execute(sql)
            columns_types = cursor.columns_with_types
            columns = [item[0] for item in columns_types]
            data = cursor.fetchone()
            if all([columns, data]):
                result = dict(zip(columns, data))
        except BaseException:
            logging.error("------clickhouse SQL ERROR:{}".format(traceback.format_exc()))
        finally:
            cursor.close()

        return result

    def fetchmany(self, sql):
        """
        查询多条数据
        :param sql: sql语句
        :return:
        """
        print("sql123", sql)
        logging.info(msg=f"------clickhouse SQL: {sql}")
        cursor = self._get_cursor()

        result = []
        # noinspection PyBroadException
        try:
            cursor.execute(sql)
            columns_types = cursor.columns_with_types
            columns = [item[0] for item in columns_types]
            data_list = cursor.fetchall()
            if all([columns, data_list]):
                for data in data_list:
                    print("---",columns,data)
                    result.append(dict(zip(columns, data)))

        except BaseException:
            logging.error("------clickhouse SQL ERROR:{}".format(traceback.format_exc()))
        finally:
            cursor.close()
        return result

    def execute(self, sql):
        """
        执行sql语句
        :param sql: sql语句
        :return:
        """
        logging.info(msg=f"------clickhouse SQL: {sql}")
        cursor = self._get_cursor()

        result = []
        try:
            cursor.execute(sql)
        except Exception as e:
            logging.error("------clickhouse SQL ERROR:{}".format(traceback.format_exc()))
        finally:
            cursor.close()

        return result

def multiClickhouseConn(ck_configs):
    """
    多clickhouse连接
    :param ck_configs: clickhouse连接配置
    :return:
    """
    conn_dict = {}
    for name, conf_inf in ck_configs.items():
        conn_dict[name] = ClickhouseManger(conf_inf)
        print("Connection (clickhouse: %s) : %s ==== successful " % (name, conf_inf))
        logging.info("Connection (clickhouse %s) : %s ==== successful " % (name, conf_inf))

    return conn_dict
if __name__ == "__main__":
    b = multiClickhouseConn(ck_configs=ck_configs)
    sql = """
    insert into dim_customer_statictis
    with (
    select sum(buyer_cnt)
    from (
          select count(distinct buyer_id) as buyer_cnt
          from dws_buyer_tag
          where province != ''
          group by province
             )
) as total_cnt,
    (
        select sum(order_total_amt)
        from dws_buyer_tag
        where province != ''
    ) as total_amt
select province,
       buyer_cnt,
       buyer_percent,
       order_per_price,
       sales_percent
from (
      select province,
             total_cnt,
             total_amt,
             count(distinct buyer_id)              as buyer_cnt,
             round(buyer_cnt / total_cnt, 6)       as buyer_percent,
             sum(order_total_amt)                  as order_total_amt,
             round(avg(order_per_price), 2)        as order_per_price,
             round(order_total_amt / total_amt, 6) as sales_percent
      from dws_buyer_tag
      where province != ''
      group by province
      order by buyer_cnt desc)"""
    print(b['data'].execute(sql))
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐