-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathemployees_sql.py
197 lines (179 loc) · 6.61 KB
/
employees_sql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
import cx_Oracle
from cx_Oracle import DatabaseError
# Connect to Oracle database
def create_connection():
try:
connection = cx_Oracle.connect(
user='user name',
password='password',
dsn='localhost:1521/xe'
)
if connection is not None:
return connection
except DatabaseError as e:
print(f"Error while connecting to Oracle: {e}")
return None
# Create the employees1 table
def create_table():
connection = create_connection()
if connection is not None:
try:
cursor = connection.cursor()
cursor.execute("""
CREATE TABLE employees1 (
id NUMBER GENERATED BY DEFAULT ON NULL AS IDENTITY PRIMARY KEY,
name VARCHAR2(25) NOT NULL,
salary FLOAT NOT NULL,
phone VARCHAR2(20) NOT NULL
)
""")
print("employees1 table created successfully")
except DatabaseError as e:
print(f"Error while creating table: {e}")
finally:
cursor.close()
connection.close()
# Create a new employee
def create_employee(employee):
connection = create_connection()
if connection is not None:
try:
cursor = connection.cursor()
cursor.execute("""
INSERT INTO employees1 (name, salary, phone)
VALUES (:name, :salary, :phone)
""", employee)
connection.commit()
print(f"Employee created with ID: {cursor.lastrowid}")
except DatabaseError as e:
print(f"Error while creating employee: {e}")
finally:
cursor.close()
connection.close()
# Create multiple employees
def create_multiple_employees(employees):
connection = create_connection()
if connection is not None:
try:
cursor = connection.cursor()
cursor.executemany("""
INSERT INTO employees1 (name, salary, phone)
VALUES (:name, :salary, :phone)
""", employees)
connection.commit()
print(f"{cursor.rowcount} employees created successfully")
except DatabaseError as e:
print(f"Error while creating multiple employees: {e}")
finally:
cursor.close()
connection.close()
# Read one employee by ID
def read_one_employee(id):
connection = create_connection()
if connection is not None:
try:
cursor = connection.cursor()
cursor.execute("SELECT * FROM employees1 WHERE id = :id", {'id': id})
employee = cursor.fetchone()
if employee:
print(employee)
else:
print(f"No employee found with ID: {id}")
except DatabaseError as e:
print(f"Error while reading employee: {e}")
finally:
cursor.close()
connection.close()
# Read all employees
def read_all_employees():
connection = create_connection()
if connection is not None:
try:
cursor = connection.cursor()
cursor.execute("SELECT * FROM employees1")
employees = cursor.fetchall()
for employee in employees:
print(employee)
except DatabaseError as e:
print(f"Error while reading all employees: {e}")
finally:
cursor.close()
connection.close()
# Update an employee by ID
def update_employee(id, update_fields):
connection = create_connection()
if connection is not None:
try:
cursor = connection.cursor()
update_query = "UPDATE employees1 SET " + ", ".join([f"{field} = :{field}" for field in update_fields.keys()]) + " WHERE id = :id"
cursor.execute(update_query, {**update_fields, 'id': id})
connection.commit()
if cursor.rowcount > 0:
print(f"Employee with ID: {id} has been updated.")
else:
print(f"No employee found with ID: {id} or no changes made.")
except DatabaseError as e:
print(f"Error while updating employee: {e}")
finally:
cursor.close()
connection.close()
# Delete an employee by ID
def delete_one_employee(id):
connection = create_connection()
if connection is not None:
try:
cursor = connection.cursor()
cursor.execute("DELETE FROM employees1 WHERE id = :id", {'id': id})
connection.commit()
if cursor.rowcount > 0:
print(f"Employee with ID: {id} has been deleted.")
else:
print(f"No employee found with ID: {id}")
except DatabaseError as e:
print(f"Error while deleting employee: {e}")
finally:
cursor.close()
connection.close()
# Delete multiple employees by a filter
def delete_many_employees(salary_threshold):
connection = create_connection()
if connection is not None:
try:
cursor = connection.cursor()
cursor.execute("DELETE FROM employees1 WHERE salary < :salary", {'salary': salary_threshold})
connection.commit()
print(f"{cursor.rowcount} employees have been deleted.")
except DatabaseError as e:
print(f"Error while deleting multiple employees: {e}")
finally:
cursor.close()
connection.close()
# Create the employees1 table
# create_table()
# Create example employee
# employee_example = {
# "name": "John Doe",
# "salary": 50000,
# "phone": "123-456-7890"
# }
# create_employee(employee_example)
# # Create multiple employees
# employees_list = [
# {"name": "Alice Smith", "salary": 60000, "phone": "555-1234"},
# {"name": "Bob Johnson", "salary": 70000, "phone": "555-5678"},
# {"name": "Charlie Brown", "salary": 55000, "phone": "555-8765"}]
# create_multiple_employees(employees_list)
# # Read example employee
# example_id = 1 # Replace with the actual ID of an existing employee
# read_one_employee(example_id)
# # Read all employees
# read_all_employees()
# # Update example employee
# example_id =1
# update_fields = {"salary": 55000, "phone": "098-765-4321"}
# update_employee(example_id, update_fields)
# Delete example employee
example_id =1
delete_one_employee(example_id)
# # Delete multiple employees
delete_many_employees(60000)