Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions js/net.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class ListenerImpl implements Listener {
export interface Conn extends Reader, Writer, Closer {
localAddr: string;
remoteAddr: string;
closeRead(): void;
closeWrite(): void;
}

class ConnImpl implements Conn {
Expand All @@ -80,19 +82,35 @@ class ConnImpl implements Conn {
* Most callers should just use close().
*/
closeRead(): void {
// TODO(ry) Connect to AsyncWrite::shutdown in resources.rs
return notImplemented();
shutdown(this.fd, ShutdownMode.Read);
}

/** closeWrite shuts down (shutdown(2)) the writing side of the TCP
* connection. Most callers should just use close().
*/
closeWrite(): void {
// TODO(ry) Connect to AsyncWrite::shutdown in resources.rs
return notImplemented();
shutdown(this.fd, ShutdownMode.Write);
}
}

enum ShutdownMode {
// See http://man7.org/linux/man-pages/man2/shutdown.2.html
// Corresponding to SHUT_RD, SHUT_WR, SHUT_RDWR
Read = 0,
Write,
ReadWrite // unused
}

function shutdown(fd: number, how: ShutdownMode) {
const builder = new flatbuffers.Builder();
msg.Shutdown.startShutdown(builder);
msg.Shutdown.addRid(builder, fd);
msg.Shutdown.addHow(builder, how);
const inner = msg.Shutdown.endShutdown(builder);
const baseRes = dispatch.sendSync(builder, msg.Any.Shutdown, inner);
Comment thread
ry marked this conversation as resolved.
assert(baseRes == null);
}

/** Listen announces on the local network address.
*
* The network must be "tcp", "tcp4", "tcp6", "unix" or "unixpacket".
Expand Down
115 changes: 114 additions & 1 deletion js/net_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

import * as deno from "deno";
import { testPerm, assert, assertEqual } from "./test_util.ts";
import { deferred } from "./util.ts";

@ry ry Oct 5, 2018

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool! I forgot about this when I suggested it - but this is only newly possible because of your work in #859.

js/net_test.ts runs inside of Deno, but js/util.ts runs inside of Node/rollup. But now we are able to seamlessly load util.ts in Deno too.

cc @kitsonk - can compiler_test.ts work like this too?


testPerm({ net: true }, function netListenClose() {
const listener = deno.listen("tcp", "127.0.0.1:4500");
listener.close();
});

testPerm({ net: true }, async function netDialListen() {
let addr = "127.0.0.1:4500";
const addr = "127.0.0.1:4500";
const listener = deno.listen("tcp", addr);
listener.accept().then(async conn => {
await conn.write(new Uint8Array([1, 2, 3]));
Expand All @@ -35,3 +36,115 @@ testPerm({ net: true }, async function netDialListen() {
listener.close();
conn.close();
});

testPerm({ net: true }, async function netCloseReadSuccess() {
const addr = "127.0.0.1:4500";
const listener = deno.listen("tcp", addr);
const closeDeferred = deferred();
listener.accept().then(async conn => {
await conn.write(new Uint8Array([1, 2, 3]));
const buf = new Uint8Array(1024);
const readResult = await conn.read(buf);
assertEqual(3, readResult.nread);
assertEqual(4, buf[0]);
assertEqual(5, buf[1]);
assertEqual(6, buf[2]);
conn.close();
closeDeferred.resolve();
});
const conn = await deno.dial("tcp", addr);
conn.closeRead(); // closing read
const buf = new Uint8Array(1024);
const readResult = await conn.read(buf);
assertEqual(0, readResult.nread); // No error, read nothing
assertEqual(true, readResult.eof); // with immediate EOF
// Ensure closeRead does not impact write
await conn.write(new Uint8Array([4, 5, 6]));
await closeDeferred.promise;
listener.close();
conn.close();
});

testPerm({ net: true }, async function netDoubleCloseRead() {
const addr = "127.0.0.1:4500";
const listener = deno.listen("tcp", addr);
const closeDeferred = deferred();
listener.accept().then(async conn => {
await conn.write(new Uint8Array([1, 2, 3]));
await closeDeferred.promise;
conn.close();
});
const conn = await deno.dial("tcp", addr);
conn.closeRead(); // closing read
let err;
try {
// Duplicated close should throw error
conn.closeRead();
} catch (e) {
err = e;
}
assert(!!err);
assertEqual(err.kind, deno.ErrorKind.NotConnected);
assertEqual(err.name, "NotConnected");
closeDeferred.resolve();
listener.close();
conn.close();
});

testPerm({ net: true }, async function netCloseWriteSuccess() {
const addr = "127.0.0.1:4500";
const listener = deno.listen("tcp", addr);
const closeDeferred = deferred();
listener.accept().then(async conn => {
await conn.write(new Uint8Array([1, 2, 3]));
await closeDeferred.promise;
conn.close();
});
const conn = await deno.dial("tcp", addr);
conn.closeWrite(); // closing write
const buf = new Uint8Array(1024);
// Check read not impacted
const readResult = await conn.read(buf);
assertEqual(3, readResult.nread);
assertEqual(1, buf[0]);
assertEqual(2, buf[1]);
assertEqual(3, buf[2]);
// Check write should be closed
let err;
try {
await conn.write(new Uint8Array([1, 2, 3]));
} catch (e) {
err = e;
}
assert(!!err);
assertEqual(err.kind, deno.ErrorKind.BrokenPipe);
assertEqual(err.name, "BrokenPipe");
closeDeferred.resolve();
listener.close();
conn.close();
});

testPerm({ net: true }, async function netDoubleCloseWrite() {
const addr = "127.0.0.1:4500";
const listener = deno.listen("tcp", addr);
const closeDeferred = deferred();
listener.accept().then(async conn => {
await closeDeferred.promise;
conn.close();
});
const conn = await deno.dial("tcp", addr);
conn.closeWrite(); // closing write
let err;
try {
// Duplicated close should throw error
conn.closeWrite();
} catch (e) {
err = e;
}
assert(!!err);
assertEqual(err.kind, deno.ErrorKind.NotConnected);
assertEqual(err.name, "NotConnected");

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. Thank you.

closeDeferred.resolve();
listener.close();
conn.close();
});
Comment thread
kevinkassimo marked this conversation as resolved.
9 changes: 4 additions & 5 deletions js/testing/testing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,11 @@ const FG_RED = "\x1b[31m";
const FG_GREEN = "\x1b[32m";

function red_failed() {
return FG_RED + "FAILED" + RESET
return FG_RED + "FAILED" + RESET;
}


function green_ok() {
return FG_GREEN + "ok" + RESET
return FG_GREEN + "ok" + RESET;
}

async function runTests() {
Expand Down Expand Up @@ -96,8 +95,8 @@ async function runTests() {
const result = failed > 0 ? red_failed() : green_ok();
console.log(
`\ntest result: ${result}. ${passed} passed; ${failed} failed; ` +
`${ignored} ignored; ${measured} measured; ${filtered} filtered out\n`);

`${ignored} ignored; ${measured} measured; ${filtered} filtered out\n`
);

if (failed === 0) {
// All good.
Expand Down
26 changes: 26 additions & 0 deletions js/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,29 @@ export function containsOnlyASCII(str: string): boolean {
}
return /^[\x00-\x7F]*$/.test(str);
}

// @internal
export interface Deferred {
promise: Promise<void>;
resolve: Function;
reject: Function;
}

/**
* Create a wrapper around a promise that could be
* resolved externally.
* @internal
*/
export function deferred(): Deferred {
let resolve: Function | undefined;
let reject: Function | undefined;
const promise = new Promise<void>((res, rej) => {
resolve = res;
reject = rej;
});
return {
promise,
resolve: resolve!,
reject: reject!
};
}
6 changes: 6 additions & 0 deletions src/msg.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ union Any {
Write,
WriteRes,
Close,
Shutdown,
Listen,
ListenRes,
Accept,
Expand Down Expand Up @@ -290,6 +291,11 @@ table Close {
rid: int;
}

table Shutdown {
rid: int;
how: uint;
}

table Listen {
network: string;
address: string;
Expand Down
35 changes: 33 additions & 2 deletions src/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use isolate::Isolate;
use isolate::IsolateState;
use isolate::Op;
use msg;
use resources;
use resources::Resource;
use tokio_util;

use flatbuffers::FlatBufferBuilder;
Expand All @@ -19,10 +21,9 @@ use hyper;
use hyper::rt::{Future, Stream};
use hyper::Client;
use remove_dir_all::remove_dir_all;
use resources;
use std;
use std::fs;
use std::net::SocketAddr;
use std::net::{Shutdown, SocketAddr};
#[cfg(any(unix))]
use std::os::unix::fs::PermissionsExt;
use std::path::Path;
Expand Down Expand Up @@ -84,6 +85,7 @@ pub fn dispatch(
msg::Any::Read => op_read,
msg::Any::Write => op_write,
msg::Any::Close => op_close,
msg::Any::Shutdown => op_shutdown,
msg::Any::Remove => op_remove,
msg::Any::ReadFile => op_read_file,
msg::Any::ReadDir => op_read_dir,
Expand Down Expand Up @@ -614,6 +616,35 @@ fn op_close(
}
}

fn op_shutdown(
_state: Arc<IsolateState>,
base: &msg::Base,
data: &'static mut [u8],
) -> Box<Op> {
assert_eq!(data.len(), 0);
let inner = base.inner_as_shutdown().unwrap();
let rid = inner.rid();
let how = inner.how();
match resources::lookup(rid) {
None => odd_future(errors::new(
errors::ErrorKind::BadFileDescriptor,
String::from("Bad File Descriptor"),
)),
Some(mut resource) => {
let shutdown_mode = match how {
0 => Shutdown::Read,
1 => Shutdown::Write,
_ => unimplemented!(),
};
blocking!(base.sync(), || {
// Use UFCS for disambiguation
Resource::shutdown(&mut resource, shutdown_mode)?;
Ok(empty_buf())
})
}
}
}

fn op_read(
_state: Arc<IsolateState>,
base: &msg::Base,
Expand Down
18 changes: 17 additions & 1 deletion src/resources.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
// descriptors". This module implements a global resource table. Ops (AKA
// handlers) look up resources by their integer id here.

use errors::DenoError;

use futures;
use futures::Poll;
use std;
use std::collections::HashMap;
use std::io::Error;
use std::io::{Read, Write};
use std::net::SocketAddr;
use std::net::{Shutdown, SocketAddr};
use std::sync::atomic::AtomicIsize;
use std::sync::atomic::Ordering;
use std::sync::Mutex;
Expand Down Expand Up @@ -79,6 +81,20 @@ impl Resource {
let r = table.remove(&self.rid);
assert!(r.is_some());
}

pub fn shutdown(&mut self, how: Shutdown) -> Result<(), DenoError> {
let mut table = RESOURCE_TABLE.lock().unwrap();
let maybe_repr = table.get_mut(&self.rid);
match maybe_repr {
None => panic!("bad rid"),
Some(repr) => match repr {
Repr::TcpStream(ref mut f) => {
TcpStream::shutdown(f, how).map_err(|err| DenoError::from(err))
}
_ => panic!("Cannot shutdown"),

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good - nice.

},
}
}
}

impl Read for Resource {
Expand Down