python连接clickhouse两种方法
1.使用Client类import loggingfrom apscheduler.schedulers.blocking import BlockingSchedulerfrom clickhouse_driver import Clientlogger = logging.getLogger(__name__)# 操作日志对象class CkClient():"""数据库连接的公共类,初始化连
·
1.使用Client类
import logging
from apscheduler.schedulers.blocking import BlockingScheduler
from clickhouse_driver import Client
logger = logging.getLogger(__name__) # 操作日志对象
class CkClient():
"""数据库连接的公共类,初始化连接,自定义查询,删除等操作"""
def __init__(self):
self.host = "127.0.0.1"
self.port = 40009
self.user = "default"
self.password = ""
self.db = "warehouse_main_ck"
self.client = Client(user=self.user, password=self.password, host=self.host, port=self.port, database=self.db)
self.resultlist = []
# 多条sql用;连接,拆分后,逐一执行
def spliteSql(self, sql):
sqllist = sql.split(';')
return sqllist[0:-1]
def executeSql(self, sql: str) -> list:
"""支持执行多条sql"""
sqllist = self.spliteSql(sql)
logger.info(f"开始执行sql语句")
for i in sqllist:
self.resultlist = self.client.execute(i)
return self.resultlist
def dojob():
ck = CkClient()
sql = """alter table dim_customer_statictis delete where 1=1;
insert into dim_customer_statictis
with (
select sum(buyer_cnt)
from (
select count(distinct buyer_id) as buyer_cnt
from dws_buyer_tag
where province != ''
group by province
)
) as total_cnt,
(
select sum(order_total_amt)
from dws_buyer_tag
where province != ''
) as total_amt
select province,
buyer_cnt,
buyer_percent,
order_per_price,
sales_percent
from (
select province,
total_cnt,
total_amt,
count(distinct buyer_id) as buyer_cnt,
round(buyer_cnt / total_cnt, 6) as buyer_percent,
sum(order_total_amt) as order_total_amt,
round(avg(order_per_price), 2) as order_per_price,
round(order_total_amt / total_amt, 6) as sales_percent
from dws_buyer_tag
where province != ''
group by province
order by buyer_cnt desc
);"""
# sql1 = "alter table dim_customer_statictis delete where 1=1;"
result = ck.executeSql(sql)
# result1 = ck.executeSql(sql1)
print("res:", result)
def data_main():
# 创建调度器:BlockingScheduler
scheduler = BlockingScheduler()
# 添加任务,定时启动,每天9:49
scheduler.add_job(dojob, 'cron', hour=9, minute=49) # day_of_week='1-5'
scheduler.start()
if __name__ == "__main__":
data_main()
2.使用connect函数
import logging
import traceback
from config.ckConfig import ck_configs # 这个是自定义的模块,可以是多个clickhouse连接配置,ck_configs = {data={"host":xxx,"port":xxx,"user":xxx...},data1={"host":xxx,"port":xxx,"user":xxx...}}
from clickhouse_driver import connect
class ClickhouseManger:
def __init__(self, conn_conf):
"""
初始化连接
:param conn_conf:
"""
self.host = conn_conf["host"]
self.port = conn_conf["port"]
self.pwd = conn_conf["pwd"]
self.user = conn_conf["user"]
self.db = conn_conf["db"]
self.conn = connect(user=self.user, password=self.pwd, host=self.host, port=self.port, database=self.db)
def _get_cursor(self):
"""
获取游标
:return:
"""
return self.conn.cursor()
def fetchone(self, sql):
"""
查询单条数据
:param sql: sql语句
:return:
"""
logging.info(msg=f"------clickhouse SQL: {sql}")
cursor = self._get_cursor()
result = {}
# noinspection PyBroadException
try:
cursor.execute(sql)
columns_types = cursor.columns_with_types
columns = [item[0] for item in columns_types]
data = cursor.fetchone()
if all([columns, data]):
result = dict(zip(columns, data))
except BaseException:
logging.error("------clickhouse SQL ERROR:{}".format(traceback.format_exc()))
finally:
cursor.close()
return result
def fetchmany(self, sql):
"""
查询多条数据
:param sql: sql语句
:return:
"""
print("sql123", sql)
logging.info(msg=f"------clickhouse SQL: {sql}")
cursor = self._get_cursor()
result = []
# noinspection PyBroadException
try:
cursor.execute(sql)
columns_types = cursor.columns_with_types
columns = [item[0] for item in columns_types]
data_list = cursor.fetchall()
if all([columns, data_list]):
for data in data_list:
print("---",columns,data)
result.append(dict(zip(columns, data)))
except BaseException:
logging.error("------clickhouse SQL ERROR:{}".format(traceback.format_exc()))
finally:
cursor.close()
return result
def execute(self, sql):
"""
执行sql语句
:param sql: sql语句
:return:
"""
logging.info(msg=f"------clickhouse SQL: {sql}")
cursor = self._get_cursor()
result = []
try:
cursor.execute(sql)
except Exception as e:
logging.error("------clickhouse SQL ERROR:{}".format(traceback.format_exc()))
finally:
cursor.close()
return result
def multiClickhouseConn(ck_configs):
"""
多clickhouse连接
:param ck_configs: clickhouse连接配置
:return:
"""
conn_dict = {}
for name, conf_inf in ck_configs.items():
conn_dict[name] = ClickhouseManger(conf_inf)
print("Connection (clickhouse: %s) : %s ==== successful " % (name, conf_inf))
logging.info("Connection (clickhouse %s) : %s ==== successful " % (name, conf_inf))
return conn_dict
if __name__ == "__main__":
b = multiClickhouseConn(ck_configs=ck_configs)
sql = """
insert into dim_customer_statictis
with (
select sum(buyer_cnt)
from (
select count(distinct buyer_id) as buyer_cnt
from dws_buyer_tag
where province != ''
group by province
)
) as total_cnt,
(
select sum(order_total_amt)
from dws_buyer_tag
where province != ''
) as total_amt
select province,
buyer_cnt,
buyer_percent,
order_per_price,
sales_percent
from (
select province,
total_cnt,
total_amt,
count(distinct buyer_id) as buyer_cnt,
round(buyer_cnt / total_cnt, 6) as buyer_percent,
sum(order_total_amt) as order_total_amt,
round(avg(order_per_price), 2) as order_per_price,
round(order_total_amt / total_amt, 6) as sales_percent
from dws_buyer_tag
where province != ''
group by province
order by buyer_cnt desc)"""
print(b['data'].execute(sql))
更多推荐
已为社区贡献8条内容
所有评论(0)