ch08
This commit is contained in:
@@ -0,0 +1 @@
|
||||
This is content1.txt
|
||||
@@ -0,0 +1 @@
|
||||
This is content2.txt
|
||||
@@ -0,0 +1 @@
|
||||
This is subfolder/content3.txt
|
||||
@@ -0,0 +1 @@
|
||||
This is subfolder/content4.txt
|
||||
@@ -0,0 +1,11 @@
|
||||
# files/compression/tar.py
|
||||
import tarfile
|
||||
|
||||
with tarfile.open('example.tar.gz', 'w:gz') as tar:
|
||||
tar.add('content1.txt')
|
||||
tar.add('content2.txt')
|
||||
tar.add('subfolder/content3.txt')
|
||||
tar.add('subfolder/content4.txt')
|
||||
|
||||
with tarfile.open('example.tar.gz', 'r:gz') as tar:
|
||||
tar.extractall('extract_tar')
|
||||
@@ -0,0 +1,14 @@
|
||||
# files/compression/zip.py
|
||||
from zipfile import ZipFile
|
||||
|
||||
|
||||
with ZipFile('example.zip', 'w') as zp:
|
||||
zp.write('content1.txt')
|
||||
zp.write('content2.txt')
|
||||
zp.write('subfolder/content3.txt')
|
||||
zp.write('subfolder/content4.txt')
|
||||
|
||||
|
||||
with ZipFile('example.zip') as zp:
|
||||
zp.extract('content1.txt', 'extract_zip')
|
||||
zp.extract('subfolder/content3.txt', 'extract_zip')
|
||||
@@ -0,0 +1,12 @@
|
||||
# files/existence.py
|
||||
from pathlib import Path
|
||||
|
||||
p = Path('fear.txt')
|
||||
path = p.parent.absolute()
|
||||
|
||||
print(p.is_file()) # True
|
||||
print(path) # /Users/fab/srv/lpp3e/ch08/files
|
||||
print(path.is_dir()) # True
|
||||
|
||||
q = Path('/Users/fab/srv/lpp3e/ch08/files')
|
||||
print(q.is_dir()) # True
|
||||
@@ -0,0 +1,7 @@
|
||||
An excerpt from Fear - By Thich Nhat Hanh
|
||||
|
||||
The Present Is Free from Fear
|
||||
|
||||
When we are not fully present, we are not really living. We’re not really there, either for our loved ones or for ourselves. If we’re not there, then where are we? We are running, running, running, even during our sleep. We run because we’re trying to escape from our fear.
|
||||
|
||||
We cannot enjoy life if we spend our time and energy worrying about what happened yesterday and what will happen tomorrow. If we’re afraid all the time, we miss out on the wonderful fact that we’re alive and can be happy right now. In everyday life, we tend to believe that happiness is only possible in the future. We’re always looking for the “right” conditions that we don’t yet have to make us happy. We ignore what is happening right in front of us. We look for something that will make us feel more solid, more safe, more secure. But we’re afraid all the time of what the future will bring—afraid we’ll lose our jobs, our possessions, the people around us whom we love. So we wait and hope for that magical moment—always sometime in the future—when everything will be as we want it to be. We forget that life is available only in the present moment. The Buddha said, “It is possible to live happily in the present moment. It is the only moment we have.”
|
||||
@@ -0,0 +1,30 @@
|
||||
# files/listing.py
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
p = Path('.')
|
||||
|
||||
# use pattern "*.*" to exclude directories
|
||||
for entry in p.glob('*'):
|
||||
print('File:' if entry.is_file() else 'Folder:', entry)
|
||||
|
||||
|
||||
"""
|
||||
File: fixed_amount.py
|
||||
File: existence.py
|
||||
File: manipulation.py
|
||||
File: print_file.py
|
||||
File: read_write_bin.py
|
||||
File: paths.py
|
||||
File: open_with.py
|
||||
File: walking.py
|
||||
File: tmp.py
|
||||
File: read_write.py
|
||||
File: listing.py
|
||||
File: write_not_exists.py
|
||||
File: buffer.py
|
||||
Folder: compression
|
||||
File: ops_create.py
|
||||
File: fear.txt
|
||||
File: open_try.py
|
||||
"""
|
||||
@@ -0,0 +1,35 @@
|
||||
# files/manipulation.py
|
||||
from collections import Counter
|
||||
from string import ascii_letters
|
||||
|
||||
|
||||
chars = ascii_letters + ' '
|
||||
|
||||
|
||||
def sanitize(s, chars):
|
||||
return ''.join(c for c in s if c in chars)
|
||||
|
||||
|
||||
def reverse(s):
|
||||
return s[::-1]
|
||||
|
||||
|
||||
with open('fear.txt') as stream:
|
||||
lines = [line.rstrip() for line in stream]
|
||||
|
||||
|
||||
# let's write the mirrored version of the file
|
||||
with open('raef.txt', 'w') as stream:
|
||||
stream.write('\n'.join(reverse(line) for line in lines))
|
||||
|
||||
|
||||
# now we can calculate some statistics
|
||||
lines = [sanitize(line, chars) for line in lines]
|
||||
whole = ' '.join(lines)
|
||||
|
||||
|
||||
# we perform comparisons on the lowercased version of `whole`
|
||||
cnt = Counter(whole.lower().split())
|
||||
|
||||
# we can print the N most common words
|
||||
print(cnt.most_common(3))
|
||||
@@ -0,0 +1,29 @@
|
||||
# files/open_try.py
|
||||
fh = open('fear.txt', 'rt') # r: read, t: text
|
||||
|
||||
for line in fh.readlines():
|
||||
print(line.strip()) # remove whitespace and print
|
||||
|
||||
fh.close()
|
||||
|
||||
|
||||
# secured by try/finally
|
||||
fh = open('fear.txt', 'rt')
|
||||
|
||||
try:
|
||||
for line in fh.readlines():
|
||||
print(line.strip())
|
||||
|
||||
finally:
|
||||
fh.close()
|
||||
|
||||
|
||||
# equivalent to:
|
||||
fh = open('fear.txt') # rt is default
|
||||
|
||||
try:
|
||||
for line in fh: # we can iterate directly on fh
|
||||
print(line.strip())
|
||||
|
||||
finally:
|
||||
fh.close()
|
||||
@@ -0,0 +1,4 @@
|
||||
# files/open_with.py
|
||||
with open('fear.txt') as fh:
|
||||
for line in fh:
|
||||
print(line.strip())
|
||||
@@ -0,0 +1,35 @@
|
||||
# files/ops_create.py
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
base_path = Path('ops_example')
|
||||
|
||||
# let's perform an initial cleanup just in case
|
||||
if base_path.exists() and base_path.is_dir():
|
||||
shutil.rmtree(base_path)
|
||||
|
||||
# now we create the directory
|
||||
base_path.mkdir()
|
||||
|
||||
path_b = base_path / 'A' / 'B'
|
||||
path_c = base_path / 'A' / 'C'
|
||||
path_d = base_path / 'A' / 'D'
|
||||
|
||||
path_b.mkdir(parents=True)
|
||||
path_c.mkdir() # no need for parents now, as 'A' has been created
|
||||
|
||||
# we add three files in `ops_example/A/B`
|
||||
for filename in ('ex1.txt', 'ex2.txt', 'ex3.txt'):
|
||||
with open(path_b / filename, 'w') as stream:
|
||||
stream.write(f'Some content here in {filename}\n')
|
||||
|
||||
|
||||
shutil.move(path_b, path_d)
|
||||
|
||||
|
||||
# we can also rename files
|
||||
ex1 = path_d / 'ex1.txt'
|
||||
ex1.rename(ex1.parent / 'ex1.renamed.txt')
|
||||
|
||||
# now call $ tree ops_example
|
||||
@@ -0,0 +1,29 @@
|
||||
# files/paths.py
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
p = Path('fear.txt')
|
||||
|
||||
print(p.absolute())
|
||||
print(p.name)
|
||||
print(p.parent.absolute())
|
||||
print(p.suffix)
|
||||
|
||||
print(p.parts)
|
||||
print(p.absolute().parts)
|
||||
|
||||
readme_path = p.parent / '..' / '..' / 'README.rst'
|
||||
print(readme_path.absolute())
|
||||
print(readme_path.resolve())
|
||||
|
||||
|
||||
"""
|
||||
/Users/fab/srv/lpp3e/ch08/files/fear.txt
|
||||
fear.txt
|
||||
/Users/fab/srv/lpp3e/ch08/files
|
||||
.txt
|
||||
('fear.txt',)
|
||||
('/', 'Users', 'fab', 'srv', 'lpp3e', 'ch08', 'files', 'fear.txt')
|
||||
/Users/fab/srv/lpp3e/ch08/files/../../README.rst
|
||||
/Users/fab/srv/lpp3e/README.rst
|
||||
"""
|
||||
@@ -0,0 +1,3 @@
|
||||
# files/print_file.py
|
||||
with open('print_example.txt', 'w') as fw:
|
||||
print('Hey I am printing into a file!!!', file=fw)
|
||||
@@ -0,0 +1,7 @@
|
||||
# files/read_write.py
|
||||
with open('fear.txt') as f:
|
||||
lines = [line.rstrip() for line in f]
|
||||
|
||||
|
||||
with open('fear_copy.txt', 'w') as fw: # w - write
|
||||
fw.write('\n'.join(lines))
|
||||
@@ -0,0 +1,7 @@
|
||||
# files/read_write_bin.py
|
||||
with open('example.bin', 'wb') as fw:
|
||||
fw.write(b'This is binary data...')
|
||||
|
||||
|
||||
with open('example.bin', 'rb') as f:
|
||||
print(f.read()) # prints: b'This is binary data...'
|
||||
@@ -0,0 +1,9 @@
|
||||
# files/tmp.py
|
||||
from tempfile import NamedTemporaryFile, TemporaryDirectory
|
||||
|
||||
|
||||
with TemporaryDirectory(dir='.') as td:
|
||||
print('Temp directory:', td)
|
||||
with NamedTemporaryFile(dir=td) as t:
|
||||
name = t.name
|
||||
print(name)
|
||||
@@ -0,0 +1,37 @@
|
||||
# files/walking.pathlib.py
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
p = Path('.')
|
||||
|
||||
for entry in p.rglob('*'):
|
||||
print('File:' if entry.is_file() else 'Folder:', entry)
|
||||
|
||||
|
||||
"""
|
||||
File: fixed_amount.py
|
||||
File: existence.py
|
||||
File: manipulation.py
|
||||
File: print_file.py
|
||||
File: read_write_bin.py
|
||||
File: paths.py
|
||||
File: open_with.py
|
||||
File: walking.py
|
||||
File: tmp.py
|
||||
File: read_write.py
|
||||
File: listing.py
|
||||
File: write_not_exists.py
|
||||
File: buffer.py
|
||||
Folder: compression
|
||||
File: ops_create.py
|
||||
File: fear.txt
|
||||
File: open_try.py
|
||||
File: walking.pathlib.py
|
||||
Folder: compression/subfolder
|
||||
File: compression/tar.py
|
||||
File: compression/content1.txt
|
||||
File: compression/content2.txt
|
||||
File: compression/zip.py
|
||||
File: compression/subfolder/content3.txt
|
||||
File: compression/subfolder/content4.txt
|
||||
"""
|
||||
@@ -0,0 +1,19 @@
|
||||
# files/walking.py
|
||||
import os
|
||||
|
||||
|
||||
for root, dirs, files in os.walk('.'):
|
||||
abs_root = os.path.abspath(root)
|
||||
print(abs_root)
|
||||
|
||||
if dirs:
|
||||
print('Directories:')
|
||||
for dir_ in dirs:
|
||||
print(dir_)
|
||||
print()
|
||||
|
||||
if files:
|
||||
print('Files:')
|
||||
for filename in files:
|
||||
print(filename)
|
||||
print()
|
||||
@@ -0,0 +1,7 @@
|
||||
# files/write_not_exists.py
|
||||
with open('write_x.txt', 'x') as fw: # this succeeds
|
||||
fw.write('Writing line 1')
|
||||
|
||||
|
||||
with open('write_x.txt', 'x') as fw: # this fails
|
||||
fw.write('Writing line 2')
|
||||
@@ -0,0 +1,83 @@
|
||||
# io_examples/reqs.py
|
||||
import requests
|
||||
|
||||
|
||||
urls = {
|
||||
"get": "https://httpbin.org/get?t=learn+python+programming",
|
||||
"headers": "https://httpbin.org/headers",
|
||||
"ip": "https://httpbin.org/ip",
|
||||
"user-agent": "https://httpbin.org/user-agent",
|
||||
"UUID": "https://httpbin.org/uuid",
|
||||
"JSON": "https://httpbin.org/json",
|
||||
}
|
||||
|
||||
|
||||
def get_content(title, url):
|
||||
resp = requests.get(url)
|
||||
print(f"Response for {title}")
|
||||
print(resp.json())
|
||||
|
||||
|
||||
for title, url in urls.items():
|
||||
get_content(title, url)
|
||||
print("-" * 40)
|
||||
|
||||
|
||||
"""
|
||||
$ python reqs.py
|
||||
|
||||
Response for get
|
||||
{
|
||||
"args": {"t": "learn python programming"},
|
||||
"headers": {
|
||||
"Accept": "*/*",
|
||||
"Accept-Encoding": "gzip, deflate",
|
||||
"Host": "httpbin.org",
|
||||
"User-Agent": "python-requests/2.25.1",
|
||||
"X-Amzn-Trace-Id": "Root=1-60a42902-3b6093e26ae375244478",
|
||||
},
|
||||
"origin": "86.8.174.15",
|
||||
"url": "https://httpbin.org/get?t=learn+python+programming",
|
||||
}
|
||||
----------------------------------------
|
||||
Response for headers
|
||||
{
|
||||
"headers": {
|
||||
"Accept": "*/*",
|
||||
"Accept-Encoding": "gzip, deflate",
|
||||
"Host": "httpbin.org",
|
||||
"User-Agent": "python-requests/2.25.1",
|
||||
"X-Amzn-Trace-Id": "Root=1-60a42902-69f2988c72187527c7aa",
|
||||
}
|
||||
}
|
||||
----------------------------------------
|
||||
Response for ip
|
||||
{'origin': '86.8.174.15'}
|
||||
----------------------------------------
|
||||
Response for user-agent
|
||||
{'user-agent': 'python-requests/2.25.1'}
|
||||
----------------------------------------
|
||||
Response for UUID
|
||||
{'uuid': '008df02c-6e62-4c88-97ad-3c3604998273'}
|
||||
----------------------------------------
|
||||
Response for JSON
|
||||
{
|
||||
"slideshow": {
|
||||
"author": "Yours Truly",
|
||||
"date": "date of publication",
|
||||
"slides": [
|
||||
{"title": "Wake up to WonderWidgets!", "type": "all"},
|
||||
{
|
||||
"items": [
|
||||
"Why <em>WonderWidgets</em> are great",
|
||||
"Who <em>buys</em> WonderWidgets",
|
||||
],
|
||||
"title": "Overview",
|
||||
"type": "all",
|
||||
},
|
||||
],
|
||||
"title": "Sample Slide Show",
|
||||
}
|
||||
}
|
||||
----------------------------------------
|
||||
"""
|
||||
@@ -0,0 +1,36 @@
|
||||
# io_examples/reqs_post.py
|
||||
import requests
|
||||
|
||||
|
||||
url = "https://httpbin.org/post"
|
||||
data = dict(title="Learn Python Programming")
|
||||
|
||||
|
||||
resp = requests.post(url, data=data)
|
||||
print("Response for POST")
|
||||
print(resp.json())
|
||||
|
||||
|
||||
"""
|
||||
$ python reqs_post.py
|
||||
|
||||
Response for POST
|
||||
{
|
||||
"args": {},
|
||||
"data": "",
|
||||
"files": {},
|
||||
"form": {"title": "Learn Python Programming"},
|
||||
"headers": {
|
||||
"Accept": "*/*",
|
||||
"Accept-Encoding": "gzip, deflate",
|
||||
"Content-Length": "30",
|
||||
"Content-Type": "application/x-www-form-urlencoded",
|
||||
"Host": "httpbin.org",
|
||||
"User-Agent": "python-requests/2.25.1",
|
||||
"X-Amzn-Trace-Id": "Root=1-60a43131-5032cdbc14db751fe775",
|
||||
},
|
||||
"json": None,
|
||||
"origin": "86.8.174.15",
|
||||
"url": "https://httpbin.org/post",
|
||||
}
|
||||
"""
|
||||
@@ -0,0 +1,21 @@
|
||||
# io_examples/string_io.py
|
||||
import io
|
||||
|
||||
|
||||
stream = io.StringIO()
|
||||
stream.write('Learning Python Programming.\n')
|
||||
print('Become a Python ninja!', file=stream)
|
||||
|
||||
contents = stream.getvalue()
|
||||
print(contents)
|
||||
|
||||
stream.close()
|
||||
|
||||
|
||||
# better alternative, using a context manager
|
||||
with io.StringIO() as stream:
|
||||
stream.write('Learning Python Programming.\n')
|
||||
print('Become a Python ninja!', file=stream)
|
||||
|
||||
contents = stream.getvalue()
|
||||
print(contents)
|
||||
@@ -0,0 +1,32 @@
|
||||
# json_examples/json_basic.py
|
||||
import sys
|
||||
import json
|
||||
|
||||
|
||||
data = {
|
||||
'big_number': 2 ** 3141,
|
||||
'max_float': sys.float_info.max,
|
||||
'a_list': [2, 3, 5, 7],
|
||||
}
|
||||
|
||||
|
||||
json_data = json.dumps(data)
|
||||
data_out = json.loads(json_data)
|
||||
|
||||
assert data == data_out # json and back, data matches
|
||||
|
||||
|
||||
# let's see how passing indent affects dumps.
|
||||
|
||||
info = {
|
||||
'full_name': 'Sherlock Holmes',
|
||||
'address': {
|
||||
'street': '221B Baker St',
|
||||
'zip': 'NW1 6XE',
|
||||
'city': 'London',
|
||||
'country': 'UK',
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
print(json.dumps(info, indent=2, sort_keys=True))
|
||||
@@ -0,0 +1,37 @@
|
||||
# json_examples/json_cplx.py
|
||||
import json
|
||||
|
||||
|
||||
class ComplexEncoder(json.JSONEncoder):
|
||||
def default(self, obj):
|
||||
print(f"ComplexEncoder.default: {obj=}")
|
||||
if isinstance(obj, complex):
|
||||
return {
|
||||
'_meta': '_complex',
|
||||
'num': [obj.real, obj.imag],
|
||||
}
|
||||
return super().default(obj)
|
||||
|
||||
|
||||
data = {
|
||||
'an_int': 42,
|
||||
'a_float': 3.14159265,
|
||||
'a_complex': 3 + 4j,
|
||||
}
|
||||
|
||||
json_data = json.dumps(data, cls=ComplexEncoder)
|
||||
|
||||
print(json_data)
|
||||
|
||||
|
||||
def object_hook(obj):
|
||||
print(f"object_hook: {obj=}")
|
||||
try:
|
||||
if obj['_meta'] == '_complex':
|
||||
return complex(*obj['num'])
|
||||
except KeyError:
|
||||
return obj
|
||||
|
||||
|
||||
data_out = json.loads(json_data, object_hook=object_hook)
|
||||
print(data_out)
|
||||
@@ -0,0 +1,57 @@
|
||||
# json_examples/json_datetime.py
|
||||
# exercise: do the same for date
|
||||
import json
|
||||
from datetime import datetime, timedelta, timezone
|
||||
|
||||
|
||||
now = datetime.now()
|
||||
now_tz = datetime.now(tz=timezone(timedelta(hours=1)))
|
||||
|
||||
|
||||
class DatetimeEncoder(json.JSONEncoder):
|
||||
def default(self, obj):
|
||||
if isinstance(obj, datetime):
|
||||
try:
|
||||
off = obj.utcoffset().seconds
|
||||
except AttributeError:
|
||||
off = None
|
||||
|
||||
return {
|
||||
'_meta': '_datetime',
|
||||
'data': obj.timetuple()[:6] + (obj.microsecond, ),
|
||||
'utcoffset': off,
|
||||
}
|
||||
return super().default(obj)
|
||||
|
||||
|
||||
data = {
|
||||
'an_int': 42,
|
||||
'a_float': 3.14159265,
|
||||
'a_datetime': now,
|
||||
'a_datetime_tz': now_tz,
|
||||
}
|
||||
|
||||
json_data = json.dumps(data, cls=DatetimeEncoder)
|
||||
|
||||
print(json_data)
|
||||
|
||||
|
||||
def object_hook(obj):
|
||||
try:
|
||||
if obj['_meta'] == '_datetime':
|
||||
if obj['utcoffset'] is None:
|
||||
tz = None
|
||||
else:
|
||||
tz = timezone(timedelta(seconds=obj['utcoffset']))
|
||||
return datetime(*obj['data'], tzinfo=tz)
|
||||
except KeyError:
|
||||
return obj
|
||||
|
||||
|
||||
data_out = json.loads(json_data, object_hook=object_hook)
|
||||
from pprint import pprint
|
||||
pprint(data_out, indent=2)
|
||||
print(data_out)
|
||||
|
||||
assert data_out['a_datetime'] == data['a_datetime']
|
||||
assert data_out['a_datetime_tz'] == data['a_datetime_tz']
|
||||
@@ -0,0 +1,13 @@
|
||||
# json_examples/json_tuple.py
|
||||
import json
|
||||
|
||||
|
||||
data_in = {
|
||||
'a_tuple': (1, 2, 3, 4, 5),
|
||||
}
|
||||
|
||||
|
||||
json_data = json.dumps(data_in)
|
||||
print(json_data) # {"a_tuple": [1, 2, 3, 4, 5]}
|
||||
data_out = json.loads(json_data)
|
||||
print(data_out) # {'a_tuple': [1, 2, 3, 4, 5]}
|
||||
@@ -0,0 +1,75 @@
|
||||
# persistence/alchemy.py
|
||||
from alchemy_models import Person, Address, engine
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
|
||||
Session = sessionmaker(bind=engine)
|
||||
session = Session()
|
||||
|
||||
|
||||
# Create a couple of people
|
||||
anakin = Person(name='Anakin Skywalker', age=32)
|
||||
obi1 = Person(name='Obi-Wan Kenobi', age=40)
|
||||
|
||||
# Add email addresses for both of them
|
||||
obi1.addresses = [
|
||||
Address(email='obi1@example.com'),
|
||||
Address(email='wanwan@example.com'),
|
||||
]
|
||||
|
||||
# another way: we can simply append
|
||||
anakin.addresses.append(Address(email='ani@example.com'))
|
||||
anakin.addresses.append(Address(email='evil.dart@example.com'))
|
||||
anakin.addresses.append(Address(email='vader@example.com'))
|
||||
|
||||
# Add people to the session. This adds addresses too.
|
||||
session.add(anakin)
|
||||
session.add(obi1)
|
||||
session.commit()
|
||||
|
||||
|
||||
# Query and display both
|
||||
obi1 = session.query(Person).filter(
|
||||
Person.name.like('Obi%')
|
||||
).first()
|
||||
print(obi1, obi1.addresses)
|
||||
|
||||
anakin = session.query(Person).filter(
|
||||
Person.name=='Anakin Skywalker'
|
||||
).first()
|
||||
print(anakin, anakin.addresses)
|
||||
|
||||
# capture anakin.id
|
||||
anakin_id = anakin.id
|
||||
|
||||
# then remove the var
|
||||
del anakin
|
||||
|
||||
|
||||
def display_info():
|
||||
# get all addresses first
|
||||
addresses = session.query(Address).all()
|
||||
|
||||
# display results
|
||||
for address in addresses:
|
||||
print(f'{address.person.name} <{address.email}>')
|
||||
|
||||
# display how many objects we have in total
|
||||
print('people: {}, addresses: {}'.format(
|
||||
session.query(Person).count(),
|
||||
session.query(Address).count())
|
||||
)
|
||||
|
||||
|
||||
display_info()
|
||||
|
||||
# Fetch anakin directly by its id
|
||||
anakin = session.query(Person).get(anakin_id)
|
||||
|
||||
# Delete anakin
|
||||
session.delete(anakin)
|
||||
session.commit()
|
||||
|
||||
|
||||
# let's do it again and see the changes
|
||||
display_info()
|
||||
@@ -0,0 +1,47 @@
|
||||
# persistence/alchemy_models.py
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
from sqlalchemy import (
|
||||
Column, Integer, String, ForeignKey, create_engine)
|
||||
from sqlalchemy.orm import relationship
|
||||
|
||||
|
||||
# swap these lines to work with an actual DB file
|
||||
# engine = create_engine('sqlite:///example.db')
|
||||
engine = create_engine('sqlite:///:memory:')
|
||||
|
||||
|
||||
Base = declarative_base()
|
||||
|
||||
|
||||
class Person(Base):
|
||||
__tablename__ = 'person'
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
name = Column(String)
|
||||
age = Column(Integer)
|
||||
|
||||
addresses = relationship(
|
||||
'Address',
|
||||
back_populates='person',
|
||||
order_by='Address.email',
|
||||
cascade='all, delete-orphan'
|
||||
)
|
||||
|
||||
def __repr__(self):
|
||||
return f'{self.name}(id={self.id})'
|
||||
|
||||
|
||||
class Address(Base):
|
||||
__tablename__ = 'address'
|
||||
|
||||
id = Column(Integer, primary_key=True)
|
||||
email = Column(String)
|
||||
person_id = Column(ForeignKey('person.id'))
|
||||
person = relationship('Person', back_populates='addresses')
|
||||
|
||||
def __str__(self):
|
||||
return self.email
|
||||
__repr__ = __str__
|
||||
|
||||
|
||||
Base.metadata.create_all(engine)
|
||||
@@ -0,0 +1,34 @@
|
||||
# persistence/pickler.py
|
||||
import pickle
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class Person:
|
||||
first_name: str
|
||||
last_name: str
|
||||
id: int
|
||||
|
||||
def greet(self):
|
||||
print(f'Hi, I am {self.first_name} {self.last_name}'
|
||||
f' and my ID is {self.id}')
|
||||
|
||||
|
||||
people = [
|
||||
Person('Obi-Wan', 'Kenobi', 123),
|
||||
Person('Anakin', 'Skywalker', 456),
|
||||
]
|
||||
|
||||
|
||||
# save data in binary format to a file
|
||||
with open('data.pickle', 'wb') as stream:
|
||||
pickle.dump(people, stream)
|
||||
|
||||
|
||||
# load data from a file
|
||||
with open('data.pickle', 'rb') as stream:
|
||||
peeps = pickle.load(stream)
|
||||
|
||||
|
||||
for person in peeps:
|
||||
person.greet()
|
||||
@@ -0,0 +1,40 @@
|
||||
# persistence/shelf.py
|
||||
import shelve
|
||||
|
||||
|
||||
class Person:
|
||||
def __init__(self, name, id):
|
||||
self.name = name
|
||||
self.id = id
|
||||
|
||||
|
||||
with shelve.open('shelf1.shelve') as db:
|
||||
db['obi1'] = Person('Obi-Wan', 123)
|
||||
db['ani'] = Person('Anakin', 456)
|
||||
db['a_list'] = [2, 3, 5]
|
||||
db['delete_me'] = 'we will have to delete this one...'
|
||||
|
||||
print(list(db.keys())) # ['ani', 'delete_me', 'a_list', 'obi1']
|
||||
|
||||
del db['delete_me'] # gone!
|
||||
|
||||
print(list(db.keys())) # ['ani', 'a_list', 'obi1']
|
||||
|
||||
print('delete_me' in db) # False
|
||||
print('ani' in db) # True
|
||||
|
||||
a_list = db['a_list']
|
||||
a_list.append(7)
|
||||
db['a_list'] = a_list
|
||||
|
||||
print(db['a_list']) # [2, 3, 5, 7]
|
||||
|
||||
|
||||
# this way allows writeback:
|
||||
# working with lists is easier, but consumes more memory and
|
||||
# closing the file takes longer.
|
||||
with shelve.open('shelf2.shelve', writeback=True) as db:
|
||||
db['a_list'] = [11, 13, 17]
|
||||
db['a_list'].append(19) # in-place append!
|
||||
|
||||
print(db['a_list']) # [11, 13, 17, 19]
|
||||
@@ -0,0 +1,2 @@
|
||||
requests~=2.25.1
|
||||
sqlalchemy~=1.4.15
|
||||
@@ -0,0 +1,20 @@
|
||||
#
|
||||
# This file is autogenerated by pip-compile
|
||||
# To update, run:
|
||||
#
|
||||
# pip-compile requirements.in
|
||||
#
|
||||
certifi==2020.12.5
|
||||
# via requests
|
||||
chardet==4.0.0
|
||||
# via requests
|
||||
greenlet==1.1.0
|
||||
# via sqlalchemy
|
||||
idna==2.10
|
||||
# via requests
|
||||
requests==2.25.1
|
||||
# via -r requirements.in
|
||||
sqlalchemy==1.4.15
|
||||
# via -r requirements.in
|
||||
urllib3==1.26.4
|
||||
# via requests
|
||||
Reference in New Issue
Block a user