1# The four classes below implement a simple DataPipeline. The code would need to be filled in by the user.
2
3class WordDoc:
4 ...
5
6class PDF:
7 ...
8
9class SQLDatabase(DataSource, DataSink):
10 @get.register(WordDoc) # Tells the DataPipeline that this SQL database can provide a WordDoc
11 def get_word_doc(query: Dict[str, Any]) -> WordDoc:
12 """Returns a WordDoc from an SQL database based on the `filename` in the query."""
13
14 @put.register(WordDoc) # Tell the DataPipeline that this SQL database can store a WordDoc
15 def put_word_doc(doc: WordDoc, query: Dict[str, Any]):
16 """Stores the document in the SQL database using the query as an identifier."""
17
18class DocumentTransformer(Transformer):
19 @transform.register(WordDoc, PDF) # Tells the DataPipeline that we know how to convert a WordDoc to a PDF
20 def Word_to_PDF(doc: WordDoc) -> PDF:
21 """Converts a WordDoc to a PDF and returns the PDF."""
22
23
24# The line of code below can now be used to request a PDF.
25# The WordDoc with the filename `find_me` will be pulled from the SQL database then converted to a PDF and returned to the user.
26my_pdf = pipeline.get(PDF, query={"filename": "find_me"})
27
28# Note also that because we implemented a `put(WordDoc)` method in the SQLDatabase that it will also store WordDocs that pass through the SQL database via the pipeline but are not already in the database.