-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathtest_data_access.py
More file actions
140 lines (127 loc) · 5.17 KB
/
test_data_access.py
File metadata and controls
140 lines (127 loc) · 5.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# pyodbc
import pyodbc
# look for pyodbc providers
sources = pyodbc.dataSources()
dsns = list(sources.keys())
sl = [' %s [%s]' % (dsn, sources[dsn]) for dsn in dsns]
print("pyodbc Providers: (beware 32/64 bit driver and python version must match)\n", '\n'.join(sl))
# odbc to EXCEL .xls via pyodbc (beware 32/64 bit driver and pytho version must match)
import pyodbc, os
filename = os.path.join(os.getcwd(), 'test.xls')
todo = "select * from [Sheet1$]"
print("\nusing pyodbc to read an Excel .xls file:\n\t", filename)
if os.path.exists(filename):
CNXNSTRING = 'Driver={Microsoft Excel Driver (*.xls, *.xlsx, *.xlsm, *.xlsb)};DBQ=%s;READONLY=FALSE' % filename
try:
cnxn = pyodbc.connect(CNXNSTRING, autocommit=True)
cursor = cnxn.cursor()
rows = cursor.execute(todo).fetchall()
print([column[0] for column in cursor.description])
print(rows)
cursor.close()
cnxn.close()
except:
print("\n *** failed ***\n")
# odbc to ACCESS .mdb via pyodbc (beware 32/64 bit driver and python version must match)
import pyodbc, os
filename = os.path.join(os.getcwd(), 'test.mdb')
print("\nusing pyodbc to read an ACCESS .mdb file:\n\t", filename)
if os.path.exists(filename):
CNXNSTRING = 'Driver={Microsoft Access Driver (*.mdb, *.accdb)};DBQ=%s;READONLY=FALSE' % filename
try:
cnxn = pyodbc.connect(CNXNSTRING, autocommit=False)
cursor = cnxn.cursor()
rows = cursor.execute("select * from users").fetchall()
print([column[0] for column in cursor.description])
print(rows)
cursor.close()
cnxn.close()
except:
print("\n *** failed ***\n")
# pythonnet
import clr
clr.AddReference("System.Data")
import System.Data.OleDb as ADONET
import System.Data.Odbc as ODBCNET
import System.Data.Common as DATACOM
table = DATACOM.DbProviderFactories.GetFactoryClasses()
print("\n .NET Providers: (beware 32/64 bit driver and pytho version must match)")
for row in table.Rows:
print(" %s" % row[table.Columns[0]])
print(" ",[row[column] for column in table.Columns if column != table.Columns[0]])
# odbc to EXCEL .xls via pythonnet
import clr, os
clr.AddReference("System.Data")
import System.Data.OleDb as ADONET
import System.Data.Odbc as ODBCNET
import System.Data.Common as DATACOM
filename = os.path.join(os.getcwd(), 'test.xls')
todo = "select * from [Sheet1$]"
print("\nusing pythonnet to read an excel .xls file:\n\t", filename , "\n\t", todo)
if os.path.exists(filename):
CNXNSTRING = 'Driver={Microsoft Excel Driver (*.xls, *.xlsx, *.xlsm, *.xlsb)};DBQ=%s;READONLY=FALSE' % filename
cnxn = ODBCNET.OdbcConnection(CNXNSTRING)
try:
cnxn.Open()
command = cnxn.CreateCommand()
command.CommandText = "select * from [Sheet1$]"
rows = command.ExecuteReader()
print ([rows.GetName(i) for i in range(rows.FieldCount)])
for row in rows:
print([row[i] for i in range(rows.FieldCount)])
command.Dispose()
cnxn.Close()
except:
print("\n *** failed ***\n")
# odbc to ACCESS .mdb via pythonnet
import clr, os
clr.AddReference("System.Data")
import System.Data.OleDb as ADONET
import System.Data.Odbc as ODBCNET
import System.Data.Common as DATACOM
filename = os.path.join(os.getcwd(), 'test.mdb')
todo = "select * from users"
print("\nusing odbc via pythonnet to read an ACCESS .mdb file:\n\t", filename , "\n\t", todo)
if os.path.exists(filename):
CNXNSTRING = 'Driver={Microsoft Access Driver (*.mdb, *.accdb)};DBQ=%s;READONLY=FALSE' % filename
cnxn = ODBCNET.OdbcConnection(CNXNSTRING)
try:
cnxn.Open()
command = cnxn.CreateCommand()
command.CommandText = "select * from users"
rows = command.ExecuteReader()
print ([rows.GetName(i) for i in range(rows.FieldCount)])
for row in rows:
print([row[i] for i in range(rows.FieldCount)])
command.Dispose()
cnxn.Close()
except:
print("\n *** failed ***\n")
# DAO via pythonnet: works ONLY if you have the 32 (or 64 bit) driver.
import clr, os
clr.AddReference("System.Data")
import System.Data.OleDb as ADONET
import System.Data.Odbc as ODBCNET
import System.Data.Common as DATACOM
filename = os.path.join(os.getcwd(), 'test.accdb')
todo = "select * from users"
print("\nusing DAO via pythonnet to read an ACCESS .mdb file:\n\t", filename , "\n\t", todo)
if os.path.exists(filename):
# needs a driver in 32 or 64 bit like your running python
# https://www.microsoft.com/download/details.aspx?id=13255
CNXNSTRING = 'Provider=Microsoft.ACE.OLEDB.12.0; Data Source=%s;READONLY=FALSE' % filename
cnxn = ADONET.OleDbConnection(CNXNSTRING)
try:
cnxn.Open()
command = cnxn.CreateCommand()
command.CommandText = todo
# command.CommandText = 'select id, name from people where group_id = @group_id'
# command.Parameters.Add(SqlParameter('group_id', 23))
rows = command.ExecuteReader()
print ([rows.GetName(i) for i in range(rows.FieldCount)])
for row in rows:
print([row[i] for i in range(rows.FieldCount)])
command.Dispose()
cnxn.Close()
except:
print("\n *** failed ***\n")